Skip to main content

Integration Patterns

Database Integrations

Hypermodern's flexible architecture allows seamless integration with various database systems, from traditional SQL databases to modern NoSQL solutions and specialized data stores.

Multi-Database Architecture

class MultiDatabaseManager {
  final Map<String, DatabaseProvider> _providers = {};
  final DatabaseRouter _router;
  
  MultiDatabaseManager({required DatabaseRouter router}) : _router = router;
  
  void registerProvider(String name, DatabaseProvider provider) {
    _providers[name] = provider;
  }
  
  Future<T> execute<T>(DatabaseOperation<T> operation) async {
    final providerName = _router.routeOperation(operation);
    final provider = _providers[providerName];
    
    if (provider == null) {
      throw DatabaseException('Provider not found: $providerName');
    }
    
    return await provider.execute(operation);
  }
  
  Future<void> transaction(Future<void> Function(TransactionContext) callback) async {
    final involvedProviders = _router.getInvolvedProviders(callback);
    
    if (involvedProviders.length == 1) {
      // Single database transaction
      final provider = _providers[involvedProviders.first]!;
      await provider.transaction(callback);
    } else {
      // Distributed transaction using 2PC
      await _executeDistributedTransaction(involvedProviders, callback);
    }
  }
  
  Future<void> _executeDistributedTransaction(
    List<String> providers,
    Future<void> Function(TransactionContext) callback,
  ) async {
    final transactions = <String, Transaction>{};
    
    try {
      // Phase 1: Prepare
      for (final providerName in providers) {
        final provider = _providers[providerName]!;
        final transaction = await provider.beginTransaction();
        transactions[providerName] = transaction;
      }
      
      // Execute operations
      final context = DistributedTransactionContext(transactions);
      await callback(context);
      
      // Phase 2: Commit
      for (final transaction in transactions.values) {
        await transaction.commit();
      }
      
    } catch (e) {
      // Rollback all transactions
      for (final transaction in transactions.values) {
        try {
          await transaction.rollback();
        } catch (rollbackError) {
          print('Rollback error: $rollbackError');
        }
      }
      rethrow;
    }
  }
}

class DatabaseRouter {
  final Map<Type, String> _modelProviders = {};
  final Map<String, String> _operationProviders = {};
  
  void routeModel<T>(String providerName) {
    _modelProviders[T] = providerName;
  }
  
  void routeOperation(String operationType, String providerName) {
    _operationProviders[operationType] = providerName;
  }
  
  String routeOperation<T>(DatabaseOperation<T> operation) {
    // Route based on model type
    if (operation.modelType != null) {
      final provider = _modelProviders[operation.modelType];
      if (provider != null) return provider;
    }
    
    // Route based on operation type
    final provider = _operationProviders[operation.runtimeType.toString()];
    if (provider != null) return provider;
    
    // Default provider
    return 'default';
  }
  
  List<String> getInvolvedProviders(Future<void> Function(TransactionContext) callback) {
    // Analyze callback to determine which providers will be used
    // This is a simplified implementation
    return ['default'];
  }
}

// Example usage
class UserService {
  final MultiDatabaseManager _dbManager;
  
  UserService(this._dbManager);
  
  Future<User> createUser(CreateUserRequest request) async {
    return await _dbManager.transaction((tx) async {
      // User data goes to PostgreSQL
      final user = await tx.execute(CreateUserOperation(
        username: request.username,
        email: request.email,
      ));
      
      // User preferences go to MongoDB
      await tx.execute(CreateUserPreferencesOperation(
        userId: user.id,
        preferences: request.preferences,
      ));
      
      // Activity log goes to ClickHouse
      await tx.execute(LogUserActivityOperation(
        userId: user.id,
        action: 'user_created',
        timestamp: DateTime.now(),
      ));
      
      return user;
    });
  }
}

Time Series Database Integration

class TimeSeriesIntegration {
  final InfluxDBClient _influxDB;
  final PrometheusClient _prometheus;
  final ClickHouseClient _clickHouse;
  
  TimeSeriesIntegration({
    required InfluxDBClient influxDB,
    required PrometheusClient prometheus,
    required ClickHouseClient clickHouse,
  }) : _influxDB = influxDB,
       _prometheus = prometheus,
       _clickHouse = clickHouse;
  
  Future<void> recordMetrics(List<MetricPoint> metrics) async {
    // Write to multiple time series databases
    await Future.wait([
      _writeToInfluxDB(metrics),
      _writeToPrometheus(metrics),
      _writeToClickHouse(metrics),
    ]);
  }
  
  Future<void> _writeToInfluxDB(List<MetricPoint> metrics) async {
    final points = metrics.map((metric) => Point(
      measurement: metric.name,
      tags: metric.tags,
      fields: {'value': metric.value},
      timestamp: metric.timestamp,
    )).toList();
    
    await _influxDB.writePoints(points);
  }
  
  Future<void> _writeToPrometheus(List<MetricPoint> metrics) async {
    for (final metric in metrics) {
      final prometheusMetric = _prometheus.getMetric(metric.name);
      prometheusMetric.set(metric.value, labels: metric.tags);
    }
  }
  
