Skip to main content

Real-World Examples

Chat Application

Let's build a complete real-time chat application that demonstrates Hypermodern's multi-protocol capabilities, real-time streaming, and user management.

Schema Definition

{
  "models": {
    "user": {
      "id": "int64",
      "username": "string",
      "email": "string",
      "display_name": "string",
      "avatar_url": "string?",
      "status": "@user_status",
      "last_seen": "datetime",
      "created_at": "datetime"
    },
    "chat_room": {
      "id": "int64",
      "name": "string",
      "description": "string?",
      "type": "@room_type",
      "owner_id": "int64",
      "created_at": "datetime",
      "member_count": "int32"
    },
    "message": {
      "id": "int64",
      "room_id": "int64",
      "user_id": "int64",
      "content": "string",
      "message_type": "@message_type",
      "reply_to_id": "int64?",
      "attachments": ["@attachment"],
      "created_at": "datetime",
      "edited_at": "datetime?"
    },
    "attachment": {
      "id": "int64",
      "filename": "string",
      "content_type": "string",
      "size": "int64",
      "url": "string"
    },
    "room_member": {
      "room_id": "int64",
      "user_id": "int64",
      "role": "@member_role",
      "joined_at": "datetime",
      "last_read_message_id": "int64?"
    }
  },
  "enums": {
    "user_status": ["online", "away", "busy", "offline"],
    "room_type": ["public", "private", "direct"],
    "message_type": ["text", "image", "file", "system"],
    "member_role": ["owner", "admin", "member"]
  },
  "endpoints": {
    "create_room": {
      "method": "POST",
      "path": "/rooms",
      "request": {
        "name": "string",
        "description": "string?",
        "type": "@room_type"
      },
      "response": "@chat_room",
      "transports": ["http", "websocket"]
    },
    "join_room": {
      "method": "POST",
      "path": "/rooms/{room_id}/join",
      "request": {
        "room_id": "int64"
      },
      "response": {
        "success": "bool",
        "room": "@chat_room"
      },
      "transports": ["http", "websocket"]
    },
    "send_message": {
      "method": "POST",
      "path": "/rooms/{room_id}/messages",
      "request": {
        "room_id": "int64",
        "content": "string",
        "message_type": "@message_type",
        "reply_to_id": "int64?"
      },
      "response": "@message",
      "transports": ["http", "websocket"]
    },
    "get_messages": {
      "method": "GET",
      "path": "/rooms/{room_id}/messages",
      "request": {
        "room_id": "int64",
        "limit": "int32?",
        "before_id": "int64?"
      },
      "response": {
        "messages": ["@message"],
        "has_more": "bool"
      },
      "transports": ["http", "websocket"]
    },
    "watch_room": {
      "type": "stream",
      "request": {
        "room_id": "int64"
      },
      "response": "@room_event",
      "transports": ["websocket", "tcp"]
    }
  }
}

Server Implementation

class ChatServer {
  final HypermodernServer _server;
  final ChatService _chatService;
  final UserService _userService;
  final RealTimeBroadcaster _broadcaster;
  
  ChatServer({
    required HypermodernServer server,
    required ChatService chatService,
    required UserService userService,
    required RealTimeBroadcaster broadcaster,
  }) : _server = server,
       _chatService = chatService,
       _userService = userService,
       _broadcaster = broadcaster;
  
  Future<void> initialize() async {
    _registerEndpoints();
    _registerStreamingEndpoints();
    _setupMiddleware();
    
    await _server.listen();
    print('🚀 Chat server running on multiple protocols');
  }
  
  void _registerEndpoints() {
    // Room management
    _server.registerEndpoint<CreateRoomRequest, ChatRoom>(
      'create_room',
      (request) async {
        final userId = _getCurrentUserId(request.context);
        final room = await _chatService.createRoom(
          name: request.name,
          description: request.description,
          type: request.type,
          ownerId: userId,
        );
        
        // Broadcast room creation
        _broadcaster.broadcast('global', RoomCreatedEvent(room: room));
        
        return room;
      },
    );
    
    _server.registerEndpoint<JoinRoomRequest, JoinRoomResponse>(
      'join_room',
      (request) async {
        final userId = _getCurrentUserId(request.context);
        
        final room = await _chatService.joinRoom(
          roomId: request.roomId,
          userId: userId,
        );
        
        // Broadcast user joined
        _broadcaster.broadcast(
          'room:${request.roomId}',
          UserJoinedEvent(roomId: request.roomId, userId: userId),
        );
        
        return JoinRoomResponse(success: true, room: room);
      },
    );
    
    // Message handling
    _server.registerEndpoint<SendMessageRequest, Message>(
      'send_message',
      (request) async {
        final userId = _getCurrentUserId(request.context);
        
        // Validate user is member of room
        await _chatService.validateRoomMembership(request.roomId, userId);
        
        final message = await _chatService.sendMessage(
          roomId: request.roomId,
          userId: userId,
          content: request.content,
          messageType: request.messageType,
          replyToId: request.replyToId,
        );
        
        // Broadcast message to room members
        _broadcaster.broadcast(
          'room:${request.roomId}',
          MessageSentEvent(message: message),
        );
        
        // Update user activity
        await _userService.updateLastSeen(userId);
        
        return message;
      },
    );
    
    _server.registerEndpoint<GetMessagesRequest, GetMessagesResponse>(
      'get_messages',
      (request) async {
        final userId = _getCurrentUserId(request.context);
        
        // Validate access to room
        await _chatService.validateRoomAccess(request.roomId, userId);
        
        final result = await _chatService.getMessages(
          roomId: request.roomId,
          limit: request.limit ?? 50,
          beforeId: request.beforeId,
        );
        
        return GetMessagesResponse(
          messages: result.messages,
          hasMore: result.hasMore,
        );
      },
    );
  }
  
