Skip to main content

Chapter 14: Monitoring and Observability

Overview

Comprehensive monitoring and observability are essential for maintaining healthy Vektagraf applications in production. This chapter covers metrics collection, structured logging, distributed tracing, alerting strategies, and integration with popular observability platforms.

Learning Objectives

  • Implement comprehensive metrics collection and performance monitoring
  • Configure structured logging with proper log levels and correlation
  • Set up distributed tracing for complex operations
  • Design effective alerting and incident response procedures
  • Integrate with observability platforms like Prometheus, Grafana, and ELK stack

Prerequisites

  • Understanding of Vektagraf core concepts (Chapters 1-3)
  • Knowledge of system administration and DevOps practices
  • Familiarity with monitoring concepts and tools

Core Concepts

Observability Pillars

Vektagraf's observability strategy is built on three pillars:

  1. Metrics: Quantitative measurements of system behavior
  2. Logs: Detailed records of events and operations
  3. Traces: Request flow tracking across distributed components
graph TB
    A[Application] --> B[Metrics Collector]
    A --> C[Logger]
    A --> D[Tracer]
    
    B --> E[Prometheus]
    C --> F[Structured Logs]
    D --> G[Jaeger/Zipkin]
    
    E --> H[Grafana Dashboard]
    F --> I[ELK Stack]
    G --> J[Trace Viewer]
    
    H --> K[Alerting]
    I --> K
    J --> K
    
    K --> L[Incident Response]

Built-in Monitoring Components

Vektagraf provides comprehensive monitoring capabilities:

  • SystemMetricsCollector: Automatic performance metrics collection
  • SystemLogger: Structured logging with correlation IDs
  • SystemMonitoringIntegration: Unified observability interface
  • AuditLogger: Security and compliance event tracking

Practical Examples

Basic Monitoring Setup

1. Enable System Metrics Collection

import 'package:vektagraf/vektagraf.dart';

// Configure database with monitoring enabled
final config = VektagrafConfig(
  monitoring: MonitoringConfig(
    enabled: true,
    metricsFlushInterval: Duration(seconds: 30),
    logLevel: LogLevel.info,
    enableTracing: true,
    enableAuditLogging: true,
  ),
);

final database = await VektagrafDatabase.open(
  'monitored_app.db',
  config: config,
);

// Initialize monitoring system
final metricsCollector = SystemMetricsCollector(database);
final monitoringIntegration = SystemMonitoringIntegration(database);

// Start monitoring
metricsCollector.start(flushInterval: Duration(seconds: 30));
monitoringIntegration.start(
  metricsFlushInterval: Duration(seconds: 30),
  logFlushInterval: Duration(seconds: 10),
  monitoringInterval: Duration(minutes: 1),
);

2. Custom Metrics Collection

// Implement application-specific metrics
class ApplicationMetrics {
  final SystemMetricsCollector _collector;
  
  ApplicationMetrics(this._collector);
  
  // Business metrics
  void recordUserRegistration(String tenantId) {
    _collector.incrementCounter(
      'app_user_registrations_total',
      labels: {'tenant': tenantId},
      tenantId: VektagrafId.fromString(tenantId),
    );
  }
  
  void recordOrderPlaced(String tenantId, double amount) {
    _collector.incrementCounter(
      'app_orders_total',
      labels: {'tenant': tenantId},
      tenantId: VektagrafId.fromString(tenantId),
    );
    
    _collector.recordHistogram(
      'app_order_amount',
      amount,
      labels: {'tenant': tenantId},
      tenantId: VektagrafId.fromString(tenantId),
    );
  }
  
  void recordSearchLatency(Duration latency, String searchType) {
    _collector.recordHistogram(
      'app_search_duration_seconds',
      latency.inMicroseconds / 1000000.0,
      labels: {'search_type': searchType},
    );
  }
  
  // System health metrics
  void recordDatabaseHealth(bool healthy) {
    _collector.setGauge(
      'app_database_healthy',
      healthy ? 1.0 : 0.0,
    );
  }
  
  void recordActiveUsers(int count, String tenantId) {
    _collector.setGauge(
      'app_active_users',
      count.toDouble(),
      labels: {'tenant': tenantId},
      tenantId: VektagrafId.fromString(tenantId),
    );
  }
}

// Usage in application code
final appMetrics = ApplicationMetrics(metricsCollector);

// Record business events
await database.transaction((txn) async {
  final user = User(
    id: VektagrafId.generate(),
    email: 'user@example.com',
    tenantId: 'acme_corp',
  );
  
  await txn.save(user);
  
  // Record metrics
  appMetrics.recordUserRegistration('acme_corp');
});

Structured Logging

1. Configure Structured Logging

// Set up structured logging with correlation
class StructuredLogger {
  final SystemLogger _systemLogger;
  final String _component;
  
  StructuredLogger(this._systemLogger, this._component);
  
  void info(
    String message, {
    Map<String, dynamic>? context,
    String? correlationId,
    VektagrafId? tenantId,
  }) {
    _systemLogger.info(
      message,
      component: _component,
      metadata: {
        if (context != null) ...context,
        if (correlationId != null) 'correlation_id': correlationId,
        'timestamp': DateTime.now().toIso8601String(),
      },
      tenantId: tenantId,
    );
  }
  
  void warn(
    String message, {
    Map<String, dynamic>? context,
    String? correlationId,
    VektagrafId? tenantId,
  }) {
    _systemLogger.warn(
      message,
      component: _component,
      metadata: {
        if (context != null) ...context,
        if (correlationId != null) 'correlation_id': correlationId,
        'timestamp': DateTime.now().toIso8601String(),
      },
      tenantId: tenantId,
    );
  }
  
  void error(
    String message, {
    dynamic error,
    StackTrace? stackTrace,
    Map<String, dynamic>? context,
    String? correlationId,
    VektagrafId? tenantId,
  }) {
    _systemLogger.error(
      message,
      error: error,
      stackTrace: stackTrace,
      component: _component,
      metadata: {
        if (context != null) ...context,
        if (correlationId != null) 'correlation_id': correlationId,
        'timestamp': DateTime.now().toIso8601String(),
      },
      tenantId: tenantId,
    );
  }
}