  Future<void> _writeToClickHouse(List<MetricPoint> metrics) async {
    final rows = metrics.map((metric) => [
      metric.timestamp.millisecondsSinceEpoch,
      metric.name,
      metric.value,
      jsonEncode(metric.tags),
    ]).toList();
    
    await _clickHouse.insert('metrics', rows);
  }
  
  Future<List<MetricPoint>> queryMetrics({
    required String metricName,
    required DateTime startTime,
    required DateTime endTime,
    Map<String, String>? tags,
  }) async {
    // Query from the most appropriate database
    if (_isHighFrequencyMetric(metricName)) {
      return await _queryFromInfluxDB(metricName, startTime, endTime, tags);
    } else if (_isAggregatedMetric(metricName)) {
      return await _queryFromClickHouse(metricName, startTime, endTime, tags);
    } else {
      return await _queryFromPrometheus(metricName, startTime, endTime, tags);
    }
  }
  
  Future<List<MetricPoint>> _queryFromInfluxDB(
    String metricName,
    DateTime startTime,
    DateTime endTime,
    Map<String, String>? tags,
  ) async {
    final query = '''
      SELECT value FROM $metricName
      WHERE time >= '${startTime.toIso8601String()}'
        AND time <= '${endTime.toIso8601String()}'
      ${_buildTagFilter(tags)}
      ORDER BY time
    ''';
    
    final result = await _influxDB.query(query);
    return _parseInfluxDBResult(result);
  }
  
  String _buildTagFilter(Map<String, String>? tags) {
    if (tags == null || tags.isEmpty) return '';
    
    final filters = tags.entries
        .map((entry) => "${entry.key} = '${entry.value}'")
        .join(' AND ');
    
    return 'AND $filters';
  }
}

Graph Database Integration

class GraphDatabaseIntegration {
  final Neo4jClient _neo4j;
  final ArangoDBClient _arangoDB;
  
  GraphDatabaseIntegration({
    required Neo4jClient neo4j,
    required ArangoDBClient arangoDB,
  }) : _neo4j = neo4j,
       _arangoDB = arangoDB;
  
  Future<void> createUserRelationship({
    required int userId1,
    required int userId2,
    required RelationshipType type,
    Map<String, dynamic>? properties,
  }) async {
    // Create relationship in Neo4j
    await _neo4j.run('''
      MATCH (u1:User {id: \$userId1}), (u2:User {id: \$userId2})
      CREATE (u1)-[r:${type.name.toUpperCase()} \$properties]->(u2)
      RETURN r
    ''', {
      'userId1': userId1,
      'userId2': userId2,
      'properties': properties ?? {},
    });
    
    // Also store in ArangoDB for different query patterns
    await _arangoDB.collection('relationships').save({
      '_from': 'users/$userId1',
      '_to': 'users/$userId2',
      'type': type.name,
      'properties': properties ?? {},
      'created_at': DateTime.now().toIso8601String(),
    });
  }
  
  Future<List<User>> findFriendsOfFriends(int userId, {int maxDepth = 2}) async {
    // Use Neo4j for complex graph traversal
    final result = await _neo4j.run('''
      MATCH (u:User {id: \$userId})-[:FRIEND*1..$maxDepth]-(friend:User)
      WHERE friend.id <> \$userId
      RETURN DISTINCT friend
      LIMIT 100
    ''', {'userId': userId});
    
    return result.records
        .map((record) => User.fromJson(record['friend'].properties))
        .toList();
  }
  
  Future<List<RecommendedConnection>> getConnectionRecommendations(int userId) async {
    // Complex recommendation query using graph algorithms
    final result = await _neo4j.run('''
      MATCH (u:User {id: \$userId})-[:FRIEND]-(friend)-[:FRIEND]-(recommendation)
      WHERE NOT (u)-[:FRIEND]-(recommendation) AND recommendation.id <> \$userId
      WITH recommendation, COUNT(*) as mutualFriends
      MATCH (recommendation)-[:WORKS_AT]->(company)<-[:WORKS_AT]-(u)
      WITH recommendation, mutualFriends, COUNT(company) as sharedCompanies
      RETURN recommendation, mutualFriends, sharedCompanies
      ORDER BY mutualFriends DESC, sharedCompanies DESC
      LIMIT 10
    ''', {'userId': userId});
    
    return result.records.map((record) => RecommendedConnection(
      user: User.fromJson(record['recommendation'].properties),
      mutualFriends: record['mutualFriends'].asInt(),
      sharedCompanies: record['sharedCompanies'].asInt(),
    )).toList();
  }
}

Third-Party Service Integration

Message Queue Integration

class MessageQueueIntegration {
  final Map<String, MessageQueueProvider> _providers = {};
  final MessageRouter _router;
  
  MessageQueueIntegration({required MessageRouter router}) : _router = router;
  
  void registerProvider(String name, MessageQueueProvider provider) {
    _providers[name] = provider;
  }
  
  Future<void> publishMessage<T>(String topic, T message, {
    Map<String, String>? headers,
    Duration? delay,
  }) async {
    final providerName = _router.getProviderForTopic(topic);
    final provider = _providers[providerName];
    
    if (provider == null) {
      throw MessageQueueException('Provider not found: $providerName');
    }
    
    await provider.publish(topic, message, headers: headers, delay: delay);
  }
  
