Skip to main content

Real-World Examples

AI-Powered Document Search Application

This example demonstrates building a modern document search application using UUID v7 for time-ordered identifiers and vector embeddings for semantic search capabilities.

Application Overview

Our document search application will feature:

  • Time-ordered document IDs using UUID v7
  • Semantic search using vector embeddings
  • Real-time document updates via WebSocket
  • Multi-protocol API (HTTP, WebSocket, TCP)
  • Similarity-based document recommendations

Schema Definition

{
  "models": {
    "document": {
      "id": {
        "type": "uuid_v7",
        "primary_key": true,
        "description": "Time-ordered UUID for better database performance"
      },
      "title": "string",
      "content": "string",
      "summary": "string?",
      "author_id": "uuid_v7",
      "category": "@document_category",
      "tags": ["string"],
      "content_embedding": {
        "type": "vector",
        "dimensions": 1536,
        "description": "OpenAI text embedding for semantic search"
      },
      "title_embedding": {
        "type": "vector", 
        "dimensions": 1536,
        "description": "Separate embedding for title-based search"
      },
      "view_count": "int32",
      "like_count": "int32",
      "metadata": "map<string, any>",
      "created_at": "datetime",
      "updated_at": "datetime",
      "indexed_at": "datetime?"
    },
    "user": {
      "id": "uuid_v7",
      "username": "string",
      "email": "string",
      "full_name": "string",
      "preferences": {
        "search_history": ["string"],
        "favorite_categories": ["@document_category"],
        "embedding_model": "string"
      },
      "created_at": "datetime"
    },
    "search_query": {
      "id": "uuid_v7",
      "user_id": "uuid_v7",
      "query_text": "string",
      "query_embedding": {
        "type": "vector",
        "dimensions": 1536
      },
      "filters": "map<string, any>",
      "result_count": "int32",
      "execution_time_ms": "int32",
      "created_at": "datetime"
    },
    "document_interaction": {
      "id": "uuid_v7",
      "user_id": "uuid_v7",
      "document_id": "uuid_v7",
      "interaction_type": "@interaction_type",
      "similarity_score": "float64?",
      "created_at": "datetime"
    }
  },
  "enums": {
    "document_category": ["technology", "science", "business", "health", "education", "entertainment"],
    "interaction_type": ["view", "like", "share", "download", "bookmark"]
  },
  "endpoints": {
    "search_documents": {
      "method": "POST",
      "path": "/documents/search",
      "request": {
        "query": "string",
        "filters": {
          "categories": ["@document_category"],
          "date_range": {
            "start": "datetime?",
            "end": "datetime?"
          },
          "author_id": "uuid_v7?"
        },
        "limit": "int32",
        "similarity_threshold": "float64"
      },
      "response": {
        "documents": [{
          "document": "@document",
          "similarity_score": "float64",
          "relevance_explanation": "string"
        }],
        "total_count": "int32",
        "execution_time_ms": "int32"
      },
      "transports": ["http", "websocket", "tcp"]
    },
    "get_similar_documents": {
      "method": "GET",
      "path": "/documents/{id}/similar",
      "request": {
        "id": "uuid_v7",
        "limit": "int32"
      },
      "response": {
        "similar_documents": [{
          "document": "@document",
          "similarity_score": "float64"
        }]
      },
      "transports": ["http", "websocket", "tcp"]
    },
    "create_document": {
      "method": "POST",
      "path": "/documents",
      "request": {
        "title": "string",
        "content": "string",
        "category": "@document_category",
        "tags": ["string"]
      },
      "response": "@document",
      "transports": ["http", "websocket", "tcp"]
    },
    "watch_document_updates": {
      "type": "stream",
      "description": "Real-time document updates",
      "request": {
        "categories": ["@document_category"],
        "user_id": "uuid_v7?"
      },
      "response": {
        "event_type": "string",
        "document": "@document",
        "timestamp": "datetime"
      },
      "transports": ["websocket", "tcp"]
    }
  }
}

Database Migrations

// Migration 1: Enable required extensions
class EnableAdvancedExtensions extends Migration {
  @override
  Future<void> up() async {
    await execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";');
    await execute('CREATE EXTENSION IF NOT EXISTS pgcrypto;');
    await execute('CREATE EXTENSION IF NOT EXISTS vector;');
    
    // Create UUID v7 generation function
    await execute('''
      CREATE OR REPLACE FUNCTION generate_uuid_v7()
      RETURNS UUID AS \$\$
      DECLARE
          unix_ts_ms BIGINT;
          uuid_bytes BYTEA;
      BEGIN
          unix_ts_ms := FLOOR(EXTRACT(EPOCH FROM NOW()) * 1000);
          uuid_bytes := 
              SUBSTRING(INT8SEND(unix_ts_ms), 3, 6) ||
              GEN_RANDOM_BYTES(2) ||
              (B'0111' || SUBSTRING(GEN_RANDOM_BYTES(1), 1, 1)::BIT(4))::BIT(8)::BYTEA ||
              GEN_RANDOM_BYTES(7);
          RETURN ENCODE(uuid_bytes, 'hex')::UUID;
      END;
      \$\$ LANGUAGE plpgsql;
    ''');
  }

  @override
  Future<void> down() async {
    await execute('DROP FUNCTION IF EXISTS generate_uuid_v7();');
    await execute('DROP EXTENSION IF EXISTS vector CASCADE;');
  }
}