  void _registerStreamingEndpoints() {
    // Real-time room updates
    _server.registerStreamingEndpoint<WatchRoomRequest, RoomEvent>(
      'watch_room',
      (request) async* {
        final userId = _getCurrentUserId(request.context);
        
        // Validate access to room
        await _chatService.validateRoomAccess(request.roomId, userId);
        
        // Subscribe to room events
        final controller = StreamController<RoomEvent>();
        final channel = 'room:${request.roomId}';
        
        _broadcaster.subscribe(channel, controller);
        
        // Send initial room state
        controller.add(RoomStateEvent(
          roomId: request.roomId,
          onlineMembers: await _chatService.getOnlineMembers(request.roomId),
        ));
        
        // Clean up on disconnect
        controller.onCancel = () {
          _broadcaster.unsubscribe(channel, controller);
        };
        
        yield* controller.stream;
      },
    );
    
    // User presence updates
    _server.registerStreamingEndpoint<WatchPresenceRequest, PresenceEvent>(
      'watch_presence',
      (request) async* {
        final userId = _getCurrentUserId(request.context);
        
        final controller = StreamController<PresenceEvent>();
        _broadcaster.subscribe('presence', controller);
        
        // Send current presence state
        final onlineUsers = await _userService.getOnlineUsers();
        controller.add(PresenceStateEvent(onlineUsers: onlineUsers));
        
        // Update user's own presence
        await _userService.setUserStatus(userId, UserStatus.online);
        _broadcaster.broadcast('presence', UserStatusChangedEvent(
          userId: userId,
          status: UserStatus.online,
        ));
        
        // Handle disconnect
        controller.onCancel = () async {
          await _userService.setUserStatus(userId, UserStatus.offline);
          _broadcaster.broadcast('presence', UserStatusChangedEvent(
            userId: userId,
            status: UserStatus.offline,
          ));
          _broadcaster.unsubscribe('presence', controller);
        };
        
        yield* controller.stream;
      },
    );
  }
  
  void _setupMiddleware() {
    // Authentication required for all endpoints
    _server.middleware.add(AuthenticationMiddleware(
      jwtService: _server.container.get<JWTService>(),
      publicEndpoints: {},
    ));
    
    // Rate limiting for message sending
    _server.middleware.add(ConditionalMiddleware(
      RateLimitMiddleware(
        rateLimiter: _server.container.get<RateLimiter>(),
        endpointLimits: {
          'send_message': RateLimit(requests: 30, window: Duration(minutes: 1)),
        },
      ),
      (request) => request.endpoint == 'send_message',
    ));
    
    // Logging and metrics
    _server.middleware.add(LoggingMiddleware());
    _server.middleware.add(MetricsMiddleware(_server.container.get<PrometheusClient>()));
  }
  
  int _getCurrentUserId(Map<String, dynamic> context) {
    final userId = context['user_id'] as int?;
    if (userId == null) {
      throw UnauthorizedException('User not authenticated');
    }
    return userId;
  }
}

class ChatService {
  final Database _db;
  final FileStorageService _fileStorage;
  
  ChatService(this._db, this._fileStorage);
  
  Future<ChatRoom> createRoom({
    required String name,
    String? description,
    required RoomType type,
    required int ownerId,
  }) async {
    return await _db.transaction((tx) async {
      // Create room
      final roomResult = await tx.query('''
        INSERT INTO chat_rooms (name, description, type, owner_id, created_at)
        VALUES (?, ?, ?, ?, ?)
        RETURNING *
      ''', [name, description, type.toString(), ownerId, DateTime.now()]);
      
      final room = ChatRoom.fromJson(roomResult.first);
      
      // Add owner as member
      await tx.query('''
        INSERT INTO room_members (room_id, user_id, role, joined_at)
        VALUES (?, ?, ?, ?)
      ''', [room.id, ownerId, MemberRole.owner.toString(), DateTime.now()]);
      
      return room;
    });
  }
  
  Future<ChatRoom> joinRoom({
    required int roomId,
    required int userId,
  }) async {
    return await _db.transaction((tx) async {
      // Check if room exists and is joinable
      final roomResult = await tx.query(
        'SELECT * FROM chat_rooms WHERE id = ?',
        [roomId],
      );
      
      if (roomResult.isEmpty) {
        throw NotFoundException('Room not found');
      }
      
      final room = ChatRoom.fromJson(roomResult.first);
      
      if (room.type == RoomType.private) {
        throw ForbiddenException('Cannot join private room without invitation');
      }
      
      // Check if already a member
      final memberResult = await tx.query(
        'SELECT 1 FROM room_members WHERE room_id = ? AND user_id = ?',
        [roomId, userId],
      );
      
      if (memberResult.isNotEmpty) {
        return room; // Already a member
      }
      
      // Add as member
      await tx.query('''
        INSERT INTO room_members (room_id, user_id, role, joined_at)
        VALUES (?, ?, ?, ?)
      ''', [roomId, userId, MemberRole.member.toString(), DateTime.now()]);
      
      // Update member count
      await tx.query(
        'UPDATE chat_rooms SET member_count = member_count + 1 WHERE id = ?',
        [roomId],
      );
      
      return room;
    });
  }
  
  Future<Message> sendMessage({
    required int roomId,
    required int userId,
    required String content,
    required MessageType messageType,
    int? replyToId,
  }) async {
    // Validate content
    if (content.trim().isEmpty) {
      throw ValidationException('Message content cannot be empty');
    }
    
    if (content.length > 4000) {
      throw ValidationException('Message too long');
    }
    
    // Process mentions and links
    final processedContent = await _processMessageContent(content);
    
    final result = await _db.query('''
      INSERT INTO messages (room_id, user_id, content, message_type, reply_to_id, created_at)
      VALUES (?, ?, ?, ?, ?, ?)
      RETURNING *
    ''', [roomId, userId, processedContent, messageType.toString(), replyToId, DateTime.now()]);
    
    return Message.fromJson(result.first);
  }
  