// Create component-specific loggers
final loggerFactory = SystemLoggerFactory(database);
final apiLogger = StructuredLogger(
  loggerFactory.getLogger('api'),
  'api_server',
);
final dbLogger = StructuredLogger(
  loggerFactory.getLogger('database'),
  'database_operations',
);

2. Request Correlation and Context

// Implement request correlation for distributed tracing
class RequestContext {
  final String correlationId;
  final String userId;
  final String tenantId;
  final DateTime startTime;
  final Map<String, dynamic> metadata;
  
  RequestContext({
    required this.correlationId,
    required this.userId,
    required this.tenantId,
    Map<String, dynamic>? metadata,
  }) : startTime = DateTime.now(),
       metadata = metadata ?? {};
  
  Duration get duration => DateTime.now().difference(startTime);
  
  RequestContext copyWith({
    Map<String, dynamic>? additionalMetadata,
  }) {
    return RequestContext(
      correlationId: correlationId,
      userId: userId,
      tenantId: tenantId,
      metadata: {
        ...metadata,
        if (additionalMetadata != null) ...additionalMetadata,
      },
    );
  }
}

// Request context middleware
class RequestContextMiddleware {
  static RequestContext? _current;
  
  static RequestContext? get current => _current;
  
  static Future<T> runWithContext<T>(
    RequestContext context,
    Future<T> Function() operation,
  ) async {
    final previous = _current;
    _current = context;
    
    try {
      return await operation();
    } finally {
      _current = previous;
    }
  }
  
  static RequestContext createContext({
    required String userId,
    required String tenantId,
    String? correlationId,
    Map<String, dynamic>? metadata,
  }) {
    return RequestContext(
      correlationId: correlationId ?? _generateCorrelationId(),
      userId: userId,
      tenantId: tenantId,
      metadata: metadata,
    );
  }
  
  static String _generateCorrelationId() {
    return 'req_${DateTime.now().millisecondsSinceEpoch}_${VektagrafId.generate()}';
  }
}

// Usage with automatic context propagation
Future<List<Product>> searchProducts(
  String query,
  String userId,
  String tenantId,
) async {
  final context = RequestContextMiddleware.createContext(
    userId: userId,
    tenantId: tenantId,
    metadata: {'operation': 'search_products', 'query': query},
  );
  
  return await RequestContextMiddleware.runWithContext(context, () async {
    apiLogger.info(
      'Starting product search',
      context: {
        'query': query,
        'user_id': userId,
        'tenant_id': tenantId,
      },
      correlationId: context.correlationId,
      tenantId: VektagrafId.fromString(tenantId),
    );
    
    try {
      final stopwatch = Stopwatch()..start();
      
      final results = await database.products()
          .where((p) => p.tenantId == tenantId)
          .vectorSearch(query: query, limit: 50);
      
      stopwatch.stop();
      
      apiLogger.info(
        'Product search completed',
        context: {
          'query': query,
          'result_count': results.length,
          'duration_ms': stopwatch.elapsedMilliseconds,
        },
        correlationId: context.correlationId,
        tenantId: VektagrafId.fromString(tenantId),
      );
      
      // Record metrics
      appMetrics.recordSearchLatency(stopwatch.elapsed, 'vector_search');
      
      return results;
    } catch (e, stackTrace) {
      apiLogger.error(
        'Product search failed',
        error: e,
        stackTrace: stackTrace,
        context: {
          'query': query,
          'user_id': userId,
          'tenant_id': tenantId,
        },
        correlationId: context.correlationId,
        tenantId: VektagrafId.fromString(tenantId),
      );
      
      rethrow;
    }
  });
}

Performance Monitoring

1. Database Operation Monitoring

// Comprehensive database operation monitoring
class DatabaseOperationMonitor {
  final SystemMonitoringIntegration _monitoring;
  final Map<String, OperationMetrics> _operationMetrics = {};
  
  DatabaseOperationMonitor(this._monitoring);
  
  Future<T> monitorOperation<T>(
    String operation,
    String collection,
    Future<T> Function() operation_func, {
    VektagrafId? objectId,
    VektagrafId? tenantId,
    String? userId,
    Map<String, dynamic>? changes,
  }) async {
    final stopwatch = Stopwatch()..start();
    final context = RequestContextMiddleware.current;
    
    try {
      final result = await operation_func();
      stopwatch.stop();
      
      // Record successful operation
      await _monitoring.recordDatabaseOperation(
        operation,
        collection,
        stopwatch.elapsed,
        success: true,
        objectId: objectId,
        tenantId: tenantId,
        userId: userId,
        changes: changes,
      );
      
      // Update operation metrics
      _updateOperationMetrics(operation, stopwatch.elapsed, true);
      
      return result;
    } catch (e) {
      stopwatch.stop();
      
      // Record failed operation
      await _monitoring.recordDatabaseOperation(
        operation,
        collection,
        stopwatch.elapsed,
        success: false,
        objectId: objectId,
        tenantId: tenantId,
        userId: userId,
        errorMessage: e.toString(),
      );
      
      // Update operation metrics
      _updateOperationMetrics(operation, stopwatch.elapsed, false);
      
      rethrow;
    }
  }
  
  void _updateOperationMetrics(String operation, Duration duration, bool success) {
    final metrics = _operationMetrics.putIfAbsent(
      operation,
      () => OperationMetrics(operation),
    );
    
    metrics.recordOperation(duration, success);
  }
  
  Map<String, OperationMetrics> get operationMetrics => 
      Map.unmodifiable(_operationMetrics);
}

class OperationMetrics {
  final String operation;
  int totalOperations = 0;
  int successfulOperations = 0;
  Duration totalDuration = Duration.zero;
  Duration minDuration = Duration(days: 1);
  Duration maxDuration = Duration.zero;
  final List<Duration> recentDurations = [];
  
