Skip to main content

Chapter 12: Transport Layer Integration

Overview

Vektagraf's transport layer provides flexible communication protocols for distributed applications, including HTTP, WebSocket, and TCP transports. This chapter covers transport configuration, connection pooling, streaming operations, and performance optimization for high-throughput applications.

Learning Objectives

  • Understand Vektagraf's transport architecture and protocol support
  • Configure HTTP, WebSocket, and TCP transports for different use cases
  • Implement connection pooling and streaming for optimal performance
  • Monitor transport performance and handle errors effectively
  • Design scalable distributed Vektagraf applications

Prerequisites

  • Understanding of Vektagraf core concepts (Chapters 1-3)
  • Knowledge of network protocols and distributed systems
  • Familiarity with database operations (Chapter 4)

Core Concepts

Transport Architecture Overview

Vektagraf's transport layer provides a unified interface for different communication protocols:

  • HTTP Transport: RESTful API communication with connection pooling
  • WebSocket Transport: Real-time bidirectional streaming with automatic reconnection
  • TCP Transport: High-performance binary protocol for low-latency applications
  • Transport Factory: Automatic protocol selection and configuration management
graph TB
    A[Application Layer] --> B[Transport Interface]
    B --> C[HTTP Transport]
    B --> D[WebSocket Transport]
    B --> E[TCP Transport]
    
    C --> F[Connection Pool]
    C --> G[HTTP Client]
    
    D --> H[WebSocket Client]
    D --> I[Stream Manager]
    
    E --> J[TCP Socket]
    E --> K[Binary Protocol]
    
    F --> L[Server Endpoints]
    H --> L
    J --> L
    
    B --> M[Error Handler]
    B --> N[Metrics Collector]
    B --> O[Retry Logic]

Transport Interface

All transports implement a common interface for consistent usage:

abstract class Transport {
  Future<void> connect(Uri endpoint);
  Future<T> request<T>(String method, dynamic data);
  Stream<T> stream<T>(String method, dynamic data);
  Future<void> disconnect();
  
  bool get isConnected;
  String get transportType;
  bool get supportsStreaming;
  bool get supportsBidirectionalStreaming;
}

Practical Examples

Basic Transport Setup

1. HTTP Transport Configuration

import 'package:vektagraf/vektagraf.dart';

// Configure HTTP transport with connection pooling
final httpTransport = HttpTransport(
  timeout: Duration(seconds: 30),
  maxConnections: 20,
  onError: (error, stackTrace) {
    print('HTTP Transport Error: $error');
  },
  onLog: (message) {
    print('HTTP Transport: $message');
  },
);

// Connect to Vektagraf server
await httpTransport.connect(Uri.parse('https://api.example.com'));

// Make requests
final result = await httpTransport.request<Map<String, dynamic>>(
  'query',
  {
    'collection': 'products',
    'filter': {'category': 'electronics'},
    'limit': 100,
  },
);

print('Query result: ${result['data']}');

2. WebSocket Transport for Real-Time Operations

// Configure WebSocket transport with automatic reconnection
final wsTransport = WebSocketTransport(
  timeout: Duration(seconds: 30),
  onError: (error, stackTrace) {
    print('WebSocket Error: $error');
  },
  onLog: (message) {
    print('WebSocket: $message');
  },
);

// Connect to WebSocket endpoint
await wsTransport.connect(Uri.parse('wss://realtime.example.com/ws'));

// Subscribe to real-time updates
final subscription = wsTransport.subscribe<Map<String, dynamic>>(
  'subscribe_changes',
  {'collection': 'orders', 'tenantId': 'acme_corp'},
  onData: (data) {
    print('Real-time update: $data');
    _handleOrderUpdate(data);
  },
  onError: (error) {
    print('Subscription error: $error');
  },
  onDone: () {
    print('Subscription ended');
  },
);

// Send data to active stream
await wsTransport.sendToStream('order_updates', {
  'type': 'status_change',
  'orderId': 'order_123',
  'newStatus': 'shipped',
});

3. TCP Transport for High Performance

// Configure TCP transport for low-latency operations
final tcpTransport = TcpTransport(
  timeout: Duration(seconds: 10),
  keepAlive: true,
  onError: (error, stackTrace) {
    print('TCP Transport Error: $error');
  },
);

// Connect to TCP server
await tcpTransport.connect(Uri.parse('tcp://database.example.com:9090'));

// High-performance binary requests
final binaryData = Uint8List.fromList([1, 2, 3, 4, 5]);
final response = await tcpTransport.request<Uint8List>('binary_query', binaryData);

print('Binary response length: ${response.length}');

Advanced Transport Configuration

Transport Factory with Automatic Selection

// Configure transport factory for automatic protocol selection
class VektagrafTransportFactory {
  static Transport createTransport(
    String endpoint, {
    TransportType? preferredType,
    Map<String, dynamic>? config,
  }) {
    final uri = Uri.parse(endpoint);
    final transportType = preferredType ?? _detectTransportType(uri);
    
    switch (transportType) {
      case TransportType.http:
        return HttpTransport(
          timeout: Duration(seconds: config?['timeout'] ?? 30),
          maxConnections: config?['maxConnections'] ?? 10,
        );
        
      case TransportType.websocket:
        return WebSocketTransport(
          timeout: Duration(seconds: config?['timeout'] ?? 30),
        );
        
      case TransportType.tcp:
        return TcpTransport(
          timeout: Duration(seconds: config?['timeout'] ?? 10),
          keepAlive: config?['keepAlive'] ?? true,
        );
    }
  }
  
