Multi-Protocol Communication
Understanding Protocol Selection
One of Hypermodern's most powerful features is its ability to serve the same API across multiple protocols. Understanding when and how to use each protocol optimally is key to building high-performance applications.
Protocol Characteristics Comparison
| Feature | HTTP | WebSocket | TCP |
|---|---|---|---|
| Connection Type | Stateless | Persistent | Persistent |
| Overhead | High | Medium | Low |
| Browser Support | Universal | Modern browsers | None |
| Caching | Excellent | None | None |
| Real-time | No | Yes | Yes |
| Streaming | Limited | Yes | Yes |
| Load Balancer Friendly | Yes | Moderate | No |
| Firewall Friendly | Yes | Yes | Moderate |
Automatic Protocol Selection
Hypermodern clients can automatically select the best protocol based on the URL scheme:
// Automatic selection based on URL
final client1 = HypermodernClient('http://api.example.com'); // HTTP
final client2 = HypermodernClient('https://api.example.com'); // HTTPS
final client3 = HypermodernClient('ws://api.example.com'); // WebSocket
final client4 = HypermodernClient('wss://api.example.com'); // Secure WebSocket
final client5 = HypermodernClient('tcp://api.example.com'); // TCP
// Fallback chain
final client = HypermodernClient.withFallback([
'wss://api.example.com:8082', // Try secure WebSocket first
'ws://api.example.com:8082', // Fall back to WebSocket
'https://api.example.com:8080', // Fall back to HTTPS
'http://api.example.com:8080', // Last resort: HTTP
]);
Dynamic Protocol Switching
Switch protocols based on runtime conditions:
class AdaptiveClient {
HypermodernClient? _currentClient;
final List<String> _endpoints;
int _currentEndpointIndex = 0;
AdaptiveClient(this._endpoints);
Future<void> connect() async {
while (_currentEndpointIndex < _endpoints.length) {
try {
_currentClient = HypermodernClient(_endpoints[_currentEndpointIndex]);
await _currentClient!.connect();
print('Connected via ${_endpoints[_currentEndpointIndex]}');
return;
} catch (e) {
print('Failed to connect via ${_endpoints[_currentEndpointIndex]}: $e');
_currentEndpointIndex++;
await _currentClient?.disconnect();
}
}
throw Exception('Failed to connect to any endpoint');
}
Future<T> request<T>(String endpoint, dynamic request) async {
if (_currentClient == null) {
throw StateError('Not connected');
}
try {
return await _currentClient!.request<T>(endpoint, request);
} on NetworkException {
// Try to reconnect with next protocol
await _reconnectWithNextProtocol();
return await _currentClient!.request<T>(endpoint, request);
}
}
Future<void> _reconnectWithNextProtocol() async {
_currentEndpointIndex++;
if (_currentEndpointIndex >= _endpoints.length) {
_currentEndpointIndex = 0; // Wrap around
}
await _currentClient?.disconnect();
await connect();
}
}
HTTP REST API Implementation
HTTP provides the most compatible and cacheable communication method, ideal for traditional web applications and mobile apps.
RESTful URL Mapping
Hypermodern automatically maps endpoints to RESTful URLs:
{
"endpoints": {
"get_user": {
"method": "GET",
"path": "/users/{id}",
"request": {"id": "int64"},
"response": "@user"
},
"list_users": {
"method": "GET",
"path": "/users",
"request": {
"page": "int32?",
"limit": "int32?",
"search": "string?"
},
"response": {"users": ["@user"], "total": "int64"}
},
"create_user": {
"method": "POST",
"path": "/users",
"request": {"username": "string", "email": "string"},
"response": "@user"
},
"update_user": {
"method": "PUT",
"path": "/users/{id}",
"request": {"id": "int64", "username": "string?", "email": "string?"},
"response": "@user"
},
"delete_user": {
"method": "DELETE",
"path": "/users/{id}",
"request": {"id": "int64"},
"response": {"success": "bool"}
}
}
}
HTTP-Specific Optimizations
class HttpOptimizedServer extends HypermodernServer {
@override
void configureHttp(HttpServerConfig config) {
super.configureHttp(config.copyWith(
// Enable compression
compressionEnabled: true,
compressionLevel: 6,
compressionTypes: ['application/json', 'application/x-hypermodern-binary'],
// Caching headers
defaultCacheControl: 'public, max-age=300', // 5 minutes
etagEnabled: true,
// Performance tuning
keepAliveTimeout: Duration(seconds: 30),
maxRequestSize: 10 * 1024 * 1024, // 10MB
// Security headers
securityHeaders: {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
},
));
}
@override
void registerHttpMiddleware() {
// HTTP-specific middleware
httpMiddleware.add(CompressionMiddleware());
httpMiddleware.add(CacheControlMiddleware());
httpMiddleware.add(ETagMiddleware());
httpMiddleware.add(SecurityHeadersMiddleware());
}
}
Caching Strategies
Implement intelligent caching for HTTP endpoints:
class CacheControlMiddleware implements Middleware {
final Map<String, CachePolicy> _endpointPolicies = {
'get_user': CachePolicy(
maxAge: Duration(minutes: 15),
staleWhileRevalidate: Duration(hours: 1),
etag: true,
),
'list_users': CachePolicy(
maxAge: Duration(minutes: 5),
etag: true,
varyHeaders: ['Authorization'],
),
'get_user_profile': CachePolicy(
maxAge: Duration(hours: 1),
etag: true,
private: true, // User-specific data
),
};
@override
Future<dynamic> handle(dynamic request, Future<dynamic> Function(dynamic) next) async {
final policy = _endpointPolicies[request.endpoint];
if (policy != null && request.method == 'GET') {
// Check if-none-match header
final ifNoneMatch = request.headers['if-none-match'];
if (ifNoneMatch != null && policy.etag) {
final currentEtag = await _generateEtag(request);
if (ifNoneMatch == currentEtag) {
return HttpResponse.notModified();
}
}
// Execute request
final response = await next(request);
// Add cache headers
response.headers.addAll(_buildCacheHeaders(policy));
// Add ETag if enabled
if (policy.etag) {
response.headers['etag'] = await _generateEtag(request);
}
return response;
}
return await next(request);
}
Map<String, String> _buildCacheHeaders(CachePolicy policy) {
final headers = <String, String>{};
if (policy.private) {
headers['cache-control'] = 'private, max-age=${policy.maxAge.inSeconds}';
} else {
headers['cache-control'] = 'public, max-age=${policy.maxAge.inSeconds}';
}
if (policy.staleWhileRevalidate != null) {
headers['cache-control'] += ', stale-while-revalidate=${policy.staleWhileRevalidate!.inSeconds}';
}
if (policy.varyHeaders.isNotEmpty) {
headers['vary'] = policy.varyHeaders.join(', ');
}
return headers;
}
}
Content Negotiation
Support multiple content types:
class ContentNegotiationMiddleware implements Middleware {
@override
Future<dynamic> handle(dynamic request, Future<dynamic> Function(dynamic) next) async {
final acceptHeader = request.headers['accept'] ?? '';
final response = await next(request);
// Determine response format
if (acceptHeader.contains('application/json')) {
response.contentType = 'application/json';
response.body = jsonEncode(response.data.toJson());
} else if (acceptHeader.contains('application/x-hypermodern-binary')) {
response.contentType = 'application/x-hypermodern-binary';
response.body = response.data.toBinary();
} else {
// Default to binary for better performance
response.contentType = 'application/x-hypermodern-binary';
response.body = response.data.toBinary();
}
return response;
}
}
WebSocket Real-time Communication
WebSocket provides persistent, bidirectional communication ideal for real-time applications.
WebSocket Connection Management
class WebSocketManager {
final Map<String, WebSocketConnection> _connections = {};
final Map<String, Set<String>> _roomSubscriptions = {};
void handleConnection(WebSocketConnection connection) {
_connections[connection.id] = connection;
connection.onMessage.listen((message) async {
try {
await _handleMessage(connection, message);
} catch (e) {
await connection.sendError(e);
}
});
connection.onClose.listen((_) {
_handleDisconnection(connection);
});
print('WebSocket client connected: ${connection.id}');
}
Future<void> _handleMessage(WebSocketConnection connection, WebSocketMessage message) async {
switch (message.type) {
case MessageType.request:
final response = await _processRequest(connection, message);
await connection.send(response);
break;
case MessageType.subscribe:
await _handleSubscription(connection, message);
break;
case MessageType.unsubscribe:
await _handleUnsubscription(connection, message);
break;
case MessageType.ping:
await connection.send(WebSocketMessage.pong(message.id));
break;
}
}
Future<void> _handleSubscription(WebSocketConnection connection, WebSocketMessage message) async {
final endpoint = message.endpoint;
final roomId = '${endpoint}:${message.data.hashCode}';
// Add connection to room
_roomSubscriptions.putIfAbsent(roomId, () => {}).add(connection.id);
// Start streaming for this endpoint
final stream = await _createEndpointStream(endpoint, message.data);
stream.listen(
(data) => _broadcastToRoom(roomId, data),
onError: (error) => _broadcastErrorToRoom(roomId, error),
onDone: () => _cleanupRoom(roomId),
);
}
void _broadcastToRoom(String roomId, dynamic data) {
final connectionIds = _roomSubscriptions[roomId] ?? {};
for (final connectionId in connectionIds) {
final connection = _connections[connectionId];
if (connection != null && connection.isOpen) {
connection.send(WebSocketMessage.data(data));
}
}
}
void _handleDisconnection(WebSocketConnection connection) {
_connections.remove(connection.id);
// Remove from all room subscriptions
for (final roomConnections in _roomSubscriptions.values) {
roomConnections.remove(connection.id);
}
print('WebSocket client disconnected: ${connection.id}');
}
}
Real-time Data Broadcasting
class RealTimeBroadcaster {
final Map<String, Set<StreamController<dynamic>>> _subscribers = {};
void subscribe<T>(String channel, StreamController<T> controller) {
_subscribers.putIfAbsent(channel, () => {}).add(controller);
// Clean up when controller is closed
controller.onCancel = () {
_subscribers[channel]?.remove(controller);
if (_subscribers[channel]?.isEmpty == true) {
_subscribers.remove(channel);
}
};
}
void broadcast(String channel, dynamic data) {
final controllers = _subscribers[channel];
if (controllers != null) {
// Remove closed controllers
controllers.removeWhere((controller) => controller.isClosed);
// Broadcast to active controllers
for (final controller in controllers) {
if (!controller.isClosed) {
controller.add(data);
}
}
}
}
void broadcastError(String channel, dynamic error) {
final controllers = _subscribers[channel];
if (controllers != null) {
for (final controller in controllers) {
if (!controller.isClosed) {
controller.addError(error);
}
}
}
}
int getSubscriberCount(String channel) {
return _subscribers[channel]?.length ?? 0;
}
List<String> getActiveChannels() {
return _subscribers.keys.toList();
}
}
// Usage in streaming endpoints
server.registerStreamingEndpoint<WatchOrdersRequest, OrderUpdate>(
'watch_orders',
(request) async* {
final userId = await getCurrentUserId(request.context);
final channel = 'orders:$userId';
final controller = StreamController<OrderUpdate>();
broadcaster.subscribe(channel, controller);
yield* controller.stream;
},
);
// Broadcast updates when orders change
class OrderService {
final RealTimeBroadcaster broadcaster;
OrderService(this.broadcaster);
Future<Order> updateOrder(int orderId, OrderUpdate update) async {
final order = await orderRepository.update(orderId, update);
// Broadcast update to subscribers
broadcaster.broadcast('orders:${order.userId}', OrderUpdate.fromOrder(order));
return order;
}
}
WebSocket Authentication and Security
class WebSocketAuthMiddleware implements WebSocketMiddleware {
final String jwtSecret;
WebSocketAuthMiddleware(this.jwtSecret);
@override
Future<bool> handleConnection(WebSocketConnection connection) async {
// Check for auth token in query parameters or headers
final token = connection.request.uri.queryParameters['token'] ??
connection.request.headers['authorization']?.replaceFirst('Bearer ', '');
if (token == null) {
await connection.close(WebSocketStatus.unauthorized, 'Authentication required');
return false;
}
try {
final jwt = JWT.verify(token, SecretKey(jwtSecret));
final userId = jwt.payload['user_id'] as int;
// Store user context in connection
connection.context['user_id'] = userId;
connection.context['authenticated'] = true;
return true;
} catch (e) {
await connection.close(WebSocketStatus.unauthorized, 'Invalid token');
return false;
}
}
@override
Future<bool> handleMessage(WebSocketConnection connection, WebSocketMessage message) async {
// Verify authentication for protected endpoints
if (_isProtectedEndpoint(message.endpoint)) {
if (connection.context['authenticated'] != true) {
await connection.sendError(UnauthorizedException('Authentication required'));
return false;
}
}
return true;
}
bool _isProtectedEndpoint(String endpoint) {
return ['create_order', 'update_profile', 'delete_user'].contains(endpoint);
}
}
TCP Direct Socket Communication
TCP provides the highest performance communication method with minimal overhead.
TCP Protocol Design
class HypermodernTcpProtocol {
static const int MAGIC_BYTES = 0x484D; // "HM" in hex
static const int VERSION = 1;
// Message types
static const int MSG_REQUEST = 0x01;
static const int MSG_RESPONSE = 0x02;
static const int MSG_ERROR = 0x03;
static const int MSG_STREAM_START = 0x04;
static const int MSG_STREAM_DATA = 0x05;
static const int MSG_STREAM_END = 0x06;
static const int MSG_PING = 0x07;
static const int MSG_PONG = 0x08;
static Uint8List encodeMessage({
required int type,
required int requestId,
required String endpoint,
required Uint8List payload,
}) {
final endpointBytes = utf8.encode(endpoint);
final totalLength = 2 + 1 + 4 + 1 + 4 + endpointBytes.length + 1 + payload.length;
final buffer = ByteData(totalLength);
int offset = 0;
// Magic bytes
buffer.setUint16(offset, MAGIC_BYTES, Endian.big);
offset += 2;
// Version
buffer.setUint8(offset, VERSION);
offset += 1;
// Total length
buffer.setUint32(offset, totalLength, Endian.big);
offset += 4;
// Message type
buffer.setUint8(offset, type);
offset += 1;
// Request ID
buffer.setUint32(offset, requestId, Endian.big);
offset += 4;
// Endpoint length and data
buffer.setUint32(offset, endpointBytes.length, Endian.big);
offset += 4;
for (int i = 0; i < endpointBytes.length; i++) {
buffer.setUint8(offset + i, endpointBytes[i]);
}
offset += endpointBytes.length;
// Payload
for (int i = 0; i < payload.length; i++) {
buffer.setUint8(offset + i, payload[i]);
}
return buffer.buffer.asUint8List();
}
static TcpMessage? decodeMessage(Uint8List data) {
if (data.length < 11) return null; // Minimum message size
final buffer = ByteData.sublistView(data);
int offset = 0;
// Check magic bytes
final magic = buffer.getUint16(offset, Endian.big);
if (magic != MAGIC_BYTES) return null;
offset += 2;
// Check version
final version = buffer.getUint8(offset);
if (version != VERSION) return null;
offset += 1;
// Get total length
final totalLength = buffer.getUint32(offset, Endian.big);
if (data.length < totalLength) return null; // Incomplete message
offset += 4;
// Get message type
final type = buffer.getUint8(offset);
offset += 1;
// Get request ID
final requestId = buffer.getUint32(offset, Endian.big);
offset += 4;
// Get endpoint
final endpointLength = buffer.getUint32(offset, Endian.big);
offset += 4;
final endpointBytes = data.sublist(offset, offset + endpointLength);
final endpoint = utf8.decode(endpointBytes);
offset += endpointLength;
// Get payload
final payload = data.sublist(offset, totalLength);
return TcpMessage(
type: type,
requestId: requestId,
endpoint: endpoint,
payload: payload,
);
}
}
TCP Connection Pool
class TcpConnectionPool {
final String host;
final int port;
final int maxConnections;
final Duration connectionTimeout;
final Queue<TcpConnection> _availableConnections = Queue();
final Set<TcpConnection> _activeConnections = {};
final Queue<Completer<TcpConnection>> _waitingQueue = Queue();
TcpConnectionPool({
required this.host,
required this.port,
this.maxConnections = 10,
this.connectionTimeout = const Duration(seconds: 10),
});
Future<TcpConnection> acquire() async {
// Return available connection if exists
if (_availableConnections.isNotEmpty) {
final connection = _availableConnections.removeFirst();
_activeConnections.add(connection);
return connection;
}
// Create new connection if under limit
if (_activeConnections.length < maxConnections) {
final connection = await _createConnection();
_activeConnections.add(connection);
return connection;
}
// Wait for available connection
final completer = Completer<TcpConnection>();
_waitingQueue.add(completer);
return completer.future;
}
void release(TcpConnection connection) {
_activeConnections.remove(connection);
if (connection.isHealthy) {
// Fulfill waiting request or return to pool
if (_waitingQueue.isNotEmpty) {
final completer = _waitingQueue.removeFirst();
_activeConnections.add(connection);
completer.complete(connection);
} else {
_availableConnections.add(connection);
}
} else {
// Close unhealthy connection
connection.close();
}
}
Future<TcpConnection> _createConnection() async {
final socket = await Socket.connect(host, port)
.timeout(connectionTimeout);
return TcpConnection(socket);
}
Future<void> close() async {
// Close all connections
for (final connection in [..._availableConnections, ..._activeConnections]) {
await connection.close();
}
_availableConnections.clear();
_activeConnections.clear();
// Complete waiting requests with error
while (_waitingQueue.isNotEmpty) {
final completer = _waitingQueue.removeFirst();
completer.completeError(StateError('Connection pool closed'));
}
}
}
High-Performance TCP Client
class HighPerformanceTcpClient {
final TcpConnectionPool _pool;
final Map<int, Completer<dynamic>> _pendingRequests = {};
int _nextRequestId = 1;
HighPerformanceTcpClient(String host, int port)
: _pool = TcpConnectionPool(host: host, port: port);
Future<T> request<T>(String endpoint, dynamic request) async {
final requestId = _nextRequestId++;
final completer = Completer<T>();
_pendingRequests[requestId] = completer;
final connection = await _pool.acquire();
try {
// Serialize request
final payload = request.toBinary();
// Encode message
final message = HypermodernTcpProtocol.encodeMessage(
type: HypermodernTcpProtocol.MSG_REQUEST,
requestId: requestId,
endpoint: endpoint,
payload: payload,
);
// Send message
connection.socket.add(message);
// Wait for response
final response = await completer.future.timeout(Duration(seconds: 30));
return response;
} finally {
_pool.release(connection);
}
}
void _handleResponse(TcpMessage message) {
final completer = _pendingRequests.remove(message.requestId);
if (completer != null) {
if (message.type == HypermodernTcpProtocol.MSG_RESPONSE) {
// Deserialize response
final response = _deserializeResponse(message.payload);
completer.complete(response);
} else if (message.type == HypermodernTcpProtocol.MSG_ERROR) {
// Handle error response
final error = _deserializeError(message.payload);
completer.completeError(error);
}
}
}
Future<void> close() async {
await _pool.close();
}
}
Protocol Selection and Fallbacks
Implement intelligent protocol selection based on various factors.
Adaptive Protocol Selection
class AdaptiveProtocolSelector {
final Map<String, ProtocolMetrics> _metrics = {};
String selectOptimalProtocol({
required List<String> availableProtocols,
required String endpoint,
required RequestContext context,
}) {
// Consider various factors
final factors = _analyzeFactors(endpoint, context);
// Score each protocol
final scores = <String, double>{};
for (final protocol in availableProtocols) {
scores[protocol] = _scoreProtocol(protocol, factors);
}
// Return highest scoring protocol
return scores.entries
.reduce((a, b) => a.value > b.value ? a : b)
.key;
}
SelectionFactors _analyzeFactors(String endpoint, RequestContext context) {
return SelectionFactors(
isStreaming: _isStreamingEndpoint(endpoint),
requiresRealTime: _requiresRealTime(endpoint),
payloadSize: _estimatePayloadSize(endpoint, context),
networkConditions: _assessNetworkConditions(context),
clientCapabilities: _getClientCapabilities(context),
cacheability: _isCacheable(endpoint),
securityRequirements: _getSecurityRequirements(endpoint),
);
}
double _scoreProtocol(String protocol, SelectionFactors factors) {
double score = 0.0;
switch (protocol) {
case 'tcp':
score += factors.requiresRealTime ? 30 : 0;
score += factors.payloadSize > 1024 ? 25 : 0;
score += factors.networkConditions.latency < 50 ? 20 : 0;
score -= factors.clientCapabilities.supportsTcp ? 0 : 100;
break;
case 'websocket':
score += factors.isStreaming ? 40 : 0;
score += factors.requiresRealTime ? 25 : 0;
score += factors.clientCapabilities.supportsWebSocket ? 20 : -50;
score -= factors.networkConditions.isUnstable ? 15 : 0;
break;
case 'http':
score += factors.cacheability ? 30 : 0;
score += factors.clientCapabilities.supportsHttp ? 25 : 0;
score += factors.networkConditions.isFirewallFriendly ? 20 : 0;
score -= factors.requiresRealTime ? 20 : 0;
break;
}
// Apply historical performance
final metrics = _metrics[protocol];
if (metrics != null) {
score += metrics.successRate * 10;
score -= metrics.averageLatency / 10;
}
return score;
}
void recordMetrics(String protocol, RequestMetrics metrics) {
_metrics.putIfAbsent(protocol, () => ProtocolMetrics()).update(metrics);
}
}
Graceful Degradation
class FallbackClient {
final List<ProtocolClient> _clients;
int _currentClientIndex = 0;
FallbackClient(this._clients);
Future<T> request<T>(String endpoint, dynamic request) async {
Exception? lastException;
for (int attempt = 0; attempt < _clients.length; attempt++) {
final client = _clients[_currentClientIndex];
try {
final response = await client.request<T>(endpoint, request);
// Success - record metrics and return
_recordSuccess(_currentClientIndex);
return response;
} catch (e) {
lastException = e as Exception;
_recordFailure(_currentClientIndex, e);
// Try next client
_currentClientIndex = (_currentClientIndex + 1) % _clients.length;
// Don't retry for certain error types
if (e is ValidationException || e is UnauthorizedException) {
break;
}
}
}
throw lastException ?? Exception('All protocols failed');
}
void _recordSuccess(int clientIndex) {
// Update success metrics
// Potentially adjust client priority
}
void _recordFailure(int clientIndex, Exception error) {
// Update failure metrics
// Consider temporarily disabling problematic clients
}
}
What's Next
You now understand how to leverage Hypermodern's multi-protocol architecture effectively. In the next chapter, we'll dive into the module system, which allows you to create reusable, self-contained components that can be shared across projects and teams.