  Future<String> _processMessageContent(String content) async {
    // Process @mentions
    final mentionRegex = RegExp(r'@(\w+)');
    var processedContent = content;
    
    final mentions = mentionRegex.allMatches(content);
    for (final mention in mentions) {
      final username = mention.group(1)!;
      final user = await _getUserByUsername(username);
      
      if (user != null) {
        processedContent = processedContent.replaceAll(
          mention.group(0)!,
          '<@${user.id}>',
        );
      }
    }
    
    return processedContent;
  }
  
  Future<User?> _getUserByUsername(String username) async {
    final result = await _db.query(
      'SELECT * FROM users WHERE username = ?',
      [username],
    );
    
    return result.isEmpty ? null : User.fromJson(result.first);
  }
}

Client Implementation

class ChatClient {
  final HypermodernClient _client;
  final StreamController<ChatEvent> _eventController = StreamController.broadcast();
  
  StreamSubscription? _roomSubscription;
  StreamSubscription? _presenceSubscription;
  
  ChatClient(String serverUrl) : _client = HypermodernClient(serverUrl);
  
  Stream<ChatEvent> get events => _eventController.stream;
  
  Future<void> connect(String authToken) async {
    _client.setAuthToken(authToken);
    await _client.connect();
    
    // Start presence monitoring
    await _startPresenceMonitoring();
    
    print('✅ Connected to chat server');
  }
  
  Future<void> disconnect() async {
    await _roomSubscription?.cancel();
    await _presenceSubscription?.cancel();
    await _client.disconnect();
    await _eventController.close();
  }
  
  // Room operations
  Future<ChatRoom> createRoom({
    required String name,
    String? description,
    required RoomType type,
  }) async {
    final request = CreateRoomRequest(
      name: name,
      description: description,
      type: type,
    );
    
    return await _client.request<ChatRoom>('create_room', request);
  }
  
  Future<void> joinRoom(int roomId) async {
    final request = JoinRoomRequest(roomId: roomId);
    final response = await _client.request<JoinRoomResponse>('join_room', request);
    
    if (response.success) {
      await _watchRoom(roomId);
      _eventController.add(RoomJoinedEvent(room: response.room));
    }
  }
  
  Future<void> _watchRoom(int roomId) async {
    await _roomSubscription?.cancel();
    
    final request = WatchRoomRequest(roomId: roomId);
    final stream = _client.stream<RoomEvent>('watch_room', request);
    
    _roomSubscription = stream.listen(
      (event) => _eventController.add(ChatEvent.fromRoomEvent(event)),
      onError: (error) => _eventController.addError(error),
    );
  }
  
  // Message operations
  Future<Message> sendMessage({
    required int roomId,
    required String content,
    MessageType type = MessageType.text,
    int? replyToId,
  }) async {
    final request = SendMessageRequest(
      roomId: roomId,
      content: content,
      messageType: type,
      replyToId: replyToId,
    );
    
    return await _client.request<Message>('send_message', request);
  }
  
  Future<List<Message>> getMessages({
    required int roomId,
    int? limit,
    int? beforeId,
  }) async {
    final request = GetMessagesRequest(
      roomId: roomId,
      limit: limit,
      beforeId: beforeId,
    );
    
    final response = await _client.request<GetMessagesResponse>('get_messages', request);
    return response.messages;
  }
  
  Future<void> _startPresenceMonitoring() async {
    final request = WatchPresenceRequest();
    final stream = _client.stream<PresenceEvent>('watch_presence', request);
    
    _presenceSubscription = stream.listen(
      (event) => _eventController.add(ChatEvent.fromPresenceEvent(event)),
      onError: (error) => print('Presence error: $error'),
    );
  }
}

// Usage example
class ChatApp {
  final ChatClient _client;
  final Map<int, List<Message>> _roomMessages = {};
  final Set<int> _onlineUsers = {};
  
  ChatApp(String serverUrl) : _client = ChatClient(serverUrl);
  
  Future<void> start() async {
    // Connect to server
    await _client.connect('your-auth-token');
    
    // Listen for events
    _client.events.listen(_handleChatEvent);
    
    // Join a room
    await _client.joinRoom(1);
    
    // Load message history
    final messages = await _client.getMessages(roomId: 1, limit: 50);
    _roomMessages[1] = messages;
    
    // Start message loop
    _startMessageLoop();
  }
  
  void _handleChatEvent(ChatEvent event) {
    switch (event.type) {
      case ChatEventType.messageReceived:
        final messageEvent = event as MessageReceivedEvent;
        _roomMessages.putIfAbsent(messageEvent.message.roomId, () => [])
            .add(messageEvent.message);
        _displayMessage(messageEvent.message);
        break;
        
      case ChatEventType.userJoined:
        final joinEvent = event as UserJoinedEvent;
        print('${joinEvent.userId} joined the room');
        break;
        
      case ChatEventType.userLeft:
        final leaveEvent = event as UserLeftEvent;
        print('${leaveEvent.userId} left the room');
        break;
        
      case ChatEventType.userStatusChanged:
        final statusEvent = event as UserStatusChangedEvent;
        if (statusEvent.status == UserStatus.online) {
          _onlineUsers.add(statusEvent.userId);
        } else {
          _onlineUsers.remove(statusEvent.userId);
        }
        break;
    }
  }
  
  void _displayMessage(Message message) {
    print('[${message.createdAt}] ${message.userId}: ${message.content}');
  }
  
  void _startMessageLoop() {
    // Simple console input loop
    stdin.transform(utf8.decoder).transform(LineSplitter()).listen((line) async {
      if (line.isNotEmpty) {
        try {
          await _client.sendMessage(roomId: 1, content: line);
        } catch (e) {
          print('Failed to send message: $e');
        }
      }
    });
  }
}

API Gateway

An API gateway that translates between different protocols and provides unified access to multiple backend services.

Gateway Schema