  static TransportType _detectTransportType(Uri uri) {
    switch (uri.scheme.toLowerCase()) {
      case 'http':
      case 'https':
        return TransportType.http;
      case 'ws':
      case 'wss':
        return TransportType.websocket;
      case 'tcp':
        return TransportType.tcp;
      default:
        return TransportType.http; // Default fallback
    }
  }
}

enum TransportType { http, websocket, tcp }

// Usage with automatic selection
final transport = VektagrafTransportFactory.createTransport(
  'wss://api.example.com/realtime',
  config: {
    'timeout': 60,
    'maxConnections': 50,
  },
);

await transport.connect(Uri.parse('wss://api.example.com/realtime'));

Connection Pool Management

// Advanced HTTP connection pool configuration
class AdvancedHttpTransport extends HttpTransport {
  final HttpConnectionPool _connectionPool;
  
  AdvancedHttpTransport({
    Duration timeout = const Duration(seconds: 30),
    int maxConnections = 20,
    int maxConnectionsPerHost = 5,
    Duration connectionTimeout = const Duration(seconds: 10),
    Duration idleTimeout = const Duration(minutes: 2),
    bool enableKeepAlive = true,
  }) : _connectionPool = HttpConnectionPool(
         maxConnections: maxConnections,
         maxConnectionsPerHost: maxConnectionsPerHost,
         connectionTimeout: connectionTimeout,
         idleTimeout: idleTimeout,
         enableKeepAlive: enableKeepAlive,
       ),
       super(timeout: timeout, maxConnections: maxConnections);
  
  @override
  Future<T> request<T>(String method, dynamic data) async {
    // Get connection from pool
    final client = await _connectionPool.getClient(_baseUri.host);
    
    try {
      // Use pooled connection for request
      return await _makeRequestWithClient(client, method, data);
    } finally {
      // Return connection to pool (handled automatically by pool)
    }
  }
  
  // Monitor connection pool health
  Map<String, dynamic> getConnectionPoolStats() {
    return _connectionPool.getStats();
  }
  
  // Warm up connections
  Future<void> warmUpConnections(List<String> hosts) async {
    for (final host in hosts) {
      await _connectionPool.preCreateConnections(host, 2);
    }
  }
}

Streaming and Real-Time Operations

Bidirectional Streaming with WebSocket

// Implement bidirectional streaming for collaborative features
class CollaborativeDocumentClient {
  final WebSocketTransport _transport;
  final StreamController<DocumentChange> _changeController;
  
  CollaborativeDocumentClient(this._transport)
      : _changeController = StreamController<DocumentChange>.broadcast();
  
  Stream<DocumentChange> get changes => _changeController.stream;
  
  Future<void> connectToDocument(String documentId) async {
    // Subscribe to document changes
    final subscription = _transport.subscribe<Map<String, dynamic>>(
      'document_changes',
      {'documentId': documentId},
      onData: (data) {
        final change = DocumentChange.fromJson(data);
        _changeController.add(change);
      },
      onError: (error) {
        _changeController.addError(error);
      },
    );
    
    // Handle local changes
    changes.listen((change) async {
      if (change.isLocal) {
        await _transport.sendToStream('document_changes', {
          'documentId': documentId,
          'change': change.toJson(),
          'userId': change.userId,
          'timestamp': change.timestamp.toIso8601String(),
        });
      }
    });
  }
  
  void applyLocalChange(DocumentChange change) {
    change.isLocal = true;
    _changeController.add(change);
  }
}

class DocumentChange {
  final String id;
  final String type;
  final Map<String, dynamic> data;
  final String userId;
  final DateTime timestamp;
  bool isLocal = false;
  
  DocumentChange({
    required this.id,
    required this.type,
    required this.data,
    required this.userId,
    required this.timestamp,
  });
  
  factory DocumentChange.fromJson(Map<String, dynamic> json) {
    return DocumentChange(
      id: json['id'],
      type: json['type'],
      data: json['data'],
      userId: json['userId'],
      timestamp: DateTime.parse(json['timestamp']),
    );
  }
  
  Map<String, dynamic> toJson() {
    return {
      'id': id,
      'type': type,
      'data': data,
      'userId': userId,
      'timestamp': timestamp.toIso8601String(),
    };
  }
}

Stream Processing Pipeline

// Create stream processing pipeline for real-time analytics
class StreamProcessor {
  final Transport _transport;
  final Map<String, StreamSubscription> _subscriptions = {};
  
  StreamProcessor(this._transport);
  
  Future<void> startProcessing() async {
    // Process user events
    await _processUserEvents();
    
    // Process system metrics
    await _processSystemMetrics();
    
    // Process business events
    await _processBusinessEvents();
  }
  