// Migration 2: Create core tables
class CreateDocumentSearchTables extends Migration {
  @override
  Future<void> up() async {
    // Users table
    create('users', (Schema table) {
      table.addColumn('id', 'UUID', nullable: false, defaultValue: 'generate_uuid_v7()');
      table.primary('id');
      table.addColumn('username', 'VARCHAR', length: 50, nullable: false, unique: true);
      table.addColumn('email', 'VARCHAR', length: 255, nullable: false, unique: true);
      table.addColumn('full_name', 'VARCHAR', length: 255, nullable: false);
      table.addColumn('preferences', 'JSONB', defaultValue: '{}');
      table.addColumn('created_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
    });

    // Documents table with vector embeddings
    create('documents', (Schema table) {
      table.addColumn('id', 'UUID', nullable: false, defaultValue: 'generate_uuid_v7()');
      table.primary('id');
      table.addColumn('title', 'VARCHAR', length: 500, nullable: false);
      table.addColumn('content', 'TEXT', nullable: false);
      table.addColumn('summary', 'TEXT', nullable: true);
      table.addColumn('author_id', 'UUID', nullable: false);
      table.addColumn('category', 'VARCHAR', length: 50, nullable: false);
      table.addColumn('tags', 'TEXT[]', defaultValue: '{}');
      
      // Vector embeddings for semantic search
      table.addColumn('content_embedding', 'VECTOR(1536)', nullable: true);
      table.addColumn('title_embedding', 'VECTOR(1536)', nullable: true);
      
      table.addColumn('view_count', 'INTEGER', defaultValue: 0);
      table.addColumn('like_count', 'INTEGER', defaultValue: 0);
      table.addColumn('metadata', 'JSONB', defaultValue: '{}');
      table.addColumn('created_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
      table.addColumn('updated_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
      table.addColumn('indexed_at', 'TIMESTAMP WITH TIME ZONE', nullable: true);
      
      // Foreign key to users
      table.foreign('author_id', 'users', 'id', onDelete: 'CASCADE');
    });

    // Search queries table for analytics
    create('search_queries', (Schema table) {
      table.addColumn('id', 'UUID', nullable: false, defaultValue: 'generate_uuid_v7()');
      table.primary('id');
      table.addColumn('user_id', 'UUID', nullable: false);
      table.addColumn('query_text', 'TEXT', nullable: false);
      table.addColumn('query_embedding', 'VECTOR(1536)', nullable: true);
      table.addColumn('filters', 'JSONB', defaultValue: '{}');
      table.addColumn('result_count', 'INTEGER', nullable: false);
      table.addColumn('execution_time_ms', 'INTEGER', nullable: false);
      table.addColumn('created_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
      
      table.foreign('user_id', 'users', 'id', onDelete: 'CASCADE');
    });

    // Document interactions for recommendations
    create('document_interactions', (Schema table) {
      table.addColumn('id', 'UUID', nullable: false, defaultValue: 'generate_uuid_v7()');
      table.primary('id');
      table.addColumn('user_id', 'UUID', nullable: false);
      table.addColumn('document_id', 'UUID', nullable: false);
      table.addColumn('interaction_type', 'VARCHAR', length: 20, nullable: false);
      table.addColumn('similarity_score', 'DOUBLE PRECISION', nullable: true);
      table.addColumn('created_at', 'TIMESTAMP WITH TIME ZONE', defaultValue: 'NOW()');
      
      table.foreign('user_id', 'users', 'id', onDelete: 'CASCADE');
      table.foreign('document_id', 'documents', 'id', onDelete: 'CASCADE');
    });
  }

  @override
  Future<void> down() async {
    await drop('document_interactions');
    await drop('search_queries');
    await drop('documents');
    await drop('users');
  }
}

// Migration 3: Create vector indexes for performance
class CreateVectorIndexes extends Migration {
  @override
  Future<void> up() async {
    // HNSW indexes for fast similarity search
    await execute('''
      CREATE INDEX idx_documents_content_embedding_hnsw 
      ON documents USING hnsw (content_embedding vector_cosine_ops) 
      WITH (m = 16, ef_construction = 64);
    ''');
    
    await execute('''
      CREATE INDEX idx_documents_title_embedding_hnsw 
      ON documents USING hnsw (title_embedding vector_cosine_ops) 
      WITH (m = 16, ef_construction = 64);
    ''');
    
    await execute('''
      CREATE INDEX idx_search_queries_embedding_hnsw 
      ON search_queries USING hnsw (query_embedding vector_cosine_ops) 
      WITH (m = 16, ef_construction = 64);
    ''');
    
    // Traditional indexes for filtering
    await execute('CREATE INDEX idx_documents_category ON documents(category);');
    await execute('CREATE INDEX idx_documents_author_id ON documents(author_id);');
    await execute('CREATE INDEX idx_documents_created_at ON documents(created_at);');
    await execute('CREATE INDEX idx_documents_tags ON documents USING GIN(tags);');
  }

  @override
  Future<void> down() async {
    await execute('DROP INDEX IF EXISTS idx_documents_content_embedding_hnsw;');
    await execute('DROP INDEX IF EXISTS idx_documents_title_embedding_hnsw;');
    await execute('DROP INDEX IF EXISTS idx_search_queries_embedding_hnsw;');
    await execute('DROP INDEX IF EXISTS idx_documents_category;');
    await execute('DROP INDEX IF EXISTS idx_documents_author_id;');
    await execute('DROP INDEX IF EXISTS idx_documents_created_at;');
    await execute('DROP INDEX IF EXISTS idx_documents_tags;');
  }
}

Server Implementation

import 'package:hypermodern_server/hypermodern_server.dart';
import 'package:openai_dart/openai_dart.dart';

class DocumentSearchServer {
  final HypermodernServer _server;
  final DocumentService _documentService;
  final SearchService _searchService;
  final EmbeddingService _embeddingService;
  
  DocumentSearchServer({
    required DocumentService documentService,
    required SearchService searchService,
    required EmbeddingService embeddingService,
  }) : _server = HypermodernServer(),
       _documentService = documentService,
       _searchService = searchService,
       _embeddingService = embeddingService;
  
  Future<void> start() async {
    _registerEndpoints();
    await _server.listen(
      httpPort: 8080,
      websocketPort: 8082,
      tcpPort: 8081,
    );
    
    print('🚀 Document Search Server started');
    print('   HTTP: http://localhost:8080');
    print('   WebSocket: ws://localhost:8082');
    print('   TCP: localhost:8081');
  }
  
  void _registerEndpoints() {
    // Document search with semantic similarity
    _server.registerEndpoint<SearchDocumentsRequest, SearchDocumentsResponse>(
      'search_documents',
      (request) async {
        final startTime = DateTime.now();
        
        // Generate embedding for search query
        final queryEmbedding = await _embeddingService.generateEmbedding(request.query);
        
        // Perform semantic search
        final results = await _searchService.searchDocuments(
          queryEmbedding: queryEmbedding,
          queryText: request.query,
          filters: request.filters,
          limit: request.limit,
          similarityThreshold: request.similarityThreshold,
        );
        
        final executionTime = DateTime.now().difference(startTime).inMilliseconds;
        
        // Log search query for analytics
        await _searchService.logSearchQuery(
          userId: request.userId,
          queryText: request.query,
          queryEmbedding: queryEmbedding,
          filters: request.filters,
          resultCount: results.length,
          executionTimeMs: executionTime,
        );
        
        return SearchDocumentsResponse(
          documents: results,
          totalCount: results.length,
          executionTimeMs: executionTime,
        );
      },
    );
    
    // Get similar documents
    _server.registerEndpoint<GetSimilarDocumentsRequest, GetSimilarDocumentsResponse>(
      'get_similar_documents',
      (request) async {
        final document = await _documentService.getDocument(request.id);
        if (document == null) {
          throw NotFoundException('Document not found');
        }
        
        final similarDocuments = await _searchService.findSimilarDocuments(
          documentId: request.id,
          limit: request.limit,
        );
        
        return GetSimilarDocumentsResponse(
          similarDocuments: similarDocuments,
        );
      },
    );
    
    // Create new document with automatic embedding generation
    _server.registerEndpoint<CreateDocumentRequest, Document>(
      'create_document',
      (request) async {
        // Generate embeddings for title and content
        final titleEmbedding = await _embeddingService.generateEmbedding(request.title);
        final contentEmbedding = await _embeddingService.generateEmbedding(request.content);
        
        final document = await _documentService.createDocument(
          title: request.title,
          content: request.content,
          authorId: request.authorId,
          category: request.category,
          tags: request.tags,
          titleEmbedding: titleEmbedding,
          contentEmbedding: contentEmbedding,
        );
        
        // Broadcast document creation to subscribers
        _server.broadcast('document_created', {
          'event_type': 'created',
          'document': document.toJson(),
          'timestamp': DateTime.now().toIso8601String(),
        });
        
        return document;
      },
    );
    
    // Real-time document updates stream
    _server.registerStreamingEndpoint<WatchDocumentUpdatesRequest, DocumentUpdateEvent>(
      'watch_document_updates',
      (request) async* {
        await for (final event in _documentService.watchDocumentUpdates(
          categories: request.categories,
          userId: request.userId,
        )) {
          yield event;
        }
      },
    );
  }
}

class DocumentService {
  final Database _db;
  final StreamController<DocumentUpdateEvent> _updateController;
  
  DocumentService(this._db) : _updateController = StreamController.broadcast();
  
  Future<Document> createDocument({
    required String title,
    required String content,
    required String authorId,
    required DocumentCategory category,
    required List<String> tags,
    required List<double> titleEmbedding,
    required List<double> contentEmbedding,
  }) async {
    final titleVector = Vector.fromList(titleEmbedding);
    final contentVector = Vector.fromList(contentEmbedding);
    
    final result = await _db.query('''
      INSERT INTO documents (
        title, content, author_id, category, tags, 
        title_embedding, content_embedding, indexed_at
      )
      VALUES (?, ?, ?, ?, ?, ?, ?, NOW())
      RETURNING *
    ''', [
      title,
      content,
      authorId,
      category.toString(),
      tags,
      titleVector.toPostgresString(),
      contentVector.toPostgresString(),
    ]);
    
    final document = Document.fromJson(result.first);
    
    // Emit update event
    _updateController.add(DocumentUpdateEvent(
      eventType: 'created',
      document: document,
      timestamp: DateTime.now(),
    ));
    
    return document;
  }
  
  Future<Document?> getDocument(String id) async {
    if (!UuidV7.isValid(id)) {
      throw ArgumentError('Invalid document ID format');
    }
    
    final result = await _db.query('''
      SELECT d.*, u.username as author_username, u.full_name as author_name
      FROM documents d
      JOIN users u ON d.author_id = u.id
      WHERE d.id = ?
    ''', [id]);
    
    if (result.isEmpty) return null;
    
    // Increment view count
    await _db.query('UPDATE documents SET view_count = view_count + 1 WHERE id = ?', [id]);
    
    return Document.fromJson(result.first);
  }
  
  Stream<DocumentUpdateEvent> watchDocumentUpdates({
    List<DocumentCategory>? categories,
    String? userId,
  }) {
    return _updateController.stream.where((event) {
      if (categories != null && !categories.contains(event.document.category)) {
        return false;
      }
      
      if (userId != null && event.document.authorId != userId) {
        return false;
      }
      
      return true;
    });
  }
}

class SearchService {
  final Database _db;
  
  SearchService(this._db);
  
  Future<List<DocumentSearchResult>> searchDocuments({
    required List<double> queryEmbedding,
    required String queryText,
    SearchFilters? filters,
    int limit = 20,
    double similarityThreshold = 0.7,
  }) async {
    final queryVector = Vector.fromList(queryEmbedding);
    final queryVectorStr = queryVector.toPostgresString();
    
    final whereConditions = <String>['d.content_embedding IS NOT NULL'];
    final parameters = <dynamic>[queryVectorStr, queryVectorStr, similarityThreshold];
    
    // Add category filter
    if (filters?.categories != null && filters!.categories!.isNotEmpty) {
      final categoryPlaceholders = List.filled(filters.categories!.length, '?').join(', ');
      whereConditions.add('d.category IN ($categoryPlaceholders)');
      parameters.addAll(filters.categories!.map((c) => c.toString()));
    }
    
    // Add date range filter
    if (filters?.dateRange?.start != null) {
      whereConditions.add('d.created_at >= ?');
      parameters.add(filters!.dateRange!.start);
    }
    
    if (filters?.dateRange?.end != null) {
      whereConditions.add('d.created_at <= ?');
      parameters.add(filters!.dateRange!.end);
    }
    
    // Add author filter
    if (filters?.authorId != null) {
      whereConditions.add('d.author_id = ?');
      parameters.add(filters!.authorId);
    }
    
    parameters.addAll([queryVectorStr, limit]);
    
    final result = await _db.query('''
      SELECT 
        d.*,
        u.username as author_username,
        u.full_name as author_name,
        1 - (d.content_embedding <=> ?) as content_similarity,
        1 - (d.title_embedding <=> ?) as title_similarity,
        GREATEST(
          1 - (d.content_embedding <=> ?),
          1 - (d.title_embedding <=> ?)
        ) as max_similarity
      FROM documents d
      JOIN users u ON d.author_id = u.id
      WHERE ${whereConditions.join(' AND ')}
        AND GREATEST(
          1 - (d.content_embedding <=> ?),
          1 - (d.title_embedding <=> ?)
        ) > ?
      ORDER BY max_similarity DESC
      LIMIT ?
    ''', parameters);
    
    return result.map((row) {
      final document = Document.fromJson(row);
      final contentSimilarity = row['content_similarity'] as double;
      final titleSimilarity = row['title_similarity'] as double;
      final maxSimilarity = row['max_similarity'] as double;
      
      return DocumentSearchResult(
        document: document,
        similarityScore: maxSimilarity,
        relevanceExplanation: _generateRelevanceExplanation(
          queryText,
          document,
          contentSimilarity,
          titleSimilarity,
        ),
      );
    }).toList();
  }
  
  Future<List<SimilarDocument>> findSimilarDocuments({
    required String documentId,
    int limit = 10,
  }) async {
    final result = await _db.query('''
      WITH target_doc AS (
        SELECT content_embedding, title_embedding 
        FROM documents 
        WHERE id = ?
      )
      SELECT 
        d.*,
        u.username as author_username,
        u.full_name as author_name,
        GREATEST(
          1 - (d.content_embedding <=> (SELECT content_embedding FROM target_doc)),
          1 - (d.title_embedding <=> (SELECT title_embedding FROM target_doc))
        ) as similarity_score
      FROM documents d
      JOIN users u ON d.author_id = u.id
      CROSS JOIN target_doc
      WHERE d.id != ?
        AND d.content_embedding IS NOT NULL
        AND d.title_embedding IS NOT NULL
      ORDER BY similarity_score DESC
      LIMIT ?
    ''', [documentId, documentId, limit]);
    
    return result.map((row) => SimilarDocument(
      document: Document.fromJson(row),
      similarityScore: row['similarity_score'] as double,
    )).toList();
  }
  
  Future<void> logSearchQuery({
    required String userId,
    required String queryText,
    required List<double> queryEmbedding,
    required SearchFilters filters,
    required int resultCount,
    required int executionTimeMs,
  }) async {
    final queryVector = Vector.fromList(queryEmbedding);
    
    await _db.query('''
      INSERT INTO search_queries (
        user_id, query_text, query_embedding, filters, 
        result_count, execution_time_ms
      )
      VALUES (?, ?, ?, ?, ?, ?)
    ''', [
      userId,
      queryText,
      queryVector.toPostgresString(),
      jsonEncode(filters.toJson()),
      resultCount,
      executionTimeMs,
    ]);
  }
  
  String _generateRelevanceExplanation(
    String queryText,
    Document document,
    double contentSimilarity,
    double titleSimilarity,
  ) {
    final explanations = <String>[];
    
    if (titleSimilarity > 0.8) {
      explanations.add('Strong title match');
    } else if (titleSimilarity > 0.6) {
      explanations.add('Good title relevance');
    }
    
    if (contentSimilarity > 0.8) {
      explanations.add('High content similarity');
    } else if (contentSimilarity > 0.6) {
      explanations.add('Relevant content match');
    }
    
    // Check for tag matches
    final queryWords = queryText.toLowerCase().split(' ');
    final matchingTags = document.tags.where((tag) =>
        queryWords.any((word) => tag.toLowerCase().contains(word))
    ).toList();
    
    if (matchingTags.isNotEmpty) {
      explanations.add('Matching tags: ${matchingTags.join(', ')}');
    }
    
    return explanations.isEmpty ? 'Semantic similarity' : explanations.join('; ');
  }
}

class EmbeddingService {
  final OpenAI _openai;
  
  EmbeddingService(String apiKey) : _openai = OpenAI(apiKey: apiKey);
  
  Future<List<double>> generateEmbedding(String text) async {
    try {
      final response = await _openai.embeddings.create(
        model: 'text-embedding-3-small',
        input: text,
      );
      
      return response.data.first.embedding;
    } catch (e) {
      throw EmbeddingException('Failed to generate embedding: $e');
    }
  }
  
  Future<Map<String, List<double>>> generateBatchEmbeddings(List<String> texts) async {
    final results = <String, List<double>>{};
    
    // Process in batches to avoid API limits
    const batchSize = 100;
    for (int i = 0; i < texts.length; i += batchSize) {
      final batch = texts.skip(i).take(batchSize).toList();
      
      final response = await _openai.embeddings.create(
        model: 'text-embedding-3-small',
        input: batch,
      );
      
      for (int j = 0; j < batch.length; j++) {
        results[batch[j]] = response.data[j].embedding;
      }
    }
    
    return results;
  }
}

Client Implementation

import 'package:hypermodern/hypermodern.dart';

class DocumentSearchClient {
  final HypermodernClient _client;
  
  DocumentSearchClient(String baseUrl) : _client = HypermodernClient(baseUrl);
  
  Future<void> connect() async {
    await _client.connect();
  }
  
  Future<SearchDocumentsResponse> searchDocuments({
    required String query,
    SearchFilters? filters,
    int limit = 20,
    double similarityThreshold = 0.7,
  }) async {
    return await _client.request<SearchDocumentsResponse>(
      'search_documents',
      SearchDocumentsRequest(
        query: query,
        filters: filters ?? SearchFilters(),
        limit: limit,
        similarityThreshold: similarityThreshold,
      ),
    );
  }
  
  Future<GetSimilarDocumentsResponse> getSimilarDocuments(String documentId, {int limit = 10}) async {
    return await _client.request<GetSimilarDocumentsResponse>(
      'get_similar_documents',
      GetSimilarDocumentsRequest(id: documentId, limit: limit),
    );
  }
  
  Future<Document> createDocument({
    required String title,
    required String content,
    required String authorId,
    required DocumentCategory category,
    List<String> tags = const [],
  }) async {
    return await _client.request<Document>(
      'create_document',
      CreateDocumentRequest(
        title: title,
        content: content,
        authorId: authorId,
        category: category,
        tags: tags,
      ),
    );
  }
  
  Stream<DocumentUpdateEvent> watchDocumentUpdates({
    List<DocumentCategory>? categories,
    String? userId,
  }) {
    return _client.stream<DocumentUpdateEvent>(
      'watch_document_updates',
      WatchDocumentUpdatesRequest(
        categories: categories,
        userId: userId,
      ),
    );
  }
  
  Future<void> disconnect() async {
    await _client.disconnect();
  }
}

// Usage example
void main() async {
  final client = DocumentSearchClient('http://localhost:8080');
  await client.connect();
  
  try {
    // Search for documents
    final searchResults = await client.searchDocuments(
      query: 'machine learning algorithms',
      filters: SearchFilters(
        categories: [DocumentCategory.technology, DocumentCategory.science],
        dateRange: DateRange(
          start: DateTime.now().subtract(Duration(days: 30)),
        ),
      ),
      limit: 10,
      similarityThreshold: 0.75,
    );
    
    print('Found ${searchResults.documents.length} documents');
    for (final result in searchResults.documents) {
      print('${result.document.title} (${result.similarityScore.toStringAsFixed(3)})');
      print('  ${result.relevanceExplanation}');
    }
    
    // Get similar documents
    if (searchResults.documents.isNotEmpty) {
      final firstDoc = searchResults.documents.first.document;
      final similarDocs = await client.getSimilarDocuments(firstDoc.id);
      
      print('\nSimilar to "${firstDoc.title}":');
      for (final similar in similarDocs.similarDocuments) {
        print('  ${similar.document.title} (${similar.similarityScore.toStringAsFixed(3)})');
      }
    }
    
    // Watch for real-time updates
    final updateStream = client.watchDocumentUpdates(
      categories: [DocumentCategory.technology],
    );
    
    await for (final update in updateStream.take(5)) {
      print('Document ${update.eventType}: ${update.document.title}');
    }
    
  } finally {
    await client.disconnect();
  }
}

Key Features Demonstrated

  1. UUID v7 Benefits:

    • Time-ordered IDs for better database performance
    • Natural sorting by creation time
    • Reduced index fragmentation
  2. Vector Embeddings:

    • Semantic search capabilities
    • Similarity-based recommendations
    • Multiple embedding types (title vs. content)
  3. Performance Optimizations:

    • HNSW indexes for fast vector similarity search
    • Batch embedding generation
    • Query result caching
  4. Real-time Features:

    • WebSocket streaming for live updates
    • Multi-protocol support (HTTP, WebSocket, TCP)
    • Broadcast notifications
  5. Analytics and Monitoring:

    • Search query logging
    • Performance metrics
    • User interaction tracking

This example showcases how UUID v7 and vector data types work together to create a modern, high-performance document search application with semantic capabilities.

Chat Application

Let's build a complete real-time chat application that demonstrates Hypermodern's multi-protocol capabilities, real-time streaming, and user management.

Schema Definition

{
  "models": {
    "user": {
      "id": "int64",
      "username": "string",
      "email": "string",
      "display_name": "string",
      "avatar_url": "string?",
      "status": "@user_status",
      "last_seen": "datetime",
      "created_at": "datetime"
    },
    "chat_room": {
      "id": "int64",
      "name": "string",
      "description": "string?",
      "type": "@room_type",
      "owner_id": "int64",
      "created_at": "datetime",
      "member_count": "int32"
    },
    "message": {
      "id": "int64",
      "room_id": "int64",
      "user_id": "int64",
      "content": "string",
      "message_type": "@message_type",
      "reply_to_id": "int64?",
      "attachments": ["@attachment"],
      "created_at": "datetime",
      "edited_at": "datetime?"
    },
    "attachment": {
      "id": "int64",
      "filename": "string",
      "content_type": "string",
      "size": "int64",
      "url": "string"
    },
    "room_member": {
      "room_id": "int64",
      "user_id": "int64",
      "role": "@member_role",
      "joined_at": "datetime",
      "last_read_message_id": "int64?"
    }
  },
  "enums": {
    "user_status": ["online", "away", "busy", "offline"],
    "room_type": ["public", "private", "direct"],
    "message_type": ["text", "image", "file", "system"],
    "member_role": ["owner", "admin", "member"]
  },
  "endpoints": {
    "create_room": {
      "method": "POST",
      "path": "/rooms",
      "request": {
        "name": "string",
        "description": "string?",
        "type": "@room_type"
      },
      "response": "@chat_room",
      "transports": ["http", "websocket"]
    },
    "join_room": {
      "method": "POST",
      "path": "/rooms/{room_id}/join",
      "request": {
        "room_id": "int64"
      },
      "response": {
        "success": "bool",
        "room": "@chat_room"
      },
      "transports": ["http", "websocket"]
    },
    "send_message": {
      "method": "POST",
      "path": "/rooms/{room_id}/messages",
      "request": {
        "room_id": "int64",
        "content": "string",
        "message_type": "@message_type",
        "reply_to_id": "int64?"
      },
      "response": "@message",
      "transports": ["http", "websocket"]
    },
    "get_messages": {
      "method": "GET",
      "path": "/rooms/{room_id}/messages",
      "request": {
        "room_id": "int64",
        "limit": "int32?",
        "before_id": "int64?"
      },
      "response": {
        "messages": ["@message"],
        "has_more": "bool"
      },
      "transports": ["http", "websocket"]
    },
    "watch_room": {
      "type": "stream",
      "request": {
        "room_id": "int64"
      },
      "response": "@room_event",
      "transports": ["websocket", "tcp"]
    }
  }
}

Server Implementation

class ChatServer {
  final HypermodernServer _server;
  final ChatService _chatService;
  final UserService _userService;
  final RealTimeBroadcaster _broadcaster;
  
  ChatServer({
    required HypermodernServer server,
    required ChatService chatService,
    required UserService userService,
    required RealTimeBroadcaster broadcaster,
  }) : _server = server,
       _chatService = chatService,
       _userService = userService,
       _broadcaster = broadcaster;
  
  Future<void> initialize() async {
    _registerEndpoints();
    _registerStreamingEndpoints();
    _setupMiddleware();
    
    await _server.listen();
    print('🚀 Chat server running on multiple protocols');
  }
  
  void _registerEndpoints() {
    // Room management
    _server.registerEndpoint<CreateRoomRequest, ChatRoom>(
      'create_room',
      (request) async {
        final userId = _getCurrentUserId(request.context);
        final room = await _chatService.createRoom(
          name: request.name,
          description: request.description,
          type: request.type,
          ownerId: userId,
        );
        
        // Broadcast room creation
        _broadcaster.broadcast('global', RoomCreatedEvent(room: room));
        
        return room;
      },
    );
    
    _server.registerEndpoint<JoinRoomRequest, JoinRoomResponse>(
      'join_room',
      (request) async {
        final userId = _getCurrentUserId(request.context);
        
        final room = await _chatService.joinRoom(
          roomId: request.roomId,
          userId: userId,
        );
        
        // Broadcast user joined
        _broadcaster.broadcast(
          'room:${request.roomId}',
          UserJoinedEvent(roomId: request.roomId, userId: userId),
        );
        
        return JoinRoomResponse(success: true, room: room);
      },
    );
    
    // Message handling
    _server.registerEndpoint<SendMessageRequest, Message>(
      'send_message',
      (request) async {
        final userId = _getCurrentUserId(request.context);
        
        // Validate user is member of room
        await _chatService.validateRoomMembership(request.roomId, userId);
        
        final message = await _chatService.sendMessage(
          roomId: request.roomId,
          userId: userId,
          content: request.content,
          messageType: request.messageType,
          replyToId: request.replyToId,
        );
        
        // Broadcast message to room members
        _broadcaster.broadcast(
          'room:${request.roomId}',
          MessageSentEvent(message: message),
        );
        
        // Update user activity
        await _userService.updateLastSeen(userId);
        
        return message;
      },
    );
    
    _server.registerEndpoint<GetMessagesRequest, GetMessagesResponse>(
      'get_messages',
      (request) async {
        final userId = _getCurrentUserId(request.context);
        
        // Validate access to room
        await _chatService.validateRoomAccess(request.roomId, userId);
        
        final result = await _chatService.getMessages(
          roomId: request.roomId,
          limit: request.limit ?? 50,
          beforeId: request.beforeId,
        );
        
        return GetMessagesResponse(
          messages: result.messages,
          hasMore: result.hasMore,
        );
      },
    );
  }
  
  void _registerStreamingEndpoints() {
    // Real-time room updates
    _server.registerStreamingEndpoint<WatchRoomRequest, RoomEvent>(
      'watch_room',
      (request) async* {
        final userId = _getCurrentUserId(request.context);
        
        // Validate access to room
        await _chatService.validateRoomAccess(request.roomId, userId);
        
        // Subscribe to room events
        final controller = StreamController<RoomEvent>();
        final channel = 'room:${request.roomId}';
        
        _broadcaster.subscribe(channel, controller);
        
        // Send initial room state
        controller.add(RoomStateEvent(
          roomId: request.roomId,
          onlineMembers: await _chatService.getOnlineMembers(request.roomId),
        ));
        
        // Clean up on disconnect
        controller.onCancel = () {
          _broadcaster.unsubscribe(channel, controller);
        };
        
        yield* controller.stream;
      },
    );
    
    // User presence updates
    _server.registerStreamingEndpoint<WatchPresenceRequest, PresenceEvent>(
      'watch_presence',
      (request) async* {
        final userId = _getCurrentUserId(request.context);
        
        final controller = StreamController<PresenceEvent>();
        _broadcaster.subscribe('presence', controller);
        
        // Send current presence state
        final onlineUsers = await _userService.getOnlineUsers();
        controller.add(PresenceStateEvent(onlineUsers: onlineUsers));
        
        // Update user's own presence
        await _userService.setUserStatus(userId, UserStatus.online);
        _broadcaster.broadcast('presence', UserStatusChangedEvent(
          userId: userId,
          status: UserStatus.online,
        ));
        
        // Handle disconnect
        controller.onCancel = () async {
          await _userService.setUserStatus(userId, UserStatus.offline);
          _broadcaster.broadcast('presence', UserStatusChangedEvent(
            userId: userId,
            status: UserStatus.offline,
          ));
          _broadcaster.unsubscribe('presence', controller);
        };
        
        yield* controller.stream;
      },
    );
  }
  
  void _setupMiddleware() {
    // Authentication required for all endpoints
    _server.middleware.add(AuthenticationMiddleware(
      jwtService: _server.container.get<JWTService>(),
      publicEndpoints: {},
    ));
    
    // Rate limiting for message sending
    _server.middleware.add(ConditionalMiddleware(
      RateLimitMiddleware(
        rateLimiter: _server.container.get<RateLimiter>(),
        endpointLimits: {
          'send_message': RateLimit(requests: 30, window: Duration(minutes: 1)),
        },
      ),
      (request) => request.endpoint == 'send_message',
    ));
    
    // Logging and metrics
    _server.middleware.add(LoggingMiddleware());
    _server.middleware.add(MetricsMiddleware(_server.container.get<PrometheusClient>()));
  }
  
  int _getCurrentUserId(Map<String, dynamic> context) {
    final userId = context['user_id'] as int?;
    if (userId == null) {
      throw UnauthorizedException('User not authenticated');
    }
    return userId;
  }
}

class ChatService {
  final Database _db;
  final FileStorageService _fileStorage;
  
  ChatService(this._db, this._fileStorage);
  
  Future<ChatRoom> createRoom({
    required String name,
    String? description,
    required RoomType type,
    required int ownerId,
  }) async {
    return await _db.transaction((tx) async {
      // Create room
      final roomResult = await tx.query('''
        INSERT INTO chat_rooms (name, description, type, owner_id, created_at)
        VALUES (?, ?, ?, ?, ?)
        RETURNING *
      ''', [name, description, type.toString(), ownerId, DateTime.now()]);
      
      final room = ChatRoom.fromJson(roomResult.first);
      
      // Add owner as member
      await tx.query('''
        INSERT INTO room_members (room_id, user_id, role, joined_at)
        VALUES (?, ?, ?, ?)
      ''', [room.id, ownerId, MemberRole.owner.toString(), DateTime.now()]);
      
      return room;
    });
  }
  
  Future<ChatRoom> joinRoom({
    required int roomId,
    required int userId,
  }) async {
    return await _db.transaction((tx) async {
      // Check if room exists and is joinable
      final roomResult = await tx.query(
        'SELECT * FROM chat_rooms WHERE id = ?',
        [roomId],
      );
      
      if (roomResult.isEmpty) {
        throw NotFoundException('Room not found');
      }
      
      final room = ChatRoom.fromJson(roomResult.first);
      
      if (room.type == RoomType.private) {
        throw ForbiddenException('Cannot join private room without invitation');
      }
      
      // Check if already a member
      final memberResult = await tx.query(
        'SELECT 1 FROM room_members WHERE room_id = ? AND user_id = ?',
        [roomId, userId],
      );
      
      if (memberResult.isNotEmpty) {
        return room; // Already a member
      }
      
      // Add as member
      await tx.query('''
        INSERT INTO room_members (room_id, user_id, role, joined_at)
        VALUES (?, ?, ?, ?)
      ''', [roomId, userId, MemberRole.member.toString(), DateTime.now()]);
      
      // Update member count
      await tx.query(
        'UPDATE chat_rooms SET member_count = member_count + 1 WHERE id = ?',
        [roomId],
      );
      
      return room;
    });
  }
  
  Future<Message> sendMessage({
    required int roomId,
    required int userId,
    required String content,
    required MessageType messageType,
    int? replyToId,
  }) async {
    // Validate content
    if (content.trim().isEmpty) {
      throw ValidationException('Message content cannot be empty');
    }
    
    if (content.length > 4000) {
      throw ValidationException('Message too long');
    }
    
    // Process mentions and links
    final processedContent = await _processMessageContent(content);
    
    final result = await _db.query('''
      INSERT INTO messages (room_id, user_id, content, message_type, reply_to_id, created_at)
      VALUES (?, ?, ?, ?, ?, ?)
      RETURNING *
    ''', [roomId, userId, processedContent, messageType.toString(), replyToId, DateTime.now()]);
    
    return Message.fromJson(result.first);
  }
  
  Future<String> _processMessageContent(String content) async {
    // Process @mentions
    final mentionRegex = RegExp(r'@(\w+)');
    var processedContent = content;
    
    final mentions = mentionRegex.allMatches(content);
    for (final mention in mentions) {
      final username = mention.group(1)!;
      final user = await _getUserByUsername(username);
      
      if (user != null) {
        processedContent = processedContent.replaceAll(
          mention.group(0)!,
          '<@${user.id}>',
        );
      }
    }
    
    return processedContent;
  }
  
  Future<User?> _getUserByUsername(String username) async {
    final result = await _db.query(
      'SELECT * FROM users WHERE username = ?',
      [username],
    );
    
    return result.isEmpty ? null : User.fromJson(result.first);
  }
}

Client Implementation

class ChatClient {
  final HypermodernClient _client;
  final StreamController<ChatEvent> _eventController = StreamController.broadcast();
  
  StreamSubscription? _roomSubscription;
  StreamSubscription? _presenceSubscription;
  
  ChatClient(String serverUrl) : _client = HypermodernClient(serverUrl);
  
  Stream<ChatEvent> get events => _eventController.stream;
  
  Future<void> connect(String authToken) async {
    _client.setAuthToken(authToken);
    await _client.connect();
    
    // Start presence monitoring
    await _startPresenceMonitoring();
    
    print('✅ Connected to chat server');
  }
  
  Future<void> disconnect() async {
    await _roomSubscription?.cancel();
    await _presenceSubscription?.cancel();
    await _client.disconnect();
    await _eventController.close();
  }
  
  // Room operations
  Future<ChatRoom> createRoom({
    required String name,
    String? description,
    required RoomType type,
  }) async {
    final request = CreateRoomRequest(
      name: name,
      description: description,
      type: type,
    );
    
    return await _client.request<ChatRoom>('create_room', request);
  }
  
  Future<void> joinRoom(int roomId) async {
    final request = JoinRoomRequest(roomId: roomId);
    final response = await _client.request<JoinRoomResponse>('join_room', request);
    
    if (response.success) {
      await _watchRoom(roomId);
      _eventController.add(RoomJoinedEvent(room: response.room));
    }
  }
  
  Future<void> _watchRoom(int roomId) async {
    await _roomSubscription?.cancel();
    
    final request = WatchRoomRequest(roomId: roomId);
    final stream = _client.stream<RoomEvent>('watch_room', request);
    
    _roomSubscription = stream.listen(
      (event) => _eventController.add(ChatEvent.fromRoomEvent(event)),
      onError: (error) => _eventController.addError(error),
    );
  }
  
  // Message operations
  Future<Message> sendMessage({
    required int roomId,
    required String content,
    MessageType type = MessageType.text,
    int? replyToId,
  }) async {
    final request = SendMessageRequest(
      roomId: roomId,
      content: content,
      messageType: type,
      replyToId: replyToId,
    );
    
    return await _client.request<Message>('send_message', request);
  }
  
  Future<List<Message>> getMessages({
    required int roomId,
    int? limit,
    int? beforeId,
  }) async {
    final request = GetMessagesRequest(
      roomId: roomId,
      limit: limit,
      beforeId: beforeId,
    );
    
    final response = await _client.request<GetMessagesResponse>('get_messages', request);
    return response.messages;
  }
  
  Future<void> _startPresenceMonitoring() async {
    final request = WatchPresenceRequest();
    final stream = _client.stream<PresenceEvent>('watch_presence', request);
    
    _presenceSubscription = stream.listen(
      (event) => _eventController.add(ChatEvent.fromPresenceEvent(event)),
      onError: (error) => print('Presence error: $error'),
    );
  }
}

// Usage example
class ChatApp {
  final ChatClient _client;
  final Map<int, List<Message>> _roomMessages = {};
  final Set<int> _onlineUsers = {};
  
  ChatApp(String serverUrl) : _client = ChatClient(serverUrl);
  
  Future<void> start() async {
    // Connect to server
    await _client.connect('your-auth-token');
    
    // Listen for events
    _client.events.listen(_handleChatEvent);
    
    // Join a room
    await _client.joinRoom(1);
    
    // Load message history
    final messages = await _client.getMessages(roomId: 1, limit: 50);
    _roomMessages[1] = messages;
    
    // Start message loop
    _startMessageLoop();
  }
  
  void _handleChatEvent(ChatEvent event) {
    switch (event.type) {
      case ChatEventType.messageReceived:
        final messageEvent = event as MessageReceivedEvent;
        _roomMessages.putIfAbsent(messageEvent.message.roomId, () => [])
            .add(messageEvent.message);
        _displayMessage(messageEvent.message);
        break;
        
      case ChatEventType.userJoined:
        final joinEvent = event as UserJoinedEvent;
        print('${joinEvent.userId} joined the room');
        break;
        
      case ChatEventType.userLeft:
        final leaveEvent = event as UserLeftEvent;
        print('${leaveEvent.userId} left the room');
        break;
        
      case ChatEventType.userStatusChanged:
        final statusEvent = event as UserStatusChangedEvent;
        if (statusEvent.status == UserStatus.online) {
          _onlineUsers.add(statusEvent.userId);
        } else {
          _onlineUsers.remove(statusEvent.userId);
        }
        break;
    }
  }
  
  void _displayMessage(Message message) {
    print('[${message.createdAt}] ${message.userId}: ${message.content}');
  }
  
  void _startMessageLoop() {
    // Simple console input loop
    stdin.transform(utf8.decoder).transform(LineSplitter()).listen((line) async {
      if (line.isNotEmpty) {
        try {
          await _client.sendMessage(roomId: 1, content: line);
        } catch (e) {
          print('Failed to send message: $e');
        }
      }
    });
  }
}

API Gateway

An API gateway that translates between different protocols and provides unified access to multiple backend services.

Gateway Schema

{
  "models": {
    "gateway_route": {
      "id": "int64",
      "path": "string",
      "method": "string",
      "backend_service": "string",
      "backend_path": "string",
      "protocol": "@backend_protocol",
      "auth_required": "bool",
      "rate_limit": "@rate_limit_config?",
      "cache_config": "@cache_config?",
      "created_at": "datetime"
    },
    "rate_limit_config": {
      "requests": "int32",
      "window_seconds": "int32",
      "burst": "int32?"
    },
    "cache_config": {
      "ttl_seconds": "int32",
      "vary_headers": ["string"]
    },
    "api_key": {
      "id": "int64",
      "key": "string",
      "name": "string",
      "permissions": ["string"],
      "rate_limit": "@rate_limit_config?",
      "expires_at": "datetime?",
      "created_at": "datetime"
    }
  },
  "enums": {
    "backend_protocol": ["http", "grpc", "graphql", "websocket"]
  },
  "endpoints": {
    "proxy_request": {
      "method": "ANY",
      "path": "/api/{path:.*}",
      "request": {
        "path": "string",
        "method": "string",
        "headers": "map<string, string>",
        "query_params": "map<string, string>",
        "body": "any?"
      },
      "response": {
        "status_code": "int32",
        "headers": "map<string, string>",
        "body": "any?"
      },
      "transports": ["http", "websocket"]
    }
  }
}

Gateway Implementation

class APIGateway {
  final HypermodernServer _server;
  final RouteManager _routeManager;
  final BackendClientPool _clientPool;
  final CacheManager _cacheManager;
  final RateLimiter _rateLimiter;
  final MetricsCollector _metrics;
  
  APIGateway({
    required HypermodernServer server,
    required RouteManager routeManager,
    required BackendClientPool clientPool,
    required CacheManager cacheManager,
    required RateLimiter rateLimiter,
    required MetricsCollector metrics,
  }) : _server = server,
       _routeManager = routeManager,
       _clientPool = clientPool,
       _cacheManager = cacheManager,
       _rateLimiter = rateLimiter,
       _metrics = metrics;
  
  Future<void> initialize() async {
    _setupMiddleware();
    _registerProxyEndpoint();
    
    await _server.listen();
    print('🌐 API Gateway running');
  }
  
  void _setupMiddleware() {
    // Request logging
    _server.middleware.add(RequestLoggingMiddleware());
    
    // CORS handling
    _server.middleware.add(CORSMiddleware(
      allowedOrigins: ['*'],
      allowedMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
      allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'],
    ));
    
    // API key authentication
    _server.middleware.add(APIKeyAuthMiddleware(_routeManager));
    
    // Rate limiting
    _server.middleware.add(GatewayRateLimitMiddleware(_rateLimiter, _routeManager));
    
    // Caching
    _server.middleware.add(GatewayCacheMiddleware(_cacheManager, _routeManager));
    
    // Metrics collection
    _server.middleware.add(GatewayMetricsMiddleware(_metrics));
  }
  
  void _registerProxyEndpoint() {
    _server.registerEndpoint<ProxyRequest, ProxyResponse>(
      'proxy_request',
      (request) async => await _handleProxyRequest(request),
    );
  }
  
  Future<ProxyResponse> _handleProxyRequest(ProxyRequest request) async {
    final stopwatch = Stopwatch()..start();
    
    try {
      // Find matching route
      final route = await _routeManager.findRoute(request.path, request.method);
      if (route == null) {
        return ProxyResponse(
          statusCode: 404,
          headers: {'Content-Type': 'application/json'},
          body: {'error': 'Route not found'},
        );
      }
      
      // Transform request for backend
      final backendRequest = await _transformRequest(request, route);
      
      // Get backend client
      final client = await _clientPool.getClient(route.backendService, route.protocol);
      
      // Make backend request
      final backendResponse = await client.request(backendRequest);
      
      // Transform response for client
      final response = await _transformResponse(backendResponse, route);
      
      stopwatch.stop();
      
      // Record metrics
      _metrics.recordRequest(
        route: route.path,
        method: request.method,
        statusCode: response.statusCode,
        duration: stopwatch.elapsed,
        backendService: route.backendService,
      );
      
      return response;
      
    } catch (e) {
      stopwatch.stop();
      
      _metrics.recordError(
        route: request.path,
        method: request.method,
        error: e.toString(),
        duration: stopwatch.elapsed,
      );
      
      return _handleError(e);
    }
  }
  
  Future<BackendRequest> _transformRequest(ProxyRequest request, GatewayRoute route) async {
    // Transform path
    final transformedPath = _transformPath(request.path, route);
    
    // Add gateway headers
    final headers = Map<String, String>.from(request.headers);
    headers['X-Gateway-Route'] = route.id.toString();
    headers['X-Gateway-Timestamp'] = DateTime.now().toIso8601String();
    
    // Protocol-specific transformations
    switch (route.protocol) {
      case BackendProtocol.http:
        return HttpBackendRequest(
          path: transformedPath,
          method: request.method,
          headers: headers,
          queryParams: request.queryParams,
          body: request.body,
        );
        
      case BackendProtocol.grpc:
        return GrpcBackendRequest(
          service: route.backendService,
          method: _extractGrpcMethod(transformedPath),
          metadata: headers,
          request: request.body,
        );
        
      case BackendProtocol.graphql:
        return GraphQLBackendRequest(
          query: _transformToGraphQL(request),
          variables: request.body is Map ? request.body as Map<String, dynamic> : {},
          headers: headers,
        );
        
      default:
        throw UnsupportedError('Protocol ${route.protocol} not supported');
    }
  }
  
  String _transformPath(String originalPath, GatewayRoute route) {
    // Remove gateway prefix and apply backend path template
    final pathWithoutPrefix = originalPath.replaceFirst('/api', '');
    return route.backendPath.replaceAll('{path}', pathWithoutPrefix);
  }
  
  Future<ProxyResponse> _transformResponse(BackendResponse backendResponse, GatewayRoute route) async {
    final headers = Map<String, String>.from(backendResponse.headers);
    
    // Add gateway headers
    headers['X-Gateway-Service'] = route.backendService;
    headers['X-Gateway-Protocol'] = route.protocol.toString();
    
    // Protocol-specific response transformation
    dynamic transformedBody;
    
    switch (route.protocol) {
      case BackendProtocol.grpc:
        transformedBody = _transformGrpcResponse(backendResponse.body);
        break;
      case BackendProtocol.graphql:
        transformedBody = _transformGraphQLResponse(backendResponse.body);
        break;
      default:
        transformedBody = backendResponse.body;
    }
    
    return ProxyResponse(
      statusCode: backendResponse.statusCode,
      headers: headers,
      body: transformedBody,
    );
  }
  
  ProxyResponse _handleError(dynamic error) {
    if (error is TimeoutException) {
      return ProxyResponse(
        statusCode: 504,
        headers: {'Content-Type': 'application/json'},
        body: {'error': 'Gateway timeout'},
      );
    } else if (error is ConnectionException) {
      return ProxyResponse(
        statusCode: 502,
        headers: {'Content-Type': 'application/json'},
        body: {'error': 'Bad gateway'},
      );
    } else {
      return ProxyResponse(
        statusCode: 500,
        headers: {'Content-Type': 'application/json'},
        body: {'error': 'Internal server error'},
      );
    }
  }
}

class BackendClientPool {
  final Map<String, BackendClient> _clients = {};
  final ClientFactory _clientFactory;
  
  BackendClientPool(this._clientFactory);
  
  Future<BackendClient> getClient(String service, BackendProtocol protocol) async {
    final key = '$service:$protocol';
    
    return _clients.putIfAbsent(key, () {
      return _clientFactory.createClient(service, protocol);
    });
  }
}

class ClientFactory {
  final Map<String, ServiceConfig> _serviceConfigs;
  
  ClientFactory(this._serviceConfigs);
  
  BackendClient createClient(String service, BackendProtocol protocol) {
    final config = _serviceConfigs[service];
    if (config == null) {
      throw ArgumentError('Service configuration not found: $service');
    }
    
    switch (protocol) {
      case BackendProtocol.http:
        return HttpBackendClient(config.httpUrl);
      case BackendProtocol.grpc:
        return GrpcBackendClient(config.grpcUrl);
      case BackendProtocol.graphql:
        return GraphQLBackendClient(config.graphqlUrl);
      default:
        throw UnsupportedError('Protocol $protocol not supported');
    }
  }
}

Microservices Architecture

A complete microservices setup with service discovery, inter-service communication, and distributed tracing.

Service Registry

class ServiceRegistry {
  final Database _db;
  final Map<String, ServiceInstance> _services = {};
  final StreamController<ServiceEvent> _eventController = StreamController.broadcast();
  
  ServiceRegistry(this._db);
  
  Stream<ServiceEvent> get events => _eventController.stream;
  
  Future<void> registerService(ServiceRegistration registration) async {
    final instance = ServiceInstance(
      id: registration.id,
      name: registration.name,
      version: registration.version,
      endpoints: registration.endpoints,
      healthCheckUrl: registration.healthCheckUrl,
      metadata: registration.metadata,
      registeredAt: DateTime.now(),
      lastHeartbeat: DateTime.now(),
    );
    
    _services[registration.id] = instance;
    
    // Persist to database
    await _db.query('''
      INSERT INTO service_instances (id, name, version, endpoints, health_check_url, metadata, registered_at, last_heartbeat)
      VALUES (?, ?, ?, ?, ?, ?, ?, ?)
      ON CONFLICT (id) DO UPDATE SET
        endpoints = EXCLUDED.endpoints,
        health_check_url = EXCLUDED.health_check_url,
        metadata = EXCLUDED.metadata,
        last_heartbeat = EXCLUDED.last_heartbeat
    ''', [
      instance.id,
      instance.name,
      instance.version,
      jsonEncode(instance.endpoints.map((e) => e.toJson()).toList()),
      instance.healthCheckUrl,
      jsonEncode(instance.metadata),
      instance.registeredAt,
      instance.lastHeartbeat,
    ]);
    
    _eventController.add(ServiceRegisteredEvent(instance: instance));
    print('✅ Service registered: ${instance.name}@${instance.version}');
  }
  
  Future<void> deregisterService(String serviceId) async {
    final instance = _services.remove(serviceId);
    if (instance != null) {
      await _db.query('DELETE FROM service_instances WHERE id = ?', [serviceId]);
      _eventController.add(ServiceDeregisteredEvent(instance: instance));
      print('❌ Service deregistered: ${instance.name}');
    }
  }
  
  Future<void> heartbeat(String serviceId) async {
    final instance = _services[serviceId];
    if (instance != null) {
      instance.lastHeartbeat = DateTime.now();
      await _db.query(
        'UPDATE service_instances SET last_heartbeat = ? WHERE id = ?',
        [instance.lastHeartbeat, serviceId],
      );
    }
  }
  
  List<ServiceInstance> discoverServices(String serviceName) {
    return _services.values
        .where((instance) => instance.name == serviceName)
        .where((instance) => _isHealthy(instance))
        .toList();
  }
  
  bool _isHealthy(ServiceInstance instance) {
    final timeSinceHeartbeat = DateTime.now().difference(instance.lastHeartbeat);
    return timeSinceHeartbeat < Duration(minutes: 2);
  }
  
  Future<void> startHealthChecking() async {
    Timer.periodic(Duration(seconds: 30), (_) async {
      await _performHealthChecks();
    });
  }
  
  Future<void> _performHealthChecks() async {
    final unhealthyServices = <String>[];
    
    for (final instance in _services.values) {
      try {
        final client = HttpClient();
        final response = await client.get(instance.healthCheckUrl)
            .timeout(Duration(seconds: 5));
        
        if (response.statusCode != 200) {
          unhealthyServices.add(instance.id);
        }
      } catch (e) {
        unhealthyServices.add(instance.id);
      }
    }
    
    // Remove unhealthy services
    for (final serviceId in unhealthyServices) {
      await deregisterService(serviceId);
    }
  }
}

Inter-Service Communication

class ServiceClient {
  final ServiceRegistry _registry;
  final LoadBalancer _loadBalancer;
  final CircuitBreaker _circuitBreaker;
  final Map<String, HypermodernClient> _clients = {};
  
  ServiceClient({
    required ServiceRegistry registry,
    required LoadBalancer loadBalancer,
    required CircuitBreaker circuitBreaker,
  }) : _registry = registry,
       _loadBalancer = loadBalancer,
       _circuitBreaker = circuitBreaker;
  
  Future<T> call<T>({
    required String serviceName,
    required String endpoint,
    required dynamic request,
    Duration? timeout,
  }) async {
    return await _circuitBreaker.execute(() async {
      // Discover service instances
      final instances = _registry.discoverServices(serviceName);
      if (instances.isEmpty) {
        throw ServiceUnavailableException('No healthy instances of $serviceName');
      }
      
      // Select instance using load balancer
      final selectedInstance = _loadBalancer.selectInstance(instances);
      
      // Get or create client
      final client = await _getClient(selectedInstance);
      
      // Make request with timeout
      final future = client.request<T>(endpoint, request);
      final timeoutDuration = timeout ?? Duration(seconds: 30);
      
      return await future.timeout(timeoutDuration);
    });
  }
  
  Future<HypermodernClient> _getClient(ServiceInstance instance) async {
    final clientKey = instance.id;
    
    if (!_clients.containsKey(clientKey)) {
      // Find best endpoint for communication
      final endpoint = _selectBestEndpoint(instance.endpoints);
      final client = HypermodernClient(endpoint.url);
      
      await client.connect();
      _clients[clientKey] = client;
    }
    
    return _clients[clientKey]!;
  }
  
  ServiceEndpoint _selectBestEndpoint(List<ServiceEndpoint> endpoints) {
    // Prefer TCP for performance, then WebSocket, then HTTP
    final priorities = [
      ProtocolType.tcp,
      ProtocolType.websocket,
      ProtocolType.http,
    ];
    
    for (final protocol in priorities) {
      final endpoint = endpoints.firstWhere(
        (e) => e.protocol == protocol,
        orElse: () => endpoints.first,
      );
      if (endpoint.protocol == protocol) {
        return endpoint;
      }
    }
    
    return endpoints.first;
  }
}

// Example microservice
class UserService extends MicroService {
  final UserRepository _userRepository;
  final ServiceClient _serviceClient;
  
  UserService({
    required UserRepository userRepository,
    required ServiceClient serviceClient,
  }) : _userRepository = userRepository,
       _serviceClient = serviceClient;
  
  @override
  String get serviceName => 'user-service';
  
  @override
  String get version => '1.0.0';
  
  @override
  List<ServiceEndpoint> get endpoints => [
    ServiceEndpoint(
      protocol: ProtocolType.http,
      url: 'http://localhost:8080',
    ),
    ServiceEndpoint(
      protocol: ProtocolType.websocket,
      url: 'ws://localhost:8082',
    ),
    ServiceEndpoint(
      protocol: ProtocolType.tcp,
      url: 'tcp://localhost:8081',
    ),
  ];
  
  @override
  void registerEndpoints(HypermodernServer server) {
    server.registerEndpoint<CreateUserRequest, User>(
      'create_user',
      (request) async {
        // Create user
        final user = await _userRepository.create(User(
          username: request.username,
          email: request.email,
          passwordHash: _hashPassword(request.password),
          createdAt: DateTime.now(),
          updatedAt: DateTime.now(),
        ));
        
        // Notify other services
        await _serviceClient.call<void>(
          serviceName: 'notification-service',
          endpoint: 'send_welcome_email',
          request: SendWelcomeEmailRequest(
            userId: user.id,
            email: user.email,
            username: user.username,
          ),
        );
        
        await _serviceClient.call<void>(
          serviceName: 'analytics-service',
          endpoint: 'track_user_registration',
          request: TrackUserRegistrationRequest(
            userId: user.id,
            timestamp: DateTime.now(),
          ),
        );
        
        return user;
      },
    );
    
    server.registerEndpoint<GetUserRequest, User>(
      'get_user',
      (request) async {
        final user = await _userRepository.findById(request.id);
        if (user == null) {
          throw NotFoundException('User not found');
        }
        return user;
      },
    );
  }
}

IoT Device Communication

A platform for managing IoT devices with efficient binary communication over TCP.

IoT Schema

{
  "models": {
    "device": {
      "id": "int64",
      "device_id": "string",
      "name": "string",
      "type": "@device_type",
      "firmware_version": "string",
      "last_seen": "datetime",
      "status": "@device_status",
      "location": "@location?",
      "metadata": "map<string, any>",
      "created_at": "datetime"
    },
    "location": {
      "latitude": "float64",
      "longitude": "float64",
      "altitude": "float64?"
    },
    "sensor_reading": {
      "device_id": "string",
      "sensor_type": "@sensor_type",
      "value": "float64",
      "unit": "string",
      "timestamp": "datetime",
      "quality": "float32?"
    },
    "device_command": {
      "id": "int64",
      "device_id": "string",
      "command": "string",
      "parameters": "map<string, any>",
      "status": "@command_status",
      "sent_at": "datetime",
      "executed_at": "datetime?"
    }
  },
  "enums": {
    "device_type": ["sensor", "actuator", "gateway", "controller"],
    "device_status": ["online", "offline", "maintenance", "error"],
    "sensor_type": ["temperature", "humidity", "pressure", "motion", "light"],
    "command_status": ["pending", "sent", "acknowledged", "executed", "failed"]
  },
  "endpoints": {
    "register_device": {
      "method": "POST",
      "path": "/devices/register",
      "request": {
        "device_id": "string",
        "name": "string",
        "type": "@device_type",
        "firmware_version": "string",
        "capabilities": ["string"]
      },
      "response": "@device",
      "transports": ["tcp", "http"]
    },
    "send_readings": {
      "method": "POST",
      "path": "/devices/{device_id}/readings",
      "request": {
        "device_id": "string",
        "readings": ["@sensor_reading"]
      },
      "response": {
        "accepted": "int32",
        "rejected": "int32"
      },
      "transports": ["tcp"]
    },
    "get_commands": {
      "method": "GET",
      "path": "/devices/{device_id}/commands",
      "request": {
        "device_id": "string",
        "since": "datetime?"
      },
      "response": {
        "commands": ["@device_command"]
      },
      "transports": ["tcp", "http"]
    }
  }
}

IoT Platform Implementation

class IoTPlatform {
  final HypermodernServer _server;
  final DeviceManager _deviceManager;
  final DataProcessor _dataProcessor;
  final CommandQueue _commandQueue;
  final TimeSeriesDB _timeSeriesDB;
  
  IoTPlatform({
    required HypermodernServer server,
    required DeviceManager deviceManager,
    required DataProcessor dataProcessor,
    required CommandQueue commandQueue,
    required TimeSeriesDB timeSeriesDB,
  }) : _server = server,
       _deviceManager = deviceManager,
       _dataProcessor = dataProcessor,
       _commandQueue = commandQueue,
       _timeSeriesDB = timeSeriesDB;
  
  Future<void> initialize() async {
    _setupTcpOptimizations();
    _registerEndpoints();
    _startBackgroundTasks();
    
    await _server.listen();
    print('🌐 IoT Platform running');
  }
  
  void _setupTcpOptimizations() {
    // Configure TCP server for IoT devices
    _server.configureTcp(TcpServerConfig(
      port: 8081,
      maxConnections: 10000,
      keepAlive: true,
      keepAliveInterval: Duration(seconds: 30),
      noDelay: true, // Disable Nagle's algorithm for low latency
      receiveBufferSize: 64 * 1024,
      sendBufferSize: 64 * 1024,
    ));
  }
  
  void _registerEndpoints() {
    // Device registration
    _server.registerEndpoint<RegisterDeviceRequest, Device>(
      'register_device',
      (request) async {
        final device = await _deviceManager.registerDevice(
          deviceId: request.deviceId,
          name: request.name,
          type: request.type,
          firmwareVersion: request.firmwareVersion,
          capabilities: request.capabilities,
        );
        
        print('📱 Device registered: ${device.deviceId}');
        return device;
      },
    );
    
    // Sensor data ingestion
    _server.registerEndpoint<SendReadingsRequest, SendReadingsResponse>(
      'send_readings',
      (request) async {
        final result = await _dataProcessor.processReadings(
          deviceId: request.deviceId,
          readings: request.readings,
        );
        
        // Update device last seen
        await _deviceManager.updateLastSeen(request.deviceId);
        
        return SendReadingsResponse(
          accepted: result.accepted,
          rejected: result.rejected,
        );
      },
    );
    
    // Command retrieval
    _server.registerEndpoint<GetCommandsRequest, GetCommandsResponse>(
      'get_commands',
      (request) async {
        final commands = await _commandQueue.getCommandsForDevice(
          deviceId: request.deviceId,
          since: request.since,
        );
        
        return GetCommandsResponse(commands: commands);
      },
    );
  }
  
  void _startBackgroundTasks() {
    // Device health monitoring
    Timer.periodic(Duration(minutes: 1), (_) async {
      await _deviceManager.checkDeviceHealth();
    });
    
    // Data aggregation
    Timer.periodic(Duration(minutes: 5), (_) async {
      await _dataProcessor.aggregateData();
    });
    
    // Command cleanup
    Timer.periodic(Duration(hours: 1), (_) async {
      await _commandQueue.cleanupExpiredCommands();
    });
  }
}

class DataProcessor {
  final TimeSeriesDB _timeSeriesDB;
  final AlertManager _alertManager;
  final Map<String, SensorCalibration> _calibrations = {};
  
  DataProcessor(this._timeSeriesDB, this._alertManager);
  
  Future<ProcessingResult> processReadings({
    required String deviceId,
    required List<SensorReading> readings,
  }) async {
    int accepted = 0;
    int rejected = 0;
    
    final processedReadings = <SensorReading>[];
    
    for (final reading in readings) {
      try {
        // Validate reading
        if (!_validateReading(reading)) {
          rejected++;
          continue;
        }
        
        // Apply calibration
        final calibratedReading = await _applyCalibration(reading);
        
        // Check for anomalies
        final anomaly = await _detectAnomaly(calibratedReading);
        if (anomaly != null) {
          await _alertManager.sendAlert(anomaly);
        }
        
        processedReadings.add(calibratedReading);
        accepted++;
        
      } catch (e) {
        print('Error processing reading: $e');
        rejected++;
      }
    }
    
    // Batch insert to time series database
    if (processedReadings.isNotEmpty) {
      await _timeSeriesDB.insertReadings(processedReadings);
    }
    
    return ProcessingResult(accepted: accepted, rejected: rejected);
  }
  
  bool _validateReading(SensorReading reading) {
    // Basic validation
    if (reading.deviceId.isEmpty) return false;
    if (reading.timestamp.isAfter(DateTime.now().add(Duration(minutes: 5)))) return false;
    if (reading.timestamp.isBefore(DateTime.now().subtract(Duration(days: 1)))) return false;
    
    // Sensor-specific validation
    switch (reading.sensorType) {
      case SensorType.temperature:
        return reading.value >= -100 && reading.value <= 200; // Celsius
      case SensorType.humidity:
        return reading.value >= 0 && reading.value <= 100; // Percentage
      case SensorType.pressure:
        return reading.value >= 0 && reading.value <= 2000; // hPa
      default:
        return true;
    }
  }
  
  Future<SensorReading> _applyCalibration(SensorReading reading) async {
    final calibration = _calibrations[reading.deviceId];
    if (calibration == null) {
      return reading;
    }
    
    final calibratedValue = reading.value * calibration.multiplier + calibration.offset;
    
    return reading.copyWith(value: calibratedValue);
  }
  
  Future<Anomaly?> _detectAnomaly(SensorReading reading) async {
    // Get recent readings for comparison
    final recentReadings = await _timeSeriesDB.getRecentReadings(
      deviceId: reading.deviceId,
      sensorType: reading.sensorType,
      duration: Duration(hours: 1),
    );
    
    if (recentReadings.length < 10) {
      return null; // Not enough data for anomaly detection
    }
    
    // Calculate statistics
    final values = recentReadings.map((r) => r.value).toList();
    final mean = values.reduce((a, b) => a + b) / values.length;
    final variance = values.map((v) => pow(v - mean, 2)).reduce((a, b) => a + b) / values.length;
    final stdDev = sqrt(variance);
    
    // Check if current reading is an outlier (3-sigma rule)
    final zScore = (reading.value - mean) / stdDev;
    
    if (zScore.abs() > 3) {
      return Anomaly(
        deviceId: reading.deviceId,
        sensorType: reading.sensorType,
        value: reading.value,
        expectedRange: (mean - 2 * stdDev, mean + 2 * stdDev),
        severity: zScore.abs() > 5 ? AnomalySeverity.critical : AnomalySeverity.warning,
        timestamp: reading.timestamp,
      );
    }
    
    return null;
  }
}

// IoT Device Client
class IoTDeviceClient {
  final String _deviceId;
  final HypermodernClient _client;
  final Queue<SensorReading> _readingBuffer = Queue();
  Timer? _sendTimer;
  
  IoTDeviceClient({
    required String deviceId,
    required String serverUrl,
  }) : _deviceId = deviceId,
       _client = HypermodernClient.tcp(serverUrl);
  
  Future<void> connect() async {
    await _client.connect();
    
    // Register device
    await _registerDevice();
    
    // Start periodic data sending
    _startDataSending();
    
    // Start command polling
    _startCommandPolling();
    
    print('🔌 IoT device connected: $_deviceId');
  }
  
  Future<void> _registerDevice() async {
    final request = RegisterDeviceRequest(
      deviceId: _deviceId,
      name: 'Temperature Sensor #$_deviceId',
      type: DeviceType.sensor,
      firmwareVersion: '1.2.3',
      capabilities: ['temperature', 'humidity'],
    );
    
    await _client.request<Device>('register_device', request);
  }
  
  void addReading(SensorReading reading) {
    _readingBuffer.add(reading);
    
    // Limit buffer size
    while (_readingBuffer.length > 1000) {
      _readingBuffer.removeFirst();
    }
  }
  
  void _startDataSending() {
    _sendTimer = Timer.periodic(Duration(seconds: 30), (_) async {
      if (_readingBuffer.isNotEmpty) {
        await _sendBufferedReadings();
      }
    });
  }
  
  Future<void> _sendBufferedReadings() async {
    final readings = <SensorReading>[];
    
    // Drain buffer
    while (_readingBuffer.isNotEmpty && readings.length < 100) {
      readings.add(_readingBuffer.removeFirst());
    }
    
    if (readings.isEmpty) return;
    
    try {
      final request = SendReadingsRequest(
        deviceId: _deviceId,
        readings: readings,
      );
      
      final response = await _client.request<SendReadingsResponse>('send_readings', request);
      
      print('📊 Sent ${response.accepted} readings, ${response.rejected} rejected');
      
    } catch (e) {
      print('❌ Failed to send readings: $e');
      
      // Re-queue readings for retry
      readings.reversed.forEach(_readingBuffer.addFirst);
    }
  }
  
  void _startCommandPolling() {
    Timer.periodic(Duration(seconds: 10), (_) async {
      await _pollCommands();
    });
  }
  
  Future<void> _pollCommands() async {
    try {
      final request = GetCommandsRequest(deviceId: _deviceId);
      final response = await _client.request<GetCommandsResponse>('get_commands', request);
      
      for (final command in response.commands) {
        await _executeCommand(command);
      }
      
    } catch (e) {
      print('❌ Failed to poll commands: $e');
    }
  }
  
  Future<void> _executeCommand(DeviceCommand command) async {
    print('⚡ Executing command: ${command.command}');
    
    try {
      switch (command.command) {
        case 'reboot':
          await _reboot();
          break;
        case 'update_firmware':
          await _updateFirmware(command.parameters['version'] as String);
          break;
        case 'set_sampling_rate':
          await _setSamplingRate(command.parameters['rate'] as int);
          break;
        default:
          print('❓ Unknown command: ${command.command}');
      }
      
    } catch (e) {
      print('❌ Command execution failed: $e');
    }
  }
  
  Future<void> _reboot() async {
    print('🔄 Rebooting device...');
    // Simulate reboot
    await Future.delayed(Duration(seconds: 2));
  }
  
  Future<void> _updateFirmware(String version) async {
    print('📦 Updating firmware to $version...');
    // Simulate firmware update
    await Future.delayed(Duration(seconds: 10));
  }
  
  Future<void> _setSamplingRate(int rate) async {
    print('⏱️ Setting sampling rate to ${rate}Hz');
    // Update sampling configuration
  }
}

What's Next

These real-world examples demonstrate how to build complete applications using Hypermodern's capabilities. In the next chapter, we'll explore the module ecosystem and integration patterns, showing how to extend Hypermodern with third-party services and create reusable components for the community.