{
  "models": {
    "gateway_route": {
      "id": "int64",
      "path": "string",
      "method": "string",
      "backend_service": "string",
      "backend_path": "string",
      "protocol": "@backend_protocol",
      "auth_required": "bool",
      "rate_limit": "@rate_limit_config?",
      "cache_config": "@cache_config?",
      "created_at": "datetime"
    },
    "rate_limit_config": {
      "requests": "int32",
      "window_seconds": "int32",
      "burst": "int32?"
    },
    "cache_config": {
      "ttl_seconds": "int32",
      "vary_headers": ["string"]
    },
    "api_key": {
      "id": "int64",
      "key": "string",
      "name": "string",
      "permissions": ["string"],
      "rate_limit": "@rate_limit_config?",
      "expires_at": "datetime?",
      "created_at": "datetime"
    }
  },
  "enums": {
    "backend_protocol": ["http", "grpc", "graphql", "websocket"]
  },
  "endpoints": {
    "proxy_request": {
      "method": "ANY",
      "path": "/api/{path:.*}",
      "request": {
        "path": "string",
        "method": "string",
        "headers": "map<string, string>",
        "query_params": "map<string, string>",
        "body": "any?"
      },
      "response": {
        "status_code": "int32",
        "headers": "map<string, string>",
        "body": "any?"
      },
      "transports": ["http", "websocket"]
    }
  }
}

Gateway Implementation

class APIGateway {
  final HypermodernServer _server;
  final RouteManager _routeManager;
  final BackendClientPool _clientPool;
  final CacheManager _cacheManager;
  final RateLimiter _rateLimiter;
  final MetricsCollector _metrics;
  
  APIGateway({
    required HypermodernServer server,
    required RouteManager routeManager,
    required BackendClientPool clientPool,
    required CacheManager cacheManager,
    required RateLimiter rateLimiter,
    required MetricsCollector metrics,
  }) : _server = server,
       _routeManager = routeManager,
       _clientPool = clientPool,
       _cacheManager = cacheManager,
       _rateLimiter = rateLimiter,
       _metrics = metrics;
  
  Future<void> initialize() async {
    _setupMiddleware();
    _registerProxyEndpoint();
    
    await _server.listen();
    print('🌐 API Gateway running');
  }
  
  void _setupMiddleware() {
    // Request logging
    _server.middleware.add(RequestLoggingMiddleware());
    
    // CORS handling
    _server.middleware.add(CORSMiddleware(
      allowedOrigins: ['*'],
      allowedMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
      allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'],
    ));
    
    // API key authentication
    _server.middleware.add(APIKeyAuthMiddleware(_routeManager));
    
    // Rate limiting
    _server.middleware.add(GatewayRateLimitMiddleware(_rateLimiter, _routeManager));
    
    // Caching
    _server.middleware.add(GatewayCacheMiddleware(_cacheManager, _routeManager));
    
    // Metrics collection
    _server.middleware.add(GatewayMetricsMiddleware(_metrics));
  }
  
  void _registerProxyEndpoint() {
    _server.registerEndpoint<ProxyRequest, ProxyResponse>(
      'proxy_request',
      (request) async => await _handleProxyRequest(request),
    );
  }
  
  Future<ProxyResponse> _handleProxyRequest(ProxyRequest request) async {
    final stopwatch = Stopwatch()..start();
    
    try {
      // Find matching route
      final route = await _routeManager.findRoute(request.path, request.method);
      if (route == null) {
        return ProxyResponse(
          statusCode: 404,
          headers: {'Content-Type': 'application/json'},
          body: {'error': 'Route not found'},
        );
      }
      
      // Transform request for backend
      final backendRequest = await _transformRequest(request, route);
      
      // Get backend client
      final client = await _clientPool.getClient(route.backendService, route.protocol);
      
      // Make backend request
      final backendResponse = await client.request(backendRequest);
      
      // Transform response for client
      final response = await _transformResponse(backendResponse, route);
      
      stopwatch.stop();
      
      // Record metrics
      _metrics.recordRequest(
        route: route.path,
        method: request.method,
        statusCode: response.statusCode,
        duration: stopwatch.elapsed,
        backendService: route.backendService,
      );
      
      return response;
      
    } catch (e) {
      stopwatch.stop();
      
      _metrics.recordError(
        route: request.path,
        method: request.method,
        error: e.toString(),
        duration: stopwatch.elapsed,
      );
      
      return _handleError(e);
    }
  }
  
  Future<BackendRequest> _transformRequest(ProxyRequest request, GatewayRoute route) async {
    // Transform path
    final transformedPath = _transformPath(request.path, route);
    
    // Add gateway headers
    final headers = Map<String, String>.from(request.headers);
    headers['X-Gateway-Route'] = route.id.toString();
    headers['X-Gateway-Timestamp'] = DateTime.now().toIso8601String();
    
    // Protocol-specific transformations
    switch (route.protocol) {
      case BackendProtocol.http:
        return HttpBackendRequest(
          path: transformedPath,
          method: request.method,
          headers: headers,
          queryParams: request.queryParams,
          body: request.body,
        );
        
      case BackendProtocol.grpc:
        return GrpcBackendRequest(
          service: route.backendService,
          method: _extractGrpcMethod(transformedPath),
          metadata: headers,
          request: request.body,
        );
        
      case BackendProtocol.graphql:
        return GraphQLBackendRequest(
          query: _transformToGraphQL(request),
          variables: request.body is Map ? request.body as Map<String, dynamic> : {},
          headers: headers,
        );
        
      default:
        throw UnsupportedError('Protocol ${route.protocol} not supported');
    }
  }
  
  String _transformPath(String originalPath, GatewayRoute route) {
    // Remove gateway prefix and apply backend path template
    final pathWithoutPrefix = originalPath.replaceFirst('/api', '');
    return route.backendPath.replaceAll('{path}', pathWithoutPrefix);
  }
  