  Future<void> _processUserEvents() async {
    final subscription = _transport.subscribe<Map<String, dynamic>>(
      'user_events',
      {'types': ['login', 'logout', 'action']},
      onData: (event) async {
        await _handleUserEvent(event);
      },
    );
    
    _subscriptions['user_events'] = subscription;
  }
  
  Future<void> _processSystemMetrics() async {
    final subscription = _transport.subscribe<Map<String, dynamic>>(
      'system_metrics',
      {'interval': 'realtime'},
      onData: (metrics) async {
        await _handleSystemMetrics(metrics);
      },
    );
    
    _subscriptions['system_metrics'] = subscription;
  }
  
  Future<void> _handleUserEvent(Map<String, dynamic> event) async {
    final eventType = event['type'] as String;
    final userId = event['userId'] as String;
    final timestamp = DateTime.parse(event['timestamp'] as String);
    
    switch (eventType) {
      case 'login':
        await _updateUserSession(userId, timestamp);
        await _trackLoginMetrics(userId, event);
        break;
        
      case 'action':
        await _processUserAction(userId, event);
        await _updateRealtimeAnalytics(event);
        break;
        
      case 'logout':
        await _endUserSession(userId, timestamp);
        break;
    }
  }
  
  Future<void> _handleSystemMetrics(Map<String, dynamic> metrics) async {
    final cpuUsage = metrics['cpu_usage'] as double;
    final memoryUsage = metrics['memory_usage'] as double;
    final activeConnections = metrics['active_connections'] as int;
    
    // Check for performance issues
    if (cpuUsage > 80.0 || memoryUsage > 85.0) {
      await _triggerPerformanceAlert(metrics);
    }
    
    // Update real-time dashboard
    await _updateSystemDashboard(metrics);
  }
}

Error Handling and Retry Logic

Robust Error Handling

// Implement comprehensive error handling for transport operations
class RobustTransportClient {
  final Transport _transport;
  final TransportErrorHandler _errorHandler;
  final TransportMetrics _metrics;
  
  RobustTransportClient(this._transport)
      : _errorHandler = TransportErrorHandler(),
        _metrics = TransportMetrics();
  
  Future<T> executeWithRetry<T>(
    String method,
    dynamic data, {
    int maxRetries = 3,
    Duration baseDelay = const Duration(seconds: 1),
    bool exponentialBackoff = true,
  }) async {
    int attempts = 0;
    Duration delay = baseDelay;
    
    while (attempts < maxRetries) {
      try {
        final stopwatch = Stopwatch()..start();
        final result = await _transport.request<T>(method, data);
        stopwatch.stop();
        
        // Record successful request
        _metrics.recordRequest(
          method: method,
          success: true,
          duration: stopwatch.elapsed,
        );
        
        return result;
      } catch (e) {
        attempts++;
        
        // Record failed request
        _metrics.recordRequest(
          method: method,
          success: false,
          error: e.toString(),
        );
        
        // Check if error is retryable
        if (!_errorHandler.isRetryable(e) || attempts >= maxRetries) {
          throw TransportException('Request failed after $attempts attempts: $e');
        }
        
        // Wait before retry
        await Future.delayed(delay);
        
        // Exponential backoff
        if (exponentialBackoff) {
          delay = Duration(milliseconds: (delay.inMilliseconds * 1.5).round());
        }
      }
    }
    
    throw TransportException('Request failed after $maxRetries attempts');
  }
  
  Future<T> executeWithCircuitBreaker<T>(
    String method,
    dynamic data,
  ) async {
    if (_errorHandler.isCircuitOpen(method)) {
      throw CircuitBreakerOpenException('Circuit breaker is open for $method');
    }
    
    try {
      final result = await _transport.request<T>(method, data);
      _errorHandler.recordSuccess(method);
      return result;
    } catch (e) {
      _errorHandler.recordFailure(method, e);
      rethrow;
    }
  }
}

// Circuit breaker implementation
class TransportErrorHandler {
  final Map<String, CircuitBreakerState> _circuitStates = {};
  final int _failureThreshold = 5;
  final Duration _timeout = Duration(minutes: 1);
  
  bool isRetryable(dynamic error) {
    if (error is TimeoutException) return true;
    if (error is ConnectionException) return true;
    if (error is TransportException && error.code != null) {
      // Retry on 5xx server errors, not on 4xx client errors
      return error.code! >= 500;
    }
    return false;
  }
  
  bool isCircuitOpen(String method) {
    final state = _circuitStates[method];
    if (state == null) return false;
    
    if (state.state == CircuitState.open) {
      if (DateTime.now().isAfter(state.nextAttempt)) {
        state.state = CircuitState.halfOpen;
        return false;
      }
      return true;
    }
    
    return false;
  }
  
  void recordSuccess(String method) {
    final state = _circuitStates[method];
    if (state != null) {
      state.consecutiveFailures = 0;
      state.state = CircuitState.closed;
    }
  }
  
  void recordFailure(String method, dynamic error) {
    final state = _circuitStates.putIfAbsent(
      method,
      () => CircuitBreakerState(),
    );
    
    state.consecutiveFailures++;
    
    if (state.consecutiveFailures >= _failureThreshold) {
      state.state = CircuitState.open;
      state.nextAttempt = DateTime.now().add(_timeout);
    }
  }
}