  OperationMetrics(this.operation);
  
  void recordOperation(Duration duration, bool success) {
    totalOperations++;
    totalDuration += duration;
    
    if (success) {
      successfulOperations++;
    }
    
    if (duration < minDuration) {
      minDuration = duration;
    }
    
    if (duration > maxDuration) {
      maxDuration = duration;
    }
    
    recentDurations.add(duration);
    if (recentDurations.length > 1000) {
      recentDurations.removeAt(0);
    }
  }
  
  double get successRate => totalOperations > 0 
      ? successfulOperations / totalOperations : 0.0;
  
  Duration get averageDuration => totalOperations > 0 
      ? Duration(microseconds: totalDuration.inMicroseconds ~/ totalOperations)
      : Duration.zero;
  
  Duration get p95Duration {
    if (recentDurations.isEmpty) return Duration.zero;
    
    final sorted = List<Duration>.from(recentDurations)..sort();
    final index = (sorted.length * 0.95).floor();
    return sorted[index.clamp(0, sorted.length - 1)];
  }
  
  Map<String, dynamic> toJson() {
    return {
      'operation': operation,
      'total_operations': totalOperations,
      'successful_operations': successfulOperations,
      'success_rate': successRate,
      'average_duration_ms': averageDuration.inMilliseconds,
      'min_duration_ms': minDuration.inMilliseconds,
      'max_duration_ms': maxDuration.inMilliseconds,
      'p95_duration_ms': p95Duration.inMilliseconds,
    };
  }
}

// Usage with automatic monitoring
final dbMonitor = DatabaseOperationMonitor(monitoringIntegration);

Future<void> createProduct(Product product) async {
  await dbMonitor.monitorOperation(
    'create',
    'products',
    () async {
      await database.transaction((txn) async {
        await txn.save(product);
      });
    },
    objectId: product.id,
    tenantId: VektagrafId.fromString(product.tenantId),
    userId: RequestContextMiddleware.current?.userId,
    changes: {'action': 'create', 'product_name': product.name},
  );
}

2. Vector Search Performance Monitoring

// Specialized monitoring for vector operations
class VectorSearchMonitor {
  final SystemMetricsCollector _metricsCollector;
  final StructuredLogger _logger;
  
  VectorSearchMonitor(this._metricsCollector, this._logger);
  
  Future<List<T>> monitorVectorSearch<T>(
    String operation,
    int dimensions,
    int vectorCount,
    Future<List<T>> Function() searchFunction,
  ) async {
    final stopwatch = Stopwatch()..start();
    final context = RequestContextMiddleware.current;
    
    _logger.info(
      'Starting vector search',
      context: {
        'operation': operation,
        'dimensions': dimensions,
        'vector_count': vectorCount,
      },
      correlationId: context?.correlationId,
    );
    
    try {
      final results = await searchFunction();
      stopwatch.stop();
      
      // Record vector operation metrics
      _metricsCollector.recordVectorOperation(
        operation,
        vectorCount,
        dimensions,
        duration: stopwatch.elapsed,
      );
      
      // Record search performance
      _metricsCollector.recordHistogram(
        'vector_search_duration_seconds',
        stopwatch.elapsed.inMicroseconds / 1000000.0,
        labels: {
          'operation': operation,
          'dimensions': dimensions.toString(),
        },
      );
      
      _metricsCollector.setGauge(
        'vector_search_result_count',
        results.length.toDouble(),
        labels: {'operation': operation},
      );
      
      _logger.info(
        'Vector search completed',
        context: {
          'operation': operation,
          'result_count': results.length,
          'duration_ms': stopwatch.elapsedMilliseconds,
          'throughput_vectors_per_second': 
              vectorCount / (stopwatch.elapsedMilliseconds / 1000.0),
        },
        correlationId: context?.correlationId,
      );
      
      return results;
    } catch (e, stackTrace) {
      stopwatch.stop();
      
      _logger.error(
        'Vector search failed',
        error: e,
        stackTrace: stackTrace,
        context: {
          'operation': operation,
          'dimensions': dimensions,
          'vector_count': vectorCount,
          'duration_ms': stopwatch.elapsedMilliseconds,
        },
        correlationId: context?.correlationId,
      );
      
      // Record error metrics
      _metricsCollector.incrementCounter(
        'vector_search_errors_total',
        labels: {
          'operation': operation,
          'error_type': e.runtimeType.toString(),
        },
      );
      
      rethrow;
    }
  }
}

Health Checks and System Status

1. Comprehensive Health Monitoring

// System health monitoring and reporting
class HealthMonitor {
  final VektagrafDatabase _database;
  final SystemMonitoringIntegration _monitoring;
  final Map<String, HealthCheck> _healthChecks = {};
  
  HealthMonitor(this._database, this._monitoring) {
    _registerDefaultHealthChecks();
  }
  
  void _registerDefaultHealthChecks() {
    registerHealthCheck('database', DatabaseHealthCheck(_database));
    registerHealthCheck('memory', MemoryHealthCheck());
    registerHealthCheck('disk', DiskHealthCheck());
    registerHealthCheck('connections', ConnectionHealthCheck());
  }
  
  void registerHealthCheck(String name, HealthCheck check) {
    _healthChecks[name] = check;
  }
  
  Future<SystemHealthReport> getSystemHealth() async {
    final results = <String, HealthCheckResult>{};
    
    for (final entry in _healthChecks.entries) {
      try {
        results[entry.key] = await entry.value.check();
      } catch (e) {
        results[entry.key] = HealthCheckResult(
          status: HealthStatus.unhealthy,
          message: 'Health check failed: $e',
          details: {'error': e.toString()},
        );
      }
    }
    
    final overallStatus = _calculateOverallStatus(results.values);
    
    return SystemHealthReport(
      status: overallStatus,
      timestamp: DateTime.now(),
      checks: results,
      uptime: _getUptime(),
      version: _getVersion(),
    );
  }
  