  Future<ProxyResponse> _transformResponse(BackendResponse backendResponse, GatewayRoute route) async {
    final headers = Map<String, String>.from(backendResponse.headers);
    
    // Add gateway headers
    headers['X-Gateway-Service'] = route.backendService;
    headers['X-Gateway-Protocol'] = route.protocol.toString();
    
    // Protocol-specific response transformation
    dynamic transformedBody;
    
    switch (route.protocol) {
      case BackendProtocol.grpc:
        transformedBody = _transformGrpcResponse(backendResponse.body);
        break;
      case BackendProtocol.graphql:
        transformedBody = _transformGraphQLResponse(backendResponse.body);
        break;
      default:
        transformedBody = backendResponse.body;
    }
    
    return ProxyResponse(
      statusCode: backendResponse.statusCode,
      headers: headers,
      body: transformedBody,
    );
  }
  
  ProxyResponse _handleError(dynamic error) {
    if (error is TimeoutException) {
      return ProxyResponse(
        statusCode: 504,
        headers: {'Content-Type': 'application/json'},
        body: {'error': 'Gateway timeout'},
      );
    } else if (error is ConnectionException) {
      return ProxyResponse(
        statusCode: 502,
        headers: {'Content-Type': 'application/json'},
        body: {'error': 'Bad gateway'},
      );
    } else {
      return ProxyResponse(
        statusCode: 500,
        headers: {'Content-Type': 'application/json'},
        body: {'error': 'Internal server error'},
      );
    }
  }
}

class BackendClientPool {
  final Map<String, BackendClient> _clients = {};
  final ClientFactory _clientFactory;
  
  BackendClientPool(this._clientFactory);
  
  Future<BackendClient> getClient(String service, BackendProtocol protocol) async {
    final key = '$service:$protocol';
    
    return _clients.putIfAbsent(key, () {
      return _clientFactory.createClient(service, protocol);
    });
  }
}

class ClientFactory {
  final Map<String, ServiceConfig> _serviceConfigs;
  
  ClientFactory(this._serviceConfigs);
  
  BackendClient createClient(String service, BackendProtocol protocol) {
    final config = _serviceConfigs[service];
    if (config == null) {
      throw ArgumentError('Service configuration not found: $service');
    }
    
    switch (protocol) {
      case BackendProtocol.http:
        return HttpBackendClient(config.httpUrl);
      case BackendProtocol.grpc:
        return GrpcBackendClient(config.grpcUrl);
      case BackendProtocol.graphql:
        return GraphQLBackendClient(config.graphqlUrl);
      default:
        throw UnsupportedError('Protocol $protocol not supported');
    }
  }
}

Microservices Architecture

A complete microservices setup with service discovery, inter-service communication, and distributed tracing.

Service Registry

class ServiceRegistry {
  final Database _db;
  final Map<String, ServiceInstance> _services = {};
  final StreamController<ServiceEvent> _eventController = StreamController.broadcast();
  
  ServiceRegistry(this._db);
  
  Stream<ServiceEvent> get events => _eventController.stream;
  
  Future<void> registerService(ServiceRegistration registration) async {
    final instance = ServiceInstance(
      id: registration.id,
      name: registration.name,
      version: registration.version,
      endpoints: registration.endpoints,
      healthCheckUrl: registration.healthCheckUrl,
      metadata: registration.metadata,
      registeredAt: DateTime.now(),
      lastHeartbeat: DateTime.now(),
    );
    
    _services[registration.id] = instance;
    
    // Persist to database
    await _db.query('''
      INSERT INTO service_instances (id, name, version, endpoints, health_check_url, metadata, registered_at, last_heartbeat)
      VALUES (?, ?, ?, ?, ?, ?, ?, ?)
      ON CONFLICT (id) DO UPDATE SET
        endpoints = EXCLUDED.endpoints,
        health_check_url = EXCLUDED.health_check_url,
        metadata = EXCLUDED.metadata,
        last_heartbeat = EXCLUDED.last_heartbeat
    ''', [
      instance.id,
      instance.name,
      instance.version,
      jsonEncode(instance.endpoints.map((e) => e.toJson()).toList()),
      instance.healthCheckUrl,
      jsonEncode(instance.metadata),
      instance.registeredAt,
      instance.lastHeartbeat,
    ]);
    
    _eventController.add(ServiceRegisteredEvent(instance: instance));
    print('✅ Service registered: ${instance.name}@${instance.version}');
  }
  
  Future<void> deregisterService(String serviceId) async {
    final instance = _services.remove(serviceId);
    if (instance != null) {
      await _db.query('DELETE FROM service_instances WHERE id = ?', [serviceId]);
      _eventController.add(ServiceDeregisteredEvent(instance: instance));
      print('❌ Service deregistered: ${instance.name}');
    }
  }
  
  Future<void> heartbeat(String serviceId) async {
    final instance = _services[serviceId];
    if (instance != null) {
      instance.lastHeartbeat = DateTime.now();
      await _db.query(
        'UPDATE service_instances SET last_heartbeat = ? WHERE id = ?',
        [instance.lastHeartbeat, serviceId],
      );
    }
  }
  
  List<ServiceInstance> discoverServices(String serviceName) {
    return _services.values
        .where((instance) => instance.name == serviceName)
        .where((instance) => _isHealthy(instance))
        .toList();
  }
  
  bool _isHealthy(ServiceInstance instance) {
    final timeSinceHeartbeat = DateTime.now().difference(instance.lastHeartbeat);
    return timeSinceHeartbeat < Duration(minutes: 2);
  }
  
  Future<void> startHealthChecking() async {
    Timer.periodic(Duration(seconds: 30), (_) async {
      await _performHealthChecks();
    });
  }
  