  Stream<T> subscribe<T>(String topic, T Function(Map<String, dynamic>) deserializer) {
    final providerName = _router.getProviderForTopic(topic);
    final provider = _providers[providerName];
    
    if (provider == null) {
      throw MessageQueueException('Provider not found: $providerName');
    }
    
    return provider.subscribe(topic, deserializer);
  }
}

class RabbitMQProvider implements MessageQueueProvider {
  final RabbitMQClient _client;
  
  RabbitMQProvider(this._client);
  
  @override
  Future<void> publish<T>(String topic, T message, {
    Map<String, String>? headers,
    Duration? delay,
  }) async {
    final exchange = _getExchangeForTopic(topic);
    final routingKey = _getRoutingKeyForTopic(topic);
    
    final messageBody = jsonEncode((message as dynamic).toJson());
    final messageHeaders = headers ?? {};
    
    if (delay != null) {
      messageHeaders['x-delay'] = delay.inMilliseconds.toString();
    }
    
    await _client.publish(
      exchange: exchange,
      routingKey: routingKey,
      body: messageBody,
      headers: messageHeaders,
    );
  }
  
  @override
  Stream<T> subscribe<T>(String topic, T Function(Map<String, dynamic>) deserializer) async* {
    final queue = _getQueueForTopic(topic);
    
    await for (final message in _client.consume(queue)) {
      try {
        final data = jsonDecode(message.body) as Map<String, dynamic>;
        yield deserializer(data);
        
        // Acknowledge message
        await message.ack();
      } catch (e) {
        print('Error processing message: $e');
        
        // Reject message and requeue
        await message.nack(requeue: true);
      }
    }
  }
}

class KafkaProvider implements MessageQueueProvider {
  final KafkaClient _client;
  
  KafkaProvider(this._client);
  
  @override
  Future<void> publish<T>(String topic, T message, {
    Map<String, String>? headers,
    Duration? delay,
  }) async {
    final producer = _client.producer();
    
    final record = ProducerRecord(
      topic: topic,
      value: jsonEncode((message as dynamic).toJson()),
      headers: headers,
      timestamp: delay != null ? DateTime.now().add(delay) : null,
    );
    
    await producer.send(record);
  }
  
  @override
  Stream<T> subscribe<T>(String topic, T Function(Map<String, dynamic>) deserializer) async* {
    final consumer = _client.consumer(groupId: 'hypermodern-${topic}');
    await consumer.subscribe([topic]);
    
    await for (final record in consumer.stream) {
      try {
        final data = jsonDecode(record.value) as Map<String, dynamic>;
        yield deserializer(data);
        
        // Commit offset
        await consumer.commitSync();
      } catch (e) {
        print('Error processing message: $e');
        // Handle error (dead letter queue, retry, etc.)
      }
    }
  }
}

// Integration with Hypermodern server
class MessageQueueMiddleware implements Middleware {
  final MessageQueueIntegration _messageQueue;
  
  MessageQueueMiddleware(this._messageQueue);
  
  @override
  Future<dynamic> handle(
    dynamic request,
    Future<dynamic> Function(dynamic) next,
  ) async {
    final response = await next(request);
    
    // Publish domain events
    if (response is DomainEventContainer) {
      for (final event in response.events) {
        await _messageQueue.publishMessage(
          event.eventType,
          event,
          headers: {
            'event_id': event.id,
            'event_version': event.version.toString(),
            'correlation_id': request.correlationId,
          },
        );
      }
    }
    
    return response;
  }
}

Search Engine Integration

class SearchIntegration {
  final ElasticsearchClient _elasticsearch;
  final SolrClient _solr;
  final AlgoliaClient _algolia;
  final SearchRouter _router;
  
  SearchIntegration({
    required ElasticsearchClient elasticsearch,
    required SolrClient solr,
    required AlgoliaClient algolia,
    required SearchRouter router,
  }) : _elasticsearch = elasticsearch,
       _solr = solr,
       _algolia = algolia,
       _router = router;
  
  Future<void> indexDocument<T>(String index, String id, T document) async {
    final providers = _router.getProvidersForIndex(index);
    
    await Future.wait(providers.map((provider) async {
      switch (provider) {
        case 'elasticsearch':
          await _indexInElasticsearch(index, id, document);
          break;
        case 'solr':
          await _indexInSolr(index, id, document);
          break;
        case 'algolia':
          await _indexInAlgolia(index, id, document);
          break;
      }
    }));
  }
  
  Future<SearchResult<T>> search<T>({
    required String index,
    required String query,
    required T Function(Map<String, dynamic>) deserializer,
    Map<String, dynamic>? filters,
    List<String>? facets,
    int? limit,
    int? offset,
  }) async {
    final provider = _router.getBestProviderForSearch(index, query);
    
    switch (provider) {
      case 'elasticsearch':
        return await _searchElasticsearch(
          index: index,
          query: query,
          deserializer: deserializer,
          filters: filters,
          facets: facets,
          limit: limit,
          offset: offset,
        );
        
      case 'algolia':
        return await _searchAlgolia(
          index: index,
          query: query,
          deserializer: deserializer,
          filters: filters,
          facets: facets,
          limit: limit,
          offset: offset,
        );
        
      default:
        throw SearchException('Unsupported search provider: $provider');
    }
  }
  