  HealthStatus _calculateOverallStatus(Iterable<HealthCheckResult> results) {
    if (results.any((r) => r.status == HealthStatus.unhealthy)) {
      return HealthStatus.unhealthy;
    }
    
    if (results.any((r) => r.status == HealthStatus.degraded)) {
      return HealthStatus.degraded;
    }
    
    return HealthStatus.healthy;
  }
  
  Duration _getUptime() {
    // Implementation would track actual uptime
    return Duration(hours: 24); // Placeholder
  }
  
  String _getVersion() {
    return '1.0.0'; // Would get from package info
  }
}

abstract class HealthCheck {
  Future<HealthCheckResult> check();
}

class DatabaseHealthCheck implements HealthCheck {
  final VektagrafDatabase _database;
  
  DatabaseHealthCheck(this._database);
  
  @override
  Future<HealthCheckResult> check() async {
    try {
      final stopwatch = Stopwatch()..start();
      
      // Perform a simple query to test database connectivity
      await _database.query((_) => false).take(1).toList();
      
      stopwatch.stop();
      
      final responseTime = stopwatch.elapsedMilliseconds;
      
      if (responseTime > 5000) {
        return HealthCheckResult(
          status: HealthStatus.degraded,
          message: 'Database response time is slow',
          details: {'response_time_ms': responseTime},
        );
      }
      
      return HealthCheckResult(
        status: HealthStatus.healthy,
        message: 'Database is responsive',
        details: {'response_time_ms': responseTime},
      );
    } catch (e) {
      return HealthCheckResult(
        status: HealthStatus.unhealthy,
        message: 'Database connection failed',
        details: {'error': e.toString()},
      );
    }
  }
}

class MemoryHealthCheck implements HealthCheck {
  @override
  Future<HealthCheckResult> check() async {
    // Simplified memory check - in practice would use platform-specific APIs
    final memoryUsage = _getMemoryUsage();
    
    if (memoryUsage > 0.9) {
      return HealthCheckResult(
        status: HealthStatus.unhealthy,
        message: 'Memory usage critically high',
        details: {'memory_usage_ratio': memoryUsage},
      );
    }
    
    if (memoryUsage > 0.8) {
      return HealthCheckResult(
        status: HealthStatus.degraded,
        message: 'Memory usage high',
        details: {'memory_usage_ratio': memoryUsage},
      );
    }
    
    return HealthCheckResult(
      status: HealthStatus.healthy,
      message: 'Memory usage normal',
      details: {'memory_usage_ratio': memoryUsage},
    );
  }
  
  double _getMemoryUsage() {
    // Placeholder - would implement actual memory monitoring
    return 0.6; // 60% usage
  }
}

enum HealthStatus { healthy, degraded, unhealthy }

class HealthCheckResult {
  final HealthStatus status;
  final String message;
  final Map<String, dynamic> details;
  final DateTime timestamp;
  
  HealthCheckResult({
    required this.status,
    required this.message,
    required this.details,
  }) : timestamp = DateTime.now();
}

class SystemHealthReport {
  final HealthStatus status;
  final DateTime timestamp;
  final Map<String, HealthCheckResult> checks;
  final Duration uptime;
  final String version;
  
  SystemHealthReport({
    required this.status,
    required this.timestamp,
    required this.checks,
    required this.uptime,
    required this.version,
  });
  
  Map<String, dynamic> toJson() {
    return {
      'status': status.name,
      'timestamp': timestamp.toIso8601String(),
      'uptime_seconds': uptime.inSeconds,
      'version': version,
      'checks': checks.map((k, v) => MapEntry(k, {
        'status': v.status.name,
        'message': v.message,
        'details': v.details,
        'timestamp': v.timestamp.toIso8601String(),
      })),
    };
  }
}

Alerting and Incident Response

1. Alert Configuration and Management

// Comprehensive alerting system
class AlertManager {
  final List<AlertRule> _rules = [];
  final List<AlertChannel> _channels = [];
  final Map<String, Alert> _activeAlerts = {};
  final StreamController<Alert> _alertController;
  
  AlertManager() : _alertController = StreamController<Alert>.broadcast();
  
  Stream<Alert> get alerts => _alertController.stream;
  
  void addRule(AlertRule rule) {
    _rules.add(rule);
  }
  
  void addChannel(AlertChannel channel) {
    _channels.add(channel);
  }
  
  Future<void> evaluateMetrics(Map<String, dynamic> metrics) async {
    for (final rule in _rules) {
      final shouldAlert = await rule.evaluate(metrics);
      
      if (shouldAlert && !_activeAlerts.containsKey(rule.name)) {
        // Create new alert
        final alert = Alert(
          id: VektagrafId.generate().toString(),
          rule: rule,
          triggeredAt: DateTime.now(),
          metrics: metrics,
        );
        
        _activeAlerts[rule.name] = alert;
        _alertController.add(alert);
        
        // Send notifications
        await _sendAlert(alert);
      } else if (!shouldAlert && _activeAlerts.containsKey(rule.name)) {
        // Resolve alert
        final alert = _activeAlerts.remove(rule.name)!;
        alert.resolvedAt = DateTime.now();
        
        await _sendResolution(alert);
      }
    }
  }
  
  Future<void> _sendAlert(Alert alert) async {
    for (final channel in _channels) {
      if (channel.shouldSend(alert)) {
        try {
          await channel.send(alert);
        } catch (e) {
          print('Failed to send alert via ${channel.name}: $e');
        }
      }
    }
  }
  
  Future<void> _sendResolution(Alert alert) async {
    for (final channel in _channels) {
      if (channel.shouldSend(alert)) {
        try {
          await channel.sendResolution(alert);
        } catch (e) {
          print('Failed to send resolution via ${channel.name}: $e');
        }
      }
    }
  }
}

abstract class AlertRule {
  String get name;
  AlertSeverity get severity;
  Duration get evaluationInterval;
  