  Future<void> _performHealthChecks() async {
    final unhealthyServices = <String>[];
    
    for (final instance in _services.values) {
      try {
        final client = HttpClient();
        final response = await client.get(instance.healthCheckUrl)
            .timeout(Duration(seconds: 5));
        
        if (response.statusCode != 200) {
          unhealthyServices.add(instance.id);
        }
      } catch (e) {
        unhealthyServices.add(instance.id);
      }
    }
    
    // Remove unhealthy services
    for (final serviceId in unhealthyServices) {
      await deregisterService(serviceId);
    }
  }
}

Inter-Service Communication

class ServiceClient {
  final ServiceRegistry _registry;
  final LoadBalancer _loadBalancer;
  final CircuitBreaker _circuitBreaker;
  final Map<String, HypermodernClient> _clients = {};
  
  ServiceClient({
    required ServiceRegistry registry,
    required LoadBalancer loadBalancer,
    required CircuitBreaker circuitBreaker,
  }) : _registry = registry,
       _loadBalancer = loadBalancer,
       _circuitBreaker = circuitBreaker;
  
  Future<T> call<T>({
    required String serviceName,
    required String endpoint,
    required dynamic request,
    Duration? timeout,
  }) async {
    return await _circuitBreaker.execute(() async {
      // Discover service instances
      final instances = _registry.discoverServices(serviceName);
      if (instances.isEmpty) {
        throw ServiceUnavailableException('No healthy instances of $serviceName');
      }
      
      // Select instance using load balancer
      final selectedInstance = _loadBalancer.selectInstance(instances);
      
      // Get or create client
      final client = await _getClient(selectedInstance);
      
      // Make request with timeout
      final future = client.request<T>(endpoint, request);
      final timeoutDuration = timeout ?? Duration(seconds: 30);
      
      return await future.timeout(timeoutDuration);
    });
  }
  
  Future<HypermodernClient> _getClient(ServiceInstance instance) async {
    final clientKey = instance.id;
    
    if (!_clients.containsKey(clientKey)) {
      // Find best endpoint for communication
      final endpoint = _selectBestEndpoint(instance.endpoints);
      final client = HypermodernClient(endpoint.url);
      
      await client.connect();
      _clients[clientKey] = client;
    }
    
    return _clients[clientKey]!;
  }
  
  ServiceEndpoint _selectBestEndpoint(List<ServiceEndpoint> endpoints) {
    // Prefer TCP for performance, then WebSocket, then HTTP
    final priorities = [
      ProtocolType.tcp,
      ProtocolType.websocket,
      ProtocolType.http,
    ];
    
    for (final protocol in priorities) {
      final endpoint = endpoints.firstWhere(
        (e) => e.protocol == protocol,
        orElse: () => endpoints.first,
      );
      if (endpoint.protocol == protocol) {
        return endpoint;
      }
    }
    
    return endpoints.first;
  }
}

// Example microservice
class UserService extends MicroService {
  final UserRepository _userRepository;
  final ServiceClient _serviceClient;
  
  UserService({
    required UserRepository userRepository,
    required ServiceClient serviceClient,
  }) : _userRepository = userRepository,
       _serviceClient = serviceClient;
  
  @override
  String get serviceName => 'user-service';
  
  @override
  String get version => '1.0.0';
  
  @override
  List<ServiceEndpoint> get endpoints => [
    ServiceEndpoint(
      protocol: ProtocolType.http,
      url: 'http://localhost:8080',
    ),
    ServiceEndpoint(
      protocol: ProtocolType.websocket,
      url: 'ws://localhost:8082',
    ),
    ServiceEndpoint(
      protocol: ProtocolType.tcp,
      url: 'tcp://localhost:8081',
    ),
  ];
  
  @override
  void registerEndpoints(HypermodernServer server) {
    server.registerEndpoint<CreateUserRequest, User>(
      'create_user',
      (request) async {
        // Create user
        final user = await _userRepository.create(User(
          username: request.username,
          email: request.email,
          passwordHash: _hashPassword(request.password),
          createdAt: DateTime.now(),
          updatedAt: DateTime.now(),
        ));
        
        // Notify other services
        await _serviceClient.call<void>(
          serviceName: 'notification-service',
          endpoint: 'send_welcome_email',
          request: SendWelcomeEmailRequest(
            userId: user.id,
            email: user.email,
            username: user.username,
          ),
        );
        
        await _serviceClient.call<void>(
          serviceName: 'analytics-service',
          endpoint: 'track_user_registration',
          request: TrackUserRegistrationRequest(
            userId: user.id,
            timestamp: DateTime.now(),
          ),
        );
        
        return user;
      },
    );
    
    server.registerEndpoint<GetUserRequest, User>(
      'get_user',
      (request) async {
        final user = await _userRepository.findById(request.id);
        if (user == null) {
          throw NotFoundException('User not found');
        }
        return user;
      },
    );
  }
}

IoT Device Communication

A platform for managing IoT devices with efficient binary communication over TCP.

IoT Schema

{
  "models": {
    "device": {
      "id": "int64",
      "device_id": "string",
      "name": "string",
      "type": "@device_type",
      "firmware_version": "string",
      "last_seen": "datetime",
      "status": "@device_status",
      "location": "@location?",
      "metadata": "map<string, any>",
      "created_at": "datetime"
    },
    "location": {
      "latitude": "float64",
      "longitude": "float64",
      "altitude": "float64?"
    },
    "sensor_reading": {
      "device_id": "string",
      "sensor_type": "@sensor_type",
      "value": "float64",
      "unit": "string",
      "timestamp": "datetime",
      "quality": "float32?"
    },
    "device_command": {
      "id": "int64",
      "device_id": "string",
      "command": "string",
      "parameters": "map<string, any>",
      "status": "@command_status",
      "sent_at": "datetime",
      "executed_at": "datetime?"
    }
  },
  "enums": {
    "device_type": ["sensor", "actuator", "gateway", "controller"],
    "device_status": ["online", "offline", "maintenance", "error"],
    "sensor_type": ["temperature", "humidity", "pressure", "motion", "light"],
    "command_status": ["pending", "sent", "acknowledged", "executed", "failed"]
  },
  "endpoints": {
    "register_device": {
      "method": "POST",
      "path": "/devices/register",
      "request": {
        "device_id": "string",
        "name": "string",
        "type": "@device_type",
        "firmware_version": "string",
        "capabilities": ["string"]
      },
      "response": "@device",
      "transports": ["tcp", "http"]
    },
    "send_readings": {
      "method": "POST",
      "path": "/devices/{device_id}/readings",
      "request": {
        "device_id": "string",
        "readings": ["@sensor_reading"]
      },
      "response": {
        "accepted": "int32",
        "rejected": "int32"
      },
      "transports": ["tcp"]
    },
    "get_commands": {
      "method": "GET",
      "path": "/devices/{device_id}/commands",
      "request": {
        "device_id": "string",
        "since": "datetime?"
      },
      "response": {
        "commands": ["@device_command"]
      },
      "transports": ["tcp", "http"]
    }
  }
}