class CircuitBreakerState {
  CircuitState state = CircuitState.closed;
  int consecutiveFailures = 0;
  DateTime nextAttempt = DateTime.now();
}

enum CircuitState { closed, open, halfOpen }

Performance Optimization

Connection Pooling Strategies

// Advanced connection pooling with health monitoring
class SmartConnectionPool {
  final Map<String, List<PooledConnection>> _connections = {};
  final Map<String, ConnectionHealth> _healthStats = {};
  final int _maxConnectionsPerHost;
  final Duration _healthCheckInterval;
  
  SmartConnectionPool({
    int maxConnectionsPerHost = 10,
    Duration healthCheckInterval = const Duration(minutes: 1),
  }) : _maxConnectionsPerHost = maxConnectionsPerHost,
       _healthCheckInterval = healthCheckInterval {
    _startHealthMonitoring();
  }
  
  Future<PooledConnection> getConnection(String host) async {
    final connections = _connections[host] ?? [];
    
    // Find available healthy connection
    for (final conn in connections) {
      if (conn.isAvailable && conn.isHealthy) {
        conn.markInUse();
        return conn;
      }
    }
    
    // Create new connection if under limit
    if (connections.length < _maxConnectionsPerHost) {
      final newConn = await _createConnection(host);
      connections.add(newConn);
      _connections[host] = connections;
      return newConn;
    }
    
    // Wait for available connection
    return await _waitForAvailableConnection(host);
  }
  
  void returnConnection(PooledConnection connection) {
    connection.markAvailable();
    _updateConnectionHealth(connection);
  }
  
  Future<PooledConnection> _createConnection(String host) async {
    final connection = PooledConnection(
      host: host,
      createdAt: DateTime.now(),
    );
    
    await connection.connect();
    return connection;
  }
  
  void _startHealthMonitoring() {
    Timer.periodic(_healthCheckInterval, (_) => _performHealthChecks());
  }
  
  Future<void> _performHealthChecks() async {
    for (final entry in _connections.entries) {
      final host = entry.key;
      final connections = entry.value;
      
      for (final conn in connections) {
        if (conn.isAvailable) {
          await _checkConnectionHealth(conn);
        }
      }
      
      // Remove unhealthy connections
      connections.removeWhere((conn) => !conn.isHealthy);
      
      // Update host health statistics
      _updateHostHealth(host, connections);
    }
  }
  
  Future<void> _checkConnectionHealth(PooledConnection connection) async {
    try {
      await connection.ping();
      connection.recordHealthCheck(true);
    } catch (e) {
      connection.recordHealthCheck(false);
    }
  }
}

class PooledConnection {
  final String host;
  final DateTime createdAt;
  bool _inUse = false;
  bool _healthy = true;
  int _successfulChecks = 0;
  int _failedChecks = 0;
  DateTime _lastUsed = DateTime.now();
  
  PooledConnection({
    required this.host,
    required this.createdAt,
  });
  
  bool get isAvailable => !_inUse;
  bool get isHealthy => _healthy && _failedChecks < 3;
  
  void markInUse() {
    _inUse = true;
    _lastUsed = DateTime.now();
  }
  
  void markAvailable() {
    _inUse = false;
  }
  
  void recordHealthCheck(bool success) {
    if (success) {
      _successfulChecks++;
      _failedChecks = 0; // Reset failed count on success
    } else {
      _failedChecks++;
    }
    
    // Update health status
    _healthy = _failedChecks < 3;
  }
  
  Future<void> connect() async {
    // Implement actual connection logic
  }
  
  Future<void> ping() async {
    // Implement ping/health check
  }
}

Transport Performance Monitoring

// Comprehensive transport performance monitoring
class TransportPerformanceMonitor {
  final Map<String, MethodMetrics> _methodMetrics = {};
  final Map<String, HostMetrics> _hostMetrics = {};
  final StreamController<PerformanceAlert> _alertController;
  
  TransportPerformanceMonitor()
      : _alertController = StreamController<PerformanceAlert>.broadcast();
  
  Stream<PerformanceAlert> get alerts => _alertController.stream;
  
  void recordRequest({
    required String method,
    required String host,
    required Duration duration,
    required bool success,
    String? error,
    int? responseSize,
  }) {
    // Record method-level metrics
    final methodMetrics = _methodMetrics.putIfAbsent(
      method,
      () => MethodMetrics(method),
    );
    methodMetrics.recordRequest(duration, success, responseSize);
    
    // Record host-level metrics
    final hostMetrics = _hostMetrics.putIfAbsent(
      host,
      () => HostMetrics(host),
    );
    hostMetrics.recordRequest(duration, success);
    
    // Check for performance issues
    _checkPerformanceThresholds(method, host, duration, success);
  }
  