  Future<void> _indexInElasticsearch<T>(String index, String id, T document) async {
    await _elasticsearch.index(
      index: index,
      id: id,
      body: (document as dynamic).toJson(),
    );
  }
  
  Future<SearchResult<T>> _searchElasticsearch<T>({
    required String index,
    required String query,
    required T Function(Map<String, dynamic>) deserializer,
    Map<String, dynamic>? filters,
    List<String>? facets,
    int? limit,
    int? offset,
  }) async {
    final searchBody = {
      'query': {
        'bool': {
          'must': [
            {
              'multi_match': {
                'query': query,
                'fields': ['*'],
                'fuzziness': 'AUTO',
              }
            }
          ],
          if (filters != null)
            'filter': filters.entries.map((entry) => {
              'term': {entry.key: entry.value}
            }).toList(),
        }
      },
      if (facets != null)
        'aggs': {
          for (final facet in facets)
            facet: {
              'terms': {'field': '$facet.keyword'}
            }
        },
      'from': offset ?? 0,
      'size': limit ?? 20,
    };
    
    final response = await _elasticsearch.search(
      index: index,
      body: searchBody,
    );
    
    final hits = response['hits']['hits'] as List;
    final results = hits
        .map((hit) => deserializer(hit['_source'] as Map<String, dynamic>))
        .toList();
    
    final aggregations = response['aggregations'] as Map<String, dynamic>?;
    final facetResults = <String, List<FacetValue>>{};
    
    if (aggregations != null) {
      for (final entry in aggregations.entries) {
        final buckets = entry.value['buckets'] as List;
        facetResults[entry.key] = buckets
            .map((bucket) => FacetValue(
              value: bucket['key'] as String,
              count: bucket['doc_count'] as int,
            ))
            .toList();
      }
    }
    
    return SearchResult<T>(
      results: results,
      totalCount: response['hits']['total']['value'] as int,
      facets: facetResults,
    );
  }
}

// Hypermodern integration
class SearchableRepository<T> {
  final Repository<T> _repository;
  final SearchIntegration _search;
  final String _indexName;
  
  SearchableRepository({
    required Repository<T> repository,
    required SearchIntegration search,
    required String indexName,
  }) : _repository = repository,
       _search = search,
       _indexName = indexName;
  
  Future<T> save(T entity) async {
    // Save to primary database
    final savedEntity = await _repository.save(entity);
    
    // Index in search engine
    final id = (savedEntity as dynamic).id.toString();
    await _search.indexDocument(_indexName, id, savedEntity);
    
    return savedEntity;
  }
  
  Future<SearchResult<T>> search({
    required String query,
    required T Function(Map<String, dynamic>) deserializer,
    Map<String, dynamic>? filters,
    int? limit,
    int? offset,
  }) async {
    return await _search.search(
      index: _indexName,
      query: query,
      deserializer: deserializer,
      filters: filters,
      limit: limit,
      offset: offset,
    );
  }
}

Legacy System Migration

Gradual Migration Strategy

class LegacyMigrationManager {
  final LegacySystemAdapter _legacyAdapter;
  final HypermodernServer _hypermodernServer;
  final MigrationConfig _config;
  final DataSynchronizer _dataSynchronizer;
  
  LegacyMigrationManager({
    required LegacySystemAdapter legacyAdapter,
    required HypermodernServer hypermodernServer,
    required MigrationConfig config,
    required DataSynchronizer dataSynchronizer,
  }) : _legacyAdapter = legacyAdapter,
       _hypermodernServer = hypermodernServer,
       _config = config,
       _dataSynchronizer = dataSynchronizer;
  
  Future<void> startMigration() async {
    print('🚀 Starting legacy system migration...');
    
    // Phase 1: Dual-write setup
    await _setupDualWrite();
    
    // Phase 2: Data synchronization
    await _synchronizeHistoricalData();
    
    // Phase 3: Gradual traffic migration
    await _migrateTrafficGradually();
    
    // Phase 4: Legacy system decommission
    await _decommissionLegacySystem();
    
    print('✅ Migration completed successfully');
  }
  
  Future<void> _setupDualWrite() async {
    print('📝 Setting up dual-write pattern...');
    
    // Configure proxy to write to both systems
    _hypermodernServer.middleware.add(DualWriteMiddleware(
      legacyAdapter: _legacyAdapter,
      config: _config,
    ));
    
    // Set up data validation
    _hypermodernServer.middleware.add(DataValidationMiddleware(
      legacyAdapter: _legacyAdapter,
    ));
  }
  
  Future<void> _synchronizeHistoricalData() async {
    print('🔄 Synchronizing historical data...');
    
    final migrationTasks = await _createMigrationTasks();
    
    for (final task in migrationTasks) {
      await _executeMigrationTask(task);
    }
  }
  