  Future<bool> evaluate(Map<String, dynamic> metrics);
}

class MetricThresholdRule implements AlertRule {
  @override
  final String name;
  
  @override
  final AlertSeverity severity;
  
  @override
  final Duration evaluationInterval;
  
  final String metricName;
  final double threshold;
  final ThresholdComparison comparison;
  final Duration duration;
  
  final List<DateTime> _violations = [];
  
  MetricThresholdRule({
    required this.name,
    required this.severity,
    required this.metricName,
    required this.threshold,
    required this.comparison,
    this.evaluationInterval = const Duration(minutes: 1),
    this.duration = const Duration(minutes: 5),
  });
  
  @override
  Future<bool> evaluate(Map<String, dynamic> metrics) async {
    final value = metrics[metricName] as double?;
    if (value == null) return false;
    
    final now = DateTime.now();
    final isViolation = _checkThreshold(value);
    
    if (isViolation) {
      _violations.add(now);
    }
    
    // Remove old violations
    _violations.removeWhere((time) => 
        now.difference(time) > duration);
    
    // Alert if violations persist for the duration
    return _violations.isNotEmpty && 
           now.difference(_violations.first) >= duration;
  }
  
  bool _checkThreshold(double value) {
    switch (comparison) {
      case ThresholdComparison.greaterThan:
        return value > threshold;
      case ThresholdComparison.lessThan:
        return value < threshold;
      case ThresholdComparison.equals:
        return value == threshold;
    }
  }
}

class ErrorRateRule implements AlertRule {
  @override
  final String name;
  
  @override
  final AlertSeverity severity;
  
  @override
  final Duration evaluationInterval;
  
  final double errorRateThreshold;
  final Duration timeWindow;
  
  ErrorRateRule({
    required this.name,
    required this.severity,
    required this.errorRateThreshold,
    this.evaluationInterval = const Duration(minutes: 1),
    this.timeWindow = const Duration(minutes: 5),
  });
  
  @override
  Future<bool> evaluate(Map<String, dynamic> metrics) async {
    final totalRequests = metrics['total_requests'] as int? ?? 0;
    final errorRequests = metrics['error_requests'] as int? ?? 0;
    
    if (totalRequests == 0) return false;
    
    final errorRate = errorRequests / totalRequests;
    return errorRate > errorRateThreshold;
  }
}

enum AlertSeverity { info, warning, critical }
enum ThresholdComparison { greaterThan, lessThan, equals }

class Alert {
  final String id;
  final AlertRule rule;
  final DateTime triggeredAt;
  final Map<String, dynamic> metrics;
  DateTime? resolvedAt;
  
  Alert({
    required this.id,
    required this.rule,
    required this.triggeredAt,
    required this.metrics,
  });
  
  Duration? get duration => resolvedAt?.difference(triggeredAt);
  bool get isResolved => resolvedAt != null;
}

abstract class AlertChannel {
  String get name;
  
  bool shouldSend(Alert alert);
  Future<void> send(Alert alert);
  Future<void> sendResolution(Alert alert);
}

class EmailAlertChannel implements AlertChannel {
  @override
  String get name => 'email';
  
  final List<String> recipients;
  final AlertSeverity minimumSeverity;
  
  EmailAlertChannel({
    required this.recipients,
    this.minimumSeverity = AlertSeverity.warning,
  });
  
  @override
  bool shouldSend(Alert alert) {
    return alert.rule.severity.index >= minimumSeverity.index;
  }
  
  @override
  Future<void> send(Alert alert) async {
    final subject = '[${alert.rule.severity.name.toUpperCase()}] ${alert.rule.name}';
    final body = _formatAlertEmail(alert);
    
    // Send email (implementation would use actual email service)
    print('Sending email alert: $subject');
    print(body);
  }
  
  @override
  Future<void> sendResolution(Alert alert) async {
    final subject = '[RESOLVED] ${alert.rule.name}';
    final body = _formatResolutionEmail(alert);
    
    print('Sending resolution email: $subject');
    print(body);
  }
  
  String _formatAlertEmail(Alert alert) {
    return '''
Alert: ${alert.rule.name}
Severity: ${alert.rule.severity.name}
Triggered: ${alert.triggeredAt}

Metrics:
${alert.metrics.entries.map((e) => '${e.key}: ${e.value}').join('\n')}
''';
  }
  
  String _formatResolutionEmail(Alert alert) {
    return '''
Alert Resolved: ${alert.rule.name}
Duration: ${alert.duration}
Resolved: ${alert.resolvedAt}
''';
  }
}

class SlackAlertChannel implements AlertChannel {
  @override
  String get name => 'slack';
  
  final String webhookUrl;
  final String channel;
  final AlertSeverity minimumSeverity;
  
  SlackAlertChannel({
    required this.webhookUrl,
    required this.channel,
    this.minimumSeverity = AlertSeverity.warning,
  });
  
  @override
  bool shouldSend(Alert alert) {
    return alert.rule.severity.index >= minimumSeverity.index;
  }
  
  @override
  Future<void> send(Alert alert) async {
    final message = _formatSlackMessage(alert);
    
    // Send to Slack (implementation would use HTTP client)
    print('Sending Slack alert to $channel: $message');
  }
  
  @override
  Future<void> sendResolution(Alert alert) async {
    final message = _formatSlackResolution(alert);
    
    print('Sending Slack resolution to $channel: $message');
  }
  
  String _formatSlackMessage(Alert alert) {
    final emoji = _getSeverityEmoji(alert.rule.severity);
    return '$emoji Alert: ${alert.rule.name} - ${alert.rule.severity.name}';
  }
  
  String _formatSlackResolution(Alert alert) {
    return '✅ Resolved: ${alert.rule.name} (Duration: ${alert.duration})';
  }
  
  String _getSeverityEmoji(AlertSeverity severity) {
    switch (severity) {
      case AlertSeverity.info:
        return 'ℹ️';
      case AlertSeverity.warning:
        return '⚠️';
      case AlertSeverity.critical:
        return '🚨';
    }
  }
}