  void _checkPerformanceThresholds(
    String method,
    String host,
    Duration duration,
    bool success,
  ) {
    // Check latency threshold
    if (duration.inMilliseconds > 5000) {
      _alertController.add(PerformanceAlert(
        type: AlertType.highLatency,
        message: 'High latency detected for $method on $host: ${duration.inMilliseconds}ms',
        method: method,
        host: host,
        value: duration.inMilliseconds.toDouble(),
        threshold: 5000.0,
      ));
    }
    
    // Check error rate
    final methodMetrics = _methodMetrics[method];
    if (methodMetrics != null && methodMetrics.errorRate > 0.1) {
      _alertController.add(PerformanceAlert(
        type: AlertType.highErrorRate,
        message: 'High error rate for $method: ${(methodMetrics.errorRate * 100).toStringAsFixed(1)}%',
        method: method,
        host: host,
        value: methodMetrics.errorRate,
        threshold: 0.1,
      ));
    }
  }
  
  Map<String, dynamic> getPerformanceReport() {
    return {
      'methods': _methodMetrics.map((k, v) => MapEntry(k, v.toJson())),
      'hosts': _hostMetrics.map((k, v) => MapEntry(k, v.toJson())),
      'summary': _generateSummary(),
    };
  }
  
  Map<String, dynamic> _generateSummary() {
    final totalRequests = _methodMetrics.values
        .fold<int>(0, (sum, metrics) => sum + metrics.totalRequests);
    
    final totalErrors = _methodMetrics.values
        .fold<int>(0, (sum, metrics) => sum + metrics.errorCount);
    
    final averageLatency = _methodMetrics.values
        .where((m) => m.totalRequests > 0)
        .map((m) => m.averageLatency)
        .fold<double>(0.0, (sum, latency) => sum + latency) /
        _methodMetrics.length;
    
    return {
      'totalRequests': totalRequests,
      'totalErrors': totalErrors,
      'overallErrorRate': totalRequests > 0 ? totalErrors / totalRequests : 0.0,
      'averageLatency': averageLatency,
      'activeHosts': _hostMetrics.length,
      'activeMethods': _methodMetrics.length,
    };
  }
}

class MethodMetrics {
  final String method;
  int totalRequests = 0;
  int errorCount = 0;
  Duration totalLatency = Duration.zero;
  int totalResponseSize = 0;
  final List<Duration> recentLatencies = [];
  
  MethodMetrics(this.method);
  
  void recordRequest(Duration latency, bool success, int? responseSize) {
    totalRequests++;
    totalLatency += latency;
    
    if (!success) {
      errorCount++;
    }
    
    if (responseSize != null) {
      totalResponseSize += responseSize;
    }
    
    // Keep recent latencies for percentile calculations
    recentLatencies.add(latency);
    if (recentLatencies.length > 1000) {
      recentLatencies.removeAt(0);
    }
  }
  
  double get errorRate => totalRequests > 0 ? errorCount / totalRequests : 0.0;
  double get averageLatency => totalRequests > 0 
      ? totalLatency.inMicroseconds / totalRequests / 1000.0 : 0.0;
  
  Map<String, dynamic> toJson() {
    final sortedLatencies = List<Duration>.from(recentLatencies)..sort();
    
    return {
      'method': method,
      'totalRequests': totalRequests,
      'errorCount': errorCount,
      'errorRate': errorRate,
      'averageLatency': averageLatency,
      'p50Latency': _percentile(sortedLatencies, 0.5),
      'p95Latency': _percentile(sortedLatencies, 0.95),
      'p99Latency': _percentile(sortedLatencies, 0.99),
      'averageResponseSize': totalRequests > 0 
          ? totalResponseSize / totalRequests : 0,
    };
  }
  
  double _percentile(List<Duration> sorted, double percentile) {
    if (sorted.isEmpty) return 0.0;
    final index = (sorted.length * percentile).floor();
    return sorted[index.clamp(0, sorted.length - 1)].inMicroseconds / 1000.0;
  }
}

class HostMetrics {
  final String host;
  int totalRequests = 0;
  int errorCount = 0;
  Duration totalLatency = Duration.zero;
  DateTime lastRequest = DateTime.now();
  
  HostMetrics(this.host);
  
  void recordRequest(Duration latency, bool success) {
    totalRequests++;
    totalLatency += latency;
    lastRequest = DateTime.now();
    
    if (!success) {
      errorCount++;
    }
  }
  
  double get errorRate => totalRequests > 0 ? errorCount / totalRequests : 0.0;
  double get averageLatency => totalRequests > 0 
      ? totalLatency.inMicroseconds / totalRequests / 1000.0 : 0.0;
  
  Map<String, dynamic> toJson() {
    return {
      'host': host,
      'totalRequests': totalRequests,
      'errorCount': errorCount,
      'errorRate': errorRate,
      'averageLatency': averageLatency,
      'lastRequest': lastRequest.toIso8601String(),
    };
  }
}

enum AlertType { highLatency, highErrorRate, connectionFailure }

class PerformanceAlert {
  final AlertType type;
  final String message;
  final String method;
  final String host;
  final double value;
  final double threshold;
  final DateTime timestamp;
  
  PerformanceAlert({
    required this.type,
    required this.message,
    required this.method,
    required this.host,
    required this.value,
    required this.threshold,
  }) : timestamp = DateTime.now();
}

Best Practices

Transport Selection Guidelines

1. Protocol Selection Matrix

