Chapter 14: Transport Layer Integration
Overview
Vektagraf's transport layer provides flexible communication protocols for distributed applications, including HTTP, WebSocket, and TCP transports. This chapter covers transport configuration, connection pooling, streaming operations, and performance optimization for high-throughput applications.
Learning Objectives
- Understand Vektagraf's transport architecture and protocol support
- Configure HTTP, WebSocket, and TCP transports for different use cases
- Implement connection pooling and streaming for optimal performance
- Monitor transport performance and handle errors effectively
- Design scalable distributed Vektagraf applications
Prerequisites
- Understanding of Vektagraf core concepts (Chapters 1-3)
- Knowledge of network protocols and distributed systems
- Familiarity with database operations (Chapter 4)
Core Concepts
Transport Architecture Overview
Vektagraf's transport layer provides a unified interface for different communication protocols:
- HTTP Transport: RESTful API communication with connection pooling
- WebSocket Transport: Real-time bidirectional streaming with automatic reconnection
- TCP Transport: High-performance binary protocol for low-latency applications
- Transport Factory: Automatic protocol selection and configuration management
graph TB
A[Application Layer] --> B[Transport Interface]
B --> C[HTTP Transport]
B --> D[WebSocket Transport]
B --> E[TCP Transport]
C --> F[Connection Pool]
C --> G[HTTP Client]
D --> H[WebSocket Client]
D --> I[Stream Manager]
E --> J[TCP Socket]
E --> K[Binary Protocol]
F --> L[Server Endpoints]
H --> L
J --> L
B --> M[Error Handler]
B --> N[Metrics Collector]
B --> O[Retry Logic]
Transport Interface
All transports implement a common interface for consistent usage:
abstract class Transport {
Future<void> connect(Uri endpoint);
Future<T> request<T>(String method, dynamic data);
Stream<T> stream<T>(String method, dynamic data);
Future<void> disconnect();
bool get isConnected;
String get transportType;
bool get supportsStreaming;
bool get supportsBidirectionalStreaming;
}
Practical Examples
Basic Transport Setup
1. HTTP Transport Configuration
import 'package:vektagraf/vektagraf.dart';
// Configure HTTP transport with connection pooling
final httpTransport = HttpTransport(
timeout: Duration(seconds: 30),
maxConnections: 20,
onError: (error, stackTrace) {
print('HTTP Transport Error: $error');
},
onLog: (message) {
print('HTTP Transport: $message');
},
);
// Connect to Vektagraf server
await httpTransport.connect(Uri.parse('https://api.example.com'));
// Make requests
final result = await httpTransport.request<Map<String, dynamic>>(
'query',
{
'collection': 'products',
'filter': {'category': 'electronics'},
'limit': 100,
},
);
print('Query result: ${result['data']}');
2. WebSocket Transport for Real-Time Operations
// Configure WebSocket transport with automatic reconnection
final wsTransport = WebSocketTransport(
timeout: Duration(seconds: 30),
onError: (error, stackTrace) {
print('WebSocket Error: $error');
},
onLog: (message) {
print('WebSocket: $message');
},
);
// Connect to WebSocket endpoint
await wsTransport.connect(Uri.parse('wss://realtime.example.com/ws'));
// Subscribe to real-time updates
final subscription = wsTransport.subscribe<Map<String, dynamic>>(
'subscribe_changes',
{'collection': 'orders', 'tenantId': 'acme_corp'},
onData: (data) {
print('Real-time update: $data');
_handleOrderUpdate(data);
},
onError: (error) {
print('Subscription error: $error');
},
onDone: () {
print('Subscription ended');
},
);
// Send data to active stream
await wsTransport.sendToStream('order_updates', {
'type': 'status_change',
'orderId': 'order_123',
'newStatus': 'shipped',
});
3. TCP Transport for High Performance
// Configure TCP transport for low-latency operations
final tcpTransport = TcpTransport(
timeout: Duration(seconds: 10),
keepAlive: true,
onError: (error, stackTrace) {
print('TCP Transport Error: $error');
},
);
// Connect to TCP server
await tcpTransport.connect(Uri.parse('tcp://database.example.com:9090'));
// High-performance binary requests
final binaryData = Uint8List.fromList([1, 2, 3, 4, 5]);
final response = await tcpTransport.request<Uint8List>('binary_query', binaryData);
print('Binary response length: ${response.length}');
Advanced Transport Configuration
Transport Factory with Automatic Selection
// Configure transport factory for automatic protocol selection
class VektagrafTransportFactory {
static Transport createTransport(
String endpoint, {
TransportType? preferredType,
Map<String, dynamic>? config,
}) {
final uri = Uri.parse(endpoint);
final transportType = preferredType ?? _detectTransportType(uri);
switch (transportType) {
case TransportType.http:
return HttpTransport(
timeout: Duration(seconds: config?['timeout'] ?? 30),
maxConnections: config?['maxConnections'] ?? 10,
);
case TransportType.websocket:
return WebSocketTransport(
timeout: Duration(seconds: config?['timeout'] ?? 30),
);
case TransportType.tcp:
return TcpTransport(
timeout: Duration(seconds: config?['timeout'] ?? 10),
keepAlive: config?['keepAlive'] ?? true,
);
}
}
static TransportType _detectTransportType(Uri uri) {
switch (uri.scheme.toLowerCase()) {
case 'http':
case 'https':
return TransportType.http;
case 'ws':
case 'wss':
return TransportType.websocket;
case 'tcp':
return TransportType.tcp;
default:
return TransportType.http; // Default fallback
}
}
}
enum TransportType { http, websocket, tcp }
// Usage with automatic selection
final transport = VektagrafTransportFactory.createTransport(
'wss://api.example.com/realtime',
config: {
'timeout': 60,
'maxConnections': 50,
},
);
await transport.connect(Uri.parse('wss://api.example.com/realtime'));
Connection Pool Management
// Advanced HTTP connection pool configuration
class AdvancedHttpTransport extends HttpTransport {
final HttpConnectionPool _connectionPool;
AdvancedHttpTransport({
Duration timeout = const Duration(seconds: 30),
int maxConnections = 20,
int maxConnectionsPerHost = 5,
Duration connectionTimeout = const Duration(seconds: 10),
Duration idleTimeout = const Duration(minutes: 2),
bool enableKeepAlive = true,
}) : _connectionPool = HttpConnectionPool(
maxConnections: maxConnections,
maxConnectionsPerHost: maxConnectionsPerHost,
connectionTimeout: connectionTimeout,
idleTimeout: idleTimeout,
enableKeepAlive: enableKeepAlive,
),
super(timeout: timeout, maxConnections: maxConnections);
@override
Future<T> request<T>(String method, dynamic data) async {
// Get connection from pool
final client = await _connectionPool.getClient(_baseUri.host);
try {
// Use pooled connection for request
return await _makeRequestWithClient(client, method, data);
} finally {
// Return connection to pool (handled automatically by pool)
}
}
// Monitor connection pool health
Map<String, dynamic> getConnectionPoolStats() {
return _connectionPool.getStats();
}
// Warm up connections
Future<void> warmUpConnections(List<String> hosts) async {
for (final host in hosts) {
await _connectionPool.preCreateConnections(host, 2);
}
}
}
Streaming and Real-Time Operations
Bidirectional Streaming with WebSocket
// Implement bidirectional streaming for collaborative features
class CollaborativeDocumentClient {
final WebSocketTransport _transport;
final StreamController<DocumentChange> _changeController;
CollaborativeDocumentClient(this._transport)
: _changeController = StreamController<DocumentChange>.broadcast();
Stream<DocumentChange> get changes => _changeController.stream;
Future<void> connectToDocument(String documentId) async {
// Subscribe to document changes
final subscription = _transport.subscribe<Map<String, dynamic>>(
'document_changes',
{'documentId': documentId},
onData: (data) {
final change = DocumentChange.fromJson(data);
_changeController.add(change);
},
onError: (error) {
_changeController.addError(error);
},
);
// Handle local changes
changes.listen((change) async {
if (change.isLocal) {
await _transport.sendToStream('document_changes', {
'documentId': documentId,
'change': change.toJson(),
'userId': change.userId,
'timestamp': change.timestamp.toIso8601String(),
});
}
});
}
void applyLocalChange(DocumentChange change) {
change.isLocal = true;
_changeController.add(change);
}
}
class DocumentChange {
final String id;
final String type;
final Map<String, dynamic> data;
final String userId;
final DateTime timestamp;
bool isLocal = false;
DocumentChange({
required this.id,
required this.type,
required this.data,
required this.userId,
required this.timestamp,
});
factory DocumentChange.fromJson(Map<String, dynamic> json) {
return DocumentChange(
id: json['id'],
type: json['type'],
data: json['data'],
userId: json['userId'],
timestamp: DateTime.parse(json['timestamp']),
);
}
Map<String, dynamic> toJson() {
return {
'id': id,
'type': type,
'data': data,
'userId': userId,
'timestamp': timestamp.toIso8601String(),
};
}
}
Stream Processing Pipeline
// Create stream processing pipeline for real-time analytics
class StreamProcessor {
final Transport _transport;
final Map<String, StreamSubscription> _subscriptions = {};
StreamProcessor(this._transport);
Future<void> startProcessing() async {
// Process user events
await _processUserEvents();
// Process system metrics
await _processSystemMetrics();
// Process business events
await _processBusinessEvents();
}
Future<void> _processUserEvents() async {
final subscription = _transport.subscribe<Map<String, dynamic>>(
'user_events',
{'types': ['login', 'logout', 'action']},
onData: (event) async {
await _handleUserEvent(event);
},
);
_subscriptions['user_events'] = subscription;
}
Future<void> _processSystemMetrics() async {
final subscription = _transport.subscribe<Map<String, dynamic>>(
'system_metrics',
{'interval': 'realtime'},
onData: (metrics) async {
await _handleSystemMetrics(metrics);
},
);
_subscriptions['system_metrics'] = subscription;
}
Future<void> _handleUserEvent(Map<String, dynamic> event) async {
final eventType = event['type'] as String;
final userId = event['userId'] as String;
final timestamp = DateTime.parse(event['timestamp'] as String);
switch (eventType) {
case 'login':
await _updateUserSession(userId, timestamp);
await _trackLoginMetrics(userId, event);
break;
case 'action':
await _processUserAction(userId, event);
await _updateRealtimeAnalytics(event);
break;
case 'logout':
await _endUserSession(userId, timestamp);
break;
}
}
Future<void> _handleSystemMetrics(Map<String, dynamic> metrics) async {
final cpuUsage = metrics['cpu_usage'] as double;
final memoryUsage = metrics['memory_usage'] as double;
final activeConnections = metrics['active_connections'] as int;
// Check for performance issues
if (cpuUsage > 80.0 || memoryUsage > 85.0) {
await _triggerPerformanceAlert(metrics);
}
// Update real-time dashboard
await _updateSystemDashboard(metrics);
}
}
Error Handling and Retry Logic
Robust Error Handling
// Implement comprehensive error handling for transport operations
class RobustTransportClient {
final Transport _transport;
final TransportErrorHandler _errorHandler;
final TransportMetrics _metrics;
RobustTransportClient(this._transport)
: _errorHandler = TransportErrorHandler(),
_metrics = TransportMetrics();
Future<T> executeWithRetry<T>(
String method,
dynamic data, {
int maxRetries = 3,
Duration baseDelay = const Duration(seconds: 1),
bool exponentialBackoff = true,
}) async {
int attempts = 0;
Duration delay = baseDelay;
while (attempts < maxRetries) {
try {
final stopwatch = Stopwatch()..start();
final result = await _transport.request<T>(method, data);
stopwatch.stop();
// Record successful request
_metrics.recordRequest(
method: method,
success: true,
duration: stopwatch.elapsed,
);
return result;
} catch (e) {
attempts++;
// Record failed request
_metrics.recordRequest(
method: method,
success: false,
error: e.toString(),
);
// Check if error is retryable
if (!_errorHandler.isRetryable(e) || attempts >= maxRetries) {
throw TransportException('Request failed after $attempts attempts: $e');
}
// Wait before retry
await Future.delayed(delay);
// Exponential backoff
if (exponentialBackoff) {
delay = Duration(milliseconds: (delay.inMilliseconds * 1.5).round());
}
}
}
throw TransportException('Request failed after $maxRetries attempts');
}
Future<T> executeWithCircuitBreaker<T>(
String method,
dynamic data,
) async {
if (_errorHandler.isCircuitOpen(method)) {
throw CircuitBreakerOpenException('Circuit breaker is open for $method');
}
try {
final result = await _transport.request<T>(method, data);
_errorHandler.recordSuccess(method);
return result;
} catch (e) {
_errorHandler.recordFailure(method, e);
rethrow;
}
}
}
// Circuit breaker implementation
class TransportErrorHandler {
final Map<String, CircuitBreakerState> _circuitStates = {};
final int _failureThreshold = 5;
final Duration _timeout = Duration(minutes: 1);
bool isRetryable(dynamic error) {
if (error is TimeoutException) return true;
if (error is ConnectionException) return true;
if (error is TransportException && error.code != null) {
// Retry on 5xx server errors, not on 4xx client errors
return error.code! >= 500;
}
return false;
}
bool isCircuitOpen(String method) {
final state = _circuitStates[method];
if (state == null) return false;
if (state.state == CircuitState.open) {
if (DateTime.now().isAfter(state.nextAttempt)) {
state.state = CircuitState.halfOpen;
return false;
}
return true;
}
return false;
}
void recordSuccess(String method) {
final state = _circuitStates[method];
if (state != null) {
state.consecutiveFailures = 0;
state.state = CircuitState.closed;
}
}
void recordFailure(String method, dynamic error) {
final state = _circuitStates.putIfAbsent(
method,
() => CircuitBreakerState(),
);
state.consecutiveFailures++;
if (state.consecutiveFailures >= _failureThreshold) {
state.state = CircuitState.open;
state.nextAttempt = DateTime.now().add(_timeout);
}
}
}
class CircuitBreakerState {
CircuitState state = CircuitState.closed;
int consecutiveFailures = 0;
DateTime nextAttempt = DateTime.now();
}
enum CircuitState { closed, open, halfOpen }
Performance Optimization
Connection Pooling Strategies
// Advanced connection pooling with health monitoring
class SmartConnectionPool {
final Map<String, List<PooledConnection>> _connections = {};
final Map<String, ConnectionHealth> _healthStats = {};
final int _maxConnectionsPerHost;
final Duration _healthCheckInterval;
SmartConnectionPool({
int maxConnectionsPerHost = 10,
Duration healthCheckInterval = const Duration(minutes: 1),
}) : _maxConnectionsPerHost = maxConnectionsPerHost,
_healthCheckInterval = healthCheckInterval {
_startHealthMonitoring();
}
Future<PooledConnection> getConnection(String host) async {
final connections = _connections[host] ?? [];
// Find available healthy connection
for (final conn in connections) {
if (conn.isAvailable && conn.isHealthy) {
conn.markInUse();
return conn;
}
}
// Create new connection if under limit
if (connections.length < _maxConnectionsPerHost) {
final newConn = await _createConnection(host);
connections.add(newConn);
_connections[host] = connections;
return newConn;
}
// Wait for available connection
return await _waitForAvailableConnection(host);
}
void returnConnection(PooledConnection connection) {
connection.markAvailable();
_updateConnectionHealth(connection);
}
Future<PooledConnection> _createConnection(String host) async {
final connection = PooledConnection(
host: host,
createdAt: DateTime.now(),
);
await connection.connect();
return connection;
}
void _startHealthMonitoring() {
Timer.periodic(_healthCheckInterval, (_) => _performHealthChecks());
}
Future<void> _performHealthChecks() async {
for (final entry in _connections.entries) {
final host = entry.key;
final connections = entry.value;
for (final conn in connections) {
if (conn.isAvailable) {
await _checkConnectionHealth(conn);
}
}
// Remove unhealthy connections
connections.removeWhere((conn) => !conn.isHealthy);
// Update host health statistics
_updateHostHealth(host, connections);
}
}
Future<void> _checkConnectionHealth(PooledConnection connection) async {
try {
await connection.ping();
connection.recordHealthCheck(true);
} catch (e) {
connection.recordHealthCheck(false);
}
}
}
class PooledConnection {
final String host;
final DateTime createdAt;
bool _inUse = false;
bool _healthy = true;
int _successfulChecks = 0;
int _failedChecks = 0;
DateTime _lastUsed = DateTime.now();
PooledConnection({
required this.host,
required this.createdAt,
});
bool get isAvailable => !_inUse;
bool get isHealthy => _healthy && _failedChecks < 3;
void markInUse() {
_inUse = true;
_lastUsed = DateTime.now();
}
void markAvailable() {
_inUse = false;
}
void recordHealthCheck(bool success) {
if (success) {
_successfulChecks++;
_failedChecks = 0; // Reset failed count on success
} else {
_failedChecks++;
}
// Update health status
_healthy = _failedChecks < 3;
}
Future<void> connect() async {
// Implement actual connection logic
}
Future<void> ping() async {
// Implement ping/health check
}
}
Transport Performance Monitoring
// Comprehensive transport performance monitoring
class TransportPerformanceMonitor {
final Map<String, MethodMetrics> _methodMetrics = {};
final Map<String, HostMetrics> _hostMetrics = {};
final StreamController<PerformanceAlert> _alertController;
TransportPerformanceMonitor()
: _alertController = StreamController<PerformanceAlert>.broadcast();
Stream<PerformanceAlert> get alerts => _alertController.stream;
void recordRequest({
required String method,
required String host,
required Duration duration,
required bool success,
String? error,
int? responseSize,
}) {
// Record method-level metrics
final methodMetrics = _methodMetrics.putIfAbsent(
method,
() => MethodMetrics(method),
);
methodMetrics.recordRequest(duration, success, responseSize);
// Record host-level metrics
final hostMetrics = _hostMetrics.putIfAbsent(
host,
() => HostMetrics(host),
);
hostMetrics.recordRequest(duration, success);
// Check for performance issues
_checkPerformanceThresholds(method, host, duration, success);
}
void _checkPerformanceThresholds(
String method,
String host,
Duration duration,
bool success,
) {
// Check latency threshold
if (duration.inMilliseconds > 5000) {
_alertController.add(PerformanceAlert(
type: AlertType.highLatency,
message: 'High latency detected for $method on $host: ${duration.inMilliseconds}ms',
method: method,
host: host,
value: duration.inMilliseconds.toDouble(),
threshold: 5000.0,
));
}
// Check error rate
final methodMetrics = _methodMetrics[method];
if (methodMetrics != null && methodMetrics.errorRate > 0.1) {
_alertController.add(PerformanceAlert(
type: AlertType.highErrorRate,
message: 'High error rate for $method: ${(methodMetrics.errorRate * 100).toStringAsFixed(1)}%',
method: method,
host: host,
value: methodMetrics.errorRate,
threshold: 0.1,
));
}
}
Map<String, dynamic> getPerformanceReport() {
return {
'methods': _methodMetrics.map((k, v) => MapEntry(k, v.toJson())),
'hosts': _hostMetrics.map((k, v) => MapEntry(k, v.toJson())),
'summary': _generateSummary(),
};
}
Map<String, dynamic> _generateSummary() {
final totalRequests = _methodMetrics.values
.fold<int>(0, (sum, metrics) => sum + metrics.totalRequests);
final totalErrors = _methodMetrics.values
.fold<int>(0, (sum, metrics) => sum + metrics.errorCount);
final averageLatency = _methodMetrics.values
.where((m) => m.totalRequests > 0)
.map((m) => m.averageLatency)
.fold<double>(0.0, (sum, latency) => sum + latency) /
_methodMetrics.length;
return {
'totalRequests': totalRequests,
'totalErrors': totalErrors,
'overallErrorRate': totalRequests > 0 ? totalErrors / totalRequests : 0.0,
'averageLatency': averageLatency,
'activeHosts': _hostMetrics.length,
'activeMethods': _methodMetrics.length,
};
}
}
class MethodMetrics {
final String method;
int totalRequests = 0;
int errorCount = 0;
Duration totalLatency = Duration.zero;
int totalResponseSize = 0;
final List<Duration> recentLatencies = [];
MethodMetrics(this.method);
void recordRequest(Duration latency, bool success, int? responseSize) {
totalRequests++;
totalLatency += latency;
if (!success) {
errorCount++;
}
if (responseSize != null) {
totalResponseSize += responseSize;
}
// Keep recent latencies for percentile calculations
recentLatencies.add(latency);
if (recentLatencies.length > 1000) {
recentLatencies.removeAt(0);
}
}
double get errorRate => totalRequests > 0 ? errorCount / totalRequests : 0.0;
double get averageLatency => totalRequests > 0
? totalLatency.inMicroseconds / totalRequests / 1000.0 : 0.0;
Map<String, dynamic> toJson() {
final sortedLatencies = List<Duration>.from(recentLatencies)..sort();
return {
'method': method,
'totalRequests': totalRequests,
'errorCount': errorCount,
'errorRate': errorRate,
'averageLatency': averageLatency,
'p50Latency': _percentile(sortedLatencies, 0.5),
'p95Latency': _percentile(sortedLatencies, 0.95),
'p99Latency': _percentile(sortedLatencies, 0.99),
'averageResponseSize': totalRequests > 0
? totalResponseSize / totalRequests : 0,
};
}
double _percentile(List<Duration> sorted, double percentile) {
if (sorted.isEmpty) return 0.0;
final index = (sorted.length * percentile).floor();
return sorted[index.clamp(0, sorted.length - 1)].inMicroseconds / 1000.0;
}
}
class HostMetrics {
final String host;
int totalRequests = 0;
int errorCount = 0;
Duration totalLatency = Duration.zero;
DateTime lastRequest = DateTime.now();
HostMetrics(this.host);
void recordRequest(Duration latency, bool success) {
totalRequests++;
totalLatency += latency;
lastRequest = DateTime.now();
if (!success) {
errorCount++;
}
}
double get errorRate => totalRequests > 0 ? errorCount / totalRequests : 0.0;
double get averageLatency => totalRequests > 0
? totalLatency.inMicroseconds / totalRequests / 1000.0 : 0.0;
Map<String, dynamic> toJson() {
return {
'host': host,
'totalRequests': totalRequests,
'errorCount': errorCount,
'errorRate': errorRate,
'averageLatency': averageLatency,
'lastRequest': lastRequest.toIso8601String(),
};
}
}
enum AlertType { highLatency, highErrorRate, connectionFailure }
class PerformanceAlert {
final AlertType type;
final String message;
final String method;
final String host;
final double value;
final double threshold;
final DateTime timestamp;
PerformanceAlert({
required this.type,
required this.message,
required this.method,
required this.host,
required this.value,
required this.threshold,
}) : timestamp = DateTime.now();
}
Best Practices
Transport Selection Guidelines
1. Protocol Selection Matrix
// Helper class for transport protocol selection
class TransportSelector {
static TransportRecommendation selectOptimalTransport({
required ApplicationRequirements requirements,
required NetworkConditions network,
required PerformanceTargets performance,
}) {
// Analyze requirements and recommend transport
if (requirements.needsRealTime && requirements.needsBidirectional) {
return TransportRecommendation(
type: TransportType.websocket,
reason: 'Real-time bidirectional communication required',
configuration: _getWebSocketConfig(performance),
);
}
if (performance.latencyTarget.inMilliseconds < 10 &&
requirements.throughputRequirement > 10000) {
return TransportRecommendation(
type: TransportType.tcp,
reason: 'Ultra-low latency and high throughput required',
configuration: _getTcpConfig(performance),
);
}
if (requirements.needsRestCompatibility || network.hasFirewallRestrictions) {
return TransportRecommendation(
type: TransportType.http,
reason: 'REST compatibility or firewall restrictions',
configuration: _getHttpConfig(performance),
);
}
// Default recommendation
return TransportRecommendation(
type: TransportType.http,
reason: 'General purpose usage',
configuration: _getHttpConfig(performance),
);
}
static Map<String, dynamic> _getWebSocketConfig(PerformanceTargets performance) {
return {
'timeout': performance.latencyTarget.inSeconds * 2,
'pingInterval': 30,
'reconnectDelay': 1000,
'maxReconnectAttempts': 5,
};
}
static Map<String, dynamic> _getTcpConfig(PerformanceTargets performance) {
return {
'timeout': performance.latencyTarget.inSeconds,
'keepAlive': true,
'noDelay': true,
'bufferSize': 65536,
};
}
static Map<String, dynamic> _getHttpConfig(PerformanceTargets performance) {
return {
'timeout': performance.latencyTarget.inSeconds * 3,
'maxConnections': performance.concurrentRequests,
'connectionTimeout': 10,
'idleTimeout': 120,
};
}
}
class ApplicationRequirements {
final bool needsRealTime;
final bool needsBidirectional;
final bool needsRestCompatibility;
final int throughputRequirement;
final bool needsStreaming;
ApplicationRequirements({
required this.needsRealTime,
required this.needsBidirectional,
required this.needsRestCompatibility,
required this.throughputRequirement,
required this.needsStreaming,
});
}
class NetworkConditions {
final bool hasFirewallRestrictions;
final bool hasProxyServers;
final Duration averageLatency;
final double packetLossRate;
NetworkConditions({
required this.hasFirewallRestrictions,
required this.hasProxyServers,
required this.averageLatency,
required this.packetLossRate,
});
}
class PerformanceTargets {
final Duration latencyTarget;
final int concurrentRequests;
final int throughputRequirement;
PerformanceTargets({
required this.latencyTarget,
required this.concurrentRequests,
required this.throughputRequirement,
});
}
class TransportRecommendation {
final TransportType type;
final String reason;
final Map<String, dynamic> configuration;
TransportRecommendation({
required this.type,
required this.reason,
required this.configuration,
});
}
2. Configuration Management
// Centralized transport configuration management
class TransportConfigurationManager {
final Map<String, TransportConfig> _configs = {};
final Map<String, Transport> _activeTransports = {};
void registerConfig(String name, TransportConfig config) {
_configs[name] = config;
}
Future<Transport> getTransport(String name) async {
// Return existing transport if available
final existing = _activeTransports[name];
if (existing != null && existing.isConnected) {
return existing;
}
// Create new transport from configuration
final config = _configs[name];
if (config == null) {
throw ArgumentError('Transport configuration not found: $name');
}
final transport = await _createTransportFromConfig(config);
_activeTransports[name] = transport;
return transport;
}
Future<Transport> _createTransportFromConfig(TransportConfig config) async {
final transport = TransportFactory.create(config.type, config.options);
await transport.connect(config.endpoint);
return transport;
}
Future<void> closeAllTransports() async {
for (final transport in _activeTransports.values) {
await transport.disconnect();
}
_activeTransports.clear();
}
}
class TransportConfig {
final String name;
final TransportType type;
final Uri endpoint;
final Map<String, dynamic> options;
final Duration timeout;
final int maxRetries;
TransportConfig({
required this.name,
required this.type,
required this.endpoint,
required this.options,
required this.timeout,
required this.maxRetries,
});
factory TransportConfig.fromJson(Map<String, dynamic> json) {
return TransportConfig(
name: json['name'],
type: TransportType.values.firstWhere(
(t) => t.toString().split('.').last == json['type'],
),
endpoint: Uri.parse(json['endpoint']),
options: Map<String, dynamic>.from(json['options'] ?? {}),
timeout: Duration(seconds: json['timeout'] ?? 30),
maxRetries: json['maxRetries'] ?? 3,
);
}
}
Security and Authentication
Transport-Level Security
// Secure transport wrapper with authentication
class SecureTransportWrapper implements Transport {
final Transport _underlying;
final AuthenticationProvider _auth;
final EncryptionProvider _encryption;
SecureTransportWrapper(
this._underlying,
this._auth,
this._encryption,
);
@override
Future<void> connect(Uri endpoint) async {
await _underlying.connect(endpoint);
await _authenticateConnection();
}
@override
Future<T> request<T>(String method, dynamic data) async {
// Add authentication headers
final authenticatedData = await _addAuthentication(method, data);
// Encrypt sensitive data
final encryptedData = await _encryption.encrypt(authenticatedData);
try {
final response = await _underlying.request<T>(method, encryptedData);
// Decrypt response
return await _encryption.decrypt(response);
} catch (e) {
// Handle authentication errors
if (_isAuthenticationError(e)) {
await _refreshAuthentication();
return await request<T>(method, data); // Retry once
}
rethrow;
}
}
Future<void> _authenticateConnection() async {
final token = await _auth.getAccessToken();
// Send authentication request
await _underlying.request('authenticate', {
'token': token,
'timestamp': DateTime.now().toIso8601String(),
});
}
Future<Map<String, dynamic>> _addAuthentication(
String method,
dynamic data,
) async {
final token = await _auth.getAccessToken();
return {
'method': method,
'data': data,
'auth': {
'token': token,
'timestamp': DateTime.now().toIso8601String(),
},
};
}
bool _isAuthenticationError(dynamic error) {
if (error is TransportException) {
return error.code == 401 || error.code == 403;
}
return false;
}
Future<void> _refreshAuthentication() async {
await _auth.refreshToken();
}
// Delegate other methods to underlying transport
@override
Stream<T> stream<T>(String method, dynamic data) =>
_underlying.stream<T>(method, data);
@override
Future<void> disconnect() => _underlying.disconnect();
@override
bool get isConnected => _underlying.isConnected;
@override
String get transportType => 'secure_${_underlying.transportType}';
@override
bool get supportsStreaming => _underlying.supportsStreaming;
@override
bool get supportsBidirectionalStreaming =>
_underlying.supportsBidirectionalStreaming;
}
abstract class AuthenticationProvider {
Future<String> getAccessToken();
Future<void> refreshToken();
}
abstract class EncryptionProvider {
Future<dynamic> encrypt(dynamic data);
Future<T> decrypt<T>(dynamic encryptedData);
}
Advanced Topics
Custom Transport Implementation
Building a Custom Transport
// Example custom transport for specialized protocols
class CustomBinaryTransport extends BaseTransport {
Socket? _socket;
late BinaryProtocolHandler _protocolHandler;
final Map<String, Completer<dynamic>> _pendingRequests = {};
int _requestId = 0;
CustomBinaryTransport({
Duration timeout = const Duration(seconds: 30),
void Function(String, [StackTrace?])? onError,
void Function(String)? onLog,
}) : super(timeout: timeout, onError: onError, onLog: onLog) {
_protocolHandler = BinaryProtocolHandler();
}
@override
String get transportType => 'custom_binary';
@override
bool get supportsStreaming => true;
@override
bool get supportsBidirectionalStreaming => true;
@override
Future<void> connect(Uri endpoint) async {
try {
_socket = await Socket.connect(endpoint.host, endpoint.port);
_socket!.listen(_handleData, onError: _handleError, onDone: _handleDone);
setConnected(true);
log('Connected to ${endpoint.host}:${endpoint.port}');
} catch (e) {
throw ConnectionException('Failed to connect: $e');
}
}
@override
Future<T> request<T>(String method, dynamic data) async {
if (!isConnected) {
throw ConnectionException('Transport not connected');
}
final requestId = _generateRequestId();
final completer = Completer<T>();
_pendingRequests[requestId] = completer;
try {
final message = _protocolHandler.encodeRequest(requestId, method, data);
_socket!.add(message);
return await withTimeout(completer.future);
} catch (e) {
_pendingRequests.remove(requestId);
rethrow;
}
}
@override
Stream<T> stream<T>(String method, dynamic data) {
final controller = StreamController<T>();
final streamId = _generateRequestId();
// Register stream
registerStreamController(streamId, controller);
// Send stream request
final message = _protocolHandler.encodeStreamRequest(streamId, method, data);
_socket!.add(message);
return controller.stream;
}
@override
Future<void> disconnect() async {
if (_socket != null) {
await _socket!.close();
_socket = null;
}
// Complete pending requests with error
for (final completer in _pendingRequests.values) {
if (!completer.isCompleted) {
completer.completeError(ConnectionException('Transport disconnected'));
}
}
_pendingRequests.clear();
await cleanupStreams();
setConnected(false);
}
void _handleData(Uint8List data) {
try {
final messages = _protocolHandler.decodeMessages(data);
for (final message in messages) {
_processMessage(message);
}
} catch (e) {
logError('Error processing data: $e');
}
}
void _processMessage(BinaryMessage message) {
switch (message.type) {
case MessageType.response:
_handleResponse(message);
break;
case MessageType.streamData:
_handleStreamData(message);
break;
case MessageType.error:
_handleErrorMessage(message);
break;
}
}
void _handleResponse(BinaryMessage message) {
final completer = _pendingRequests.remove(message.id);
if (completer != null && !completer.isCompleted) {
completer.complete(message.data);
}
}
void _handleStreamData(BinaryMessage message) {
final controller = getStreamController(message.id);
if (controller != null) {
controller.add(message.data);
}
}
void _handleErrorMessage(BinaryMessage message) {
final error = TransportException(message.data.toString());
final completer = _pendingRequests.remove(message.id);
if (completer != null && !completer.isCompleted) {
completer.completeError(error);
return;
}
final controller = getStreamController(message.id);
if (controller != null) {
controller.addError(error);
}
}
String _generateRequestId() {
return 'req_${++_requestId}';
}
}
// Binary protocol handler for custom transport
class BinaryProtocolHandler {
static const int _headerSize = 12; // 4 bytes type + 4 bytes id + 4 bytes length
Uint8List encodeRequest(String id, String method, dynamic data) {
final payload = _encodePayload({'method': method, 'data': data});
return _encodeMessage(MessageType.request, id, payload);
}
Uint8List encodeStreamRequest(String id, String method, dynamic data) {
final payload = _encodePayload({'method': method, 'data': data});
return _encodeMessage(MessageType.streamRequest, id, payload);
}
List<BinaryMessage> decodeMessages(Uint8List data) {
final messages = <BinaryMessage>[];
int offset = 0;
while (offset + _headerSize <= data.length) {
final header = data.sublist(offset, offset + _headerSize);
final type = MessageType.values[_readInt32(header, 0)];
final id = _readString(header, 4);
final length = _readInt32(header, 8);
if (offset + _headerSize + length > data.length) {
break; // Incomplete message
}
final payload = data.sublist(
offset + _headerSize,
offset + _headerSize + length,
);
messages.add(BinaryMessage(
type: type,
id: id,
data: _decodePayload(payload),
));
offset += _headerSize + length;
}
return messages;
}
Uint8List _encodeMessage(MessageType type, String id, Uint8List payload) {
final header = Uint8List(_headerSize);
_writeInt32(header, 0, type.index);
_writeString(header, 4, id);
_writeInt32(header, 8, payload.length);
final message = Uint8List(_headerSize + payload.length);
message.setRange(0, _headerSize, header);
message.setRange(_headerSize, message.length, payload);
return message;
}
Uint8List _encodePayload(dynamic data) {
final json = jsonEncode(data);
return utf8.encode(json);
}
dynamic _decodePayload(Uint8List payload) {
final json = utf8.decode(payload);
return jsonDecode(json);
}
int _readInt32(Uint8List data, int offset) {
return ByteData.sublistView(data, offset, offset + 4).getInt32(0);
}
void _writeInt32(Uint8List data, int offset, int value) {
ByteData.sublistView(data, offset, offset + 4).setInt32(0, value);
}
String _readString(Uint8List data, int offset) {
// Simplified - in practice you'd encode string length
return 'id'; // Placeholder
}
void _writeString(Uint8List data, int offset, String value) {
// Simplified - in practice you'd encode string properly
}
}
enum MessageType { request, response, streamRequest, streamData, error }
class BinaryMessage {
final MessageType type;
final String id;
final dynamic data;
BinaryMessage({
required this.type,
required this.id,
required this.data,
});
}
Summary
This chapter covered Vektagraf's comprehensive transport layer integration, including:
Key Takeaways
- Transport Flexibility: Support for HTTP, WebSocket, and TCP protocols with unified interface
- Performance Optimization: Connection pooling, streaming, and performance monitoring
- Error Handling: Robust retry logic, circuit breakers, and error recovery
- Real-Time Features: Bidirectional streaming and real-time data synchronization
- Security Integration: Transport-level authentication and encryption
Next Steps
- Chapter 15: Learn about monitoring and observability for transport operations
- Chapter 16: Explore performance tuning strategies for transport optimization
- Chapter 19: Discover production deployment patterns for distributed systems