  Future<List<MigrationTask>> _createMigrationTasks() async {
    final tasks = <MigrationTask>[];
    
    // Get data volume estimates
    final dataVolumes = await _legacyAdapter.getDataVolumes();
    
    for (final entry in dataVolumes.entries) {
      final tableName = entry.key;
      final recordCount = entry.value;
      
      // Create batched migration tasks
      final batchSize = _config.getBatchSize(tableName);
      final batchCount = (recordCount / batchSize).ceil();
      
      for (int i = 0; i < batchCount; i++) {
        tasks.add(MigrationTask(
          tableName: tableName,
          batchIndex: i,
          batchSize: batchSize,
          offset: i * batchSize,
        ));
      }
    }
    
    return tasks;
  }
  
  Future<void> _executeMigrationTask(MigrationTask task) async {
    try {
      // Extract data from legacy system
      final data = await _legacyAdapter.extractData(
        table: task.tableName,
        limit: task.batchSize,
        offset: task.offset,
      );
      
      // Transform data to Hypermodern format
      final transformedData = await _transformData(task.tableName, data);
      
      // Load data into Hypermodern
      await _loadData(task.tableName, transformedData);
      
      // Validate data consistency
      await _validateDataConsistency(task.tableName, data, transformedData);
      
      print('✅ Migrated batch ${task.batchIndex + 1} of ${task.tableName}');
      
    } catch (e) {
      print('❌ Failed to migrate batch ${task.batchIndex + 1} of ${task.tableName}: $e');
      
      // Record failure for retry
      await _recordMigrationFailure(task, e);
    }
  }
  
  Future<List<Map<String, dynamic>>> _transformData(
    String tableName,
    List<Map<String, dynamic>> legacyData,
  ) async {
    final transformer = _config.getTransformer(tableName);
    return await transformer.transform(legacyData);
  }
  
  Future<void> _migrateTrafficGradually() async {
    print('🔀 Starting gradual traffic migration...');
    
    final migrationSteps = [10, 25, 50, 75, 90, 100]; // Percentage of traffic
    
    for (final percentage in migrationSteps) {
      print('📊 Migrating $percentage% of traffic to Hypermodern...');
      
      // Update traffic routing
      await _updateTrafficRouting(percentage);
      
      // Monitor for issues
      await _monitorMigrationHealth(Duration(minutes: 30));
      
      // Validate data consistency
      await _validateSystemConsistency();
      
      print('✅ Successfully migrated $percentage% of traffic');
      
      // Wait before next step
      if (percentage < 100) {
        await Future.delayed(Duration(hours: 2));
      }
    }
  }
  
  Future<void> _updateTrafficRouting(int percentage) async {
    // Update load balancer or API gateway configuration
    final routingConfig = TrafficRoutingConfig(
      hypermodernPercentage: percentage,
      legacyPercentage: 100 - percentage,
    );
    
    await _config.trafficRouter.updateRouting(routingConfig);
  }
  
  Future<void> _monitorMigrationHealth(Duration duration) async {
    final endTime = DateTime.now().add(duration);
    
    while (DateTime.now().isBefore(endTime)) {
      final healthMetrics = await _collectHealthMetrics();
      
      if (!healthMetrics.isHealthy) {
        throw MigrationException('Health check failed: ${healthMetrics.issues}');
      }
      
      await Future.delayed(Duration(minutes: 1));
    }
  }
  
  Future<void> _decommissionLegacySystem() async {
    print('🗑️ Decommissioning legacy system...');
    
    // Stop dual-write
    _hypermodernServer.middleware.removeWhere((m) => m is DualWriteMiddleware);
    
    // Archive legacy data
    await _archiveLegacyData();
    
    // Update documentation
    await _updateSystemDocumentation();
    
    print('✅ Legacy system decommissioned');
  }
}

class DualWriteMiddleware implements Middleware {
  final LegacySystemAdapter _legacyAdapter;
  final MigrationConfig _config;
  
  DualWriteMiddleware({
    required LegacySystemAdapter legacyAdapter,
    required MigrationConfig config,
  }) : _legacyAdapter = legacyAdapter,
       _config = config;
  
  @override
  Future<dynamic> handle(
    dynamic request,
    Future<dynamic> Function(dynamic) next,
  ) async {
    // Execute in Hypermodern first
    final hypermodernResponse = await next(request);
    
    // Write to legacy system if it's a write operation
    if (_isWriteOperation(request)) {
      try {
        await _writeToLegacySystem(request, hypermodernResponse);
      } catch (e) {
        // Log error but don't fail the request
        print('Legacy write failed: $e');
        
        // Record for later reconciliation
        await _recordLegacyWriteFailure(request, e);
      }
    }
    
    return hypermodernResponse;
  }
  
  bool _isWriteOperation(dynamic request) {
    final writeOperations = ['create', 'update', 'delete'];
    return writeOperations.any((op) => request.endpoint.contains(op));
  }
  
  Future<void> _writeToLegacySystem(dynamic request, dynamic response) async {
    final legacyRequest = await _transformToLegacyFormat(request, response);
    await _legacyAdapter.executeRequest(legacyRequest);
  }
}

Data Transformation Patterns

class DataTransformationEngine {
  final Map<String, DataTransformer> _transformers = {};
  
  void registerTransformer(String entityType, DataTransformer transformer) {
    _transformers[entityType] = transformer;
  }
  