// Helper class for transport protocol selection
class TransportSelector {
  static TransportRecommendation selectOptimalTransport({
    required ApplicationRequirements requirements,
    required NetworkConditions network,
    required PerformanceTargets performance,
  }) {
    // Analyze requirements and recommend transport
    if (requirements.needsRealTime && requirements.needsBidirectional) {
      return TransportRecommendation(
        type: TransportType.websocket,
        reason: 'Real-time bidirectional communication required',
        configuration: _getWebSocketConfig(performance),
      );
    }
    
    if (performance.latencyTarget.inMilliseconds < 10 && 
        requirements.throughputRequirement > 10000) {
      return TransportRecommendation(
        type: TransportType.tcp,
        reason: 'Ultra-low latency and high throughput required',
        configuration: _getTcpConfig(performance),
      );
    }
    
    if (requirements.needsRestCompatibility || network.hasFirewallRestrictions) {
      return TransportRecommendation(
        type: TransportType.http,
        reason: 'REST compatibility or firewall restrictions',
        configuration: _getHttpConfig(performance),
      );
    }
    
    // Default recommendation
    return TransportRecommendation(
      type: TransportType.http,
      reason: 'General purpose usage',
      configuration: _getHttpConfig(performance),
    );
  }
  
  static Map<String, dynamic> _getWebSocketConfig(PerformanceTargets performance) {
    return {
      'timeout': performance.latencyTarget.inSeconds * 2,
      'pingInterval': 30,
      'reconnectDelay': 1000,
      'maxReconnectAttempts': 5,
    };
  }
  
  static Map<String, dynamic> _getTcpConfig(PerformanceTargets performance) {
    return {
      'timeout': performance.latencyTarget.inSeconds,
      'keepAlive': true,
      'noDelay': true,
      'bufferSize': 65536,
    };
  }
  
  static Map<String, dynamic> _getHttpConfig(PerformanceTargets performance) {
    return {
      'timeout': performance.latencyTarget.inSeconds * 3,
      'maxConnections': performance.concurrentRequests,
      'connectionTimeout': 10,
      'idleTimeout': 120,
    };
  }
}

class ApplicationRequirements {
  final bool needsRealTime;
  final bool needsBidirectional;
  final bool needsRestCompatibility;
  final int throughputRequirement;
  final bool needsStreaming;
  
  ApplicationRequirements({
    required this.needsRealTime,
    required this.needsBidirectional,
    required this.needsRestCompatibility,
    required this.throughputRequirement,
    required this.needsStreaming,
  });
}

class NetworkConditions {
  final bool hasFirewallRestrictions;
  final bool hasProxyServers;
  final Duration averageLatency;
  final double packetLossRate;
  
  NetworkConditions({
    required this.hasFirewallRestrictions,
    required this.hasProxyServers,
    required this.averageLatency,
    required this.packetLossRate,
  });
}

class PerformanceTargets {
  final Duration latencyTarget;
  final int concurrentRequests;
  final int throughputRequirement;
  
  PerformanceTargets({
    required this.latencyTarget,
    required this.concurrentRequests,
    required this.throughputRequirement,
  });
}

class TransportRecommendation {
  final TransportType type;
  final String reason;
  final Map<String, dynamic> configuration;
  
  TransportRecommendation({
    required this.type,
    required this.reason,
    required this.configuration,
  });
}

2. Configuration Management

// Centralized transport configuration management
class TransportConfigurationManager {
  final Map<String, TransportConfig> _configs = {};
  final Map<String, Transport> _activeTransports = {};
  
  void registerConfig(String name, TransportConfig config) {
    _configs[name] = config;
  }
  
  Future<Transport> getTransport(String name) async {
    // Return existing transport if available
    final existing = _activeTransports[name];
    if (existing != null && existing.isConnected) {
      return existing;
    }
    
    // Create new transport from configuration
    final config = _configs[name];
    if (config == null) {
      throw ArgumentError('Transport configuration not found: $name');
    }
    
    final transport = await _createTransportFromConfig(config);
    _activeTransports[name] = transport;
    
    return transport;
  }
  
  Future<Transport> _createTransportFromConfig(TransportConfig config) async {
    final transport = TransportFactory.create(config.type, config.options);
    await transport.connect(config.endpoint);
    return transport;
  }
  
  Future<void> closeAllTransports() async {
    for (final transport in _activeTransports.values) {
      await transport.disconnect();
    }
    _activeTransports.clear();
  }
}

class TransportConfig {
  final String name;
  final TransportType type;
  final Uri endpoint;
  final Map<String, dynamic> options;
  final Duration timeout;
  final int maxRetries;
  
  TransportConfig({
    required this.name,
    required this.type,
    required this.endpoint,
    required this.options,
    required this.timeout,
    required this.maxRetries,
  });
  
  factory TransportConfig.fromJson(Map<String, dynamic> json) {
    return TransportConfig(
      name: json['name'],
      type: TransportType.values.firstWhere(
        (t) => t.toString().split('.').last == json['type'],
      ),
      endpoint: Uri.parse(json['endpoint']),
      options: Map<String, dynamic>.from(json['options'] ?? {}),
      timeout: Duration(seconds: json['timeout'] ?? 30),
      maxRetries: json['maxRetries'] ?? 3,
    );
  }
}

