Real-World Examples
AI-Powered Document Search Application
This example demonstrates building a modern document search application using UUID v7 for time-ordered identifiers and vector embeddings for semantic search capabilities.
Application Overview
Our document search application will feature:
- Time-ordered document IDs using UUID v7
- Semantic search using vector embeddings
- Real-time document updates via WebSocket
- Multi-protocol API (HTTP, WebSocket, TCP)
- Similarity-based document recommendations
Schema Definition
{
"models": {
"document": {
"id": {
"type": "uuid_v7",
"primary_key": true,
"description": "Time-ordered UUID for better database performance"
},
"title": "string",
"content": "string",
"summary": "string?",
"author_id": "uuid_v7",
"category": "@document_category",
"tags": ["string"],
"content_embedding": {
"type": "vector",
"dimensions": 1536,
"description": "OpenAI text embedding for semantic search"
},
"title_embedding": {
"type": "vector",
"dimensions": 1536,
"description": "Separate embedding for title-based search"
},
"view_count": "int32",
"like_count": "int32",
"metadata": "map<string, any>",
"created_at": "datetime",
"updated_at": "datetime",
"indexed_at": "datetime?"
},
"user": {
"id": "uuid_v7",
"username": "string",
"email": "string",
"full_name": "string",
"preferences": {
"search_history": ["string"],
"favorite_categories": ["@document_category"],
"embedding_model": "string"
},
"created_at": "datetime"
},
"search_query": {
"id": "uuid_v7",
"user_id": "uuid_v7",
"query_text": "string",
"query_embedding": {
"type": "vector",
"dimensions": 1536
},
"filters": "map<string, any>",
"result_count": "int32",
"execution_time_ms": "int32",
"created_at": "datetime"
},
"document_interaction": {
"id": "uuid_v7",
"user_id": "uuid_v7",
"document_id": "uuid_v7",
"interaction_type": "@interaction_type",
"similarity_score": "float64?",
"created_at": "datetime"
}
},
"enums": {
"document_category": ["technology", "science", "business", "health", "education", "entertainment"],
"interaction_type": ["view", "like", "share", "download", "bookmark"]
},
"endpoints": {
"search_documents": {
"method": "POST",
"path": "/documents/search",
"request": {
"query": "string",
"filters": {
"categories": ["@document_category"],
"date_range": {
"start": "datetime?",
"end": "datetime?"
},
"author_id": "uuid_v7?"
},
"limit": "int32",
"similarity_threshold": "float64"
},
"response": {
"documents": [{
"document": "@document",
"similarity_score": "float64",
"relevance_explanation": "string"
}],
"total_count": "int32",
"execution_time_ms": "int32"
},
"transports": ["http", "websocket", "tcp"]
},
"get_similar_documents": {
"method": "GET",
"path": "/documents/{id}/similar",
"request": {
"id": "uuid_v7",
"limit": "int32"
},
"response": {
"similar_documents": [{
"document": "@document",
"similarity_score": "float64"
}]
},
"transports": ["http", "websocket", "tcp"]
},
"create_document": {
"method": "POST",
"path": "/documents",
"request": {
"title": "string",
"content": "string",
"category": "@document_category",
"tags": ["string"]
},
"response": "@document",
"transports": ["http", "websocket", "tcp"]
},
"watch_document_updates": {
"type": "stream",
"description": "Real-time document updates",
"request": {
"categories": ["@document_category"],
"user_id": "uuid_v7?"
},
"response": {
"event_type": "string",
"document": "@document",
"timestamp": "datetime"
},
"transports": ["websocket", "tcp"]
}
}
}
Database Migrations
// Migration 1: Enable required extensions
class EnableAdvancedExtensions extends Migration {
@override
Future<void> up() async {
await execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";');
await execute('CREATE EXTENSION IF NOT EXISTS pgcrypto;');
await execute('CREATE EXTENSION IF NOT EXISTS vector;');
// Create UUID v7 generation function
await execute('''
CREATE OR REPLACE FUNCTION generate_uuid_v7()
RETURNS UUID AS \$\$
DECLARE
unix_ts_ms BIGINT;
uuid_bytes BYTEA;
BEGIN
unix_ts_ms := FLOOR(EXTRACT(EPOCH FROM NOW()) * 1000);
uuid_bytes :=
SUBSTRING(INT8SEND(unix_ts_ms), 3, 6) ||
GEN_RANDOM_BYTES(2) ||
(B'0111' || SUBSTRING(GEN_RANDOM_BYTES(1), 1, 1)::BIT(4))::BIT(8)::BYTEA ||
GEN_RANDOM_BYTES(7);
RETURN ENCODE(uuid_bytes, 'hex')::UUID;
END;
\$\$ LANGUAGE plpgsql;
''');
}
@override
Future<void> down() async {
await execute('DROP FUNCTION IF EXISTS generate_uuid_v7();');
await execute('DROP EXTENSION IF EXISTS vector CASCADE;');
}
}
// Migration 2: Create core tables
class CreateDocumentSearchTables extends Migration {
@override
Future<void> up() async {
// Users table
create('users', (Schema table) {
table.addColumn('id', 'UUID', nullable: false, defaultValue: 'generate_uuid_v7()');
table.primary('id');
table.addColumn('username', 'VARCHAR', length: 50, nullable: false, unique: true);
table.addColumn('email', 'VARCHAR', length: 255, nullable: false, unique: true);
table.addColumn('full_name', 'VARCHAR', length: 255, nullable: false);
table.addColumn('preferences', 'JSONB', defaultValue: '{}');
table.addColumn('created_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
});
// Documents table with vector embeddings
create('documents', (Schema table) {
table.addColumn('id', 'UUID', nullable: false, defaultValue: 'generate_uuid_v7()');
table.primary('id');
table.addColumn('title', 'VARCHAR', length: 500, nullable: false);
table.addColumn('content', 'TEXT', nullable: false);
table.addColumn('summary', 'TEXT', nullable: true);
table.addColumn('author_id', 'UUID', nullable: false);
table.addColumn('category', 'VARCHAR', length: 50, nullable: false);
table.addColumn('tags', 'TEXT[]', defaultValue: '{}');
// Vector embeddings for semantic search
table.addColumn('content_embedding', 'VECTOR(1536)', nullable: true);
table.addColumn('title_embedding', 'VECTOR(1536)', nullable: true);
table.addColumn('view_count', 'INTEGER', defaultValue: 0);
table.addColumn('like_count', 'INTEGER', defaultValue: 0);
table.addColumn('metadata', 'JSONB', defaultValue: '{}');
table.addColumn('created_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
table.addColumn('updated_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
table.addColumn('indexed_at', 'TIMESTAMP WITH TIME ZONE', nullable: true);
// Foreign key to users
table.foreign('author_id', 'users', 'id', onDelete: 'CASCADE');
});
// Search queries table for analytics
create('search_queries', (Schema table) {
table.addColumn('id', 'UUID', nullable: false, defaultValue: 'generate_uuid_v7()');
table.primary('id');
table.addColumn('user_id', 'UUID', nullable: false);
table.addColumn('query_text', 'TEXT', nullable: false);
table.addColumn('query_embedding', 'VECTOR(1536)', nullable: true);
table.addColumn('filters', 'JSONB', defaultValue: '{}');
table.addColumn('result_count', 'INTEGER', nullable: false);
table.addColumn('execution_time_ms', 'INTEGER', nullable: false);
table.addColumn('created_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
table.foreign('user_id', 'users', 'id', onDelete: 'CASCADE');
});
// Document interactions for recommendations
create('document_interactions', (Schema table) {
table.addColumn('id', 'UUID', nullable: false, defaultValue: 'generate_uuid_v7()');
table.primary('id');
table.addColumn('user_id', 'UUID', nullable: false);
table.addColumn('document_id', 'UUID', nullable: false);
table.addColumn('interaction_type', 'VARCHAR', length: 20, nullable: false);
table.addColumn('similarity_score', 'DOUBLE PRECISION', nullable: true);
table.addColumn('created_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
table.foreign('user_id', 'users', 'id', onDelete: 'CASCADE');
table.foreign('document_id', 'documents', 'id', onDelete: 'CASCADE');
});
}
@override
Future<void> down() async {
await drop('document_interactions');
await drop('search_queries');
await drop('documents');
await drop('users');
}
}
// Migration 3: Create vector indexes for performance
class CreateVectorIndexes extends Migration {
@override
Future<void> up() async {
// HNSW indexes for fast similarity search
await execute('''
CREATE INDEX idx_documents_content_embedding_hnsw
ON documents USING hnsw (content_embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
''');
await execute('''
CREATE INDEX idx_documents_title_embedding_hnsw
ON documents USING hnsw (title_embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
''');
await execute('''
CREATE INDEX idx_search_queries_embedding_hnsw
ON search_queries USING hnsw (query_embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
''');
// Traditional indexes for filtering
await execute('CREATE INDEX idx_documents_category ON documents(category);');
await execute('CREATE INDEX idx_documents_author_id ON documents(author_id);');
await execute('CREATE INDEX idx_documents_created_at ON documents(created_at);');
await execute('CREATE INDEX idx_documents_tags ON documents USING GIN(tags);');
}
@override
Future<void> down() async {
await execute('DROP INDEX IF EXISTS idx_documents_content_embedding_hnsw;');
await execute('DROP INDEX IF EXISTS idx_documents_title_embedding_hnsw;');
await execute('DROP INDEX IF EXISTS idx_search_queries_embedding_hnsw;');
await execute('DROP INDEX IF EXISTS idx_documents_category;');
await execute('DROP INDEX IF EXISTS idx_documents_author_id;');
await execute('DROP INDEX IF EXISTS idx_documents_created_at;');
await execute('DROP INDEX IF EXISTS idx_documents_tags;');
}
}
Server Implementation
import 'package:hypermodern_server/hypermodern_server.dart';
import 'package:openai_dart/openai_dart.dart';
class DocumentSearchServer {
final HypermodernServer _server;
final DocumentService _documentService;
final SearchService _searchService;
final EmbeddingService _embeddingService;
DocumentSearchServer({
required DocumentService documentService,
required SearchService searchService,
required EmbeddingService embeddingService,
}) : _server = HypermodernServer(),
_documentService = documentService,
_searchService = searchService,
_embeddingService = embeddingService;
Future<void> start() async {
_registerEndpoints();
await _server.listen(
httpPort: 8080,
websocketPort: 8082,
tcpPort: 8081,
);
print('🚀 Document Search Server started');
print(' HTTP: http://localhost:8080');
print(' WebSocket: ws://localhost:8082');
print(' TCP: localhost:8081');
}
void _registerEndpoints() {
// Document search with semantic similarity
_server.registerEndpoint<SearchDocumentsRequest, SearchDocumentsResponse>(
'search_documents',
(request) async {
final startTime = DateTime.now();
// Generate embedding for search query
final queryEmbedding = await _embeddingService.generateEmbedding(request.query);
// Perform semantic search
final results = await _searchService.searchDocuments(
queryEmbedding: queryEmbedding,
queryText: request.query,
filters: request.filters,
limit: request.limit,
similarityThreshold: request.similarityThreshold,
);
final executionTime = DateTime.now().difference(startTime).inMilliseconds;
// Log search query for analytics
await _searchService.logSearchQuery(
userId: request.userId,
queryText: request.query,
queryEmbedding: queryEmbedding,
filters: request.filters,
resultCount: results.length,
executionTimeMs: executionTime,
);
return SearchDocumentsResponse(
documents: results,
totalCount: results.length,
executionTimeMs: executionTime,
);
},
);
// Get similar documents
_server.registerEndpoint<GetSimilarDocumentsRequest, GetSimilarDocumentsResponse>(
'get_similar_documents',
(request) async {
final document = await _documentService.getDocument(request.id);
if (document == null) {
throw NotFoundException('Document not found');
}
final similarDocuments = await _searchService.findSimilarDocuments(
documentId: request.id,
limit: request.limit,
);
return GetSimilarDocumentsResponse(
similarDocuments: similarDocuments,
);
},
);
// Create new document with automatic embedding generation
_server.registerEndpoint<CreateDocumentRequest, Document>(
'create_document',
(request) async {
// Generate embeddings for title and content
final titleEmbedding = await _embeddingService.generateEmbedding(request.title);
final contentEmbedding = await _embeddingService.generateEmbedding(request.content);
final document = await _documentService.createDocument(
title: request.title,
content: request.content,
authorId: request.authorId,
category: request.category,
tags: request.tags,
titleEmbedding: titleEmbedding,
contentEmbedding: contentEmbedding,
);
// Broadcast document creation to subscribers
_server.broadcast('document_created', {
'event_type': 'created',
'document': document.toJson(),
'timestamp': DateTime.now().toIso8601String(),
});
return document;
},
);
// Real-time document updates stream
_server.registerStreamingEndpoint<WatchDocumentUpdatesRequest, DocumentUpdateEvent>(
'watch_document_updates',
(request) async* {
await for (final event in _documentService.watchDocumentUpdates(
categories: request.categories,
userId: request.userId,
)) {
yield event;
}
},
);
}
}
class DocumentService {
final Database _db;
final StreamController<DocumentUpdateEvent> _updateController;
DocumentService(this._db) : _updateController = StreamController.broadcast();
Future<Document> createDocument({
required String title,
required String content,
required String authorId,
required DocumentCategory category,
required List<String> tags,
required List<double> titleEmbedding,
required List<double> contentEmbedding,
}) async {
final titleVector = Vector.fromList(titleEmbedding);
final contentVector = Vector.fromList(contentEmbedding);
final result = await _db.query('''
INSERT INTO documents (
title, content, author_id, category, tags,
title_embedding, content_embedding, indexed_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, NOW())
RETURNING *
''', [
title,
content,
authorId,
category.toString(),
tags,
titleVector.toPostgresString(),
contentVector.toPostgresString(),
]);
final document = Document.fromJson(result.first);
// Emit update event
_updateController.add(DocumentUpdateEvent(
eventType: 'created',
document: document,
timestamp: DateTime.now(),
));
return document;
}
Future<Document?> getDocument(String id) async {
if (!UuidV7.isValid(id)) {
throw ArgumentError('Invalid document ID format');
}
final result = await _db.query('''
SELECT d.*, u.username as author_username, u.full_name as author_name
FROM documents d
JOIN users u ON d.author_id = u.id
WHERE d.id = ?
''', [id]);
if (result.isEmpty) return null;
// Increment view count
await _db.query('UPDATE documents SET view_count = view_count + 1 WHERE id = ?', [id]);
return Document.fromJson(result.first);
}
Stream<DocumentUpdateEvent> watchDocumentUpdates({
List<DocumentCategory>? categories,
String? userId,
}) {
return _updateController.stream.where((event) {
if (categories != null && !categories.contains(event.document.category)) {
return false;
}
if (userId != null && event.document.authorId != userId) {
return false;
}
return true;
});
}
}
class SearchService {
final Database _db;
SearchService(this._db);
Future<List<DocumentSearchResult>> searchDocuments({
required List<double> queryEmbedding,
required String queryText,
SearchFilters? filters,
int limit = 20,
double similarityThreshold = 0.7,
}) async {
final queryVector = Vector.fromList(queryEmbedding);
final queryVectorStr = queryVector.toPostgresString();
final whereConditions = <String>['d.content_embedding IS NOT NULL'];
final parameters = <dynamic>[queryVectorStr, queryVectorStr, similarityThreshold];
// Add category filter
if (filters?.categories != null && filters!.categories!.isNotEmpty) {
final categoryPlaceholders = List.filled(filters.categories!.length, '?').join(', ');
whereConditions.add('d.category IN ($categoryPlaceholders)');
parameters.addAll(filters.categories!.map((c) => c.toString()));
}
// Add date range filter
if (filters?.dateRange?.start != null) {
whereConditions.add('d.created_at >= ?');
parameters.add(filters!.dateRange!.start);
}
if (filters?.dateRange?.end != null) {
whereConditions.add('d.created_at <= ?');
parameters.add(filters!.dateRange!.end);
}
// Add author filter
if (filters?.authorId != null) {
whereConditions.add('d.author_id = ?');
parameters.add(filters!.authorId);
}
parameters.addAll([queryVectorStr, limit]);
final result = await _db.query('''
SELECT
d.*,
u.username as author_username,
u.full_name as author_name,
1 - (d.content_embedding <=> ?) as content_similarity,
1 - (d.title_embedding <=> ?) as title_similarity,
GREATEST(
1 - (d.content_embedding <=> ?),
1 - (d.title_embedding <=> ?)
) as max_similarity
FROM documents d
JOIN users u ON d.author_id = u.id
WHERE ${whereConditions.join(' AND ')}
AND GREATEST(
1 - (d.content_embedding <=> ?),
1 - (d.title_embedding <=> ?)
) > ?
ORDER BY max_similarity DESC
LIMIT ?
''', parameters);
return result.map((row) {
final document = Document.fromJson(row);
final contentSimilarity = row['content_similarity'] as double;
final titleSimilarity = row['title_similarity'] as double;
final maxSimilarity = row['max_similarity'] as double;
return DocumentSearchResult(
document: document,
similarityScore: maxSimilarity,
relevanceExplanation: _generateRelevanceExplanation(
queryText,
document,
contentSimilarity,
titleSimilarity,
),
);
}).toList();
}
Future<List<SimilarDocument>> findSimilarDocuments({
required String documentId,
int limit = 10,
}) async {
final result = await _db.query('''
WITH target_doc AS (
SELECT content_embedding, title_embedding
FROM documents
WHERE id = ?
)
SELECT
d.*,
u.username as author_username,
u.full_name as author_name,
GREATEST(
1 - (d.content_embedding <=> (SELECT content_embedding FROM target_doc)),
1 - (d.title_embedding <=> (SELECT title_embedding FROM target_doc))
) as similarity_score
FROM documents d
JOIN users u ON d.author_id = u.id
CROSS JOIN target_doc
WHERE d.id != ?
AND d.content_embedding IS NOT NULL
AND d.title_embedding IS NOT NULL
ORDER BY similarity_score DESC
LIMIT ?
''', [documentId, documentId, limit]);
return result.map((row) => SimilarDocument(
document: Document.fromJson(row),
similarityScore: row['similarity_score'] as double,
)).toList();
}
Future<void> logSearchQuery({
required String userId,
required String queryText,
required List<double> queryEmbedding,
required SearchFilters filters,
required int resultCount,
required int executionTimeMs,
}) async {
final queryVector = Vector.fromList(queryEmbedding);
await _db.query('''
INSERT INTO search_queries (
user_id, query_text, query_embedding, filters,
result_count, execution_time_ms
)
VALUES (?, ?, ?, ?, ?, ?)
''', [
userId,
queryText,
queryVector.toPostgresString(),
jsonEncode(filters.toJson()),
resultCount,
executionTimeMs,
]);
}
String _generateRelevanceExplanation(
String queryText,
Document document,
double contentSimilarity,
double titleSimilarity,
) {
final explanations = <String>[];
if (titleSimilarity > 0.8) {
explanations.add('Strong title match');
} else if (titleSimilarity > 0.6) {
explanations.add('Good title relevance');
}
if (contentSimilarity > 0.8) {
explanations.add('High content similarity');
} else if (contentSimilarity > 0.6) {
explanations.add('Relevant content match');
}
// Check for tag matches
final queryWords = queryText.toLowerCase().split(' ');
final matchingTags = document.tags.where((tag) =>
queryWords.any((word) => tag.toLowerCase().contains(word))
).toList();
if (matchingTags.isNotEmpty) {
explanations.add('Matching tags: ${matchingTags.join(', ')}');
}
return explanations.isEmpty ? 'Semantic similarity' : explanations.join('; ');
}
}
class EmbeddingService {
final OpenAI _openai;
EmbeddingService(String apiKey) : _openai = OpenAI(apiKey: apiKey);
Future<List<double>> generateEmbedding(String text) async {
try {
final response = await _openai.embeddings.create(
model: 'text-embedding-3-small',
input: text,
);
return response.data.first.embedding;
} catch (e) {
throw EmbeddingException('Failed to generate embedding: $e');
}
}
Future<Map<String, List<double>>> generateBatchEmbeddings(List<String> texts) async {
final results = <String, List<double>>{};
// Process in batches to avoid API limits
const batchSize = 100;
for (int i = 0; i < texts.length; i += batchSize) {
final batch = texts.skip(i).take(batchSize).toList();
final response = await _openai.embeddings.create(
model: 'text-embedding-3-small',
input: batch,
);
for (int j = 0; j < batch.length; j++) {
results[batch[j]] = response.data[j].embedding;
}
}
return results;
}
}
Client Implementation
import 'package:hypermodern/hypermodern.dart';
class DocumentSearchClient {
final HypermodernClient _client;
DocumentSearchClient(String baseUrl) : _client = HypermodernClient(baseUrl);
Future<void> connect() async {
await _client.connect();
}
Future<SearchDocumentsResponse> searchDocuments({
required String query,
SearchFilters? filters,
int limit = 20,
double similarityThreshold = 0.7,
}) async {
return await _client.request<SearchDocumentsResponse>(
'search_documents',
SearchDocumentsRequest(
query: query,
filters: filters ?? SearchFilters(),
limit: limit,
similarityThreshold: similarityThreshold,
),
);
}
Future<GetSimilarDocumentsResponse> getSimilarDocuments(String documentId, {int limit = 10}) async {
return await _client.request<GetSimilarDocumentsResponse>(
'get_similar_documents',
GetSimilarDocumentsRequest(id: documentId, limit: limit),
);
}
Future<Document> createDocument({
required String title,
required String content,
required String authorId,
required DocumentCategory category,
List<String> tags = const [],
}) async {
return await _client.request<Document>(
'create_document',
CreateDocumentRequest(
title: title,
content: content,
authorId: authorId,
category: category,
tags: tags,
),
);
}
Stream<DocumentUpdateEvent> watchDocumentUpdates({
List<DocumentCategory>? categories,
String? userId,
}) {
return _client.stream<DocumentUpdateEvent>(
'watch_document_updates',
WatchDocumentUpdatesRequest(
categories: categories,
userId: userId,
),
);
}
Future<void> disconnect() async {
await _client.disconnect();
}
}
// Usage example
void main() async {
final client = DocumentSearchClient('http://localhost:8080');
await client.connect();
try {
// Search for documents
final searchResults = await client.searchDocuments(
query: 'machine learning algorithms',
filters: SearchFilters(
categories: [DocumentCategory.technology, DocumentCategory.science],
dateRange: DateRange(
start: DateTime.now().subtract(Duration(days: 30)),
),
),
limit: 10,
similarityThreshold: 0.75,
);
print('Found ${searchResults.documents.length} documents');
for (final result in searchResults.documents) {
print('${result.document.title} (${result.similarityScore.toStringAsFixed(3)})');
print(' ${result.relevanceExplanation}');
}
// Get similar documents
if (searchResults.documents.isNotEmpty) {
final firstDoc = searchResults.documents.first.document;
final similarDocs = await client.getSimilarDocuments(firstDoc.id);
print('\nSimilar to "${firstDoc.title}":');
for (final similar in similarDocs.similarDocuments) {
print(' ${similar.document.title} (${similar.similarityScore.toStringAsFixed(3)})');
}
}
// Watch for real-time updates
final updateStream = client.watchDocumentUpdates(
categories: [DocumentCategory.technology],
);
await for (final update in updateStream.take(5)) {
print('Document ${update.eventType}: ${update.document.title}');
}
} finally {
await client.disconnect();
}
}
Key Features Demonstrated
-
UUID v7 Benefits:
- Time-ordered IDs for better database performance
- Natural sorting by creation time
- Reduced index fragmentation
-
Vector Embeddings:
- Semantic search capabilities
- Similarity-based recommendations
- Multiple embedding types (title vs. content)
-
Performance Optimizations:
- HNSW indexes for fast vector similarity search
- Batch embedding generation
- Query result caching
-
Real-time Features:
- WebSocket streaming for live updates
- Multi-protocol support (HTTP, WebSocket, TCP)
- Broadcast notifications
-
Analytics and Monitoring:
- Search query logging
- Performance metrics
- User interaction tracking
This example showcases how UUID v7 and vector data types work together to create a modern, high-performance document search application with semantic capabilities.
Chat Application
Let's build a complete real-time chat application that demonstrates Hypermodern's multi-protocol capabilities, real-time streaming, and user management.
Schema Definition
{
"models": {
"user": {
"id": "int64",
"username": "string",
"email": "string",
"display_name": "string",
"avatar_url": "string?",
"status": "@user_status",
"last_seen": "datetime",
"created_at": "datetime"
},
"chat_room": {
"id": "int64",
"name": "string",
"description": "string?",
"type": "@room_type",
"owner_id": "int64",
"created_at": "datetime",
"member_count": "int32"
},
"message": {
"id": "int64",
"room_id": "int64",
"user_id": "int64",
"content": "string",
"message_type": "@message_type",
"reply_to_id": "int64?",
"attachments": ["@attachment"],
"created_at": "datetime",
"edited_at": "datetime?"
},
"attachment": {
"id": "int64",
"filename": "string",
"content_type": "string",
"size": "int64",
"url": "string"
},
"room_member": {
"room_id": "int64",
"user_id": "int64",
"role": "@member_role",
"joined_at": "datetime",
"last_read_message_id": "int64?"
}
},
"enums": {
"user_status": ["online", "away", "busy", "offline"],
"room_type": ["public", "private", "direct"],
"message_type": ["text", "image", "file", "system"],
"member_role": ["owner", "admin", "member"]
},
"endpoints": {
"create_room": {
"method": "POST",
"path": "/rooms",
"request": {
"name": "string",
"description": "string?",
"type": "@room_type"
},
"response": "@chat_room",
"transports": ["http", "websocket"]
},
"join_room": {
"method": "POST",
"path": "/rooms/{room_id}/join",
"request": {
"room_id": "int64"
},
"response": {
"success": "bool",
"room": "@chat_room"
},
"transports": ["http", "websocket"]
},
"send_message": {
"method": "POST",
"path": "/rooms/{room_id}/messages",
"request": {
"room_id": "int64",
"content": "string",
"message_type": "@message_type",
"reply_to_id": "int64?"
},
"response": "@message",
"transports": ["http", "websocket"]
},
"get_messages": {
"method": "GET",
"path": "/rooms/{room_id}/messages",
"request": {
"room_id": "int64",
"limit": "int32?",
"before_id": "int64?"
},
"response": {
"messages": ["@message"],
"has_more": "bool"
},
"transports": ["http", "websocket"]
},
"watch_room": {
"type": "stream",
"request": {
"room_id": "int64"
},
"response": "@room_event",
"transports": ["websocket", "tcp"]
}
}
}
Server Implementation
class ChatServer {
final HypermodernServer _server;
final ChatService _chatService;
final UserService _userService;
final RealTimeBroadcaster _broadcaster;
ChatServer({
required HypermodernServer server,
required ChatService chatService,
required UserService userService,
required RealTimeBroadcaster broadcaster,
}) : _server = server,
_chatService = chatService,
_userService = userService,
_broadcaster = broadcaster;
Future<void> initialize() async {
_registerEndpoints();
_registerStreamingEndpoints();
_setupMiddleware();
await _server.listen();
print('🚀 Chat server running on multiple protocols');
}
void _registerEndpoints() {
// Room management
_server.registerEndpoint<CreateRoomRequest, ChatRoom>(
'create_room',
(request) async {
final userId = _getCurrentUserId(request.context);
final room = await _chatService.createRoom(
name: request.name,
description: request.description,
type: request.type,
ownerId: userId,
);
// Broadcast room creation
_broadcaster.broadcast('global', RoomCreatedEvent(room: room));
return room;
},
);
_server.registerEndpoint<JoinRoomRequest, JoinRoomResponse>(
'join_room',
(request) async {
final userId = _getCurrentUserId(request.context);
final room = await _chatService.joinRoom(
roomId: request.roomId,
userId: userId,
);
// Broadcast user joined
_broadcaster.broadcast(
'room:${request.roomId}',
UserJoinedEvent(roomId: request.roomId, userId: userId),
);
return JoinRoomResponse(success: true, room: room);
},
);
// Message handling
_server.registerEndpoint<SendMessageRequest, Message>(
'send_message',
(request) async {
final userId = _getCurrentUserId(request.context);
// Validate user is member of room
await _chatService.validateRoomMembership(request.roomId, userId);
final message = await _chatService.sendMessage(
roomId: request.roomId,
userId: userId,
content: request.content,
messageType: request.messageType,
replyToId: request.replyToId,
);
// Broadcast message to room members
_broadcaster.broadcast(
'room:${request.roomId}',
MessageSentEvent(message: message),
);
// Update user activity
await _userService.updateLastSeen(userId);
return message;
},
);
_server.registerEndpoint<GetMessagesRequest, GetMessagesResponse>(
'get_messages',
(request) async {
final userId = _getCurrentUserId(request.context);
// Validate access to room
await _chatService.validateRoomAccess(request.roomId, userId);
final result = await _chatService.getMessages(
roomId: request.roomId,
limit: request.limit ?? 50,
beforeId: request.beforeId,
);
return GetMessagesResponse(
messages: result.messages,
hasMore: result.hasMore,
);
},
);
}
void _registerStreamingEndpoints() {
// Real-time room updates
_server.registerStreamingEndpoint<WatchRoomRequest, RoomEvent>(
'watch_room',
(request) async* {
final userId = _getCurrentUserId(request.context);
// Validate access to room
await _chatService.validateRoomAccess(request.roomId, userId);
// Subscribe to room events
final controller = StreamController<RoomEvent>();
final channel = 'room:${request.roomId}';
_broadcaster.subscribe(channel, controller);
// Send initial room state
controller.add(RoomStateEvent(
roomId: request.roomId,
onlineMembers: await _chatService.getOnlineMembers(request.roomId),
));
// Clean up on disconnect
controller.onCancel = () {
_broadcaster.unsubscribe(channel, controller);
};
yield* controller.stream;
},
);
// User presence updates
_server.registerStreamingEndpoint<WatchPresenceRequest, PresenceEvent>(
'watch_presence',
(request) async* {
final userId = _getCurrentUserId(request.context);
final controller = StreamController<PresenceEvent>();
_broadcaster.subscribe('presence', controller);
// Send current presence state
final onlineUsers = await _userService.getOnlineUsers();
controller.add(PresenceStateEvent(onlineUsers: onlineUsers));
// Update user's own presence
await _userService.setUserStatus(userId, UserStatus.online);
_broadcaster.broadcast('presence', UserStatusChangedEvent(
userId: userId,
status: UserStatus.online,
));
// Handle disconnect
controller.onCancel = () async {
await _userService.setUserStatus(userId, UserStatus.offline);
_broadcaster.broadcast('presence', UserStatusChangedEvent(
userId: userId,
status: UserStatus.offline,
));
_broadcaster.unsubscribe('presence', controller);
};
yield* controller.stream;
},
);
}
void _setupMiddleware() {
// Authentication required for all endpoints
_server.middleware.add(AuthenticationMiddleware(
jwtService: _server.container.get<JWTService>(),
publicEndpoints: {},
));
// Rate limiting for message sending
_server.middleware.add(ConditionalMiddleware(
RateLimitMiddleware(
rateLimiter: _server.container.get<RateLimiter>(),
endpointLimits: {
'send_message': RateLimit(requests: 30, window: Duration(minutes: 1)),
},
),
(request) => request.endpoint == 'send_message',
));
// Logging and metrics
_server.middleware.add(LoggingMiddleware());
_server.middleware.add(MetricsMiddleware(_server.container.get<PrometheusClient>()));
}
int _getCurrentUserId(Map<String, dynamic> context) {
final userId = context['user_id'] as int?;
if (userId == null) {
throw UnauthorizedException('User not authenticated');
}
return userId;
}
}
class ChatService {
final Database _db;
final FileStorageService _fileStorage;
ChatService(this._db, this._fileStorage);
Future<ChatRoom> createRoom({
required String name,
String? description,
required RoomType type,
required int ownerId,
}) async {
return await _db.transaction((tx) async {
// Create room
final roomResult = await tx.query('''
INSERT INTO chat_rooms (name, description, type, owner_id, created_at)
VALUES (?, ?, ?, ?, ?)
RETURNING *
''', [name, description, type.toString(), ownerId, DateTime.now()]);
final room = ChatRoom.fromJson(roomResult.first);
// Add owner as member
await tx.query('''
INSERT INTO room_members (room_id, user_id, role, joined_at)
VALUES (?, ?, ?, ?)
''', [room.id, ownerId, MemberRole.owner.toString(), DateTime.now()]);
return room;
});
}
Future<ChatRoom> joinRoom({
required int roomId,
required int userId,
}) async {
return await _db.transaction((tx) async {
// Check if room exists and is joinable
final roomResult = await tx.query(
'SELECT * FROM chat_rooms WHERE id = ?',
[roomId],
);
if (roomResult.isEmpty) {
throw NotFoundException('Room not found');
}
final room = ChatRoom.fromJson(roomResult.first);
if (room.type == RoomType.private) {
throw ForbiddenException('Cannot join private room without invitation');
}
// Check if already a member
final memberResult = await tx.query(
'SELECT 1 FROM room_members WHERE room_id = ? AND user_id = ?',
[roomId, userId],
);
if (memberResult.isNotEmpty) {
return room; // Already a member
}
// Add as member
await tx.query('''
INSERT INTO room_members (room_id, user_id, role, joined_at)
VALUES (?, ?, ?, ?)
''', [roomId, userId, MemberRole.member.toString(), DateTime.now()]);
// Update member count
await tx.query(
'UPDATE chat_rooms SET member_count = member_count + 1 WHERE id = ?',
[roomId],
);
return room;
});
}
Future<Message> sendMessage({
required int roomId,
required int userId,
required String content,
required MessageType messageType,
int? replyToId,
}) async {
// Validate content
if (content.trim().isEmpty) {
throw ValidationException('Message content cannot be empty');
}
if (content.length > 4000) {
throw ValidationException('Message too long');
}
// Process mentions and links
final processedContent = await _processMessageContent(content);
final result = await _db.query('''
INSERT INTO messages (room_id, user_id, content, message_type, reply_to_id, created_at)
VALUES (?, ?, ?, ?, ?, ?)
RETURNING *
''', [roomId, userId, processedContent, messageType.toString(), replyToId, DateTime.now()]);
return Message.fromJson(result.first);
}
Future<String> _processMessageContent(String content) async {
// Process @mentions
final mentionRegex = RegExp(r'@(\w+)');
var processedContent = content;
final mentions = mentionRegex.allMatches(content);
for (final mention in mentions) {
final username = mention.group(1)!;
final user = await _getUserByUsername(username);
if (user != null) {
processedContent = processedContent.replaceAll(
mention.group(0)!,
'<@${user.id}>',
);
}
}
return processedContent;
}
Future<User?> _getUserByUsername(String username) async {
final result = await _db.query(
'SELECT * FROM users WHERE username = ?',
[username],
);
return result.isEmpty ? null : User.fromJson(result.first);
}
}
Client Implementation
class ChatClient {
final HypermodernClient _client;
final StreamController<ChatEvent> _eventController = StreamController.broadcast();
StreamSubscription? _roomSubscription;
StreamSubscription? _presenceSubscription;
ChatClient(String serverUrl) : _client = HypermodernClient(serverUrl);
Stream<ChatEvent> get events => _eventController.stream;
Future<void> connect(String authToken) async {
_client.setAuthToken(authToken);
await _client.connect();
// Start presence monitoring
await _startPresenceMonitoring();
print('✅ Connected to chat server');
}
Future<void> disconnect() async {
await _roomSubscription?.cancel();
await _presenceSubscription?.cancel();
await _client.disconnect();
await _eventController.close();
}
// Room operations
Future<ChatRoom> createRoom({
required String name,
String? description,
required RoomType type,
}) async {
final request = CreateRoomRequest(
name: name,
description: description,
type: type,
);
return await _client.request<ChatRoom>('create_room', request);
}
Future<void> joinRoom(int roomId) async {
final request = JoinRoomRequest(roomId: roomId);
final response = await _client.request<JoinRoomResponse>('join_room', request);
if (response.success) {
await _watchRoom(roomId);
_eventController.add(RoomJoinedEvent(room: response.room));
}
}
Future<void> _watchRoom(int roomId) async {
await _roomSubscription?.cancel();
final request = WatchRoomRequest(roomId: roomId);
final stream = _client.stream<RoomEvent>('watch_room', request);
_roomSubscription = stream.listen(
(event) => _eventController.add(ChatEvent.fromRoomEvent(event)),
onError: (error) => _eventController.addError(error),
);
}
// Message operations
Future<Message> sendMessage({
required int roomId,
required String content,
MessageType type = MessageType.text,
int? replyToId,
}) async {
final request = SendMessageRequest(
roomId: roomId,
content: content,
messageType: type,
replyToId: replyToId,
);
return await _client.request<Message>('send_message', request);
}
Future<List<Message>> getMessages({
required int roomId,
int? limit,
int? beforeId,
}) async {
final request = GetMessagesRequest(
roomId: roomId,
limit: limit,
beforeId: beforeId,
);
final response = await _client.request<GetMessagesResponse>('get_messages', request);
return response.messages;
}
Future<void> _startPresenceMonitoring() async {
final request = WatchPresenceRequest();
final stream = _client.stream<PresenceEvent>('watch_presence', request);
_presenceSubscription = stream.listen(
(event) => _eventController.add(ChatEvent.fromPresenceEvent(event)),
onError: (error) => print('Presence error: $error'),
);
}
}
// Usage example
class ChatApp {
final ChatClient _client;
final Map<int, List<Message>> _roomMessages = {};
final Set<int> _onlineUsers = {};
ChatApp(String serverUrl) : _client = ChatClient(serverUrl);
Future<void> start() async {
// Connect to server
await _client.connect('your-auth-token');
// Listen for events
_client.events.listen(_handleChatEvent);
// Join a room
await _client.joinRoom(1);
// Load message history
final messages = await _client.getMessages(roomId: 1, limit: 50);
_roomMessages[1] = messages;
// Start message loop
_startMessageLoop();
}
void _handleChatEvent(ChatEvent event) {
switch (event.type) {
case ChatEventType.messageReceived:
final messageEvent = event as MessageReceivedEvent;
_roomMessages.putIfAbsent(messageEvent.message.roomId, () => [])
.add(messageEvent.message);
_displayMessage(messageEvent.message);
break;
case ChatEventType.userJoined:
final joinEvent = event as UserJoinedEvent;
print('${joinEvent.userId} joined the room');
break;
case ChatEventType.userLeft:
final leaveEvent = event as UserLeftEvent;
print('${leaveEvent.userId} left the room');
break;
case ChatEventType.userStatusChanged:
final statusEvent = event as UserStatusChangedEvent;
if (statusEvent.status == UserStatus.online) {
_onlineUsers.add(statusEvent.userId);
} else {
_onlineUsers.remove(statusEvent.userId);
}
break;
}
}
void _displayMessage(Message message) {
print('[${message.createdAt}] ${message.userId}: ${message.content}');
}
void _startMessageLoop() {
// Simple console input loop
stdin.transform(utf8.decoder).transform(LineSplitter()).listen((line) async {
if (line.isNotEmpty) {
try {
await _client.sendMessage(roomId: 1, content: line);
} catch (e) {
print('Failed to send message: $e');
}
}
});
}
}
API Gateway
An API gateway that translates between different protocols and provides unified access to multiple backend services.
Gateway Schema
{
"models": {
"gateway_route": {
"id": "int64",
"path": "string",
"method": "string",
"backend_service": "string",
"backend_path": "string",
"protocol": "@backend_protocol",
"auth_required": "bool",
"rate_limit": "@rate_limit_config?",
"cache_config": "@cache_config?",
"created_at": "datetime"
},
"rate_limit_config": {
"requests": "int32",
"window_seconds": "int32",
"burst": "int32?"
},
"cache_config": {
"ttl_seconds": "int32",
"vary_headers": ["string"]
},
"api_key": {
"id": "int64",
"key": "string",
"name": "string",
"permissions": ["string"],
"rate_limit": "@rate_limit_config?",
"expires_at": "datetime?",
"created_at": "datetime"
}
},
"enums": {
"backend_protocol": ["http", "grpc", "graphql", "websocket"]
},
"endpoints": {
"proxy_request": {
"method": "ANY",
"path": "/api/{path:.*}",
"request": {
"path": "string",
"method": "string",
"headers": "map<string, string>",
"query_params": "map<string, string>",
"body": "any?"
},
"response": {
"status_code": "int32",
"headers": "map<string, string>",
"body": "any?"
},
"transports": ["http", "websocket"]
}
}
}
Gateway Implementation
class APIGateway {
final HypermodernServer _server;
final RouteManager _routeManager;
final BackendClientPool _clientPool;
final CacheManager _cacheManager;
final RateLimiter _rateLimiter;
final MetricsCollector _metrics;
APIGateway({
required HypermodernServer server,
required RouteManager routeManager,
required BackendClientPool clientPool,
required CacheManager cacheManager,
required RateLimiter rateLimiter,
required MetricsCollector metrics,
}) : _server = server,
_routeManager = routeManager,
_clientPool = clientPool,
_cacheManager = cacheManager,
_rateLimiter = rateLimiter,
_metrics = metrics;
Future<void> initialize() async {
_setupMiddleware();
_registerProxyEndpoint();
await _server.listen();
print('🌐 API Gateway running');
}
void _setupMiddleware() {
// Request logging
_server.middleware.add(RequestLoggingMiddleware());
// CORS handling
_server.middleware.add(CORSMiddleware(
allowedOrigins: ['*'],
allowedMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'],
));
// API key authentication
_server.middleware.add(APIKeyAuthMiddleware(_routeManager));
// Rate limiting
_server.middleware.add(GatewayRateLimitMiddleware(_rateLimiter, _routeManager));
// Caching
_server.middleware.add(GatewayCacheMiddleware(_cacheManager, _routeManager));
// Metrics collection
_server.middleware.add(GatewayMetricsMiddleware(_metrics));
}
void _registerProxyEndpoint() {
_server.registerEndpoint<ProxyRequest, ProxyResponse>(
'proxy_request',
(request) async => await _handleProxyRequest(request),
);
}
Future<ProxyResponse> _handleProxyRequest(ProxyRequest request) async {
final stopwatch = Stopwatch()..start();
try {
// Find matching route
final route = await _routeManager.findRoute(request.path, request.method);
if (route == null) {
return ProxyResponse(
statusCode: 404,
headers: {'Content-Type': 'application/json'},
body: {'error': 'Route not found'},
);
}
// Transform request for backend
final backendRequest = await _transformRequest(request, route);
// Get backend client
final client = await _clientPool.getClient(route.backendService, route.protocol);
// Make backend request
final backendResponse = await client.request(backendRequest);
// Transform response for client
final response = await _transformResponse(backendResponse, route);
stopwatch.stop();
// Record metrics
_metrics.recordRequest(
route: route.path,
method: request.method,
statusCode: response.statusCode,
duration: stopwatch.elapsed,
backendService: route.backendService,
);
return response;
} catch (e) {
stopwatch.stop();
_metrics.recordError(
route: request.path,
method: request.method,
error: e.toString(),
duration: stopwatch.elapsed,
);
return _handleError(e);
}
}
Future<BackendRequest> _transformRequest(ProxyRequest request, GatewayRoute route) async {
// Transform path
final transformedPath = _transformPath(request.path, route);
// Add gateway headers
final headers = Map<String, String>.from(request.headers);
headers['X-Gateway-Route'] = route.id.toString();
headers['X-Gateway-Timestamp'] = DateTime.now().toIso8601String();
// Protocol-specific transformations
switch (route.protocol) {
case BackendProtocol.http:
return HttpBackendRequest(
path: transformedPath,
method: request.method,
headers: headers,
queryParams: request.queryParams,
body: request.body,
);
case BackendProtocol.grpc:
return GrpcBackendRequest(
service: route.backendService,
method: _extractGrpcMethod(transformedPath),
metadata: headers,
request: request.body,
);
case BackendProtocol.graphql:
return GraphQLBackendRequest(
query: _transformToGraphQL(request),
variables: request.body is Map ? request.body as Map<String, dynamic> : {},
headers: headers,
);
default:
throw UnsupportedError('Protocol ${route.protocol} not supported');
}
}
String _transformPath(String originalPath, GatewayRoute route) {
// Remove gateway prefix and apply backend path template
final pathWithoutPrefix = originalPath.replaceFirst('/api', '');
return route.backendPath.replaceAll('{path}', pathWithoutPrefix);
}
Future<ProxyResponse> _transformResponse(BackendResponse backendResponse, GatewayRoute route) async {
final headers = Map<String, String>.from(backendResponse.headers);
// Add gateway headers
headers['X-Gateway-Service'] = route.backendService;
headers['X-Gateway-Protocol'] = route.protocol.toString();
// Protocol-specific response transformation
dynamic transformedBody;
switch (route.protocol) {
case BackendProtocol.grpc:
transformedBody = _transformGrpcResponse(backendResponse.body);
break;
case BackendProtocol.graphql:
transformedBody = _transformGraphQLResponse(backendResponse.body);
break;
default:
transformedBody = backendResponse.body;
}
return ProxyResponse(
statusCode: backendResponse.statusCode,
headers: headers,
body: transformedBody,
);
}
ProxyResponse _handleError(dynamic error) {
if (error is TimeoutException) {
return ProxyResponse(
statusCode: 504,
headers: {'Content-Type': 'application/json'},
body: {'error': 'Gateway timeout'},
);
} else if (error is ConnectionException) {
return ProxyResponse(
statusCode: 502,
headers: {'Content-Type': 'application/json'},
body: {'error': 'Bad gateway'},
);
} else {
return ProxyResponse(
statusCode: 500,
headers: {'Content-Type': 'application/json'},
body: {'error': 'Internal server error'},
);
}
}
}
class BackendClientPool {
final Map<String, BackendClient> _clients = {};
final ClientFactory _clientFactory;
BackendClientPool(this._clientFactory);
Future<BackendClient> getClient(String service, BackendProtocol protocol) async {
final key = '$service:$protocol';
return _clients.putIfAbsent(key, () {
return _clientFactory.createClient(service, protocol);
});
}
}
class ClientFactory {
final Map<String, ServiceConfig> _serviceConfigs;
ClientFactory(this._serviceConfigs);
BackendClient createClient(String service, BackendProtocol protocol) {
final config = _serviceConfigs[service];
if (config == null) {
throw ArgumentError('Service configuration not found: $service');
}
switch (protocol) {
case BackendProtocol.http:
return HttpBackendClient(config.httpUrl);
case BackendProtocol.grpc:
return GrpcBackendClient(config.grpcUrl);
case BackendProtocol.graphql:
return GraphQLBackendClient(config.graphqlUrl);
default:
throw UnsupportedError('Protocol $protocol not supported');
}
}
}
Microservices Architecture
A complete microservices setup with service discovery, inter-service communication, and distributed tracing.
Service Registry
class ServiceRegistry {
final Database _db;
final Map<String, ServiceInstance> _services = {};
final StreamController<ServiceEvent> _eventController = StreamController.broadcast();
ServiceRegistry(this._db);
Stream<ServiceEvent> get events => _eventController.stream;
Future<void> registerService(ServiceRegistration registration) async {
final instance = ServiceInstance(
id: registration.id,
name: registration.name,
version: registration.version,
endpoints: registration.endpoints,
healthCheckUrl: registration.healthCheckUrl,
metadata: registration.metadata,
registeredAt: DateTime.now(),
lastHeartbeat: DateTime.now(),
);
_services[registration.id] = instance;
// Persist to database
await _db.query('''
INSERT INTO service_instances (id, name, version, endpoints, health_check_url, metadata, registered_at, last_heartbeat)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
endpoints = EXCLUDED.endpoints,
health_check_url = EXCLUDED.health_check_url,
metadata = EXCLUDED.metadata,
last_heartbeat = EXCLUDED.last_heartbeat
''', [
instance.id,
instance.name,
instance.version,
jsonEncode(instance.endpoints.map((e) => e.toJson()).toList()),
instance.healthCheckUrl,
jsonEncode(instance.metadata),
instance.registeredAt,
instance.lastHeartbeat,
]);
_eventController.add(ServiceRegisteredEvent(instance: instance));
print('✅ Service registered: ${instance.name}@${instance.version}');
}
Future<void> deregisterService(String serviceId) async {
final instance = _services.remove(serviceId);
if (instance != null) {
await _db.query('DELETE FROM service_instances WHERE id = ?', [serviceId]);
_eventController.add(ServiceDeregisteredEvent(instance: instance));
print('❌ Service deregistered: ${instance.name}');
}
}
Future<void> heartbeat(String serviceId) async {
final instance = _services[serviceId];
if (instance != null) {
instance.lastHeartbeat = DateTime.now();
await _db.query(
'UPDATE service_instances SET last_heartbeat = ? WHERE id = ?',
[instance.lastHeartbeat, serviceId],
);
}
}
List<ServiceInstance> discoverServices(String serviceName) {
return _services.values
.where((instance) => instance.name == serviceName)
.where((instance) => _isHealthy(instance))
.toList();
}
bool _isHealthy(ServiceInstance instance) {
final timeSinceHeartbeat = DateTime.now().difference(instance.lastHeartbeat);
return timeSinceHeartbeat < Duration(minutes: 2);
}
Future<void> startHealthChecking() async {
Timer.periodic(Duration(seconds: 30), (_) async {
await _performHealthChecks();
});
}
Future<void> _performHealthChecks() async {
final unhealthyServices = <String>[];
for (final instance in _services.values) {
try {
final client = HttpClient();
final response = await client.get(instance.healthCheckUrl)
.timeout(Duration(seconds: 5));
if (response.statusCode != 200) {
unhealthyServices.add(instance.id);
}
} catch (e) {
unhealthyServices.add(instance.id);
}
}
// Remove unhealthy services
for (final serviceId in unhealthyServices) {
await deregisterService(serviceId);
}
}
}
Inter-Service Communication
class ServiceClient {
final ServiceRegistry _registry;
final LoadBalancer _loadBalancer;
final CircuitBreaker _circuitBreaker;
final Map<String, HypermodernClient> _clients = {};
ServiceClient({
required ServiceRegistry registry,
required LoadBalancer loadBalancer,
required CircuitBreaker circuitBreaker,
}) : _registry = registry,
_loadBalancer = loadBalancer,
_circuitBreaker = circuitBreaker;
Future<T> call<T>({
required String serviceName,
required String endpoint,
required dynamic request,
Duration? timeout,
}) async {
return await _circuitBreaker.execute(() async {
// Discover service instances
final instances = _registry.discoverServices(serviceName);
if (instances.isEmpty) {
throw ServiceUnavailableException('No healthy instances of $serviceName');
}
// Select instance using load balancer
final selectedInstance = _loadBalancer.selectInstance(instances);
// Get or create client
final client = await _getClient(selectedInstance);
// Make request with timeout
final future = client.request<T>(endpoint, request);
final timeoutDuration = timeout ?? Duration(seconds: 30);
return await future.timeout(timeoutDuration);
});
}
Future<HypermodernClient> _getClient(ServiceInstance instance) async {
final clientKey = instance.id;
if (!_clients.containsKey(clientKey)) {
// Find best endpoint for communication
final endpoint = _selectBestEndpoint(instance.endpoints);
final client = HypermodernClient(endpoint.url);
await client.connect();
_clients[clientKey] = client;
}
return _clients[clientKey]!;
}
ServiceEndpoint _selectBestEndpoint(List<ServiceEndpoint> endpoints) {
// Prefer TCP for performance, then WebSocket, then HTTP
final priorities = [
ProtocolType.tcp,
ProtocolType.websocket,
ProtocolType.http,
];
for (final protocol in priorities) {
final endpoint = endpoints.firstWhere(
(e) => e.protocol == protocol,
orElse: () => endpoints.first,
);
if (endpoint.protocol == protocol) {
return endpoint;
}
}
return endpoints.first;
}
}
// Example microservice
class UserService extends MicroService {
final UserRepository _userRepository;
final ServiceClient _serviceClient;
UserService({
required UserRepository userRepository,
required ServiceClient serviceClient,
}) : _userRepository = userRepository,
_serviceClient = serviceClient;
@override
String get serviceName => 'user-service';
@override
String get version => '1.0.0';
@override
List<ServiceEndpoint> get endpoints => [
ServiceEndpoint(
protocol: ProtocolType.http,
url: 'http://localhost:8080',
),
ServiceEndpoint(
protocol: ProtocolType.websocket,
url: 'ws://localhost:8082',
),
ServiceEndpoint(
protocol: ProtocolType.tcp,
url: 'tcp://localhost:8081',
),
];
@override
void registerEndpoints(HypermodernServer server) {
server.registerEndpoint<CreateUserRequest, User>(
'create_user',
(request) async {
// Create user
final user = await _userRepository.create(User(
username: request.username,
email: request.email,
passwordHash: _hashPassword(request.password),
createdAt: DateTime.now(),
updatedAt: DateTime.now(),
));
// Notify other services
await _serviceClient.call<void>(
serviceName: 'notification-service',
endpoint: 'send_welcome_email',
request: SendWelcomeEmailRequest(
userId: user.id,
email: user.email,
username: user.username,
),
);
await _serviceClient.call<void>(
serviceName: 'analytics-service',
endpoint: 'track_user_registration',
request: TrackUserRegistrationRequest(
userId: user.id,
timestamp: DateTime.now(),
),
);
return user;
},
);
server.registerEndpoint<GetUserRequest, User>(
'get_user',
(request) async {
final user = await _userRepository.findById(request.id);
if (user == null) {
throw NotFoundException('User not found');
}
return user;
},
);
}
}
IoT Device Communication
A platform for managing IoT devices with efficient binary communication over TCP.
IoT Schema
{
"models": {
"device": {
"id": "int64",
"device_id": "string",
"name": "string",
"type": "@device_type",
"firmware_version": "string",
"last_seen": "datetime",
"status": "@device_status",
"location": "@location?",
"metadata": "map<string, any>",
"created_at": "datetime"
},
"location": {
"latitude": "float64",
"longitude": "float64",
"altitude": "float64?"
},
"sensor_reading": {
"device_id": "string",
"sensor_type": "@sensor_type",
"value": "float64",
"unit": "string",
"timestamp": "datetime",
"quality": "float32?"
},
"device_command": {
"id": "int64",
"device_id": "string",
"command": "string",
"parameters": "map<string, any>",
"status": "@command_status",
"sent_at": "datetime",
"executed_at": "datetime?"
}
},
"enums": {
"device_type": ["sensor", "actuator", "gateway", "controller"],
"device_status": ["online", "offline", "maintenance", "error"],
"sensor_type": ["temperature", "humidity", "pressure", "motion", "light"],
"command_status": ["pending", "sent", "acknowledged", "executed", "failed"]
},
"endpoints": {
"register_device": {
"method": "POST",
"path": "/devices/register",
"request": {
"device_id": "string",
"name": "string",
"type": "@device_type",
"firmware_version": "string",
"capabilities": ["string"]
},
"response": "@device",
"transports": ["tcp", "http"]
},
"send_readings": {
"method": "POST",
"path": "/devices/{device_id}/readings",
"request": {
"device_id": "string",
"readings": ["@sensor_reading"]
},
"response": {
"accepted": "int32",
"rejected": "int32"
},
"transports": ["tcp"]
},
"get_commands": {
"method": "GET",
"path": "/devices/{device_id}/commands",
"request": {
"device_id": "string",
"since": "datetime?"
},
"response": {
"commands": ["@device_command"]
},
"transports": ["tcp", "http"]
}
}
}
IoT Platform Implementation
class IoTPlatform {
final HypermodernServer _server;
final DeviceManager _deviceManager;
final DataProcessor _dataProcessor;
final CommandQueue _commandQueue;
final TimeSeriesDB _timeSeriesDB;
IoTPlatform({
required HypermodernServer server,
required DeviceManager deviceManager,
required DataProcessor dataProcessor,
required CommandQueue commandQueue,
required TimeSeriesDB timeSeriesDB,
}) : _server = server,
_deviceManager = deviceManager,
_dataProcessor = dataProcessor,
_commandQueue = commandQueue,
_timeSeriesDB = timeSeriesDB;
Future<void> initialize() async {
_setupTcpOptimizations();
_registerEndpoints();
_startBackgroundTasks();
await _server.listen();
print('🌐 IoT Platform running');
}
void _setupTcpOptimizations() {
// Configure TCP server for IoT devices
_server.configureTcp(TcpServerConfig(
port: 8081,
maxConnections: 10000,
keepAlive: true,
keepAliveInterval: Duration(seconds: 30),
noDelay: true, // Disable Nagle's algorithm for low latency
receiveBufferSize: 64 * 1024,
sendBufferSize: 64 * 1024,
));
}
void _registerEndpoints() {
// Device registration
_server.registerEndpoint<RegisterDeviceRequest, Device>(
'register_device',
(request) async {
final device = await _deviceManager.registerDevice(
deviceId: request.deviceId,
name: request.name,
type: request.type,
firmwareVersion: request.firmwareVersion,
capabilities: request.capabilities,
);
print('📱 Device registered: ${device.deviceId}');
return device;
},
);
// Sensor data ingestion
_server.registerEndpoint<SendReadingsRequest, SendReadingsResponse>(
'send_readings',
(request) async {
final result = await _dataProcessor.processReadings(
deviceId: request.deviceId,
readings: request.readings,
);
// Update device last seen
await _deviceManager.updateLastSeen(request.deviceId);
return SendReadingsResponse(
accepted: result.accepted,
rejected: result.rejected,
);
},
);
// Command retrieval
_server.registerEndpoint<GetCommandsRequest, GetCommandsResponse>(
'get_commands',
(request) async {
final commands = await _commandQueue.getCommandsForDevice(
deviceId: request.deviceId,
since: request.since,
);
return GetCommandsResponse(commands: commands);
},
);
}
void _startBackgroundTasks() {
// Device health monitoring
Timer.periodic(Duration(minutes: 1), (_) async {
await _deviceManager.checkDeviceHealth();
});
// Data aggregation
Timer.periodic(Duration(minutes: 5), (_) async {
await _dataProcessor.aggregateData();
});
// Command cleanup
Timer.periodic(Duration(hours: 1), (_) async {
await _commandQueue.cleanupExpiredCommands();
});
}
}
class DataProcessor {
final TimeSeriesDB _timeSeriesDB;
final AlertManager _alertManager;
final Map<String, SensorCalibration> _calibrations = {};
DataProcessor(this._timeSeriesDB, this._alertManager);
Future<ProcessingResult> processReadings({
required String deviceId,
required List<SensorReading> readings,
}) async {
int accepted = 0;
int rejected = 0;
final processedReadings = <SensorReading>[];
for (final reading in readings) {
try {
// Validate reading
if (!_validateReading(reading)) {
rejected++;
continue;
}
// Apply calibration
final calibratedReading = await _applyCalibration(reading);
// Check for anomalies
final anomaly = await _detectAnomaly(calibratedReading);
if (anomaly != null) {
await _alertManager.sendAlert(anomaly);
}
processedReadings.add(calibratedReading);
accepted++;
} catch (e) {
print('Error processing reading: $e');
rejected++;
}
}
// Batch insert to time series database
if (processedReadings.isNotEmpty) {
await _timeSeriesDB.insertReadings(processedReadings);
}
return ProcessingResult(accepted: accepted, rejected: rejected);
}
bool _validateReading(SensorReading reading) {
// Basic validation
if (reading.deviceId.isEmpty) return false;
if (reading.timestamp.isAfter(DateTime.now().add(Duration(minutes: 5)))) return false;
if (reading.timestamp.isBefore(DateTime.now().subtract(Duration(days: 1)))) return false;
// Sensor-specific validation
switch (reading.sensorType) {
case SensorType.temperature:
return reading.value >= -100 && reading.value <= 200; // Celsius
case SensorType.humidity:
return reading.value >= 0 && reading.value <= 100; // Percentage
case SensorType.pressure:
return reading.value >= 0 && reading.value <= 2000; // hPa
default:
return true;
}
}
Future<SensorReading> _applyCalibration(SensorReading reading) async {
final calibration = _calibrations[reading.deviceId];
if (calibration == null) {
return reading;
}
final calibratedValue = reading.value * calibration.multiplier + calibration.offset;
return reading.copyWith(value: calibratedValue);
}
Future<Anomaly?> _detectAnomaly(SensorReading reading) async {
// Get recent readings for comparison
final recentReadings = await _timeSeriesDB.getRecentReadings(
deviceId: reading.deviceId,
sensorType: reading.sensorType,
duration: Duration(hours: 1),
);
if (recentReadings.length < 10) {
return null; // Not enough data for anomaly detection
}
// Calculate statistics
final values = recentReadings.map((r) => r.value).toList();
final mean = values.reduce((a, b) => a + b) / values.length;
final variance = values.map((v) => pow(v - mean, 2)).reduce((a, b) => a + b) / values.length;
final stdDev = sqrt(variance);
// Check if current reading is an outlier (3-sigma rule)
final zScore = (reading.value - mean) / stdDev;
if (zScore.abs() > 3) {
return Anomaly(
deviceId: reading.deviceId,
sensorType: reading.sensorType,
value: reading.value,
expectedRange: (mean - 2 * stdDev, mean + 2 * stdDev),
severity: zScore.abs() > 5 ? AnomalySeverity.critical : AnomalySeverity.warning,
timestamp: reading.timestamp,
);
}
return null;
}
}
// IoT Device Client
class IoTDeviceClient {
final String _deviceId;
final HypermodernClient _client;
final Queue<SensorReading> _readingBuffer = Queue();
Timer? _sendTimer;
IoTDeviceClient({
required String deviceId,
required String serverUrl,
}) : _deviceId = deviceId,
_client = HypermodernClient.tcp(serverUrl);
Future<void> connect() async {
await _client.connect();
// Register device
await _registerDevice();
// Start periodic data sending
_startDataSending();
// Start command polling
_startCommandPolling();
print('🔌 IoT device connected: $_deviceId');
}
Future<void> _registerDevice() async {
final request = RegisterDeviceRequest(
deviceId: _deviceId,
name: 'Temperature Sensor #$_deviceId',
type: DeviceType.sensor,
firmwareVersion: '1.2.3',
capabilities: ['temperature', 'humidity'],
);
await _client.request<Device>('register_device', request);
}
void addReading(SensorReading reading) {
_readingBuffer.add(reading);
// Limit buffer size
while (_readingBuffer.length > 1000) {
_readingBuffer.removeFirst();
}
}
void _startDataSending() {
_sendTimer = Timer.periodic(Duration(seconds: 30), (_) async {
if (_readingBuffer.isNotEmpty) {
await _sendBufferedReadings();
}
});
}
Future<void> _sendBufferedReadings() async {
final readings = <SensorReading>[];
// Drain buffer
while (_readingBuffer.isNotEmpty && readings.length < 100) {
readings.add(_readingBuffer.removeFirst());
}
if (readings.isEmpty) return;
try {
final request = SendReadingsRequest(
deviceId: _deviceId,
readings: readings,
);
final response = await _client.request<SendReadingsResponse>('send_readings', request);
print('📊 Sent ${response.accepted} readings, ${response.rejected} rejected');
} catch (e) {
print('❌ Failed to send readings: $e');
// Re-queue readings for retry
readings.reversed.forEach(_readingBuffer.addFirst);
}
}
void _startCommandPolling() {
Timer.periodic(Duration(seconds: 10), (_) async {
await _pollCommands();
});
}
Future<void> _pollCommands() async {
try {
final request = GetCommandsRequest(deviceId: _deviceId);
final response = await _client.request<GetCommandsResponse>('get_commands', request);
for (final command in response.commands) {
await _executeCommand(command);
}
} catch (e) {
print('❌ Failed to poll commands: $e');
}
}
Future<void> _executeCommand(DeviceCommand command) async {
print('⚡ Executing command: ${command.command}');
try {
switch (command.command) {
case 'reboot':
await _reboot();
break;
case 'update_firmware':
await _updateFirmware(command.parameters['version'] as String);
break;
case 'set_sampling_rate':
await _setSamplingRate(command.parameters['rate'] as int);
break;
default:
print('❓ Unknown command: ${command.command}');
}
} catch (e) {
print('❌ Command execution failed: $e');
}
}
Future<void> _reboot() async {
print('🔄 Rebooting device...');
// Simulate reboot
await Future.delayed(Duration(seconds: 2));
}
Future<void> _updateFirmware(String version) async {
print('📦 Updating firmware to $version...');
// Simulate firmware update
await Future.delayed(Duration(seconds: 10));
}
Future<void> _setSamplingRate(int rate) async {
print('⏱️ Setting sampling rate to ${rate}Hz');
// Update sampling configuration
}
}
What's Next
These real-world examples demonstrate how to build complete applications using Hypermodern's capabilities. In the next chapter, we'll explore the module ecosystem and integration patterns, showing how to extend Hypermodern with third-party services and create reusable components for the community.