2. Incident Response Automation

// Automated incident response system
class IncidentManager {
  final AlertManager _alertManager;
  final List<IncidentResponse> _responses = [];
  final Map<String, Incident> _activeIncidents = {};
  
  IncidentManager(this._alertManager) {
    _alertManager.alerts.listen(_handleAlert);
  }
  
  void addResponse(IncidentResponse response) {
    _responses.add(response);
  }
  
  Future<void> _handleAlert(Alert alert) async {
    // Check if this should create an incident
    if (alert.rule.severity == AlertSeverity.critical) {
      await _createIncident(alert);
    }
    
    // Execute automated responses
    for (final response in _responses) {
      if (response.shouldRespond(alert)) {
        try {
          await response.execute(alert);
        } catch (e) {
          print('Incident response failed: $e');
        }
      }
    }
  }
  
  Future<void> _createIncident(Alert alert) async {
    final incident = Incident(
      id: VektagrafId.generate().toString(),
      title: 'Critical Alert: ${alert.rule.name}',
      description: 'Automated incident created from critical alert',
      severity: IncidentSeverity.high,
      status: IncidentStatus.open,
      createdAt: DateTime.now(),
      alerts: [alert],
    );
    
    _activeIncidents[incident.id] = incident;
    
    // Notify incident response team
    await _notifyIncidentTeam(incident);
  }
  
  Future<void> _notifyIncidentTeam(Incident incident) async {
    // Implementation would integrate with incident management tools
    print('Incident created: ${incident.title}');
  }
}

abstract class IncidentResponse {
  String get name;
  
  bool shouldRespond(Alert alert);
  Future<void> execute(Alert alert);
}

class AutoScaleResponse implements IncidentResponse {
  @override
  String get name => 'auto_scale';
  
  @override
  bool shouldRespond(Alert alert) {
    return alert.rule.name.contains('cpu_usage') || 
           alert.rule.name.contains('memory_usage');
  }
  
  @override
  Future<void> execute(Alert alert) async {
    print('Executing auto-scale response for ${alert.rule.name}');
    
    // Implementation would trigger scaling actions
    // - Scale up compute resources
    // - Add more database connections
    // - Enable caching layers
  }
}

class CircuitBreakerResponse implements IncidentResponse {
  @override
  String get name => 'circuit_breaker';
  
  @override
  bool shouldRespond(Alert alert) {
    return alert.rule.name.contains('error_rate');
  }
  
  @override
  Future<void> execute(Alert alert) async {
    print('Activating circuit breaker for ${alert.rule.name}');
    
    // Implementation would:
    // - Enable circuit breakers
    // - Redirect traffic to backup systems
    // - Enable degraded mode
  }
}

enum IncidentSeverity { low, medium, high, critical }
enum IncidentStatus { open, investigating, resolved, closed }

class Incident {
  final String id;
  final String title;
  final String description;
  final IncidentSeverity severity;
  IncidentStatus status;
  final DateTime createdAt;
  DateTime? resolvedAt;
  final List<Alert> alerts;
  final List<String> updates = [];
  
  Incident({
    required this.id,
    required this.title,
    required this.description,
    required this.severity,
    required this.status,
    required this.createdAt,
    required this.alerts,
  });
  
  void addUpdate(String update) {
    updates.add('${DateTime.now().toIso8601String()}: $update');
  }
  
  void resolve() {
    status = IncidentStatus.resolved;
    resolvedAt = DateTime.now();
    addUpdate('Incident resolved');
  }
}

Best Practices

Monitoring Strategy

1. Layered Monitoring Approach

// Implement layered monitoring for comprehensive coverage
class LayeredMonitoringStrategy {
  final InfrastructureMonitoring _infrastructure;
  final ApplicationMonitoring _application;
  final BusinessMonitoring _business;
  final UserExperienceMonitoring _userExperience;
  
  LayeredMonitoringStrategy({
    required InfrastructureMonitoring infrastructure,
    required ApplicationMonitoring application,
    required BusinessMonitoring business,
    required UserExperienceMonitoring userExperience,
  }) : _infrastructure = infrastructure,
       _application = application,
       _business = business,
       _userExperience = userExperience;
  
  Future<void> startMonitoring() async {
    await _infrastructure.start();
    await _application.start();
    await _business.start();
    await _userExperience.start();
  }
  
  Future<ComprehensiveReport> generateReport() async {
    final infraReport = await _infrastructure.getReport();
    final appReport = await _application.getReport();
    final businessReport = await _business.getReport();
    final uxReport = await _userExperience.getReport();
    
    return ComprehensiveReport(
      infrastructure: infraReport,
      application: appReport,
      business: businessReport,
      userExperience: uxReport,
      generatedAt: DateTime.now(),
    );
  }
}

// Infrastructure monitoring (system resources)
class InfrastructureMonitoring {
  final SystemMetricsCollector _collector;
  
  InfrastructureMonitoring(this._collector);
  
  Future<void> start() async {
    Timer.periodic(Duration(seconds: 30), (_) => _collectMetrics());
  }
  
  Future<void> _collectMetrics() async {
    // CPU usage
    final cpuUsage = await _getCpuUsage();
    _collector.setGauge('system_cpu_usage_percent', cpuUsage);
    
    // Memory usage
    final memoryUsage = await _getMemoryUsage();
    _collector.setGauge('system_memory_usage_percent', memoryUsage * 100);
    
    // Disk usage
    final diskUsage = await _getDiskUsage();
    _collector.setGauge('system_disk_usage_percent', diskUsage * 100);
    
    // Network I/O
    final networkStats = await _getNetworkStats();
    _collector.setGauge('system_network_bytes_sent', networkStats.bytesSent.toDouble());
    _collector.setGauge('system_network_bytes_received', networkStats.bytesReceived.toDouble());
  }
  