Security and Authentication

Transport-Level Security

// Secure transport wrapper with authentication
class SecureTransportWrapper implements Transport {
  final Transport _underlying;
  final AuthenticationProvider _auth;
  final EncryptionProvider _encryption;
  
  SecureTransportWrapper(
    this._underlying,
    this._auth,
    this._encryption,
  );
  
  @override
  Future<void> connect(Uri endpoint) async {
    await _underlying.connect(endpoint);
    await _authenticateConnection();
  }
  
  @override
  Future<T> request<T>(String method, dynamic data) async {
    // Add authentication headers
    final authenticatedData = await _addAuthentication(method, data);
    
    // Encrypt sensitive data
    final encryptedData = await _encryption.encrypt(authenticatedData);
    
    try {
      final response = await _underlying.request<T>(method, encryptedData);
      
      // Decrypt response
      return await _encryption.decrypt(response);
    } catch (e) {
      // Handle authentication errors
      if (_isAuthenticationError(e)) {
        await _refreshAuthentication();
        return await request<T>(method, data); // Retry once
      }
      rethrow;
    }
  }
  
  Future<void> _authenticateConnection() async {
    final token = await _auth.getAccessToken();
    
    // Send authentication request
    await _underlying.request('authenticate', {
      'token': token,
      'timestamp': DateTime.now().toIso8601String(),
    });
  }
  
  Future<Map<String, dynamic>> _addAuthentication(
    String method,
    dynamic data,
  ) async {
    final token = await _auth.getAccessToken();
    
    return {
      'method': method,
      'data': data,
      'auth': {
        'token': token,
        'timestamp': DateTime.now().toIso8601String(),
      },
    };
  }
  
  bool _isAuthenticationError(dynamic error) {
    if (error is TransportException) {
      return error.code == 401 || error.code == 403;
    }
    return false;
  }
  
  Future<void> _refreshAuthentication() async {
    await _auth.refreshToken();
  }
  
  // Delegate other methods to underlying transport
  @override
  Stream<T> stream<T>(String method, dynamic data) => 
      _underlying.stream<T>(method, data);
  
  @override
  Future<void> disconnect() => _underlying.disconnect();
  
  @override
  bool get isConnected => _underlying.isConnected;
  
  @override
  String get transportType => 'secure_${_underlying.transportType}';
  
  @override
  bool get supportsStreaming => _underlying.supportsStreaming;
  
  @override
  bool get supportsBidirectionalStreaming => 
      _underlying.supportsBidirectionalStreaming;
}

abstract class AuthenticationProvider {
  Future<String> getAccessToken();
  Future<void> refreshToken();
}

abstract class EncryptionProvider {
  Future<dynamic> encrypt(dynamic data);
  Future<T> decrypt<T>(dynamic encryptedData);
}

Advanced Topics

Custom Transport Implementation

Building a Custom Transport

// Example custom transport for specialized protocols
class CustomBinaryTransport extends BaseTransport {
  Socket? _socket;
  late BinaryProtocolHandler _protocolHandler;
  final Map<String, Completer<dynamic>> _pendingRequests = {};
  int _requestId = 0;
  
  CustomBinaryTransport({
    Duration timeout = const Duration(seconds: 30),
    void Function(String, [StackTrace?])? onError,
    void Function(String)? onLog,
  }) : super(timeout: timeout, onError: onError, onLog: onLog) {
    _protocolHandler = BinaryProtocolHandler();
  }
  
  @override
  String get transportType => 'custom_binary';
  
  @override
  bool get supportsStreaming => true;
  
  @override
  bool get supportsBidirectionalStreaming => true;
  
  @override
  Future<void> connect(Uri endpoint) async {
    try {
      _socket = await Socket.connect(endpoint.host, endpoint.port);
      _socket!.listen(_handleData, onError: _handleError, onDone: _handleDone);
      setConnected(true);
      log('Connected to ${endpoint.host}:${endpoint.port}');
    } catch (e) {
      throw ConnectionException('Failed to connect: $e');
    }
  }
  
  @override
  Future<T> request<T>(String method, dynamic data) async {
    if (!isConnected) {
      throw ConnectionException('Transport not connected');
    }
    
    final requestId = _generateRequestId();
    final completer = Completer<T>();
    _pendingRequests[requestId] = completer;
    
    try {
      final message = _protocolHandler.encodeRequest(requestId, method, data);
      _socket!.add(message);
      
      return await withTimeout(completer.future);
    } catch (e) {
      _pendingRequests.remove(requestId);
      rethrow;
    }
  }
  
  @override
  Stream<T> stream<T>(String method, dynamic data) {
    final controller = StreamController<T>();
    final streamId = _generateRequestId();
    
    // Register stream
    registerStreamController(streamId, controller);
    
    // Send stream request
    final message = _protocolHandler.encodeStreamRequest(streamId, method, data);
    _socket!.add(message);
    
    return controller.stream;
  }
  
