Integration Patterns
Database Integrations
Hypermodern's flexible architecture allows seamless integration with various database systems, from traditional SQL databases to modern NoSQL solutions and specialized data stores.
Multi-Database Architecture
class MultiDatabaseManager {
final Map<String, DatabaseProvider> _providers = {};
final DatabaseRouter _router;
MultiDatabaseManager({required DatabaseRouter router}) : _router = router;
void registerProvider(String name, DatabaseProvider provider) {
_providers[name] = provider;
}
Future<T> execute<T>(DatabaseOperation<T> operation) async {
final providerName = _router.routeOperation(operation);
final provider = _providers[providerName];
if (provider == null) {
throw DatabaseException('Provider not found: $providerName');
}
return await provider.execute(operation);
}
Future<void> transaction(Future<void> Function(TransactionContext) callback) async {
final involvedProviders = _router.getInvolvedProviders(callback);
if (involvedProviders.length == 1) {
// Single database transaction
final provider = _providers[involvedProviders.first]!;
await provider.transaction(callback);
} else {
// Distributed transaction using 2PC
await _executeDistributedTransaction(involvedProviders, callback);
}
}
Future<void> _executeDistributedTransaction(
List<String> providers,
Future<void> Function(TransactionContext) callback,
) async {
final transactions = <String, Transaction>{};
try {
// Phase 1: Prepare
for (final providerName in providers) {
final provider = _providers[providerName]!;
final transaction = await provider.beginTransaction();
transactions[providerName] = transaction;
}
// Execute operations
final context = DistributedTransactionContext(transactions);
await callback(context);
// Phase 2: Commit
for (final transaction in transactions.values) {
await transaction.commit();
}
} catch (e) {
// Rollback all transactions
for (final transaction in transactions.values) {
try {
await transaction.rollback();
} catch (rollbackError) {
print('Rollback error: $rollbackError');
}
}
rethrow;
}
}
}
class DatabaseRouter {
final Map<Type, String> _modelProviders = {};
final Map<String, String> _operationProviders = {};
void routeModel<T>(String providerName) {
_modelProviders[T] = providerName;
}
void routeOperation(String operationType, String providerName) {
_operationProviders[operationType] = providerName;
}
String routeOperation<T>(DatabaseOperation<T> operation) {
// Route based on model type
if (operation.modelType != null) {
final provider = _modelProviders[operation.modelType];
if (provider != null) return provider;
}
// Route based on operation type
final provider = _operationProviders[operation.runtimeType.toString()];
if (provider != null) return provider;
// Default provider
return 'default';
}
List<String> getInvolvedProviders(Future<void> Function(TransactionContext) callback) {
// Analyze callback to determine which providers will be used
// This is a simplified implementation
return ['default'];
}
}
// Example usage
class UserService {
final MultiDatabaseManager _dbManager;
UserService(this._dbManager);
Future<User> createUser(CreateUserRequest request) async {
return await _dbManager.transaction((tx) async {
// User data goes to PostgreSQL
final user = await tx.execute(CreateUserOperation(
username: request.username,
email: request.email,
));
// User preferences go to MongoDB
await tx.execute(CreateUserPreferencesOperation(
userId: user.id,
preferences: request.preferences,
));
// Activity log goes to ClickHouse
await tx.execute(LogUserActivityOperation(
userId: user.id,
action: 'user_created',
timestamp: DateTime.now(),
));
return user;
});
}
}
Time Series Database Integration
class TimeSeriesIntegration {
final InfluxDBClient _influxDB;
final PrometheusClient _prometheus;
final ClickHouseClient _clickHouse;
TimeSeriesIntegration({
required InfluxDBClient influxDB,
required PrometheusClient prometheus,
required ClickHouseClient clickHouse,
}) : _influxDB = influxDB,
_prometheus = prometheus,
_clickHouse = clickHouse;
Future<void> recordMetrics(List<MetricPoint> metrics) async {
// Write to multiple time series databases
await Future.wait([
_writeToInfluxDB(metrics),
_writeToPrometheus(metrics),
_writeToClickHouse(metrics),
]);
}
Future<void> _writeToInfluxDB(List<MetricPoint> metrics) async {
final points = metrics.map((metric) => Point(
measurement: metric.name,
tags: metric.tags,
fields: {'value': metric.value},
timestamp: metric.timestamp,
)).toList();
await _influxDB.writePoints(points);
}
Future<void> _writeToPrometheus(List<MetricPoint> metrics) async {
for (final metric in metrics) {
final prometheusMetric = _prometheus.getMetric(metric.name);
prometheusMetric.set(metric.value, labels: metric.tags);
}
}
Future<void> _writeToClickHouse(List<MetricPoint> metrics) async {
final rows = metrics.map((metric) => [
metric.timestamp.millisecondsSinceEpoch,
metric.name,
metric.value,
jsonEncode(metric.tags),
]).toList();
await _clickHouse.insert('metrics', rows);
}
Future<List<MetricPoint>> queryMetrics({
required String metricName,
required DateTime startTime,
required DateTime endTime,
Map<String, String>? tags,
}) async {
// Query from the most appropriate database
if (_isHighFrequencyMetric(metricName)) {
return await _queryFromInfluxDB(metricName, startTime, endTime, tags);
} else if (_isAggregatedMetric(metricName)) {
return await _queryFromClickHouse(metricName, startTime, endTime, tags);
} else {
return await _queryFromPrometheus(metricName, startTime, endTime, tags);
}
}
Future<List<MetricPoint>> _queryFromInfluxDB(
String metricName,
DateTime startTime,
DateTime endTime,
Map<String, String>? tags,
) async {
final query = '''
SELECT value FROM $metricName
WHERE time >= '${startTime.toIso8601String()}'
AND time <= '${endTime.toIso8601String()}'
${_buildTagFilter(tags)}
ORDER BY time
''';
final result = await _influxDB.query(query);
return _parseInfluxDBResult(result);
}
String _buildTagFilter(Map<String, String>? tags) {
if (tags == null || tags.isEmpty) return '';
final filters = tags.entries
.map((entry) => "${entry.key} = '${entry.value}'")
.join(' AND ');
return 'AND $filters';
}
}
Graph Database Integration
class GraphDatabaseIntegration {
final Neo4jClient _neo4j;
final ArangoDBClient _arangoDB;
GraphDatabaseIntegration({
required Neo4jClient neo4j,
required ArangoDBClient arangoDB,
}) : _neo4j = neo4j,
_arangoDB = arangoDB;
Future<void> createUserRelationship({
required int userId1,
required int userId2,
required RelationshipType type,
Map<String, dynamic>? properties,
}) async {
// Create relationship in Neo4j
await _neo4j.run('''
MATCH (u1:User {id: \$userId1}), (u2:User {id: \$userId2})
CREATE (u1)-[r:${type.name.toUpperCase()} \$properties]->(u2)
RETURN r
''', {
'userId1': userId1,
'userId2': userId2,
'properties': properties ?? {},
});
// Also store in ArangoDB for different query patterns
await _arangoDB.collection('relationships').save({
'_from': 'users/$userId1',
'_to': 'users/$userId2',
'type': type.name,
'properties': properties ?? {},
'created_at': DateTime.now().toIso8601String(),
});
}
Future<List<User>> findFriendsOfFriends(int userId, {int maxDepth = 2}) async {
// Use Neo4j for complex graph traversal
final result = await _neo4j.run('''
MATCH (u:User {id: \$userId})-[:FRIEND*1..$maxDepth]-(friend:User)
WHERE friend.id <> \$userId
RETURN DISTINCT friend
LIMIT 100
''', {'userId': userId});
return result.records
.map((record) => User.fromJson(record['friend'].properties))
.toList();
}
Future<List<RecommendedConnection>> getConnectionRecommendations(int userId) async {
// Complex recommendation query using graph algorithms
final result = await _neo4j.run('''
MATCH (u:User {id: \$userId})-[:FRIEND]-(friend)-[:FRIEND]-(recommendation)
WHERE NOT (u)-[:FRIEND]-(recommendation) AND recommendation.id <> \$userId
WITH recommendation, COUNT(*) as mutualFriends
MATCH (recommendation)-[:WORKS_AT]->(company)<-[:WORKS_AT]-(u)
WITH recommendation, mutualFriends, COUNT(company) as sharedCompanies
RETURN recommendation, mutualFriends, sharedCompanies
ORDER BY mutualFriends DESC, sharedCompanies DESC
LIMIT 10
''', {'userId': userId});
return result.records.map((record) => RecommendedConnection(
user: User.fromJson(record['recommendation'].properties),
mutualFriends: record['mutualFriends'].asInt(),
sharedCompanies: record['sharedCompanies'].asInt(),
)).toList();
}
}
Third-Party Service Integration
Message Queue Integration
class MessageQueueIntegration {
final Map<String, MessageQueueProvider> _providers = {};
final MessageRouter _router;
MessageQueueIntegration({required MessageRouter router}) : _router = router;
void registerProvider(String name, MessageQueueProvider provider) {
_providers[name] = provider;
}
Future<void> publishMessage<T>(String topic, T message, {
Map<String, String>? headers,
Duration? delay,
}) async {
final providerName = _router.getProviderForTopic(topic);
final provider = _providers[providerName];
if (provider == null) {
throw MessageQueueException('Provider not found: $providerName');
}
await provider.publish(topic, message, headers: headers, delay: delay);
}
Stream<T> subscribe<T>(String topic, T Function(Map<String, dynamic>) deserializer) {
final providerName = _router.getProviderForTopic(topic);
final provider = _providers[providerName];
if (provider == null) {
throw MessageQueueException('Provider not found: $providerName');
}
return provider.subscribe(topic, deserializer);
}
}
class RabbitMQProvider implements MessageQueueProvider {
final RabbitMQClient _client;
RabbitMQProvider(this._client);
@override
Future<void> publish<T>(String topic, T message, {
Map<String, String>? headers,
Duration? delay,
}) async {
final exchange = _getExchangeForTopic(topic);
final routingKey = _getRoutingKeyForTopic(topic);
final messageBody = jsonEncode((message as dynamic).toJson());
final messageHeaders = headers ?? {};
if (delay != null) {
messageHeaders['x-delay'] = delay.inMilliseconds.toString();
}
await _client.publish(
exchange: exchange,
routingKey: routingKey,
body: messageBody,
headers: messageHeaders,
);
}
@override
Stream<T> subscribe<T>(String topic, T Function(Map<String, dynamic>) deserializer) async* {
final queue = _getQueueForTopic(topic);
await for (final message in _client.consume(queue)) {
try {
final data = jsonDecode(message.body) as Map<String, dynamic>;
yield deserializer(data);
// Acknowledge message
await message.ack();
} catch (e) {
print('Error processing message: $e');
// Reject message and requeue
await message.nack(requeue: true);
}
}
}
}
class KafkaProvider implements MessageQueueProvider {
final KafkaClient _client;
KafkaProvider(this._client);
@override
Future<void> publish<T>(String topic, T message, {
Map<String, String>? headers,
Duration? delay,
}) async {
final producer = _client.producer();
final record = ProducerRecord(
topic: topic,
value: jsonEncode((message as dynamic).toJson()),
headers: headers,
timestamp: delay != null ? DateTime.now().add(delay) : null,
);
await producer.send(record);
}
@override
Stream<T> subscribe<T>(String topic, T Function(Map<String, dynamic>) deserializer) async* {
final consumer = _client.consumer(groupId: 'hypermodern-${topic}');
await consumer.subscribe([topic]);
await for (final record in consumer.stream) {
try {
final data = jsonDecode(record.value) as Map<String, dynamic>;
yield deserializer(data);
// Commit offset
await consumer.commitSync();
} catch (e) {
print('Error processing message: $e');
// Handle error (dead letter queue, retry, etc.)
}
}
}
}
// Integration with Hypermodern server
class MessageQueueMiddleware implements Middleware {
final MessageQueueIntegration _messageQueue;
MessageQueueMiddleware(this._messageQueue);
@override
Future<dynamic> handle(
dynamic request,
Future<dynamic> Function(dynamic) next,
) async {
final response = await next(request);
// Publish domain events
if (response is DomainEventContainer) {
for (final event in response.events) {
await _messageQueue.publishMessage(
event.eventType,
event,
headers: {
'event_id': event.id,
'event_version': event.version.toString(),
'correlation_id': request.correlationId,
},
);
}
}
return response;
}
}
Search Engine Integration
class SearchIntegration {
final ElasticsearchClient _elasticsearch;
final SolrClient _solr;
final AlgoliaClient _algolia;
final SearchRouter _router;
SearchIntegration({
required ElasticsearchClient elasticsearch,
required SolrClient solr,
required AlgoliaClient algolia,
required SearchRouter router,
}) : _elasticsearch = elasticsearch,
_solr = solr,
_algolia = algolia,
_router = router;
Future<void> indexDocument<T>(String index, String id, T document) async {
final providers = _router.getProvidersForIndex(index);
await Future.wait(providers.map((provider) async {
switch (provider) {
case 'elasticsearch':
await _indexInElasticsearch(index, id, document);
break;
case 'solr':
await _indexInSolr(index, id, document);
break;
case 'algolia':
await _indexInAlgolia(index, id, document);
break;
}
}));
}
Future<SearchResult<T>> search<T>({
required String index,
required String query,
required T Function(Map<String, dynamic>) deserializer,
Map<String, dynamic>? filters,
List<String>? facets,
int? limit,
int? offset,
}) async {
final provider = _router.getBestProviderForSearch(index, query);
switch (provider) {
case 'elasticsearch':
return await _searchElasticsearch(
index: index,
query: query,
deserializer: deserializer,
filters: filters,
facets: facets,
limit: limit,
offset: offset,
);
case 'algolia':
return await _searchAlgolia(
index: index,
query: query,
deserializer: deserializer,
filters: filters,
facets: facets,
limit: limit,
offset: offset,
);
default:
throw SearchException('Unsupported search provider: $provider');
}
}
Future<void> _indexInElasticsearch<T>(String index, String id, T document) async {
await _elasticsearch.index(
index: index,
id: id,
body: (document as dynamic).toJson(),
);
}
Future<SearchResult<T>> _searchElasticsearch<T>({
required String index,
required String query,
required T Function(Map<String, dynamic>) deserializer,
Map<String, dynamic>? filters,
List<String>? facets,
int? limit,
int? offset,
}) async {
final searchBody = {
'query': {
'bool': {
'must': [
{
'multi_match': {
'query': query,
'fields': ['*'],
'fuzziness': 'AUTO',
}
}
],
if (filters != null)
'filter': filters.entries.map((entry) => {
'term': {entry.key: entry.value}
}).toList(),
}
},
if (facets != null)
'aggs': {
for (final facet in facets)
facet: {
'terms': {'field': '$facet.keyword'}
}
},
'from': offset ?? 0,
'size': limit ?? 20,
};
final response = await _elasticsearch.search(
index: index,
body: searchBody,
);
final hits = response['hits']['hits'] as List;
final results = hits
.map((hit) => deserializer(hit['_source'] as Map<String, dynamic>))
.toList();
final aggregations = response['aggregations'] as Map<String, dynamic>?;
final facetResults = <String, List<FacetValue>>{};
if (aggregations != null) {
for (final entry in aggregations.entries) {
final buckets = entry.value['buckets'] as List;
facetResults[entry.key] = buckets
.map((bucket) => FacetValue(
value: bucket['key'] as String,
count: bucket['doc_count'] as int,
))
.toList();
}
}
return SearchResult<T>(
results: results,
totalCount: response['hits']['total']['value'] as int,
facets: facetResults,
);
}
}
// Hypermodern integration
class SearchableRepository<T> {
final Repository<T> _repository;
final SearchIntegration _search;
final String _indexName;
SearchableRepository({
required Repository<T> repository,
required SearchIntegration search,
required String indexName,
}) : _repository = repository,
_search = search,
_indexName = indexName;
Future<T> save(T entity) async {
// Save to primary database
final savedEntity = await _repository.save(entity);
// Index in search engine
final id = (savedEntity as dynamic).id.toString();
await _search.indexDocument(_indexName, id, savedEntity);
return savedEntity;
}
Future<SearchResult<T>> search({
required String query,
required T Function(Map<String, dynamic>) deserializer,
Map<String, dynamic>? filters,
int? limit,
int? offset,
}) async {
return await _search.search(
index: _indexName,
query: query,
deserializer: deserializer,
filters: filters,
limit: limit,
offset: offset,
);
}
}
Legacy System Migration
Gradual Migration Strategy
class LegacyMigrationManager {
final LegacySystemAdapter _legacyAdapter;
final HypermodernServer _hypermodernServer;
final MigrationConfig _config;
final DataSynchronizer _dataSynchronizer;
LegacyMigrationManager({
required LegacySystemAdapter legacyAdapter,
required HypermodernServer hypermodernServer,
required MigrationConfig config,
required DataSynchronizer dataSynchronizer,
}) : _legacyAdapter = legacyAdapter,
_hypermodernServer = hypermodernServer,
_config = config,
_dataSynchronizer = dataSynchronizer;
Future<void> startMigration() async {
print('🚀 Starting legacy system migration...');
// Phase 1: Dual-write setup
await _setupDualWrite();
// Phase 2: Data synchronization
await _synchronizeHistoricalData();
// Phase 3: Gradual traffic migration
await _migrateTrafficGradually();
// Phase 4: Legacy system decommission
await _decommissionLegacySystem();
print('✅ Migration completed successfully');
}
Future<void> _setupDualWrite() async {
print('📝 Setting up dual-write pattern...');
// Configure proxy to write to both systems
_hypermodernServer.middleware.add(DualWriteMiddleware(
legacyAdapter: _legacyAdapter,
config: _config,
));
// Set up data validation
_hypermodernServer.middleware.add(DataValidationMiddleware(
legacyAdapter: _legacyAdapter,
));
}
Future<void> _synchronizeHistoricalData() async {
print('🔄 Synchronizing historical data...');
final migrationTasks = await _createMigrationTasks();
for (final task in migrationTasks) {
await _executeMigrationTask(task);
}
}
Future<List<MigrationTask>> _createMigrationTasks() async {
final tasks = <MigrationTask>[];
// Get data volume estimates
final dataVolumes = await _legacyAdapter.getDataVolumes();
for (final entry in dataVolumes.entries) {
final tableName = entry.key;
final recordCount = entry.value;
// Create batched migration tasks
final batchSize = _config.getBatchSize(tableName);
final batchCount = (recordCount / batchSize).ceil();
for (int i = 0; i < batchCount; i++) {
tasks.add(MigrationTask(
tableName: tableName,
batchIndex: i,
batchSize: batchSize,
offset: i * batchSize,
));
}
}
return tasks;
}
Future<void> _executeMigrationTask(MigrationTask task) async {
try {
// Extract data from legacy system
final data = await _legacyAdapter.extractData(
table: task.tableName,
limit: task.batchSize,
offset: task.offset,
);
// Transform data to Hypermodern format
final transformedData = await _transformData(task.tableName, data);
// Load data into Hypermodern
await _loadData(task.tableName, transformedData);
// Validate data consistency
await _validateDataConsistency(task.tableName, data, transformedData);
print('✅ Migrated batch ${task.batchIndex + 1} of ${task.tableName}');
} catch (e) {
print('❌ Failed to migrate batch ${task.batchIndex + 1} of ${task.tableName}: $e');
// Record failure for retry
await _recordMigrationFailure(task, e);
}
}
Future<List<Map<String, dynamic>>> _transformData(
String tableName,
List<Map<String, dynamic>> legacyData,
) async {
final transformer = _config.getTransformer(tableName);
return await transformer.transform(legacyData);
}
Future<void> _migrateTrafficGradually() async {
print('🔀 Starting gradual traffic migration...');
final migrationSteps = [10, 25, 50, 75, 90, 100]; // Percentage of traffic
for (final percentage in migrationSteps) {
print('📊 Migrating $percentage% of traffic to Hypermodern...');
// Update traffic routing
await _updateTrafficRouting(percentage);
// Monitor for issues
await _monitorMigrationHealth(Duration(minutes: 30));
// Validate data consistency
await _validateSystemConsistency();
print('✅ Successfully migrated $percentage% of traffic');
// Wait before next step
if (percentage < 100) {
await Future.delayed(Duration(hours: 2));
}
}
}
Future<void> _updateTrafficRouting(int percentage) async {
// Update load balancer or API gateway configuration
final routingConfig = TrafficRoutingConfig(
hypermodernPercentage: percentage,
legacyPercentage: 100 - percentage,
);
await _config.trafficRouter.updateRouting(routingConfig);
}
Future<void> _monitorMigrationHealth(Duration duration) async {
final endTime = DateTime.now().add(duration);
while (DateTime.now().isBefore(endTime)) {
final healthMetrics = await _collectHealthMetrics();
if (!healthMetrics.isHealthy) {
throw MigrationException('Health check failed: ${healthMetrics.issues}');
}
await Future.delayed(Duration(minutes: 1));
}
}
Future<void> _decommissionLegacySystem() async {
print('🗑️ Decommissioning legacy system...');
// Stop dual-write
_hypermodernServer.middleware.removeWhere((m) => m is DualWriteMiddleware);
// Archive legacy data
await _archiveLegacyData();
// Update documentation
await _updateSystemDocumentation();
print('✅ Legacy system decommissioned');
}
}
class DualWriteMiddleware implements Middleware {
final LegacySystemAdapter _legacyAdapter;
final MigrationConfig _config;
DualWriteMiddleware({
required LegacySystemAdapter legacyAdapter,
required MigrationConfig config,
}) : _legacyAdapter = legacyAdapter,
_config = config;
@override
Future<dynamic> handle(
dynamic request,
Future<dynamic> Function(dynamic) next,
) async {
// Execute in Hypermodern first
final hypermodernResponse = await next(request);
// Write to legacy system if it's a write operation
if (_isWriteOperation(request)) {
try {
await _writeToLegacySystem(request, hypermodernResponse);
} catch (e) {
// Log error but don't fail the request
print('Legacy write failed: $e');
// Record for later reconciliation
await _recordLegacyWriteFailure(request, e);
}
}
return hypermodernResponse;
}
bool _isWriteOperation(dynamic request) {
final writeOperations = ['create', 'update', 'delete'];
return writeOperations.any((op) => request.endpoint.contains(op));
}
Future<void> _writeToLegacySystem(dynamic request, dynamic response) async {
final legacyRequest = await _transformToLegacyFormat(request, response);
await _legacyAdapter.executeRequest(legacyRequest);
}
}
Data Transformation Patterns
class DataTransformationEngine {
final Map<String, DataTransformer> _transformers = {};
void registerTransformer(String entityType, DataTransformer transformer) {
_transformers[entityType] = transformer;
}
Future<List<Map<String, dynamic>>> transform(
String entityType,
List<Map<String, dynamic>> legacyData,
) async {
final transformer = _transformers[entityType];
if (transformer == null) {
throw TransformationException('No transformer found for $entityType');
}
final transformedData = <Map<String, dynamic>>[];
for (final record in legacyData) {
try {
final transformed = await transformer.transform(record);
transformedData.add(transformed);
} catch (e) {
print('Failed to transform record: $e');
// Record transformation failure
await _recordTransformationFailure(entityType, record, e);
}
}
return transformedData;
}
}
class UserDataTransformer implements DataTransformer {
@override
Future<Map<String, dynamic>> transform(Map<String, dynamic> legacyRecord) async {
return {
'id': legacyRecord['user_id'],
'username': legacyRecord['login_name'],
'email': legacyRecord['email_address'],
'first_name': legacyRecord['fname'],
'last_name': legacyRecord['lname'],
'created_at': _parseDate(legacyRecord['created_date']),
'updated_at': _parseDate(legacyRecord['modified_date']),
'status': _mapStatus(legacyRecord['user_status']),
'profile': await _transformProfile(legacyRecord),
'preferences': await _transformPreferences(legacyRecord),
};
}
DateTime _parseDate(dynamic dateValue) {
if (dateValue is String) {
return DateTime.parse(dateValue);
} else if (dateValue is int) {
return DateTime.fromMillisecondsSinceEpoch(dateValue * 1000);
}
throw ArgumentError('Invalid date format: $dateValue');
}
String _mapStatus(dynamic statusValue) {
final statusMap = {
'A': 'active',
'I': 'inactive',
'S': 'suspended',
'D': 'deleted',
};
return statusMap[statusValue] ?? 'unknown';
}
Future<Map<String, dynamic>?> _transformProfile(Map<String, dynamic> record) async {
if (record['profile_data'] == null) return null;
final profileData = jsonDecode(record['profile_data']) as Map<String, dynamic>;
return {
'bio': profileData['biography'],
'avatar_url': profileData['avatar_image_url'],
'location': profileData['location'],
'website': profileData['website_url'],
};
}
Future<Map<String, dynamic>> _transformPreferences(Map<String, dynamic> record) async {
return {
'theme': record['ui_theme'] ?? 'light',
'language': record['preferred_language'] ?? 'en',
'notifications': {
'email': record['email_notifications'] == 1,
'push': record['push_notifications'] == 1,
'sms': record['sms_notifications'] == 1,
},
'privacy': {
'profile_public': record['public_profile'] == 1,
'show_email': record['show_email'] == 1,
},
};
}
}
Hybrid Architectures
Microservices Integration
class MicroservicesOrchestrator {
final ServiceRegistry _serviceRegistry;
final MessageBus _messageBus;
final CircuitBreakerManager _circuitBreakers;
final LoadBalancer _loadBalancer;
MicroservicesOrchestrator({
required ServiceRegistry serviceRegistry,
required MessageBus messageBus,
required CircuitBreakerManager circuitBreakers,
required LoadBalancer loadBalancer,
}) : _serviceRegistry = serviceRegistry,
_messageBus = messageBus,
_circuitBreakers = circuitBreakers,
_loadBalancer = loadBalancer;
Future<T> callService<T>({
required String serviceName,
required String operation,
required dynamic request,
Duration? timeout,
RetryPolicy? retryPolicy,
}) async {
final circuitBreaker = _circuitBreakers.getCircuitBreaker(serviceName);
return await circuitBreaker.execute(() async {
final serviceInstance = await _selectServiceInstance(serviceName);
final client = await _getServiceClient(serviceInstance);
return await _executeWithRetry(
() => client.request<T>(operation, request),
retryPolicy ?? RetryPolicy.defaultPolicy,
timeout ?? Duration(seconds: 30),
);
});
}
Future<void> publishEvent(DomainEvent event) async {
await _messageBus.publish(
topic: event.eventType,
message: event,
headers: {
'event_id': event.id,
'event_version': event.version.toString(),
'correlation_id': event.correlationId,
'causation_id': event.causationId,
},
);
}
Stream<T> subscribeToEvents<T>(
String eventType,
T Function(Map<String, dynamic>) deserializer,
) {
return _messageBus.subscribe(eventType, deserializer);
}
Future<ServiceInstance> _selectServiceInstance(String serviceName) async {
final instances = await _serviceRegistry.getHealthyInstances(serviceName);
if (instances.isEmpty) {
throw ServiceUnavailableException('No healthy instances of $serviceName');
}
return _loadBalancer.selectInstance(instances);
}
Future<T> _executeWithRetry<T>(
Future<T> Function() operation,
RetryPolicy retryPolicy,
Duration timeout,
) async {
int attempt = 0;
Duration delay = retryPolicy.initialDelay;
while (attempt < retryPolicy.maxAttempts) {
try {
return await operation().timeout(timeout);
} catch (e) {
attempt++;
if (attempt >= retryPolicy.maxAttempts || !retryPolicy.shouldRetry(e)) {
rethrow;
}
await Future.delayed(delay);
delay = Duration(
milliseconds: (delay.inMilliseconds * retryPolicy.backoffMultiplier).round(),
);
if (delay > retryPolicy.maxDelay) {
delay = retryPolicy.maxDelay;
}
}
}
throw StateError('This should never be reached');
}
}
// Saga pattern implementation for distributed transactions
class SagaOrchestrator {
final MicroservicesOrchestrator _orchestrator;
final SagaRepository _sagaRepository;
SagaOrchestrator({
required MicroservicesOrchestrator orchestrator,
required SagaRepository sagaRepository,
}) : _orchestrator = orchestrator,
_sagaRepository = sagaRepository;
Future<SagaResult> executeSaga(Saga saga) async {
final sagaInstance = SagaInstance(
id: _generateSagaId(),
sagaType: saga.runtimeType.toString(),
status: SagaStatus.started,
currentStep: 0,
data: saga.initialData,
createdAt: DateTime.now(),
);
await _sagaRepository.save(sagaInstance);
try {
for (int i = 0; i < saga.steps.length; i++) {
final step = saga.steps[i];
// Update saga state
sagaInstance.currentStep = i;
sagaInstance.status = SagaStatus.executing;
await _sagaRepository.save(sagaInstance);
// Execute step
final stepResult = await _executeStep(step, sagaInstance.data);
// Update saga data with step result
sagaInstance.data.addAll(stepResult);
}
// Saga completed successfully
sagaInstance.status = SagaStatus.completed;
await _sagaRepository.save(sagaInstance);
return SagaResult.success(sagaInstance.data);
} catch (e) {
// Saga failed, start compensation
sagaInstance.status = SagaStatus.compensating;
await _sagaRepository.save(sagaInstance);
await _compensateSaga(saga, sagaInstance);
sagaInstance.status = SagaStatus.compensated;
await _sagaRepository.save(sagaInstance);
return SagaResult.failure(e.toString());
}
}
Future<Map<String, dynamic>> _executeStep(
SagaStep step,
Map<String, dynamic> sagaData,
) async {
return await _orchestrator.callService<Map<String, dynamic>>(
serviceName: step.serviceName,
operation: step.operation,
request: step.buildRequest(sagaData),
timeout: step.timeout,
);
}
Future<void> _compensateSaga(Saga saga, SagaInstance sagaInstance) async {
// Execute compensation steps in reverse order
for (int i = sagaInstance.currentStep; i >= 0; i--) {
final step = saga.steps[i];
if (step.compensationOperation != null) {
try {
await _orchestrator.callService(
serviceName: step.serviceName,
operation: step.compensationOperation!,
request: step.buildCompensationRequest(sagaInstance.data),
);
} catch (e) {
print('Compensation failed for step $i: $e');
// Log compensation failure but continue
}
}
}
}
}
// Example saga for order processing
class OrderProcessingSaga extends Saga {
@override
List<SagaStep> get steps => [
SagaStep(
serviceName: 'inventory-service',
operation: 'reserve_items',
compensationOperation: 'release_items',
timeout: Duration(seconds: 10),
),
SagaStep(
serviceName: 'payment-service',
operation: 'charge_payment',
compensationOperation: 'refund_payment',
timeout: Duration(seconds: 30),
),
SagaStep(
serviceName: 'shipping-service',
operation: 'create_shipment',
compensationOperation: 'cancel_shipment',
timeout: Duration(seconds: 15),
),
SagaStep(
serviceName: 'notification-service',
operation: 'send_confirmation',
timeout: Duration(seconds: 5),
),
];
@override
Map<String, dynamic> get initialData => {
'order_id': orderId,
'customer_id': customerId,
'items': items,
'total_amount': totalAmount,
};
final String orderId;
final String customerId;
final List<OrderItem> items;
final double totalAmount;
OrderProcessingSaga({
required this.orderId,
required this.customerId,
required this.items,
required this.totalAmount,
});
}
Event-Driven Architecture
class EventDrivenArchitecture {
final EventStore _eventStore;
final EventBus _eventBus;
final ProjectionManager _projectionManager;
final SnapshotStore _snapshotStore;
EventDrivenArchitecture({
required EventStore eventStore,
required EventBus eventBus,
required ProjectionManager projectionManager,
required SnapshotStore snapshotStore,
}) : _eventStore = eventStore,
_eventBus = eventBus,
_projectionManager = projectionManager,
_snapshotStore = snapshotStore;
Future<void> saveAggregate<T extends AggregateRoot>(T aggregate) async {
final uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.isEmpty) return;
// Save events to event store
await _eventStore.saveEvents(
aggregateId: aggregate.id,
aggregateType: T.toString(),
expectedVersion: aggregate.version - uncommittedEvents.length,
events: uncommittedEvents,
);
// Publish events to event bus
for (final event in uncommittedEvents) {
await _eventBus.publish(event);
}
// Create snapshot if needed
if (_shouldCreateSnapshot(aggregate)) {
await _snapshotStore.saveSnapshot(aggregate);
}
// Mark events as committed
aggregate.markEventsAsCommitted();
}
Future<T?> loadAggregate<T extends AggregateRoot>(
String aggregateId,
T Function() factory,
) async {
// Try to load from snapshot first
final snapshot = await _snapshotStore.getSnapshot<T>(aggregateId);
T? aggregate;
int fromVersion = 0;
if (snapshot != null) {
aggregate = snapshot.aggregate;
fromVersion = snapshot.version;
}
// Load events since snapshot
final events = await _eventStore.getEvents(
aggregateId: aggregateId,
fromVersion: fromVersion,
);
if (events.isEmpty && aggregate == null) {
return null; // Aggregate doesn't exist
}
// Create aggregate if no snapshot
aggregate ??= factory();
// Apply events
for (final event in events) {
aggregate!.applyEvent(event, isNew: false);
}
return aggregate;
}
bool _shouldCreateSnapshot<T extends AggregateRoot>(T aggregate) {
// Create snapshot every 100 events
return aggregate.version % 100 == 0;
}
}
class EventStore {
final Database _database;
EventStore(this._database);
Future<void> saveEvents({
required String aggregateId,
required String aggregateType,
required int expectedVersion,
required List<DomainEvent> events,
}) async {
await _database.transaction((tx) async {
// Check for concurrency conflicts
final currentVersion = await _getCurrentVersion(tx, aggregateId);
if (currentVersion != expectedVersion) {
throw ConcurrencyException(
'Expected version $expectedVersion, but current version is $currentVersion',
);
}
// Save events
for (int i = 0; i < events.length; i++) {
final event = events[i];
final version = expectedVersion + i + 1;
await tx.query('''
INSERT INTO event_store (
aggregate_id, aggregate_type, version, event_type,
event_data, metadata, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', [
aggregateId,
aggregateType,
version,
event.eventType,
jsonEncode(event.toJson()),
jsonEncode(event.metadata),
event.occurredAt,
]);
}
});
}
Future<List<DomainEvent>> getEvents({
required String aggregateId,
int fromVersion = 0,
int? toVersion,
}) async {
final query = '''
SELECT event_type, event_data, metadata, created_at, version
FROM event_store
WHERE aggregate_id = ? AND version > ?
${toVersion != null ? 'AND version <= ?' : ''}
ORDER BY version
''';
final params = [aggregateId, fromVersion];
if (toVersion != null) params.add(toVersion);
final result = await _database.query(query, params);
return result.map((row) {
final eventData = jsonDecode(row['event_data']) as Map<String, dynamic>;
final metadata = jsonDecode(row['metadata']) as Map<String, dynamic>;
return DomainEvent.fromJson({
'event_type': row['event_type'],
'occurred_at': row['created_at'],
'version': row['version'],
'metadata': metadata,
...eventData,
});
}).toList();
}
Future<int> _getCurrentVersion(Transaction tx, String aggregateId) async {
final result = await tx.query(
'SELECT COALESCE(MAX(version), 0) as version FROM event_store WHERE aggregate_id = ?',
[aggregateId],
);
return result.first['version'] as int;
}
}
Conclusion
You now have comprehensive knowledge of integration patterns for Hypermodern applications. These patterns enable you to:
- Integrate with multiple databases using unified interfaces while maintaining data consistency
- Connect to third-party services through standardized adapters and middleware
- Migrate from legacy systems using proven patterns like dual-write and gradual traffic migration
- Build hybrid architectures that combine Hypermodern with microservices and event-driven patterns
The integration patterns covered in this chapter provide the foundation for building complex, real-world systems that can evolve and scale while maintaining reliability and performance. Whether you're modernizing existing systems or building new distributed architectures, these patterns will help you leverage Hypermodern's multi-protocol capabilities effectively.
This concludes our comprehensive journey through the Hypermodern platform. You now have the knowledge and tools to build sophisticated, scalable applications that can communicate across multiple protocols, integrate with diverse systems, and evolve with your business needs.