  Future<InfrastructureReport> getReport() async {
    // Generate infrastructure health report
    return InfrastructureReport(
      cpuUsage: await _getCpuUsage(),
      memoryUsage: await _getMemoryUsage(),
      diskUsage: await _getDiskUsage(),
      networkStats: await _getNetworkStats(),
    );
  }
  
  // Placeholder implementations
  Future<double> _getCpuUsage() async => 45.0;
  Future<double> _getMemoryUsage() async => 0.6;
  Future<double> _getDiskUsage() async => 0.3;
  Future<NetworkStats> _getNetworkStats() async => 
      NetworkStats(bytesSent: 1024000, bytesReceived: 2048000);
}

// Application monitoring (Vektagraf-specific metrics)
class ApplicationMonitoring {
  final DatabaseOperationMonitor _dbMonitor;
  final VectorSearchMonitor _vectorMonitor;
  
  ApplicationMonitoring(this._dbMonitor, this._vectorMonitor);
  
  Future<void> start() async {
    // Application monitoring is event-driven through operation monitoring
  }
  
  Future<ApplicationReport> getReport() async {
    return ApplicationReport(
      operationMetrics: _dbMonitor.operationMetrics,
      vectorSearchMetrics: await _getVectorSearchMetrics(),
    );
  }
  
  Future<Map<String, dynamic>> _getVectorSearchMetrics() async {
    // Collect vector search performance metrics
    return {
      'total_searches': 1000,
      'average_latency_ms': 50.0,
      'p95_latency_ms': 120.0,
    };
  }
}

// Business monitoring (KPIs and business metrics)
class BusinessMonitoring {
  final SystemMetricsCollector _collector;
  
  BusinessMonitoring(this._collector);
  
  Future<void> start() async {
    Timer.periodic(Duration(minutes: 5), (_) => _collectBusinessMetrics());
  }
  
  Future<void> _collectBusinessMetrics() async {
    // Active users
    final activeUsers = await _getActiveUsers();
    _collector.setGauge('business_active_users', activeUsers.toDouble());
    
    // Revenue metrics
    final revenue = await _getCurrentRevenue();
    _collector.setGauge('business_revenue', revenue);
    
    // Conversion rates
    final conversionRate = await _getConversionRate();
    _collector.setGauge('business_conversion_rate', conversionRate);
  }
  
  Future<BusinessReport> getReport() async {
    return BusinessReport(
      activeUsers: await _getActiveUsers(),
      revenue: await _getCurrentRevenue(),
      conversionRate: await _getConversionRate(),
    );
  }
  
  // Placeholder implementations
  Future<int> _getActiveUsers() async => 500;
  Future<double> _getCurrentRevenue() async => 10000.0;
  Future<double> _getConversionRate() async => 0.15;
}

// User experience monitoring (performance from user perspective)
class UserExperienceMonitoring {
  final SystemMetricsCollector _collector;
  
  UserExperienceMonitoring(this._collector);
  
  Future<void> start() async {
    // UX monitoring would typically be client-side
    // This is server-side approximation
  }
  
  Future<UserExperienceReport> getReport() async {
    return UserExperienceReport(
      averageResponseTime: 150.0,
      errorRate: 0.02,
      satisfactionScore: 4.2,
    );
  }
}

2. Metric Naming and Organization

// Standardized metric naming conventions
class MetricNamingConvention {
  static const String _prefix = 'vektagraf';
  
  // Database metrics
  static String databaseOperation(String operation) => 
      '${_prefix}_database_${operation}_total';
  
  static String databaseLatency(String operation) => 
      '${_prefix}_database_${operation}_duration_seconds';
  
  static String databaseConnections() => 
      '${_prefix}_database_connections_active';
  
  // Vector search metrics
  static String vectorSearchLatency() => 
      '${_prefix}_vector_search_duration_seconds';
  
  static String vectorSearchResults() => 
      '${_prefix}_vector_search_results_count';
  
  static String vectorIndexSize() => 
      '${_prefix}_vector_index_size_bytes';
  
  // System metrics
  static String systemMemory() => 
      '${_prefix}_system_memory_usage_bytes';
  
  static String systemCpu() => 
      '${_prefix}_system_cpu_usage_percent';
  
  // Business metrics
  static String businessUsers() => 
      '${_prefix}_business_active_users';
  
  static String businessRevenue() => 
      '${_prefix}_business_revenue';
  
  // Error metrics
  static String errorRate(String component) => 
      '${_prefix}_${component}_error_rate';
  
  static String errorCount(String component) => 
      '${_prefix}_${component}_errors_total';
}

// Metric labels standardization
class MetricLabels {
  static Map<String, String> database({
    String? operation,
    String? collection,
    String? tenant,
  }) {
    return {
      if (operation != null) 'operation': operation,
      if (collection != null) 'collection': collection,
      if (tenant != null) 'tenant': tenant,
    };
  }
  
  static Map<String, String> vector({
    String? algorithm,
    int? dimensions,
    String? operation,
  }) {
    return {
      if (algorithm != null) 'algorithm': algorithm,
      if (dimensions != null) 'dimensions': dimensions.toString(),
      if (operation != null) 'operation': operation,
    };
  }
  
  static Map<String, String> error({
    String? type,
    String? component,
    String? severity,
  }) {
    return {
      if (type != null) 'error_type': type,
      if (component != null) 'component': component,
      if (severity != null) 'severity': severity,
    };
  }
}

Advanced Topics

Integration with External Monitoring Systems

Prometheus Integration

// Prometheus metrics exporter
class PrometheusExporter {
  final SystemMetricsCollector _collector;
  final int _port;
  HttpServer? _server;
  
  PrometheusExporter(this._collector, {int port = 9090}) : _port = port;
  
  Future<void> start() async {
    _server = await HttpServer.bind(InternetAddress.anyIPv4, _port);
    
    _server!.listen((request) async {
      if (request.uri.path == '/metrics') {
        await _handleMetricsRequest(request);
      } else {
        request.response.statusCode = 404;
        await request.response.close();
      }
    });
    
    print('Prometheus exporter listening on port $_port');
  }
  