IoT Platform Implementation

class IoTPlatform {
  final HypermodernServer _server;
  final DeviceManager _deviceManager;
  final DataProcessor _dataProcessor;
  final CommandQueue _commandQueue;
  final TimeSeriesDB _timeSeriesDB;
  
  IoTPlatform({
    required HypermodernServer server,
    required DeviceManager deviceManager,
    required DataProcessor dataProcessor,
    required CommandQueue commandQueue,
    required TimeSeriesDB timeSeriesDB,
  }) : _server = server,
       _deviceManager = deviceManager,
       _dataProcessor = dataProcessor,
       _commandQueue = commandQueue,
       _timeSeriesDB = timeSeriesDB;
  
  Future<void> initialize() async {
    _setupTcpOptimizations();
    _registerEndpoints();
    _startBackgroundTasks();
    
    await _server.listen();
    print('🌐 IoT Platform running');
  }
  
  void _setupTcpOptimizations() {
    // Configure TCP server for IoT devices
    _server.configureTcp(TcpServerConfig(
      port: 8081,
      maxConnections: 10000,
      keepAlive: true,
      keepAliveInterval: Duration(seconds: 30),
      noDelay: true, // Disable Nagle's algorithm for low latency
      receiveBufferSize: 64 * 1024,
      sendBufferSize: 64 * 1024,
    ));
  }
  
  void _registerEndpoints() {
    // Device registration
    _server.registerEndpoint<RegisterDeviceRequest, Device>(
      'register_device',
      (request) async {
        final device = await _deviceManager.registerDevice(
          deviceId: request.deviceId,
          name: request.name,
          type: request.type,
          firmwareVersion: request.firmwareVersion,
          capabilities: request.capabilities,
        );
        
        print('📱 Device registered: ${device.deviceId}');
        return device;
      },
    );
    
    // Sensor data ingestion
    _server.registerEndpoint<SendReadingsRequest, SendReadingsResponse>(
      'send_readings',
      (request) async {
        final result = await _dataProcessor.processReadings(
          deviceId: request.deviceId,
          readings: request.readings,
        );
        
        // Update device last seen
        await _deviceManager.updateLastSeen(request.deviceId);
        
        return SendReadingsResponse(
          accepted: result.accepted,
          rejected: result.rejected,
        );
      },
    );
    
    // Command retrieval
    _server.registerEndpoint<GetCommandsRequest, GetCommandsResponse>(
      'get_commands',
      (request) async {
        final commands = await _commandQueue.getCommandsForDevice(
          deviceId: request.deviceId,
          since: request.since,
        );
        
        return GetCommandsResponse(commands: commands);
      },
    );
  }
  
  void _startBackgroundTasks() {
    // Device health monitoring
    Timer.periodic(Duration(minutes: 1), (_) async {
      await _deviceManager.checkDeviceHealth();
    });
    
    // Data aggregation
    Timer.periodic(Duration(minutes: 5), (_) async {
      await _dataProcessor.aggregateData();
    });
    
    // Command cleanup
    Timer.periodic(Duration(hours: 1), (_) async {
      await _commandQueue.cleanupExpiredCommands();
    });
  }
}

class DataProcessor {
  final TimeSeriesDB _timeSeriesDB;
  final AlertManager _alertManager;
  final Map<String, SensorCalibration> _calibrations = {};
  
  DataProcessor(this._timeSeriesDB, this._alertManager);
  
  Future<ProcessingResult> processReadings({
    required String deviceId,
    required List<SensorReading> readings,
  }) async {
    int accepted = 0;
    int rejected = 0;
    
    final processedReadings = <SensorReading>[];
    
    for (final reading in readings) {
      try {
        // Validate reading
        if (!_validateReading(reading)) {
          rejected++;
          continue;
        }
        
        // Apply calibration
        final calibratedReading = await _applyCalibration(reading);
        
        // Check for anomalies
        final anomaly = await _detectAnomaly(calibratedReading);
        if (anomaly != null) {
          await _alertManager.sendAlert(anomaly);
        }
        
        processedReadings.add(calibratedReading);
        accepted++;
        
      } catch (e) {
        print('Error processing reading: $e');
        rejected++;
      }
    }
    
    // Batch insert to time series database
    if (processedReadings.isNotEmpty) {
      await _timeSeriesDB.insertReadings(processedReadings);
    }
    
    return ProcessingResult(accepted: accepted, rejected: rejected);
  }
  
  bool _validateReading(SensorReading reading) {
    // Basic validation
    if (reading.deviceId.isEmpty) return false;
    if (reading.timestamp.isAfter(DateTime.now().add(Duration(minutes: 5)))) return false;
    if (reading.timestamp.isBefore(DateTime.now().subtract(Duration(days: 1)))) return false;
    
    // Sensor-specific validation
    switch (reading.sensorType) {
      case SensorType.temperature:
        return reading.value >= -100 && reading.value <= 200; // Celsius
      case SensorType.humidity:
        return reading.value >= 0 && reading.value <= 100; // Percentage
      case SensorType.pressure:
        return reading.value >= 0 && reading.value <= 2000; // hPa
      default:
        return true;
    }
  }
  
