Real-World Examples
Chat Application
Let's build a complete real-time chat application that demonstrates Hypermodern's multi-protocol capabilities, real-time streaming, and user management.
Schema Definition
{
"models": {
"user": {
"id": "int64",
"username": "string",
"email": "string",
"display_name": "string",
"avatar_url": "string?",
"status": "@user_status",
"last_seen": "datetime",
"created_at": "datetime"
},
"chat_room": {
"id": "int64",
"name": "string",
"description": "string?",
"type": "@room_type",
"owner_id": "int64",
"created_at": "datetime",
"member_count": "int32"
},
"message": {
"id": "int64",
"room_id": "int64",
"user_id": "int64",
"content": "string",
"message_type": "@message_type",
"reply_to_id": "int64?",
"attachments": ["@attachment"],
"created_at": "datetime",
"edited_at": "datetime?"
},
"attachment": {
"id": "int64",
"filename": "string",
"content_type": "string",
"size": "int64",
"url": "string"
},
"room_member": {
"room_id": "int64",
"user_id": "int64",
"role": "@member_role",
"joined_at": "datetime",
"last_read_message_id": "int64?"
}
},
"enums": {
"user_status": ["online", "away", "busy", "offline"],
"room_type": ["public", "private", "direct"],
"message_type": ["text", "image", "file", "system"],
"member_role": ["owner", "admin", "member"]
},
"endpoints": {
"create_room": {
"method": "POST",
"path": "/rooms",
"request": {
"name": "string",
"description": "string?",
"type": "@room_type"
},
"response": "@chat_room",
"transports": ["http", "websocket"]
},
"join_room": {
"method": "POST",
"path": "/rooms/{room_id}/join",
"request": {
"room_id": "int64"
},
"response": {
"success": "bool",
"room": "@chat_room"
},
"transports": ["http", "websocket"]
},
"send_message": {
"method": "POST",
"path": "/rooms/{room_id}/messages",
"request": {
"room_id": "int64",
"content": "string",
"message_type": "@message_type",
"reply_to_id": "int64?"
},
"response": "@message",
"transports": ["http", "websocket"]
},
"get_messages": {
"method": "GET",
"path": "/rooms/{room_id}/messages",
"request": {
"room_id": "int64",
"limit": "int32?",
"before_id": "int64?"
},
"response": {
"messages": ["@message"],
"has_more": "bool"
},
"transports": ["http", "websocket"]
},
"watch_room": {
"type": "stream",
"request": {
"room_id": "int64"
},
"response": "@room_event",
"transports": ["websocket", "tcp"]
}
}
}
Server Implementation
class ChatServer {
final HypermodernServer _server;
final ChatService _chatService;
final UserService _userService;
final RealTimeBroadcaster _broadcaster;
ChatServer({
required HypermodernServer server,
required ChatService chatService,
required UserService userService,
required RealTimeBroadcaster broadcaster,
}) : _server = server,
_chatService = chatService,
_userService = userService,
_broadcaster = broadcaster;
Future<void> initialize() async {
_registerEndpoints();
_registerStreamingEndpoints();
_setupMiddleware();
await _server.listen();
print('🚀 Chat server running on multiple protocols');
}
void _registerEndpoints() {
// Room management
_server.registerEndpoint<CreateRoomRequest, ChatRoom>(
'create_room',
(request) async {
final userId = _getCurrentUserId(request.context);
final room = await _chatService.createRoom(
name: request.name,
description: request.description,
type: request.type,
ownerId: userId,
);
// Broadcast room creation
_broadcaster.broadcast('global', RoomCreatedEvent(room: room));
return room;
},
);
_server.registerEndpoint<JoinRoomRequest, JoinRoomResponse>(
'join_room',
(request) async {
final userId = _getCurrentUserId(request.context);
final room = await _chatService.joinRoom(
roomId: request.roomId,
userId: userId,
);
// Broadcast user joined
_broadcaster.broadcast(
'room:${request.roomId}',
UserJoinedEvent(roomId: request.roomId, userId: userId),
);
return JoinRoomResponse(success: true, room: room);
},
);
// Message handling
_server.registerEndpoint<SendMessageRequest, Message>(
'send_message',
(request) async {
final userId = _getCurrentUserId(request.context);
// Validate user is member of room
await _chatService.validateRoomMembership(request.roomId, userId);
final message = await _chatService.sendMessage(
roomId: request.roomId,
userId: userId,
content: request.content,
messageType: request.messageType,
replyToId: request.replyToId,
);
// Broadcast message to room members
_broadcaster.broadcast(
'room:${request.roomId}',
MessageSentEvent(message: message),
);
// Update user activity
await _userService.updateLastSeen(userId);
return message;
},
);
_server.registerEndpoint<GetMessagesRequest, GetMessagesResponse>(
'get_messages',
(request) async {
final userId = _getCurrentUserId(request.context);
// Validate access to room
await _chatService.validateRoomAccess(request.roomId, userId);
final result = await _chatService.getMessages(
roomId: request.roomId,
limit: request.limit ?? 50,
beforeId: request.beforeId,
);
return GetMessagesResponse(
messages: result.messages,
hasMore: result.hasMore,
);
},
);
}
void _registerStreamingEndpoints() {
// Real-time room updates
_server.registerStreamingEndpoint<WatchRoomRequest, RoomEvent>(
'watch_room',
(request) async* {
final userId = _getCurrentUserId(request.context);
// Validate access to room
await _chatService.validateRoomAccess(request.roomId, userId);
// Subscribe to room events
final controller = StreamController<RoomEvent>();
final channel = 'room:${request.roomId}';
_broadcaster.subscribe(channel, controller);
// Send initial room state
controller.add(RoomStateEvent(
roomId: request.roomId,
onlineMembers: await _chatService.getOnlineMembers(request.roomId),
));
// Clean up on disconnect
controller.onCancel = () {
_broadcaster.unsubscribe(channel, controller);
};
yield* controller.stream;
},
);
// User presence updates
_server.registerStreamingEndpoint<WatchPresenceRequest, PresenceEvent>(
'watch_presence',
(request) async* {
final userId = _getCurrentUserId(request.context);
final controller = StreamController<PresenceEvent>();
_broadcaster.subscribe('presence', controller);
// Send current presence state
final onlineUsers = await _userService.getOnlineUsers();
controller.add(PresenceStateEvent(onlineUsers: onlineUsers));
// Update user's own presence
await _userService.setUserStatus(userId, UserStatus.online);
_broadcaster.broadcast('presence', UserStatusChangedEvent(
userId: userId,
status: UserStatus.online,
));
// Handle disconnect
controller.onCancel = () async {
await _userService.setUserStatus(userId, UserStatus.offline);
_broadcaster.broadcast('presence', UserStatusChangedEvent(
userId: userId,
status: UserStatus.offline,
));
_broadcaster.unsubscribe('presence', controller);
};
yield* controller.stream;
},
);
}
void _setupMiddleware() {
// Authentication required for all endpoints
_server.middleware.add(AuthenticationMiddleware(
jwtService: _server.container.get<JWTService>(),
publicEndpoints: {},
));
// Rate limiting for message sending
_server.middleware.add(ConditionalMiddleware(
RateLimitMiddleware(
rateLimiter: _server.container.get<RateLimiter>(),
endpointLimits: {
'send_message': RateLimit(requests: 30, window: Duration(minutes: 1)),
},
),
(request) => request.endpoint == 'send_message',
));
// Logging and metrics
_server.middleware.add(LoggingMiddleware());
_server.middleware.add(MetricsMiddleware(_server.container.get<PrometheusClient>()));
}
int _getCurrentUserId(Map<String, dynamic> context) {
final userId = context['user_id'] as int?;
if (userId == null) {
throw UnauthorizedException('User not authenticated');
}
return userId;
}
}
class ChatService {
final Database _db;
final FileStorageService _fileStorage;
ChatService(this._db, this._fileStorage);
Future<ChatRoom> createRoom({
required String name,
String? description,
required RoomType type,
required int ownerId,
}) async {
return await _db.transaction((tx) async {
// Create room
final roomResult = await tx.query('''
INSERT INTO chat_rooms (name, description, type, owner_id, created_at)
VALUES (?, ?, ?, ?, ?)
RETURNING *
''', [name, description, type.toString(), ownerId, DateTime.now()]);
final room = ChatRoom.fromJson(roomResult.first);
// Add owner as member
await tx.query('''
INSERT INTO room_members (room_id, user_id, role, joined_at)
VALUES (?, ?, ?, ?)
''', [room.id, ownerId, MemberRole.owner.toString(), DateTime.now()]);
return room;
});
}
Future<ChatRoom> joinRoom({
required int roomId,
required int userId,
}) async {
return await _db.transaction((tx) async {
// Check if room exists and is joinable
final roomResult = await tx.query(
'SELECT * FROM chat_rooms WHERE id = ?',
[roomId],
);
if (roomResult.isEmpty) {
throw NotFoundException('Room not found');
}
final room = ChatRoom.fromJson(roomResult.first);
if (room.type == RoomType.private) {
throw ForbiddenException('Cannot join private room without invitation');
}
// Check if already a member
final memberResult = await tx.query(
'SELECT 1 FROM room_members WHERE room_id = ? AND user_id = ?',
[roomId, userId],
);
if (memberResult.isNotEmpty) {
return room; // Already a member
}
// Add as member
await tx.query('''
INSERT INTO room_members (room_id, user_id, role, joined_at)
VALUES (?, ?, ?, ?)
''', [roomId, userId, MemberRole.member.toString(), DateTime.now()]);
// Update member count
await tx.query(
'UPDATE chat_rooms SET member_count = member_count + 1 WHERE id = ?',
[roomId],
);
return room;
});
}
Future<Message> sendMessage({
required int roomId,
required int userId,
required String content,
required MessageType messageType,
int? replyToId,
}) async {
// Validate content
if (content.trim().isEmpty) {
throw ValidationException('Message content cannot be empty');
}
if (content.length > 4000) {
throw ValidationException('Message too long');
}
// Process mentions and links
final processedContent = await _processMessageContent(content);
final result = await _db.query('''
INSERT INTO messages (room_id, user_id, content, message_type, reply_to_id, created_at)
VALUES (?, ?, ?, ?, ?, ?)
RETURNING *
''', [roomId, userId, processedContent, messageType.toString(), replyToId, DateTime.now()]);
return Message.fromJson(result.first);
}
Future<String> _processMessageContent(String content) async {
// Process @mentions
final mentionRegex = RegExp(r'@(\w+)');
var processedContent = content;
final mentions = mentionRegex.allMatches(content);
for (final mention in mentions) {
final username = mention.group(1)!;
final user = await _getUserByUsername(username);
if (user != null) {
processedContent = processedContent.replaceAll(
mention.group(0)!,
'<@${user.id}>',
);
}
}
return processedContent;
}
Future<User?> _getUserByUsername(String username) async {
final result = await _db.query(
'SELECT * FROM users WHERE username = ?',
[username],
);
return result.isEmpty ? null : User.fromJson(result.first);
}
}
Client Implementation
class ChatClient {
final HypermodernClient _client;
final StreamController<ChatEvent> _eventController = StreamController.broadcast();
StreamSubscription? _roomSubscription;
StreamSubscription? _presenceSubscription;
ChatClient(String serverUrl) : _client = HypermodernClient(serverUrl);
Stream<ChatEvent> get events => _eventController.stream;
Future<void> connect(String authToken) async {
_client.setAuthToken(authToken);
await _client.connect();
// Start presence monitoring
await _startPresenceMonitoring();
print('✅ Connected to chat server');
}
Future<void> disconnect() async {
await _roomSubscription?.cancel();
await _presenceSubscription?.cancel();
await _client.disconnect();
await _eventController.close();
}
// Room operations
Future<ChatRoom> createRoom({
required String name,
String? description,
required RoomType type,
}) async {
final request = CreateRoomRequest(
name: name,
description: description,
type: type,
);
return await _client.request<ChatRoom>('create_room', request);
}
Future<void> joinRoom(int roomId) async {
final request = JoinRoomRequest(roomId: roomId);
final response = await _client.request<JoinRoomResponse>('join_room', request);
if (response.success) {
await _watchRoom(roomId);
_eventController.add(RoomJoinedEvent(room: response.room));
}
}
Future<void> _watchRoom(int roomId) async {
await _roomSubscription?.cancel();
final request = WatchRoomRequest(roomId: roomId);
final stream = _client.stream<RoomEvent>('watch_room', request);
_roomSubscription = stream.listen(
(event) => _eventController.add(ChatEvent.fromRoomEvent(event)),
onError: (error) => _eventController.addError(error),
);
}
// Message operations
Future<Message> sendMessage({
required int roomId,
required String content,
MessageType type = MessageType.text,
int? replyToId,
}) async {
final request = SendMessageRequest(
roomId: roomId,
content: content,
messageType: type,
replyToId: replyToId,
);
return await _client.request<Message>('send_message', request);
}
Future<List<Message>> getMessages({
required int roomId,
int? limit,
int? beforeId,
}) async {
final request = GetMessagesRequest(
roomId: roomId,
limit: limit,
beforeId: beforeId,
);
final response = await _client.request<GetMessagesResponse>('get_messages', request);
return response.messages;
}
Future<void> _startPresenceMonitoring() async {
final request = WatchPresenceRequest();
final stream = _client.stream<PresenceEvent>('watch_presence', request);
_presenceSubscription = stream.listen(
(event) => _eventController.add(ChatEvent.fromPresenceEvent(event)),
onError: (error) => print('Presence error: $error'),
);
}
}
// Usage example
class ChatApp {
final ChatClient _client;
final Map<int, List<Message>> _roomMessages = {};
final Set<int> _onlineUsers = {};
ChatApp(String serverUrl) : _client = ChatClient(serverUrl);
Future<void> start() async {
// Connect to server
await _client.connect('your-auth-token');
// Listen for events
_client.events.listen(_handleChatEvent);
// Join a room
await _client.joinRoom(1);
// Load message history
final messages = await _client.getMessages(roomId: 1, limit: 50);
_roomMessages[1] = messages;
// Start message loop
_startMessageLoop();
}
void _handleChatEvent(ChatEvent event) {
switch (event.type) {
case ChatEventType.messageReceived:
final messageEvent = event as MessageReceivedEvent;
_roomMessages.putIfAbsent(messageEvent.message.roomId, () => [])
.add(messageEvent.message);
_displayMessage(messageEvent.message);
break;
case ChatEventType.userJoined:
final joinEvent = event as UserJoinedEvent;
print('${joinEvent.userId} joined the room');
break;
case ChatEventType.userLeft:
final leaveEvent = event as UserLeftEvent;
print('${leaveEvent.userId} left the room');
break;
case ChatEventType.userStatusChanged:
final statusEvent = event as UserStatusChangedEvent;
if (statusEvent.status == UserStatus.online) {
_onlineUsers.add(statusEvent.userId);
} else {
_onlineUsers.remove(statusEvent.userId);
}
break;
}
}
void _displayMessage(Message message) {
print('[${message.createdAt}] ${message.userId}: ${message.content}');
}
void _startMessageLoop() {
// Simple console input loop
stdin.transform(utf8.decoder).transform(LineSplitter()).listen((line) async {
if (line.isNotEmpty) {
try {
await _client.sendMessage(roomId: 1, content: line);
} catch (e) {
print('Failed to send message: $e');
}
}
});
}
}
API Gateway
An API gateway that translates between different protocols and provides unified access to multiple backend services.
Gateway Schema
{
"models": {
"gateway_route": {
"id": "int64",
"path": "string",
"method": "string",
"backend_service": "string",
"backend_path": "string",
"protocol": "@backend_protocol",
"auth_required": "bool",
"rate_limit": "@rate_limit_config?",
"cache_config": "@cache_config?",
"created_at": "datetime"
},
"rate_limit_config": {
"requests": "int32",
"window_seconds": "int32",
"burst": "int32?"
},
"cache_config": {
"ttl_seconds": "int32",
"vary_headers": ["string"]
},
"api_key": {
"id": "int64",
"key": "string",
"name": "string",
"permissions": ["string"],
"rate_limit": "@rate_limit_config?",
"expires_at": "datetime?",
"created_at": "datetime"
}
},
"enums": {
"backend_protocol": ["http", "grpc", "graphql", "websocket"]
},
"endpoints": {
"proxy_request": {
"method": "ANY",
"path": "/api/{path:.*}",
"request": {
"path": "string",
"method": "string",
"headers": "map<string, string>",
"query_params": "map<string, string>",
"body": "any?"
},
"response": {
"status_code": "int32",
"headers": "map<string, string>",
"body": "any?"
},
"transports": ["http", "websocket"]
}
}
}
Gateway Implementation
class APIGateway {
final HypermodernServer _server;
final RouteManager _routeManager;
final BackendClientPool _clientPool;
final CacheManager _cacheManager;
final RateLimiter _rateLimiter;
final MetricsCollector _metrics;
APIGateway({
required HypermodernServer server,
required RouteManager routeManager,
required BackendClientPool clientPool,
required CacheManager cacheManager,
required RateLimiter rateLimiter,
required MetricsCollector metrics,
}) : _server = server,
_routeManager = routeManager,
_clientPool = clientPool,
_cacheManager = cacheManager,
_rateLimiter = rateLimiter,
_metrics = metrics;
Future<void> initialize() async {
_setupMiddleware();
_registerProxyEndpoint();
await _server.listen();
print('🌐 API Gateway running');
}
void _setupMiddleware() {
// Request logging
_server.middleware.add(RequestLoggingMiddleware());
// CORS handling
_server.middleware.add(CORSMiddleware(
allowedOrigins: ['*'],
allowedMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'],
));
// API key authentication
_server.middleware.add(APIKeyAuthMiddleware(_routeManager));
// Rate limiting
_server.middleware.add(GatewayRateLimitMiddleware(_rateLimiter, _routeManager));
// Caching
_server.middleware.add(GatewayCacheMiddleware(_cacheManager, _routeManager));
// Metrics collection
_server.middleware.add(GatewayMetricsMiddleware(_metrics));
}
void _registerProxyEndpoint() {
_server.registerEndpoint<ProxyRequest, ProxyResponse>(
'proxy_request',
(request) async => await _handleProxyRequest(request),
);
}
Future<ProxyResponse> _handleProxyRequest(ProxyRequest request) async {
final stopwatch = Stopwatch()..start();
try {
// Find matching route
final route = await _routeManager.findRoute(request.path, request.method);
if (route == null) {
return ProxyResponse(
statusCode: 404,
headers: {'Content-Type': 'application/json'},
body: {'error': 'Route not found'},
);
}
// Transform request for backend
final backendRequest = await _transformRequest(request, route);
// Get backend client
final client = await _clientPool.getClient(route.backendService, route.protocol);
// Make backend request
final backendResponse = await client.request(backendRequest);
// Transform response for client
final response = await _transformResponse(backendResponse, route);
stopwatch.stop();
// Record metrics
_metrics.recordRequest(
route: route.path,
method: request.method,
statusCode: response.statusCode,
duration: stopwatch.elapsed,
backendService: route.backendService,
);
return response;
} catch (e) {
stopwatch.stop();
_metrics.recordError(
route: request.path,
method: request.method,
error: e.toString(),
duration: stopwatch.elapsed,
);
return _handleError(e);
}
}
Future<BackendRequest> _transformRequest(ProxyRequest request, GatewayRoute route) async {
// Transform path
final transformedPath = _transformPath(request.path, route);
// Add gateway headers
final headers = Map<String, String>.from(request.headers);
headers['X-Gateway-Route'] = route.id.toString();
headers['X-Gateway-Timestamp'] = DateTime.now().toIso8601String();
// Protocol-specific transformations
switch (route.protocol) {
case BackendProtocol.http:
return HttpBackendRequest(
path: transformedPath,
method: request.method,
headers: headers,
queryParams: request.queryParams,
body: request.body,
);
case BackendProtocol.grpc:
return GrpcBackendRequest(
service: route.backendService,
method: _extractGrpcMethod(transformedPath),
metadata: headers,
request: request.body,
);
case BackendProtocol.graphql:
return GraphQLBackendRequest(
query: _transformToGraphQL(request),
variables: request.body is Map ? request.body as Map<String, dynamic> : {},
headers: headers,
);
default:
throw UnsupportedError('Protocol ${route.protocol} not supported');
}
}
String _transformPath(String originalPath, GatewayRoute route) {
// Remove gateway prefix and apply backend path template
final pathWithoutPrefix = originalPath.replaceFirst('/api', '');
return route.backendPath.replaceAll('{path}', pathWithoutPrefix);
}
Future<ProxyResponse> _transformResponse(BackendResponse backendResponse, GatewayRoute route) async {
final headers = Map<String, String>.from(backendResponse.headers);
// Add gateway headers
headers['X-Gateway-Service'] = route.backendService;
headers['X-Gateway-Protocol'] = route.protocol.toString();
// Protocol-specific response transformation
dynamic transformedBody;
switch (route.protocol) {
case BackendProtocol.grpc:
transformedBody = _transformGrpcResponse(backendResponse.body);
break;
case BackendProtocol.graphql:
transformedBody = _transformGraphQLResponse(backendResponse.body);
break;
default:
transformedBody = backendResponse.body;
}
return ProxyResponse(
statusCode: backendResponse.statusCode,
headers: headers,
body: transformedBody,
);
}
ProxyResponse _handleError(dynamic error) {
if (error is TimeoutException) {
return ProxyResponse(
statusCode: 504,
headers: {'Content-Type': 'application/json'},
body: {'error': 'Gateway timeout'},
);
} else if (error is ConnectionException) {
return ProxyResponse(
statusCode: 502,
headers: {'Content-Type': 'application/json'},
body: {'error': 'Bad gateway'},
);
} else {
return ProxyResponse(
statusCode: 500,
headers: {'Content-Type': 'application/json'},
body: {'error': 'Internal server error'},
);
}
}
}
class BackendClientPool {
final Map<String, BackendClient> _clients = {};
final ClientFactory _clientFactory;
BackendClientPool(this._clientFactory);
Future<BackendClient> getClient(String service, BackendProtocol protocol) async {
final key = '$service:$protocol';
return _clients.putIfAbsent(key, () {
return _clientFactory.createClient(service, protocol);
});
}
}
class ClientFactory {
final Map<String, ServiceConfig> _serviceConfigs;
ClientFactory(this._serviceConfigs);
BackendClient createClient(String service, BackendProtocol protocol) {
final config = _serviceConfigs[service];
if (config == null) {
throw ArgumentError('Service configuration not found: $service');
}
switch (protocol) {
case BackendProtocol.http:
return HttpBackendClient(config.httpUrl);
case BackendProtocol.grpc:
return GrpcBackendClient(config.grpcUrl);
case BackendProtocol.graphql:
return GraphQLBackendClient(config.graphqlUrl);
default:
throw UnsupportedError('Protocol $protocol not supported');
}
}
}
Microservices Architecture
A complete microservices setup with service discovery, inter-service communication, and distributed tracing.
Service Registry
class ServiceRegistry {
final Database _db;
final Map<String, ServiceInstance> _services = {};
final StreamController<ServiceEvent> _eventController = StreamController.broadcast();
ServiceRegistry(this._db);
Stream<ServiceEvent> get events => _eventController.stream;
Future<void> registerService(ServiceRegistration registration) async {
final instance = ServiceInstance(
id: registration.id,
name: registration.name,
version: registration.version,
endpoints: registration.endpoints,
healthCheckUrl: registration.healthCheckUrl,
metadata: registration.metadata,
registeredAt: DateTime.now(),
lastHeartbeat: DateTime.now(),
);
_services[registration.id] = instance;
// Persist to database
await _db.query('''
INSERT INTO service_instances (id, name, version, endpoints, health_check_url, metadata, registered_at, last_heartbeat)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
endpoints = EXCLUDED.endpoints,
health_check_url = EXCLUDED.health_check_url,
metadata = EXCLUDED.metadata,
last_heartbeat = EXCLUDED.last_heartbeat
''', [
instance.id,
instance.name,
instance.version,
jsonEncode(instance.endpoints.map((e) => e.toJson()).toList()),
instance.healthCheckUrl,
jsonEncode(instance.metadata),
instance.registeredAt,
instance.lastHeartbeat,
]);
_eventController.add(ServiceRegisteredEvent(instance: instance));
print('✅ Service registered: ${instance.name}@${instance.version}');
}
Future<void> deregisterService(String serviceId) async {
final instance = _services.remove(serviceId);
if (instance != null) {
await _db.query('DELETE FROM service_instances WHERE id = ?', [serviceId]);
_eventController.add(ServiceDeregisteredEvent(instance: instance));
print('❌ Service deregistered: ${instance.name}');
}
}
Future<void> heartbeat(String serviceId) async {
final instance = _services[serviceId];
if (instance != null) {
instance.lastHeartbeat = DateTime.now();
await _db.query(
'UPDATE service_instances SET last_heartbeat = ? WHERE id = ?',
[instance.lastHeartbeat, serviceId],
);
}
}
List<ServiceInstance> discoverServices(String serviceName) {
return _services.values
.where((instance) => instance.name == serviceName)
.where((instance) => _isHealthy(instance))
.toList();
}
bool _isHealthy(ServiceInstance instance) {
final timeSinceHeartbeat = DateTime.now().difference(instance.lastHeartbeat);
return timeSinceHeartbeat < Duration(minutes: 2);
}
Future<void> startHealthChecking() async {
Timer.periodic(Duration(seconds: 30), (_) async {
await _performHealthChecks();
});
}
Future<void> _performHealthChecks() async {
final unhealthyServices = <String>[];
for (final instance in _services.values) {
try {
final client = HttpClient();
final response = await client.get(instance.healthCheckUrl)
.timeout(Duration(seconds: 5));
if (response.statusCode != 200) {
unhealthyServices.add(instance.id);
}
} catch (e) {
unhealthyServices.add(instance.id);
}
}
// Remove unhealthy services
for (final serviceId in unhealthyServices) {
await deregisterService(serviceId);
}
}
}
Inter-Service Communication
class ServiceClient {
final ServiceRegistry _registry;
final LoadBalancer _loadBalancer;
final CircuitBreaker _circuitBreaker;
final Map<String, HypermodernClient> _clients = {};
ServiceClient({
required ServiceRegistry registry,
required LoadBalancer loadBalancer,
required CircuitBreaker circuitBreaker,
}) : _registry = registry,
_loadBalancer = loadBalancer,
_circuitBreaker = circuitBreaker;
Future<T> call<T>({
required String serviceName,
required String endpoint,
required dynamic request,
Duration? timeout,
}) async {
return await _circuitBreaker.execute(() async {
// Discover service instances
final instances = _registry.discoverServices(serviceName);
if (instances.isEmpty) {
throw ServiceUnavailableException('No healthy instances of $serviceName');
}
// Select instance using load balancer
final selectedInstance = _loadBalancer.selectInstance(instances);
// Get or create client
final client = await _getClient(selectedInstance);
// Make request with timeout
final future = client.request<T>(endpoint, request);
final timeoutDuration = timeout ?? Duration(seconds: 30);
return await future.timeout(timeoutDuration);
});
}
Future<HypermodernClient> _getClient(ServiceInstance instance) async {
final clientKey = instance.id;
if (!_clients.containsKey(clientKey)) {
// Find best endpoint for communication
final endpoint = _selectBestEndpoint(instance.endpoints);
final client = HypermodernClient(endpoint.url);
await client.connect();
_clients[clientKey] = client;
}
return _clients[clientKey]!;
}
ServiceEndpoint _selectBestEndpoint(List<ServiceEndpoint> endpoints) {
// Prefer TCP for performance, then WebSocket, then HTTP
final priorities = [
ProtocolType.tcp,
ProtocolType.websocket,
ProtocolType.http,
];
for (final protocol in priorities) {
final endpoint = endpoints.firstWhere(
(e) => e.protocol == protocol,
orElse: () => endpoints.first,
);
if (endpoint.protocol == protocol) {
return endpoint;
}
}
return endpoints.first;
}
}
// Example microservice
class UserService extends MicroService {
final UserRepository _userRepository;
final ServiceClient _serviceClient;
UserService({
required UserRepository userRepository,
required ServiceClient serviceClient,
}) : _userRepository = userRepository,
_serviceClient = serviceClient;
@override
String get serviceName => 'user-service';
@override
String get version => '1.0.0';
@override
List<ServiceEndpoint> get endpoints => [
ServiceEndpoint(
protocol: ProtocolType.http,
url: 'http://localhost:8080',
),
ServiceEndpoint(
protocol: ProtocolType.websocket,
url: 'ws://localhost:8082',
),
ServiceEndpoint(
protocol: ProtocolType.tcp,
url: 'tcp://localhost:8081',
),
];
@override
void registerEndpoints(HypermodernServer server) {
server.registerEndpoint<CreateUserRequest, User>(
'create_user',
(request) async {
// Create user
final user = await _userRepository.create(User(
username: request.username,
email: request.email,
passwordHash: _hashPassword(request.password),
createdAt: DateTime.now(),
updatedAt: DateTime.now(),
));
// Notify other services
await _serviceClient.call<void>(
serviceName: 'notification-service',
endpoint: 'send_welcome_email',
request: SendWelcomeEmailRequest(
userId: user.id,
email: user.email,
username: user.username,
),
);
await _serviceClient.call<void>(
serviceName: 'analytics-service',
endpoint: 'track_user_registration',
request: TrackUserRegistrationRequest(
userId: user.id,
timestamp: DateTime.now(),
),
);
return user;
},
);
server.registerEndpoint<GetUserRequest, User>(
'get_user',
(request) async {
final user = await _userRepository.findById(request.id);
if (user == null) {
throw NotFoundException('User not found');
}
return user;
},
);
}
}
IoT Device Communication
A platform for managing IoT devices with efficient binary communication over TCP.
IoT Schema
{
"models": {
"device": {
"id": "int64",
"device_id": "string",
"name": "string",
"type": "@device_type",
"firmware_version": "string",
"last_seen": "datetime",
"status": "@device_status",
"location": "@location?",
"metadata": "map<string, any>",
"created_at": "datetime"
},
"location": {
"latitude": "float64",
"longitude": "float64",
"altitude": "float64?"
},
"sensor_reading": {
"device_id": "string",
"sensor_type": "@sensor_type",
"value": "float64",
"unit": "string",
"timestamp": "datetime",
"quality": "float32?"
},
"device_command": {
"id": "int64",
"device_id": "string",
"command": "string",
"parameters": "map<string, any>",
"status": "@command_status",
"sent_at": "datetime",
"executed_at": "datetime?"
}
},
"enums": {
"device_type": ["sensor", "actuator", "gateway", "controller"],
"device_status": ["online", "offline", "maintenance", "error"],
"sensor_type": ["temperature", "humidity", "pressure", "motion", "light"],
"command_status": ["pending", "sent", "acknowledged", "executed", "failed"]
},
"endpoints": {
"register_device": {
"method": "POST",
"path": "/devices/register",
"request": {
"device_id": "string",
"name": "string",
"type": "@device_type",
"firmware_version": "string",
"capabilities": ["string"]
},
"response": "@device",
"transports": ["tcp", "http"]
},
"send_readings": {
"method": "POST",
"path": "/devices/{device_id}/readings",
"request": {
"device_id": "string",
"readings": ["@sensor_reading"]
},
"response": {
"accepted": "int32",
"rejected": "int32"
},
"transports": ["tcp"]
},
"get_commands": {
"method": "GET",
"path": "/devices/{device_id}/commands",
"request": {
"device_id": "string",
"since": "datetime?"
},
"response": {
"commands": ["@device_command"]
},
"transports": ["tcp", "http"]
}
}
}
IoT Platform Implementation
class IoTPlatform {
final HypermodernServer _server;
final DeviceManager _deviceManager;
final DataProcessor _dataProcessor;
final CommandQueue _commandQueue;
final TimeSeriesDB _timeSeriesDB;
IoTPlatform({
required HypermodernServer server,
required DeviceManager deviceManager,
required DataProcessor dataProcessor,
required CommandQueue commandQueue,
required TimeSeriesDB timeSeriesDB,
}) : _server = server,
_deviceManager = deviceManager,
_dataProcessor = dataProcessor,
_commandQueue = commandQueue,
_timeSeriesDB = timeSeriesDB;
Future<void> initialize() async {
_setupTcpOptimizations();
_registerEndpoints();
_startBackgroundTasks();
await _server.listen();
print('🌐 IoT Platform running');
}
void _setupTcpOptimizations() {
// Configure TCP server for IoT devices
_server.configureTcp(TcpServerConfig(
port: 8081,
maxConnections: 10000,
keepAlive: true,
keepAliveInterval: Duration(seconds: 30),
noDelay: true, // Disable Nagle's algorithm for low latency
receiveBufferSize: 64 * 1024,
sendBufferSize: 64 * 1024,
));
}
void _registerEndpoints() {
// Device registration
_server.registerEndpoint<RegisterDeviceRequest, Device>(
'register_device',
(request) async {
final device = await _deviceManager.registerDevice(
deviceId: request.deviceId,
name: request.name,
type: request.type,
firmwareVersion: request.firmwareVersion,
capabilities: request.capabilities,
);
print('📱 Device registered: ${device.deviceId}');
return device;
},
);
// Sensor data ingestion
_server.registerEndpoint<SendReadingsRequest, SendReadingsResponse>(
'send_readings',
(request) async {
final result = await _dataProcessor.processReadings(
deviceId: request.deviceId,
readings: request.readings,
);
// Update device last seen
await _deviceManager.updateLastSeen(request.deviceId);
return SendReadingsResponse(
accepted: result.accepted,
rejected: result.rejected,
);
},
);
// Command retrieval
_server.registerEndpoint<GetCommandsRequest, GetCommandsResponse>(
'get_commands',
(request) async {
final commands = await _commandQueue.getCommandsForDevice(
deviceId: request.deviceId,
since: request.since,
);
return GetCommandsResponse(commands: commands);
},
);
}
void _startBackgroundTasks() {
// Device health monitoring
Timer.periodic(Duration(minutes: 1), (_) async {
await _deviceManager.checkDeviceHealth();
});
// Data aggregation
Timer.periodic(Duration(minutes: 5), (_) async {
await _dataProcessor.aggregateData();
});
// Command cleanup
Timer.periodic(Duration(hours: 1), (_) async {
await _commandQueue.cleanupExpiredCommands();
});
}
}
class DataProcessor {
final TimeSeriesDB _timeSeriesDB;
final AlertManager _alertManager;
final Map<String, SensorCalibration> _calibrations = {};
DataProcessor(this._timeSeriesDB, this._alertManager);
Future<ProcessingResult> processReadings({
required String deviceId,
required List<SensorReading> readings,
}) async {
int accepted = 0;
int rejected = 0;
final processedReadings = <SensorReading>[];
for (final reading in readings) {
try {
// Validate reading
if (!_validateReading(reading)) {
rejected++;
continue;
}
// Apply calibration
final calibratedReading = await _applyCalibration(reading);
// Check for anomalies
final anomaly = await _detectAnomaly(calibratedReading);
if (anomaly != null) {
await _alertManager.sendAlert(anomaly);
}
processedReadings.add(calibratedReading);
accepted++;
} catch (e) {
print('Error processing reading: $e');
rejected++;
}
}
// Batch insert to time series database
if (processedReadings.isNotEmpty) {
await _timeSeriesDB.insertReadings(processedReadings);
}
return ProcessingResult(accepted: accepted, rejected: rejected);
}
bool _validateReading(SensorReading reading) {
// Basic validation
if (reading.deviceId.isEmpty) return false;
if (reading.timestamp.isAfter(DateTime.now().add(Duration(minutes: 5)))) return false;
if (reading.timestamp.isBefore(DateTime.now().subtract(Duration(days: 1)))) return false;
// Sensor-specific validation
switch (reading.sensorType) {
case SensorType.temperature:
return reading.value >= -100 && reading.value <= 200; // Celsius
case SensorType.humidity:
return reading.value >= 0 && reading.value <= 100; // Percentage
case SensorType.pressure:
return reading.value >= 0 && reading.value <= 2000; // hPa
default:
return true;
}
}
Future<SensorReading> _applyCalibration(SensorReading reading) async {
final calibration = _calibrations[reading.deviceId];
if (calibration == null) {
return reading;
}
final calibratedValue = reading.value * calibration.multiplier + calibration.offset;
return reading.copyWith(value: calibratedValue);
}
Future<Anomaly?> _detectAnomaly(SensorReading reading) async {
// Get recent readings for comparison
final recentReadings = await _timeSeriesDB.getRecentReadings(
deviceId: reading.deviceId,
sensorType: reading.sensorType,
duration: Duration(hours: 1),
);
if (recentReadings.length < 10) {
return null; // Not enough data for anomaly detection
}
// Calculate statistics
final values = recentReadings.map((r) => r.value).toList();
final mean = values.reduce((a, b) => a + b) / values.length;
final variance = values.map((v) => pow(v - mean, 2)).reduce((a, b) => a + b) / values.length;
final stdDev = sqrt(variance);
// Check if current reading is an outlier (3-sigma rule)
final zScore = (reading.value - mean) / stdDev;
if (zScore.abs() > 3) {
return Anomaly(
deviceId: reading.deviceId,
sensorType: reading.sensorType,
value: reading.value,
expectedRange: (mean - 2 * stdDev, mean + 2 * stdDev),
severity: zScore.abs() > 5 ? AnomalySeverity.critical : AnomalySeverity.warning,
timestamp: reading.timestamp,
);
}
return null;
}
}
// IoT Device Client
class IoTDeviceClient {
final String _deviceId;
final HypermodernClient _client;
final Queue<SensorReading> _readingBuffer = Queue();
Timer? _sendTimer;
IoTDeviceClient({
required String deviceId,
required String serverUrl,
}) : _deviceId = deviceId,
_client = HypermodernClient.tcp(serverUrl);
Future<void> connect() async {
await _client.connect();
// Register device
await _registerDevice();
// Start periodic data sending
_startDataSending();
// Start command polling
_startCommandPolling();
print('🔌 IoT device connected: $_deviceId');
}
Future<void> _registerDevice() async {
final request = RegisterDeviceRequest(
deviceId: _deviceId,
name: 'Temperature Sensor #$_deviceId',
type: DeviceType.sensor,
firmwareVersion: '1.2.3',
capabilities: ['temperature', 'humidity'],
);
await _client.request<Device>('register_device', request);
}
void addReading(SensorReading reading) {
_readingBuffer.add(reading);
// Limit buffer size
while (_readingBuffer.length > 1000) {
_readingBuffer.removeFirst();
}
}
void _startDataSending() {
_sendTimer = Timer.periodic(Duration(seconds: 30), (_) async {
if (_readingBuffer.isNotEmpty) {
await _sendBufferedReadings();
}
});
}
Future<void> _sendBufferedReadings() async {
final readings = <SensorReading>[];
// Drain buffer
while (_readingBuffer.isNotEmpty && readings.length < 100) {
readings.add(_readingBuffer.removeFirst());
}
if (readings.isEmpty) return;
try {
final request = SendReadingsRequest(
deviceId: _deviceId,
readings: readings,
);
final response = await _client.request<SendReadingsResponse>('send_readings', request);
print('📊 Sent ${response.accepted} readings, ${response.rejected} rejected');
} catch (e) {
print('❌ Failed to send readings: $e');
// Re-queue readings for retry
readings.reversed.forEach(_readingBuffer.addFirst);
}
}
void _startCommandPolling() {
Timer.periodic(Duration(seconds: 10), (_) async {
await _pollCommands();
});
}
Future<void> _pollCommands() async {
try {
final request = GetCommandsRequest(deviceId: _deviceId);
final response = await _client.request<GetCommandsResponse>('get_commands', request);
for (final command in response.commands) {
await _executeCommand(command);
}
} catch (e) {
print('❌ Failed to poll commands: $e');
}
}
Future<void> _executeCommand(DeviceCommand command) async {
print('⚡ Executing command: ${command.command}');
try {
switch (command.command) {
case 'reboot':
await _reboot();
break;
case 'update_firmware':
await _updateFirmware(command.parameters['version'] as String);
break;
case 'set_sampling_rate':
await _setSamplingRate(command.parameters['rate'] as int);
break;
default:
print('❓ Unknown command: ${command.command}');
}
} catch (e) {
print('❌ Command execution failed: $e');
}
}
Future<void> _reboot() async {
print('🔄 Rebooting device...');
// Simulate reboot
await Future.delayed(Duration(seconds: 2));
}
Future<void> _updateFirmware(String version) async {
print('📦 Updating firmware to $version...');
// Simulate firmware update
await Future.delayed(Duration(seconds: 10));
}
Future<void> _setSamplingRate(int rate) async {
print('⏱️ Setting sampling rate to ${rate}Hz');
// Update sampling configuration
}
}
What's Next
These real-world examples demonstrate how to build complete applications using Hypermodern's capabilities. In the next chapter, we'll explore the module ecosystem and integration patterns, showing how to extend Hypermodern with third-party services and create reusable components for the community.