  Future<List<Map<String, dynamic>>> transform(
    String entityType,
    List<Map<String, dynamic>> legacyData,
  ) async {
    final transformer = _transformers[entityType];
    if (transformer == null) {
      throw TransformationException('No transformer found for $entityType');
    }
    
    final transformedData = <Map<String, dynamic>>[];
    
    for (final record in legacyData) {
      try {
        final transformed = await transformer.transform(record);
        transformedData.add(transformed);
      } catch (e) {
        print('Failed to transform record: $e');
        // Record transformation failure
        await _recordTransformationFailure(entityType, record, e);
      }
    }
    
    return transformedData;
  }
}

class UserDataTransformer implements DataTransformer {
  @override
  Future<Map<String, dynamic>> transform(Map<String, dynamic> legacyRecord) async {
    return {
      'id': legacyRecord['user_id'],
      'username': legacyRecord['login_name'],
      'email': legacyRecord['email_address'],
      'first_name': legacyRecord['fname'],
      'last_name': legacyRecord['lname'],
      'created_at': _parseDate(legacyRecord['created_date']),
      'updated_at': _parseDate(legacyRecord['modified_date']),
      'status': _mapStatus(legacyRecord['user_status']),
      'profile': await _transformProfile(legacyRecord),
      'preferences': await _transformPreferences(legacyRecord),
    };
  }
  
  DateTime _parseDate(dynamic dateValue) {
    if (dateValue is String) {
      return DateTime.parse(dateValue);
    } else if (dateValue is int) {
      return DateTime.fromMillisecondsSinceEpoch(dateValue * 1000);
    }
    throw ArgumentError('Invalid date format: $dateValue');
  }
  
  String _mapStatus(dynamic statusValue) {
    final statusMap = {
      'A': 'active',
      'I': 'inactive',
      'S': 'suspended',
      'D': 'deleted',
    };
    
    return statusMap[statusValue] ?? 'unknown';
  }
  
  Future<Map<String, dynamic>?> _transformProfile(Map<String, dynamic> record) async {
    if (record['profile_data'] == null) return null;
    
    final profileData = jsonDecode(record['profile_data']) as Map<String, dynamic>;
    
    return {
      'bio': profileData['biography'],
      'avatar_url': profileData['avatar_image_url'],
      'location': profileData['location'],
      'website': profileData['website_url'],
    };
  }
  
  Future<Map<String, dynamic>> _transformPreferences(Map<String, dynamic> record) async {
    return {
      'theme': record['ui_theme'] ?? 'light',
      'language': record['preferred_language'] ?? 'en',
      'notifications': {
        'email': record['email_notifications'] == 1,
        'push': record['push_notifications'] == 1,
        'sms': record['sms_notifications'] == 1,
      },
      'privacy': {
        'profile_public': record['public_profile'] == 1,
        'show_email': record['show_email'] == 1,
      },
    };
  }
}

Hybrid Architectures

Microservices Integration

class MicroservicesOrchestrator {
  final ServiceRegistry _serviceRegistry;
  final MessageBus _messageBus;
  final CircuitBreakerManager _circuitBreakers;
  final LoadBalancer _loadBalancer;
  
  MicroservicesOrchestrator({
    required ServiceRegistry serviceRegistry,
    required MessageBus messageBus,
    required CircuitBreakerManager circuitBreakers,
    required LoadBalancer loadBalancer,
  }) : _serviceRegistry = serviceRegistry,
       _messageBus = messageBus,
       _circuitBreakers = circuitBreakers,
       _loadBalancer = loadBalancer;
  
  Future<T> callService<T>({
    required String serviceName,
    required String operation,
    required dynamic request,
    Duration? timeout,
    RetryPolicy? retryPolicy,
  }) async {
    final circuitBreaker = _circuitBreakers.getCircuitBreaker(serviceName);
    
    return await circuitBreaker.execute(() async {
      final serviceInstance = await _selectServiceInstance(serviceName);
      final client = await _getServiceClient(serviceInstance);
      
      return await _executeWithRetry(
        () => client.request<T>(operation, request),
        retryPolicy ?? RetryPolicy.defaultPolicy,
        timeout ?? Duration(seconds: 30),
      );
    });
  }
  
  Future<void> publishEvent(DomainEvent event) async {
    await _messageBus.publish(
      topic: event.eventType,
      message: event,
      headers: {
        'event_id': event.id,
        'event_version': event.version.toString(),
        'correlation_id': event.correlationId,
        'causation_id': event.causationId,
      },
    );
  }
  
  Stream<T> subscribeToEvents<T>(
    String eventType,
    T Function(Map<String, dynamic>) deserializer,
  ) {
    return _messageBus.subscribe(eventType, deserializer);
  }
  
  Future<ServiceInstance> _selectServiceInstance(String serviceName) async {
    final instances = await _serviceRegistry.getHealthyInstances(serviceName);
    
    if (instances.isEmpty) {
      throw ServiceUnavailableException('No healthy instances of $serviceName');
    }
    
    return _loadBalancer.selectInstance(instances);
  }
  