  @override
  Future<void> disconnect() async {
    if (_socket != null) {
      await _socket!.close();
      _socket = null;
    }
    
    // Complete pending requests with error
    for (final completer in _pendingRequests.values) {
      if (!completer.isCompleted) {
        completer.completeError(ConnectionException('Transport disconnected'));
      }
    }
    _pendingRequests.clear();
    
    await cleanupStreams();
    setConnected(false);
  }
  
  void _handleData(Uint8List data) {
    try {
      final messages = _protocolHandler.decodeMessages(data);
      
      for (final message in messages) {
        _processMessage(message);
      }
    } catch (e) {
      logError('Error processing data: $e');
    }
  }
  
  void _processMessage(BinaryMessage message) {
    switch (message.type) {
      case MessageType.response:
        _handleResponse(message);
        break;
      case MessageType.streamData:
        _handleStreamData(message);
        break;
      case MessageType.error:
        _handleErrorMessage(message);
        break;
    }
  }
  
  void _handleResponse(BinaryMessage message) {
    final completer = _pendingRequests.remove(message.id);
    if (completer != null && !completer.isCompleted) {
      completer.complete(message.data);
    }
  }
  
  void _handleStreamData(BinaryMessage message) {
    final controller = getStreamController(message.id);
    if (controller != null) {
      controller.add(message.data);
    }
  }
  
  void _handleErrorMessage(BinaryMessage message) {
    final error = TransportException(message.data.toString());
    
    final completer = _pendingRequests.remove(message.id);
    if (completer != null && !completer.isCompleted) {
      completer.completeError(error);
      return;
    }
    
    final controller = getStreamController(message.id);
    if (controller != null) {
      controller.addError(error);
    }
  }
  
  String _generateRequestId() {
    return 'req_${++_requestId}';
  }
}

// Binary protocol handler for custom transport
class BinaryProtocolHandler {
  static const int _headerSize = 12; // 4 bytes type + 4 bytes id + 4 bytes length
  
  Uint8List encodeRequest(String id, String method, dynamic data) {
    final payload = _encodePayload({'method': method, 'data': data});
    return _encodeMessage(MessageType.request, id, payload);
  }
  
  Uint8List encodeStreamRequest(String id, String method, dynamic data) {
    final payload = _encodePayload({'method': method, 'data': data});
    return _encodeMessage(MessageType.streamRequest, id, payload);
  }
  
  List<BinaryMessage> decodeMessages(Uint8List data) {
    final messages = <BinaryMessage>[];
    int offset = 0;
    
    while (offset + _headerSize <= data.length) {
      final header = data.sublist(offset, offset + _headerSize);
      final type = MessageType.values[_readInt32(header, 0)];
      final id = _readString(header, 4);
      final length = _readInt32(header, 8);
      
      if (offset + _headerSize + length > data.length) {
        break; // Incomplete message
      }
      
      final payload = data.sublist(
        offset + _headerSize,
        offset + _headerSize + length,
      );
      
      messages.add(BinaryMessage(
        type: type,
        id: id,
        data: _decodePayload(payload),
      ));
      
      offset += _headerSize + length;
    }
    
    return messages;
  }
  
  Uint8List _encodeMessage(MessageType type, String id, Uint8List payload) {
    final header = Uint8List(_headerSize);
    _writeInt32(header, 0, type.index);
    _writeString(header, 4, id);
    _writeInt32(header, 8, payload.length);
    
    final message = Uint8List(_headerSize + payload.length);
    message.setRange(0, _headerSize, header);
    message.setRange(_headerSize, message.length, payload);
    
    return message;
  }
  
  Uint8List _encodePayload(dynamic data) {
    final json = jsonEncode(data);
    return utf8.encode(json);
  }
  
  dynamic _decodePayload(Uint8List payload) {
    final json = utf8.decode(payload);
    return jsonDecode(json);
  }
  
  int _readInt32(Uint8List data, int offset) {
    return ByteData.sublistView(data, offset, offset + 4).getInt32(0);
  }
  
  void _writeInt32(Uint8List data, int offset, int value) {
    ByteData.sublistView(data, offset, offset + 4).setInt32(0, value);
  }
  
  String _readString(Uint8List data, int offset) {
    // Simplified - in practice you'd encode string length
    return 'id'; // Placeholder
  }
  
  void _writeString(Uint8List data, int offset, String value) {
    // Simplified - in practice you'd encode string properly
  }
}

enum MessageType { request, response, streamRequest, streamData, error }

class BinaryMessage {
  final MessageType type;
  final String id;
  final dynamic data;
  
  BinaryMessage({
    required this.type,
    required this.id,
    required this.data,
  });
}

Summary

This chapter covered Vektagraf's comprehensive transport layer integration, including:

Key Takeaways

  1. Transport Flexibility: Support for HTTP, WebSocket, and TCP protocols with unified interface
  2. Performance Optimization: Connection pooling, streaming, and performance monitoring
  3. Error Handling: Robust retry logic, circuit breakers, and error recovery
  4. Real-Time Features: Bidirectional streaming and real-time data synchronization
  5. Security Integration: Transport-level authentication and encryption

Next Steps

  • Chapter 13: Learn about monitoring and observability for transport operations
  • Chapter 14: Explore performance tuning strategies for transport optimization
  • Chapter 15: Discover production deployment patterns for distributed systems