  Future<void> stop() async {
    await _server?.close();
  }
  
  Future<void> _handleMetricsRequest(HttpRequest request) async {
    try {
      final metrics = await _generatePrometheusMetrics();
      
      request.response.headers.contentType = 
          ContentType('text', 'plain', charset: 'utf-8');
      request.response.write(metrics);
      await request.response.close();
    } catch (e) {
      request.response.statusCode = 500;
      request.response.write('Error generating metrics: $e');
      await request.response.close();
    }
  }
  
  Future<String> _generatePrometheusMetrics() async {
    final buffer = StringBuffer();
    
    // Get all metrics from collector
    final metrics = await _collector.queryMetrics();
    
    // Group metrics by name
    final groupedMetrics = <String, List<SystemMetric>>{};
    for (final metric in metrics) {
      groupedMetrics.putIfAbsent(metric.name, () => []).add(metric);
    }
    
    // Generate Prometheus format
    for (final entry in groupedMetrics.entries) {
      final metricName = entry.key;
      final metricList = entry.value;
      
      // Add help and type comments
      buffer.writeln('# HELP $metricName ${_getMetricHelp(metricName)}');
      buffer.writeln('# TYPE $metricName ${_getMetricType(metricList.first.type)}');
      
      // Add metric values
      for (final metric in metricList) {
        final labels = _formatLabels(metric.labels);
        buffer.writeln('$metricName$labels ${metric.value}');
      }
      
      buffer.writeln();
    }
    
    return buffer.toString();
  }
  
  String _getMetricHelp(String metricName) {
    // Return appropriate help text for metric
    return 'Vektagraf metric: $metricName';
  }
  
  String _getMetricType(String type) {
    switch (type) {
      case 'counter':
        return 'counter';
      case 'gauge':
        return 'gauge';
      case 'histogram':
        return 'histogram';
      default:
        return 'gauge';
    }
  }
  
  String _formatLabels(Map<String, String> labels) {
    if (labels.isEmpty) return '';
    
    final labelPairs = labels.entries
        .map((e) => '${e.key}="${e.value}"')
        .join(',');
    
    return '{$labelPairs}';
  }
}

Grafana Dashboard Configuration

// Grafana dashboard generator
class GrafanaDashboardGenerator {
  static Map<String, dynamic> generateVektagrafDashboard() {
    return {
      'dashboard': {
        'id': null,
        'title': 'Vektagraf Monitoring',
        'tags': ['vektagraf', 'database', 'monitoring'],
        'timezone': 'browser',
        'panels': [
          _createDatabaseOperationsPanel(),
          _createVectorSearchPanel(),
          _createSystemResourcesPanel(),
          _createErrorRatePanel(),
        ],
        'time': {
          'from': 'now-1h',
          'to': 'now',
        },
        'refresh': '30s',
      },
    };
  }
  
  static Map<String, dynamic> _createDatabaseOperationsPanel() {
    return {
      'id': 1,
      'title': 'Database Operations',
      'type': 'graph',
      'targets': [
        {
          'expr': 'rate(vektagraf_database_operations_total[5m])',
          'legendFormat': '{{operation}} operations/sec',
        },
      ],
      'yAxes': [
        {'label': 'Operations per second'},
      ],
      'gridPos': {'h': 8, 'w': 12, 'x': 0, 'y': 0},
    };
  }
  
  static Map<String, dynamic> _createVectorSearchPanel() {
    return {
      'id': 2,
      'title': 'Vector Search Performance',
      'type': 'graph',
      'targets': [
        {
          'expr': 'histogram_quantile(0.95, vektagraf_vector_search_duration_seconds)',
          'legendFormat': 'P95 Latency',
        },
        {
          'expr': 'histogram_quantile(0.50, vektagraf_vector_search_duration_seconds)',
          'legendFormat': 'P50 Latency',
        },
      ],
      'yAxes': [
        {'label': 'Seconds'},
      ],
      'gridPos': {'h': 8, 'w': 12, 'x': 12, 'y': 0},
    };
  }
  
  static Map<String, dynamic> _createSystemResourcesPanel() {
    return {
      'id': 3,
      'title': 'System Resources',
      'type': 'graph',
      'targets': [
        {
          'expr': 'vektagraf_system_cpu_usage_percent',
          'legendFormat': 'CPU Usage %',
        },
        {
          'expr': 'vektagraf_system_memory_usage_bytes / 1024 / 1024 / 1024',
          'legendFormat': 'Memory Usage GB',
        },
      ],
      'gridPos': {'h': 8, 'w': 12, 'x': 0, 'y': 8},
    };
  }
  
  static Map<String, dynamic> _createErrorRatePanel() {
    return {
      'id': 4,
      'title': 'Error Rate',
      'type': 'singlestat',
      'targets': [
        {
          'expr': 'rate(vektagraf_database_errors_total[5m]) / rate(vektagraf_database_operations_total[5m])',
          'legendFormat': 'Error Rate',
        },
      ],
      'valueName': 'current',
      'format': 'percentunit',
      'thresholds': '0.01,0.05',
      'colorBackground': true,
      'gridPos': {'h': 8, 'w': 12, 'x': 12, 'y': 8},
    };
  }
}

Summary

This chapter covered comprehensive monitoring and observability for Vektagraf applications, including:

Key Takeaways

  1. Three Pillars: Metrics, logs, and traces provide complete observability
  2. Built-in Monitoring: Vektagraf includes comprehensive monitoring capabilities
  3. Structured Logging: Proper correlation and context tracking
  4. Health Monitoring: Proactive system health checks and alerting
  5. Integration: Seamless integration with popular monitoring platforms

Next Steps

  • Chapter 16: Learn performance tuning and optimization strategies
  • Chapter 19: Explore production deployment patterns
  • Chapter 20: Discover scaling and high availability techniques