  Future<T> _executeWithRetry<T>(
    Future<T> Function() operation,
    RetryPolicy retryPolicy,
    Duration timeout,
  ) async {
    int attempt = 0;
    Duration delay = retryPolicy.initialDelay;
    
    while (attempt < retryPolicy.maxAttempts) {
      try {
        return await operation().timeout(timeout);
      } catch (e) {
        attempt++;
        
        if (attempt >= retryPolicy.maxAttempts || !retryPolicy.shouldRetry(e)) {
          rethrow;
        }
        
        await Future.delayed(delay);
        delay = Duration(
          milliseconds: (delay.inMilliseconds * retryPolicy.backoffMultiplier).round(),
        );
        
        if (delay > retryPolicy.maxDelay) {
          delay = retryPolicy.maxDelay;
        }
      }
    }
    
    throw StateError('This should never be reached');
  }
}

// Saga pattern implementation for distributed transactions
class SagaOrchestrator {
  final MicroservicesOrchestrator _orchestrator;
  final SagaRepository _sagaRepository;
  
  SagaOrchestrator({
    required MicroservicesOrchestrator orchestrator,
    required SagaRepository sagaRepository,
  }) : _orchestrator = orchestrator,
       _sagaRepository = sagaRepository;
  
  Future<SagaResult> executeSaga(Saga saga) async {
    final sagaInstance = SagaInstance(
      id: _generateSagaId(),
      sagaType: saga.runtimeType.toString(),
      status: SagaStatus.started,
      currentStep: 0,
      data: saga.initialData,
      createdAt: DateTime.now(),
    );
    
    await _sagaRepository.save(sagaInstance);
    
    try {
      for (int i = 0; i < saga.steps.length; i++) {
        final step = saga.steps[i];
        
        // Update saga state
        sagaInstance.currentStep = i;
        sagaInstance.status = SagaStatus.executing;
        await _sagaRepository.save(sagaInstance);
        
        // Execute step
        final stepResult = await _executeStep(step, sagaInstance.data);
        
        // Update saga data with step result
        sagaInstance.data.addAll(stepResult);
      }
      
      // Saga completed successfully
      sagaInstance.status = SagaStatus.completed;
      await _sagaRepository.save(sagaInstance);
      
      return SagaResult.success(sagaInstance.data);
      
    } catch (e) {
      // Saga failed, start compensation
      sagaInstance.status = SagaStatus.compensating;
      await _sagaRepository.save(sagaInstance);
      
      await _compensateSaga(saga, sagaInstance);
      
      sagaInstance.status = SagaStatus.compensated;
      await _sagaRepository.save(sagaInstance);
      
      return SagaResult.failure(e.toString());
    }
  }
  
  Future<Map<String, dynamic>> _executeStep(
    SagaStep step,
    Map<String, dynamic> sagaData,
  ) async {
    return await _orchestrator.callService<Map<String, dynamic>>(
      serviceName: step.serviceName,
      operation: step.operation,
      request: step.buildRequest(sagaData),
      timeout: step.timeout,
    );
  }
  
  Future<void> _compensateSaga(Saga saga, SagaInstance sagaInstance) async {
    // Execute compensation steps in reverse order
    for (int i = sagaInstance.currentStep; i >= 0; i--) {
      final step = saga.steps[i];
      
      if (step.compensationOperation != null) {
        try {
          await _orchestrator.callService(
            serviceName: step.serviceName,
            operation: step.compensationOperation!,
            request: step.buildCompensationRequest(sagaInstance.data),
          );
        } catch (e) {
          print('Compensation failed for step $i: $e');
          // Log compensation failure but continue
        }
      }
    }
  }
}

// Example saga for order processing
class OrderProcessingSaga extends Saga {
  @override
  List<SagaStep> get steps => [
    SagaStep(
      serviceName: 'inventory-service',
      operation: 'reserve_items',
      compensationOperation: 'release_items',
      timeout: Duration(seconds: 10),
    ),
    SagaStep(
      serviceName: 'payment-service',
      operation: 'charge_payment',
      compensationOperation: 'refund_payment',
      timeout: Duration(seconds: 30),
    ),
    SagaStep(
      serviceName: 'shipping-service',
      operation: 'create_shipment',
      compensationOperation: 'cancel_shipment',
      timeout: Duration(seconds: 15),
    ),
    SagaStep(
      serviceName: 'notification-service',
      operation: 'send_confirmation',
      timeout: Duration(seconds: 5),
    ),
  ];
  
  @override
  Map<String, dynamic> get initialData => {
    'order_id': orderId,
    'customer_id': customerId,
    'items': items,
    'total_amount': totalAmount,
  };
  
  final String orderId;
  final String customerId;
  final List<OrderItem> items;
  final double totalAmount;
  
  OrderProcessingSaga({
    required this.orderId,
    required this.customerId,
    required this.items,
    required this.totalAmount,
  });
}

Event-Driven Architecture

class EventDrivenArchitecture {
  final EventStore _eventStore;
  final EventBus _eventBus;
  final ProjectionManager _projectionManager;
  final SnapshotStore _snapshotStore;
  
  EventDrivenArchitecture({
    required EventStore eventStore,
    required EventBus eventBus,
    required ProjectionManager projectionManager,
    required SnapshotStore snapshotStore,
  }) : _eventStore = eventStore,
       _eventBus = eventBus,
       _projectionManager = projectionManager,
       _snapshotStore = snapshotStore;
  