  Future<SensorReading> _applyCalibration(SensorReading reading) async {
    final calibration = _calibrations[reading.deviceId];
    if (calibration == null) {
      return reading;
    }
    
    final calibratedValue = reading.value * calibration.multiplier + calibration.offset;
    
    return reading.copyWith(value: calibratedValue);
  }
  
  Future<Anomaly?> _detectAnomaly(SensorReading reading) async {
    // Get recent readings for comparison
    final recentReadings = await _timeSeriesDB.getRecentReadings(
      deviceId: reading.deviceId,
      sensorType: reading.sensorType,
      duration: Duration(hours: 1),
    );
    
    if (recentReadings.length < 10) {
      return null; // Not enough data for anomaly detection
    }
    
    // Calculate statistics
    final values = recentReadings.map((r) => r.value).toList();
    final mean = values.reduce((a, b) => a + b) / values.length;
    final variance = values.map((v) => pow(v - mean, 2)).reduce((a, b) => a + b) / values.length;
    final stdDev = sqrt(variance);
    
    // Check if current reading is an outlier (3-sigma rule)
    final zScore = (reading.value - mean) / stdDev;
    
    if (zScore.abs() > 3) {
      return Anomaly(
        deviceId: reading.deviceId,
        sensorType: reading.sensorType,
        value: reading.value,
        expectedRange: (mean - 2 * stdDev, mean + 2 * stdDev),
        severity: zScore.abs() > 5 ? AnomalySeverity.critical : AnomalySeverity.warning,
        timestamp: reading.timestamp,
      );
    }
    
    return null;
  }
}

// IoT Device Client
class IoTDeviceClient {
  final String _deviceId;
  final HypermodernClient _client;
  final Queue<SensorReading> _readingBuffer = Queue();
  Timer? _sendTimer;
  
  IoTDeviceClient({
    required String deviceId,
    required String serverUrl,
  }) : _deviceId = deviceId,
       _client = HypermodernClient.tcp(serverUrl);
  
  Future<void> connect() async {
    await _client.connect();
    
    // Register device
    await _registerDevice();
    
    // Start periodic data sending
    _startDataSending();
    
    // Start command polling
    _startCommandPolling();
    
    print('🔌 IoT device connected: $_deviceId');
  }
  
  Future<void> _registerDevice() async {
    final request = RegisterDeviceRequest(
      deviceId: _deviceId,
      name: 'Temperature Sensor #$_deviceId',
      type: DeviceType.sensor,
      firmwareVersion: '1.2.3',
      capabilities: ['temperature', 'humidity'],
    );
    
    await _client.request<Device>('register_device', request);
  }
  
  void addReading(SensorReading reading) {
    _readingBuffer.add(reading);
    
    // Limit buffer size
    while (_readingBuffer.length > 1000) {
      _readingBuffer.removeFirst();
    }
  }
  
  void _startDataSending() {
    _sendTimer = Timer.periodic(Duration(seconds: 30), (_) async {
      if (_readingBuffer.isNotEmpty) {
        await _sendBufferedReadings();
      }
    });
  }
  
  Future<void> _sendBufferedReadings() async {
    final readings = <SensorReading>[];
    
    // Drain buffer
    while (_readingBuffer.isNotEmpty && readings.length < 100) {
      readings.add(_readingBuffer.removeFirst());
    }
    
    if (readings.isEmpty) return;
    
    try {
      final request = SendReadingsRequest(
        deviceId: _deviceId,
        readings: readings,
      );
      
      final response = await _client.request<SendReadingsResponse>('send_readings', request);
      
      print('📊 Sent ${response.accepted} readings, ${response.rejected} rejected');
      
    } catch (e) {
      print('❌ Failed to send readings: $e');
      
      // Re-queue readings for retry
      readings.reversed.forEach(_readingBuffer.addFirst);
    }
  }
  
  void _startCommandPolling() {
    Timer.periodic(Duration(seconds: 10), (_) async {
      await _pollCommands();
    });
  }
  
  Future<void> _pollCommands() async {
    try {
      final request = GetCommandsRequest(deviceId: _deviceId);
      final response = await _client.request<GetCommandsResponse>('get_commands', request);
      
      for (final command in response.commands) {
        await _executeCommand(command);
      }
      
    } catch (e) {
      print('❌ Failed to poll commands: $e');
    }
  }
  
  Future<void> _executeCommand(DeviceCommand command) async {
    print('⚡ Executing command: ${command.command}');
    
    try {
      switch (command.command) {
        case 'reboot':
          await _reboot();
          break;
        case 'update_firmware':
          await _updateFirmware(command.parameters['version'] as String);
          break;
        case 'set_sampling_rate':
          await _setSamplingRate(command.parameters['rate'] as int);
          break;
        default:
          print('❓ Unknown command: ${command.command}');
      }
      
    } catch (e) {
      print('❌ Command execution failed: $e');
    }
  }
  
  Future<void> _reboot() async {
    print('🔄 Rebooting device...');
    // Simulate reboot
    await Future.delayed(Duration(seconds: 2));
  }
  
  Future<void> _updateFirmware(String version) async {
    print('📦 Updating firmware to $version...');
    // Simulate firmware update
    await Future.delayed(Duration(seconds: 10));
  }
  
  Future<void> _setSamplingRate(int rate) async {
    print('⏱️ Setting sampling rate to ${rate}Hz');
    // Update sampling configuration
  }
}

What's Next

These real-world examples demonstrate how to build complete applications using Hypermodern's capabilities. In the next chapter, we'll explore the module ecosystem and integration patterns, showing how to extend Hypermodern with third-party services and create reusable components for the community.