  Future<void> saveAggregate<T extends AggregateRoot>(T aggregate) async {
    final uncommittedEvents = aggregate.getUncommittedEvents();
    
    if (uncommittedEvents.isEmpty) return;
    
    // Save events to event store
    await _eventStore.saveEvents(
      aggregateId: aggregate.id,
      aggregateType: T.toString(),
      expectedVersion: aggregate.version - uncommittedEvents.length,
      events: uncommittedEvents,
    );
    
    // Publish events to event bus
    for (final event in uncommittedEvents) {
      await _eventBus.publish(event);
    }
    
    // Create snapshot if needed
    if (_shouldCreateSnapshot(aggregate)) {
      await _snapshotStore.saveSnapshot(aggregate);
    }
    
    // Mark events as committed
    aggregate.markEventsAsCommitted();
  }
  
  Future<T?> loadAggregate<T extends AggregateRoot>(
    String aggregateId,
    T Function() factory,
  ) async {
    // Try to load from snapshot first
    final snapshot = await _snapshotStore.getSnapshot<T>(aggregateId);
    
    T? aggregate;
    int fromVersion = 0;
    
    if (snapshot != null) {
      aggregate = snapshot.aggregate;
      fromVersion = snapshot.version;
    }
    
    // Load events since snapshot
    final events = await _eventStore.getEvents(
      aggregateId: aggregateId,
      fromVersion: fromVersion,
    );
    
    if (events.isEmpty && aggregate == null) {
      return null; // Aggregate doesn't exist
    }
    
    // Create aggregate if no snapshot
    aggregate ??= factory();
    
    // Apply events
    for (final event in events) {
      aggregate!.applyEvent(event, isNew: false);
    }
    
    return aggregate;
  }
  
  bool _shouldCreateSnapshot<T extends AggregateRoot>(T aggregate) {
    // Create snapshot every 100 events
    return aggregate.version % 100 == 0;
  }
}

class EventStore {
  final Database _database;
  
  EventStore(this._database);
  
  Future<void> saveEvents({
    required String aggregateId,
    required String aggregateType,
    required int expectedVersion,
    required List<DomainEvent> events,
  }) async {
    await _database.transaction((tx) async {
      // Check for concurrency conflicts
      final currentVersion = await _getCurrentVersion(tx, aggregateId);
      
      if (currentVersion != expectedVersion) {
        throw ConcurrencyException(
          'Expected version $expectedVersion, but current version is $currentVersion',
        );
      }
      
      // Save events
      for (int i = 0; i < events.length; i++) {
        final event = events[i];
        final version = expectedVersion + i + 1;
        
        await tx.query('''
          INSERT INTO event_store (
            aggregate_id, aggregate_type, version, event_type, 
            event_data, metadata, created_at
          ) VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', [
          aggregateId,
          aggregateType,
          version,
          event.eventType,
          jsonEncode(event.toJson()),
          jsonEncode(event.metadata),
          event.occurredAt,
        ]);
      }
    });
  }
  
  Future<List<DomainEvent>> getEvents({
    required String aggregateId,
    int fromVersion = 0,
    int? toVersion,
  }) async {
    final query = '''
      SELECT event_type, event_data, metadata, created_at, version
      FROM event_store
      WHERE aggregate_id = ? AND version > ?
      ${toVersion != null ? 'AND version <= ?' : ''}
      ORDER BY version
    ''';
    
    final params = [aggregateId, fromVersion];
    if (toVersion != null) params.add(toVersion);
    
    final result = await _database.query(query, params);
    
    return result.map((row) {
      final eventData = jsonDecode(row['event_data']) as Map<String, dynamic>;
      final metadata = jsonDecode(row['metadata']) as Map<String, dynamic>;
      
      return DomainEvent.fromJson({
        'event_type': row['event_type'],
        'occurred_at': row['created_at'],
        'version': row['version'],
        'metadata': metadata,
        ...eventData,
      });
    }).toList();
  }
  
  Future<int> _getCurrentVersion(Transaction tx, String aggregateId) async {
    final result = await tx.query(
      'SELECT COALESCE(MAX(version), 0) as version FROM event_store WHERE aggregate_id = ?',
      [aggregateId],
    );
    
    return result.first['version'] as int;
  }
}

Conclusion

You now have comprehensive knowledge of integration patterns for Hypermodern applications. These patterns enable you to:

  • Integrate with multiple databases using unified interfaces while maintaining data consistency
  • Connect to third-party services through standardized adapters and middleware
  • Migrate from legacy systems using proven patterns like dual-write and gradual traffic migration
  • Build hybrid architectures that combine Hypermodern with microservices and event-driven patterns

The integration patterns covered in this chapter provide the foundation for building complex, real-world systems that can evolve and scale while maintaining reliability and performance. Whether you're modernizing existing systems or building new distributed architectures, these patterns will help you leverage Hypermodern's multi-protocol capabilities effectively.

This concludes our comprehensive journey through the Hypermodern platform. You now have the knowledge and tools to build sophisticated, scalable applications that can communicate across multiple protocols, integrate with diverse systems, and evolve with your business needs.