Skip to main content

Chapter 23: AI/ML Integration Patterns

Overview

Vektagraf's vector-first architecture makes it an ideal platform for AI/ML integration. This chapter demonstrates advanced patterns for embedding generation, model integration, real-time inference, batch processing workflows, model versioning, A/B testing, and feature store implementation.

Learning Objectives

  • Integrate embedding generation models with Vektagraf
  • Implement real-time inference and batch processing workflows
  • Design model versioning and A/B testing systems
  • Build feature stores and ML pipeline integration
  • Create feedback loops for model improvement
  • Optimize ML workloads for production scale

Prerequisites

  • Completed Chapters 1-6 (Foundations and Core Features)
  • Understanding of machine learning concepts and workflows
  • Knowledge of vector embeddings and similarity search
  • Basic understanding of MLOps practices

Core Concepts

AI/ML Architecture with Vektagraf

Vektagraf serves as both a vector database and feature store, enabling seamless AI/ML integration:

class MLModel extends VektaObject {
  late String name;
  late String version;
  late String modelType;
  late Map<String, dynamic> config;
  late List<double> performanceMetrics;
  late DateTime trainedAt;
  late String status; // training, deployed, deprecated
}

class FeatureVector extends VektaObject {
  late String entityId;
  late String entityType;
  late List<double> features;
  late Map<String, dynamic> metadata;
  late DateTime computedAt;
  late String modelVersion;
}

Embedding Integration Patterns

Vektagraf treats embeddings as first-class properties, making ML integration natural:

  1. Real-time Embedding Generation: Generate embeddings on-demand
  2. Batch Embedding Processing: Process large datasets efficiently
  3. Multi-modal Embeddings: Handle text, image, and audio embeddings
  4. Embedding Versioning: Track embedding model versions
  5. Embedding Quality Monitoring: Monitor embedding drift and quality

Practical Examples

Complete AI/ML Integration System

Let's build a comprehensive AI/ML platform with Vektagraf:

1. Schema Definition

{
  "name": "AIMLPlatform",
  "version": "1.0.0",
  "objects": {
    "MLModel": {
      "properties": {
        "name": {"type": "string", "required": true},
        "version": {"type": "string", "required": true},
        "modelType": {"type": "string", "enum": ["embedding", "classification", "regression", "clustering"], "required": true},
        "framework": {"type": "string", "enum": ["tensorflow", "pytorch", "scikit-learn", "transformers"]},
        "config": {
          "type": "object",
          "properties": {
            "architecture": {"type": "string"},
            "hyperparameters": {"type": "object"},
            "inputDimensions": {"type": "integer"},
            "outputDimensions": {"type": "integer"},
            "trainingDataset": {"type": "string"},
            "validationDataset": {"type": "string"}
          }
        },
        "performance": {
          "type": "object",
          "properties": {
            "accuracy": {"type": "number"},
            "precision": {"type": "number"},
            "recall": {"type": "number"},
            "f1Score": {"type": "number"},
            "loss": {"type": "number"},
            "validationLoss": {"type": "number"}
          }
        },
        "deployment": {
          "type": "object",
          "properties": {
            "endpoint": {"type": "string"},
            "replicas": {"type": "integer", "default": 1},
            "resources": {"type": "object"},
            "autoscaling": {"type": "boolean", "default": false}
          }
        },
        "status": {"type": "string", "enum": ["training", "validating", "deployed", "deprecated"], "default": "training"},
        "createdAt": {"type": "datetime", "required": true},
        "trainedAt": {"type": "datetime"},
        "deployedAt": {"type": "datetime"}
      }
    },
    "FeatureVector": {
      "properties": {
        "entityId": {"type": "string", "required": true},
        "entityType": {"type": "string", "required": true},
        "features": {
          "type": "vector",
          "dimensions": 512,
          "algorithm": "hnsw",
          "distance": "cosine"
        },
        "rawFeatures": {"type": "object"},
        "metadata": {
          "type": "object",
          "properties": {
            "modelVersion": {"type": "string"},
            "computationTime": {"type": "number"},
            "confidence": {"type": "number"},
            "source": {"type": "string"}
          }
        },
        "computedAt": {"type": "datetime", "required": true},
        "expiresAt": {"type": "datetime"}
      }
    },
    "Prediction": {
      "properties": {
        "modelId": {"type": "string", "required": true},
        "inputData": {"type": "object", "required": true},
        "prediction": {"type": "object", "required": true},
        "confidence": {"type": "number"},
        "latency": {"type": "number"},
        "features": {
          "type": "vector",
          "dimensions": 512,
          "algorithm": "hnsw",
          "distance": "cosine"
        },
        "metadata": {
          "type": "object",
          "properties": {
            "requestId": {"type": "string"},
            "userId": {"type": "string"},
            "sessionId": {"type": "string"},
            "abTestGroup": {"type": "string"}
          }
        },
        "feedback": {
          "type": "object",
          "properties": {
            "rating": {"type": "number"},
            "correctPrediction": {"type": "boolean"},
            "actualOutcome": {"type": "object"}
          }
        },
        "timestamp": {"type": "datetime", "required": true}
      }
    },
    "TrainingJob": {
      "properties": {
        "modelId": {"type": "string", "required": true},
        "jobType": {"type": "string", "enum": ["training", "retraining", "fine_tuning"], "required": true},
        "config": {
          "type": "object",
          "properties": {
            "datasetPath": {"type": "string"},
            "epochs": {"type": "integer"},
            "batchSize": {"type": "integer"},
            "learningRate": {"type": "number"},
            "validationSplit": {"type": "number"}
          }
        },
        "status": {"type": "string", "enum": ["queued", "running", "completed", "failed"], "default": "queued"},
        "progress": {"type": "number", "default": 0.0},
        "metrics": {"type": "object"},
        "logs": {"type": "array", "items": {"type": "string"}},
        "startedAt": {"type": "datetime"},
        "completedAt": {"type": "datetime"},
        "createdAt": {"type": "datetime", "required": true}
      }
    },
    "ABTest": {
      "properties": {
        "name": {"type": "string", "required": true},
        "description": {"type": "string"},
        "modelIds": {"type": "array", "items": {"type": "string"}},
        "trafficSplit": {"type": "object"},
        "metrics": {"type": "array", "items": {"type": "string"}},
        "status": {"type": "string", "enum": ["draft", "running", "completed", "paused"], "default": "draft"},
        "startDate": {"type": "datetime"},
        "endDate": {"type": "datetime"},
        "results": {"type": "object"},
        "winner": {"type": "string"},
        "createdAt": {"type": "datetime", "required": true}
      }
    }
  },
  "relationships": {
    "ModelPredictions": {
      "from": "MLModel",
      "to": "Prediction",
      "type": "one_to_many",
      "foreignKey": "modelId"
    },
    "ModelTrainingJobs": {
      "from": "MLModel",
      "to": "TrainingJob",
      "type": "one_to_many",
      "foreignKey": "modelId"
    }
  }
}

2. Embedding Generation Service

class EmbeddingGenerationService {
  final VektaDatabase db;
  final Map<String, EmbeddingModel> _models = {};
  
  EmbeddingGenerationService(this.db);
  
  /// Generate embeddings for text content
  Future<List<double>> generateTextEmbedding(
    String text, {
    String modelName = 'sentence-transformers/all-MiniLM-L6-v2',
    bool cache = true,
  }) async {
    final model = await _getOrLoadModel(modelName);
    
    // Check cache first
    if (cache) {
      final cached = await _getCachedEmbedding(text, modelName);
      if (cached != null) return cached;
    }
    
    // Generate embedding
    final embedding = await model.encode(text);
    
    // Cache result
    if (cache) {
      await _cacheEmbedding(text, modelName, embedding);
    }
    
    return embedding;
  }
  
  /// Batch generate embeddings for multiple texts
  Future<List<List<double>>> batchGenerateEmbeddings(
    List<String> texts, {
    String modelName = 'sentence-transformers/all-MiniLM-L6-v2',
    int batchSize = 32,
  }) async {
    final model = await _getOrLoadModel(modelName);
    final results = <List<double>>[];
    
    // Process in batches for memory efficiency
    for (int i = 0; i < texts.length; i += batchSize) {
      final batch = texts.skip(i).take(batchSize).toList();
      final batchEmbeddings = await model.encodeBatch(batch);
      results.addAll(batchEmbeddings);
      
      // Small delay to prevent overwhelming the system
      if (i + batchSize < texts.length) {
        await Future.delayed(Duration(milliseconds: 100));
      }
    }
    
    return results;
  }
  
  /// Generate multi-modal embeddings (text + image)
  Future<List<double>> generateMultiModalEmbedding(
    String? text,
    String? imagePath, {
    String modelName = 'clip-vit-base-patch32',
  }) async {
    final model = await _getOrLoadModel(modelName);
    
    if (text != null && imagePath != null) {
      // Combine text and image embeddings
      final textEmbedding = await model.encodeText(text);
      final imageEmbedding = await model.encodeImage(imagePath);
      
      // Weighted combination
      return _combineEmbeddings(textEmbedding, imageEmbedding, weights: [0.6, 0.4]);
    } else if (text != null) {
      return await model.encodeText(text);
    } else if (imagePath != null) {
      return await model.encodeImage(imagePath);
    } else {
      throw ArgumentError('Either text or imagePath must be provided');
    }
  }
  
  /// Update embeddings when content changes
  Future<void> updateEmbeddings(
    String entityId,
    String entityType,
    String content, {
    String modelVersion = 'v1.0',
  }) async {
    // Generate new embedding
    final embedding = await generateTextEmbedding(content);
    
    // Store feature vector
    final featureVector = FeatureVector()
      ..entityId = entityId
      ..entityType = entityType
      ..features = embedding
      ..rawFeatures = {'content': content}
      ..metadata = {
        'modelVersion': modelVersion,
        'computationTime': DateTime.now().millisecondsSinceEpoch,
        'source': 'content_update',
      }
      ..computedAt = DateTime.now()
      ..expiresAt = DateTime.now().add(Duration(days: 30));
    
    await db.featureVectors.save(featureVector);
    
    // Update the original entity with new embedding
    await _updateEntityEmbedding(entityId, entityType, embedding);
  }
  
  /// Monitor embedding quality and drift
  Future<EmbeddingQualityReport> monitorEmbeddingQuality(
    String modelVersion,
    Duration period,
  ) async {
    final endTime = DateTime.now();
    final startTime = endTime.subtract(period);
    
    final recentEmbeddings = await db.featureVectors
        .where('metadata.modelVersion', modelVersion)
        .where('computedAt', between: [startTime, endTime])
        .find();
    
    if (recentEmbeddings.isEmpty) {
      return EmbeddingQualityReport.empty();
    }
    
    // Calculate quality metrics
    final avgMagnitude = _calculateAverageMagnitude(recentEmbeddings);
    final dimensionVariance = _calculateDimensionVariance(recentEmbeddings);
    const driftScore = await _calculateEmbeddingDrift(recentEmbeddings, modelVersion);
    
    return EmbeddingQualityReport(
      modelVersion: modelVersion,
      period: period,
      sampleCount: recentEmbeddings.length,
      averageMagnitude: avgMagnitude,
      dimensionVariance: dimensionVariance,
      driftScore: driftScore,
      qualityScore: _calculateQualityScore(avgMagnitude, dimensionVariance, driftScore),
    );
  }
  
  Future<EmbeddingModel> _getOrLoadModel(String modelName) async {
    if (!_models.containsKey(modelName)) {
      _models[modelName] = await EmbeddingModel.load(modelName);
    }
    return _models[modelName]!;
  }
  
  Future<List<double>?> _getCachedEmbedding(String text, String modelName) async {
    final hash = _hashText(text);
    final cached = await db.featureVectors
        .where('entityId', hash)
        .where('entityType', 'text_embedding')
        .where('metadata.modelVersion', modelName)
        .where('expiresAt', greaterThan: DateTime.now())
        .findFirst();
    
    return cached?.features;
  }
  
  Future<void> _cacheEmbedding(String text, String modelName, List<double> embedding) async {
    final hash = _hashText(text);
    final featureVector = FeatureVector()
      ..entityId = hash
      ..entityType = 'text_embedding'
      ..features = embedding
      ..rawFeatures = {'text': text}
      ..metadata = {
        'modelVersion': modelName,
        'source': 'cache',
      }
      ..computedAt = DateTime.now()
      ..expiresAt = DateTime.now().add(Duration(days: 7));
    
    await db.featureVectors.save(featureVector);
  }
}

3. Real-Time Inference Engine

class RealTimeInferenceEngine {
  final VektaDatabase db;
  final Map<String, MLModelWrapper> _loadedModels = {};
  
  RealTimeInferenceEngine(this.db);
  
  /// Perform real-time prediction
  Future<PredictionResult> predict(
    String modelId,
    Map<String, dynamic> inputData, {
    String? requestId,
    String? userId,
    String? abTestGroup,
  }) async {
    final startTime = DateTime.now();
    
    // Get model
    final model = await _getLoadedModel(modelId);
    if (model == null) {
      throw Exception('Model $modelId not found or not deployed');
    }
    
    // Prepare features
    final features = await _prepareFeatures(inputData, model.config);
    
    // Make prediction
    final prediction = await model.predict(features);
    
    // Calculate latency
    final latency = DateTime.now().difference(startTime).inMilliseconds;
    
    // Store prediction for monitoring and feedback
    final predictionRecord = Prediction()
      ..modelId = modelId
      ..inputData = inputData
      ..prediction = prediction.toMap()
      ..confidence = prediction.confidence
      ..latency = latency.toDouble()
      ..features = features
      ..metadata = {
        'requestId': requestId,
        'userId': userId,
        'abTestGroup': abTestGroup,
      }
      ..timestamp = DateTime.now();
    
    await db.predictions.save(predictionRecord);
    
    // Update model metrics
    await _updateModelMetrics(modelId, latency, prediction.confidence);
    
    return PredictionResult(
      prediction: prediction.value,
      confidence: prediction.confidence,
      latency: latency,
      modelVersion: model.version,
      features: features,
    );
  }
  
  /// Batch prediction for multiple inputs
  Future<List<PredictionResult>> batchPredict(
    String modelId,
    List<Map<String, dynamic>> inputDataList, {
    int batchSize = 32,
  }) async {
    final model = await _getLoadedModel(modelId);
    if (model == null) {
      throw Exception('Model $modelId not found or not deployed');
    }
    
    final results = <PredictionResult>[];
    
    // Process in batches
    for (int i = 0; i < inputDataList.length; i += batchSize) {
      final batch = inputDataList.skip(i).take(batchSize).toList();
      
      // Prepare features for batch
      final featuresBatch = <List<double>>[];
      for (final inputData in batch) {
        final features = await _prepareFeatures(inputData, model.config);
        featuresBatch.add(features);
      }
      
      // Batch prediction
      final predictions = await model.batchPredict(featuresBatch);
      
      // Create results
      for (int j = 0; j < batch.length; j++) {
        results.add(PredictionResult(
          prediction: predictions[j].value,
          confidence: predictions[j].confidence,
          latency: 0, // Batch latency not per-item
          modelVersion: model.version,
          features: featuresBatch[j],
        ));
      }
    }
    
    return results;
  }
  
  /// Stream predictions for real-time applications
  Stream<PredictionResult> streamPredict(
    String modelId,
    Stream<Map<String, dynamic>> inputStream,
  ) async* {
    await for (final inputData in inputStream) {
      try {
        final result = await predict(modelId, inputData);
        yield result;
      } catch (e) {
        // Log error but continue processing
        print('Prediction error: $e');
      }
    }
  }
  
  /// Prepare features from raw input data
  Future<List<double>> _prepareFeatures(
    Map<String, dynamic> inputData,
    Map<String, dynamic> modelConfig,
  ) async {
    final features = <double>[];
    
    // Extract features based on model configuration
    final featureConfig = modelConfig['features'] as Map<String, dynamic>? ?? {};
    
    for (final entry in featureConfig.entries) {
      final featureName = entry.key;
      final featureSpec = entry.value as Map<String, dynamic>;
      
      switch (featureSpec['type']) {
        case 'numerical':
          final value = inputData[featureName] as num? ?? 0.0;
          features.add(value.toDouble());
          break;
          
        case 'categorical':
          final value = inputData[featureName] as String? ?? '';
          final encoded = _encodeCategorical(value, featureSpec['categories'] as List);
          features.addAll(encoded);
          break;
          
        case 'embedding':
          final text = inputData[featureName] as String? ?? '';
          final embedding = await embeddingService.generateTextEmbedding(text);
          features.addAll(embedding);
          break;
          
        case 'vector':
          final vector = inputData[featureName] as List<double>? ?? [];
          features.addAll(vector);
          break;
      }
    }
    
    return features;
  }
  
  List<double> _encodeCategorical(String value, List categories) {
    final encoded = List<double>.filled(categories.length, 0.0);
    final index = categories.indexOf(value);
    if (index >= 0) {
      encoded[index] = 1.0;
    }
    return encoded;
  }
}

4. Model Training and Versioning System

class ModelTrainingService {
  final VektaDatabase db;
  final TrainingOrchestrator orchestrator;
  
  ModelTrainingService(this.db, this.orchestrator);
  
  /// Start model training job
  Future<String> startTraining(
    String modelId,
    TrainingConfig config, {
    String jobType = 'training',
  }) async {
    final model = await db.mlModels.findById(modelId);
    if (model == null) throw Exception('Model not found');
    
    // Create training job
    final job = TrainingJob()
      ..modelId = modelId
      ..jobType = jobType
      ..config = config.toMap()
      ..status = 'queued'
      ..createdAt = DateTime.now();
    
    await db.trainingJobs.save(job);
    
    // Submit to training orchestrator
    await orchestrator.submitJob(job);
    
    return job.id;
  }
  
  /// Monitor training progress
  Future<TrainingProgress> getTrainingProgress(String jobId) async {
    final job = await db.trainingJobs.findById(jobId);
    if (job == null) throw Exception('Training job not found');
    
    return TrainingProgress(
      jobId: jobId,
      status: job.status,
      progress: job.progress,
      currentEpoch: job.metrics['current_epoch'] as int? ?? 0,
      totalEpochs: job.config['epochs'] as int? ?? 0,
      currentLoss: job.metrics['current_loss'] as double? ?? 0.0,
      validationLoss: job.metrics['validation_loss'] as double? ?? 0.0,
      estimatedTimeRemaining: _estimateTimeRemaining(job),
    );
  }
  
  /// Complete training and create new model version
  Future<void> completeTraining(
    String jobId,
    Map<String, dynamic> finalMetrics,
    String modelArtifactPath,
  ) async {
    final job = await db.trainingJobs.findById(jobId);
    if (job == null) throw Exception('Training job not found');
    
    final model = await db.mlModels.findById(job.modelId);
    if (model == null) throw Exception('Model not found');
    
    // Create new model version
    final newVersion = _generateNewVersion(model.version);
    final newModel = MLModel()
      ..name = model.name
      ..version = newVersion
      ..modelType = model.modelType
      ..framework = model.framework
      ..config = {
        ...model.config,
        'artifactPath': modelArtifactPath,
        'trainingJobId': jobId,
      }
      ..performance = finalMetrics
      ..status = 'validating'
      ..createdAt = DateTime.now()
      ..trainedAt = DateTime.now();
    
    await db.mlModels.save(newModel);
    
    // Update training job
    job.status = 'completed';
    job.completedAt = DateTime.now();
    job.metrics = finalMetrics;
    await db.trainingJobs.save(job);
    
    // Start validation process
    await _startModelValidation(newModel.id);
  }
  
  /// Validate trained model
  Future<ValidationResult> validateModel(String modelId) async {
    final model = await db.mlModels.findById(modelId);
    if (model == null) throw Exception('Model not found');
    
    // Load validation dataset
    final validationData = await _loadValidationDataset(model.config['validationDataset']);
    
    // Load model for validation
    final modelWrapper = await MLModelWrapper.load(model.config['artifactPath']);
    
    // Run validation
    final results = <ValidationSample>[];
    for (final sample in validationData) {
      final prediction = await modelWrapper.predict(sample.features);
      results.add(ValidationSample(
        features: sample.features,
        actualLabel: sample.label,
        predictedLabel: prediction.value,
        confidence: prediction.confidence,
      ));
    }
    
    // Calculate metrics
    final metrics = _calculateValidationMetrics(results);
    
    // Update model performance
    model.performance = metrics;
    model.status = metrics['accuracy'] > 0.8 ? 'deployed' : 'failed';
    await db.mlModels.save(model);
    
    return ValidationResult(
      modelId: modelId,
      metrics: metrics,
      sampleCount: results.length,
      passed: metrics['accuracy'] > 0.8,
    );
  }
  
  /// Deploy model to production
  Future<void> deployModel(String modelId, DeploymentConfig config) async {
    final model = await db.mlModels.findById(modelId);
    if (model == null) throw Exception('Model not found');
    
    if (model.status != 'validated') {
      throw Exception('Model must be validated before deployment');
    }
    
    // Deploy to inference service
    final endpoint = await orchestrator.deployModel(model, config);
    
    // Update model deployment info
    model.deployment = {
      'endpoint': endpoint,
      'replicas': config.replicas,
      'resources': config.resources.toMap(),
      'autoscaling': config.autoscaling,
    };
    model.status = 'deployed';
    model.deployedAt = DateTime.now();
    
    await db.mlModels.save(model);
  }
  
  String _generateNewVersion(String currentVersion) {
    final parts = currentVersion.split('.');
    final major = int.parse(parts[0]);
    final minor = int.parse(parts[1]);
    final patch = parts.length > 2 ? int.parse(parts[2]) : 0;
    
    return '$major.${minor + 1}.0';
  }
}

5. A/B Testing Framework for ML Models

class MLABTestingFramework {
  final VektaDatabase db;
  final RealTimeInferenceEngine inferenceEngine;
  
  MLABTestingFramework(this.db, this.inferenceEngine);
  
  /// Create A/B test for model comparison
  Future<String> createABTest(
    String name,
    List<String> modelIds,
    Map<String, double> trafficSplit,
    List<String> metrics, {
    DateTime? startDate,
    DateTime? endDate,
  }) async {
    // Validate traffic split
    final totalTraffic = trafficSplit.values.reduce((a, b) => a + b);
    if ((totalTraffic - 1.0).abs() > 0.001) {
      throw ArgumentError('Traffic split must sum to 1.0');
    }
    
    final abTest = ABTest()
      ..name = name
      ..description = 'A/B test comparing models: ${modelIds.join(', ')}'
      ..modelIds = modelIds
      ..trafficSplit = trafficSplit
      ..metrics = metrics
      ..status = 'draft'
      ..startDate = startDate ?? DateTime.now()
      ..endDate = endDate ?? DateTime.now().add(Duration(days: 14))
      ..createdAt = DateTime.now();
    
    await db.abTests.save(abTest);
    return abTest.id;
  }
  
  /// Start A/B test
  Future<void> startABTest(String testId) async {
    final test = await db.abTests.findById(testId);
    if (test == null) throw Exception('A/B test not found');
    
    // Validate all models are deployed
    for (final modelId in test.modelIds) {
      final model = await db.mlModels.findById(modelId);
      if (model == null || model.status != 'deployed') {
        throw Exception('Model $modelId is not deployed');
      }
    }
    
    test.status = 'running';
    test.startDate = DateTime.now();
    await db.abTests.save(test);
  }
  
  /// Route prediction request based on A/B test
  Future<PredictionResult> routePrediction(
    String testId,
    Map<String, dynamic> inputData, {
    String? userId,
    String? requestId,
  }) async {
    final test = await db.abTests.findById(testId);
    if (test == null || test.status != 'running') {
      throw Exception('A/B test not found or not running');
    }
    
    // Determine which model to use based on traffic split
    final selectedModel = _selectModelForUser(userId ?? requestId ?? '', test.trafficSplit);
    
    // Make prediction with selected model
    final result = await inferenceEngine.predict(
      selectedModel,
      inputData,
      requestId: requestId,
      userId: userId,
      abTestGroup: testId,
    );
    
    return result;
  }
  
  /// Analyze A/B test results
  Future<ABTestResults> analyzeABTest(String testId) async {
    final test = await db.abTests.findById(testId);
    if (test == null) throw Exception('A/B test not found');
    
    // Get predictions for each model in the test
    final predictions = await db.predictions
        .where('metadata.abTestGroup', testId)
        .where('timestamp', between: [test.startDate!, test.endDate!])
        .find();
    
    // Group predictions by model
    final modelPredictions = <String, List<Prediction>>{};
    for (final prediction in predictions) {
      final modelId = prediction.modelId;
      modelPredictions[modelId] = (modelPredictions[modelId] ?? [])..add(prediction);
    }
    
    // Calculate metrics for each model
    final modelResults = <String, ModelTestResults>{};
    for (final entry in modelPredictions.entries) {
      final modelId = entry.key;
      final modelPreds = entry.value;
      
      modelResults[modelId] = ModelTestResults(
        modelId: modelId,
        sampleCount: modelPreds.length,
        averageLatency: _calculateAverageLatency(modelPreds),
        averageConfidence: _calculateAverageConfidence(modelPreds),
        conversionRate: await _calculateConversionRate(modelPreds),
        clickThroughRate: await _calculateClickThroughRate(modelPreds),
      );
    }
    
    // Determine statistical significance
    final significance = await _calculateStatisticalSignificance(modelResults);
    
    // Determine winner
    final winner = _determineWinner(modelResults, test.metrics);
    
    return ABTestResults(
      testId: testId,
      testName: test.name,
      duration: test.endDate!.difference(test.startDate!),
      modelResults: modelResults,
      winner: winner,
      significance: significance,
      recommendations: _generateRecommendations(modelResults, significance),
    );
  }
  
  /// Complete A/B test and promote winner
  Future<void> completeABTest(String testId, {bool promoteWinner = true}) async {
    final test = await db.abTests.findById(testId);
    if (test == null) throw Exception('A/B test not found');
    
    final results = await analyzeABTest(testId);
    
    test.status = 'completed';
    test.results = results.toMap();
    test.winner = results.winner;
    await db.abTests.save(test);
    
    if (promoteWinner && results.winner != null) {
      await _promoteWinnerModel(results.winner!);
    }
  }
  
  String _selectModelForUser(String identifier, Map<String, double> trafficSplit) {
    // Use consistent hashing to ensure same user gets same model
    final hash = identifier.hashCode.abs() / 0x7FFFFFFF; // Normalize to 0-1
    
    double cumulative = 0.0;
    for (final entry in trafficSplit.entries) {
      cumulative += entry.value;
      if (hash <= cumulative) {
        return entry.key;
      }
    }
    
    // Fallback to first model
    return trafficSplit.keys.first;
  }
}

6. Feature Store Implementation

class FeatureStore {
  final VektaDatabase db;
  final Map<String, FeatureComputer> _featureComputers = {};
  
  FeatureStore(this.db);
  
  /// Register feature computer
  void registerFeatureComputer(String featureName, FeatureComputer computer) {
    _featureComputers[featureName] = computer;
  }
  
  /// Get features for entity
  Future<Map<String, dynamic>> getFeatures(
    String entityId,
    String entityType,
    List<String> featureNames, {
    bool allowStale = false,
    Duration maxAge = const Duration(hours: 24),
  }) async {
    final features = <String, dynamic>{};
    final missingFeatures = <String>[];
    
    // Try to get cached features first
    for (final featureName in featureNames) {
      final cached = await _getCachedFeature(entityId, entityType, featureName, maxAge);
      if (cached != null) {
        features[featureName] = cached;
      } else {
        missingFeatures.add(featureName);
      }
    }
    
    // Compute missing features
    if (missingFeatures.isNotEmpty && !allowStale) {
      final computed = await _computeFeatures(entityId, entityType, missingFeatures);
      features.addAll(computed);
    }
    
    return features;
  }
  
  /// Batch get features for multiple entities
  Future<Map<String, Map<String, dynamic>>> batchGetFeatures(
    List<String> entityIds,
    String entityType,
    List<String> featureNames, {
    Duration maxAge = const Duration(hours: 24),
  }) async {
    final results = <String, Map<String, dynamic>>{};
    
    // Get cached features in batch
    final cachedFeatures = await _batchGetCachedFeatures(
      entityIds, entityType, featureNames, maxAge);
    
    // Identify missing features per entity
    final missingFeatures = <String, List<String>>{};
    for (final entityId in entityIds) {
      final entityFeatures = cachedFeatures[entityId] ?? {};
      final missing = featureNames.where((f) => !entityFeatures.containsKey(f)).toList();
      if (missing.isNotEmpty) {
        missingFeatures[entityId] = missing;
      }
      results[entityId] = entityFeatures;
    }
    
    // Compute missing features in parallel
    final computeFutures = missingFeatures.entries.map((entry) async {
      final entityId = entry.key;
      final missing = entry.value;
      final computed = await _computeFeatures(entityId, entityType, missing);
      return MapEntry(entityId, computed);
    });
    
    final computedResults = await Future.wait(computeFutures);
    
    // Merge computed features
    for (final entry in computedResults) {
      results[entry.key] = {...results[entry.key]!, ...entry.value};
    }
    
    return results;
  }
  
  /// Precompute features for entities
  Future<void> precomputeFeatures(
    List<String> entityIds,
    String entityType,
    List<String> featureNames,
  ) async {
    const batchSize = 50;
    
    for (int i = 0; i < entityIds.length; i += batchSize) {
      final batch = entityIds.skip(i).take(batchSize).toList();
      
      final futures = batch.map((entityId) async {
        try {
          await _computeFeatures(entityId, entityType, featureNames);
        } catch (e) {
          print('Error precomputing features for $entityId: $e');
        }
      });
      
      await Future.wait(futures);
      
      // Small delay between batches
      if (i + batchSize < entityIds.length) {
        await Future.delayed(Duration(milliseconds: 500));
      }
    }
  }
  
  /// Stream real-time feature updates
  Stream<FeatureUpdate> streamFeatureUpdates(
    String entityId,
    String entityType,
    List<String> featureNames,
  ) async* {
    // Set up listeners for entity changes
    final entityStream = db.watchEntity(entityId, entityType);
    
    await for (final change in entityStream) {
      // Recompute affected features
      final affectedFeatures = _getAffectedFeatures(change, featureNames);
      
      if (affectedFeatures.isNotEmpty) {
        final updatedFeatures = await _computeFeatures(
          entityId, entityType, affectedFeatures);
        
        yield FeatureUpdate(
          entityId: entityId,
          entityType: entityType,
          features: updatedFeatures,
          timestamp: DateTime.now(),
        );
      }
    }
  }
  
  Future<Map<String, dynamic>> _computeFeatures(
    String entityId,
    String entityType,
    List<String> featureNames,
  ) async {
    final features = <String, dynamic>{};
    
    for (final featureName in featureNames) {
      final computer = _featureComputers[featureName];
      if (computer == null) {
        throw Exception('Feature computer not found for $featureName');
      }
      
      try {
        final value = await computer.compute(entityId, entityType);
        features[featureName] = value;
        
        // Cache the computed feature
        await _cacheFeature(entityId, entityType, featureName, value);
      } catch (e) {
        print('Error computing feature $featureName for $entityId: $e');
        features[featureName] = null;
      }
    }
    
    return features;
  }
  
  Future<dynamic> _getCachedFeature(
    String entityId,
    String entityType,
    String featureName,
    Duration maxAge,
  ) async {
    final cached = await db.featureVectors
        .where('entityId', entityId)
        .where('entityType', entityType)
        .where('rawFeatures.$featureName', isNotNull: true)
        .where('computedAt', greaterThan: DateTime.now().subtract(maxAge))
        .orderBy('computedAt', descending: true)
        .findFirst();
    
    return cached?.rawFeatures[featureName];
  }
  
  Future<void> _cacheFeature(
    String entityId,
    String entityType,
    String featureName,
    dynamic value,
  ) async {
    // Try to update existing feature vector
    final existing = await db.featureVectors
        .where('entityId', entityId)
        .where('entityType', entityType)
        .orderBy('computedAt', descending: true)
        .findFirst();
    
    if (existing != null) {
      existing.rawFeatures[featureName] = value;
      existing.computedAt = DateTime.now();
      await db.featureVectors.save(existing);
    } else {
      // Create new feature vector
      final featureVector = FeatureVector()
        ..entityId = entityId
        ..entityType = entityType
        ..features = [] // Will be populated when needed
        ..rawFeatures = {featureName: value}
        ..metadata = {'source': 'feature_store'}
        ..computedAt = DateTime.now()
        ..expiresAt = DateTime.now().add(Duration(days: 30));
      
      await db.featureVectors.save(featureVector);
    }
  }
}

Best Practices

1. Model Performance Monitoring

class ModelPerformanceMonitor {
  final VektaDatabase db;
  
  ModelPerformanceMonitor(this.db);
  
  /// Monitor model performance metrics
  Future<PerformanceReport> generatePerformanceReport(
    String modelId,
    Duration period,
  ) async {
    final endTime = DateTime.now();
    final startTime = endTime.subtract(period);
    
    final predictions = await db.predictions
        .where('modelId', modelId)
        .where('timestamp', between: [startTime, endTime])
        .find();
    
    if (predictions.isEmpty) {
      return PerformanceReport.empty(modelId, period);
    }
    
    // Calculate performance metrics
    final avgLatency = predictions.map((p) => p.latency).reduce((a, b) => a + b) / predictions.length;
    final avgConfidence = predictions.map((p) => p.confidence).reduce((a, b) => a + b) / predictions.length;
    
    // Calculate accuracy from feedback
    final feedbackPredictions = predictions.where((p) => p.feedback.isNotEmpty).toList();
    final accuracy = feedbackPredictions.isEmpty ? null :
        feedbackPredictions.where((p) => p.feedback['correctPrediction'] == true).length / 
        feedbackPredictions.length;
    
    // Detect performance degradation
    final degradationAlert = await _detectPerformanceDegradation(modelId, predictions);
    
    return PerformanceReport(
      modelId: modelId,
      period: period,
      totalPredictions: predictions.length,
      averageLatency: avgLatency,
      averageConfidence: avgConfidence,
      accuracy: accuracy,
      throughput: predictions.length / period.inHours,
      degradationAlert: degradationAlert,
    );
  }
  
  /// Detect model drift
  Future<DriftReport> detectModelDrift(String modelId, Duration period) async {
    final endTime = DateTime.now();
    final startTime = endTime.subtract(period);
    
    // Get recent predictions
    final recentPredictions = await db.predictions
        .where('modelId', modelId)
        .where('timestamp', between: [startTime, endTime])
        .find();
    
    // Get baseline predictions (from model training period)
    final model = await db.mlModels.findById(modelId);
    if (model == null) throw Exception('Model not found');
    
    final baselinePredictions = await db.predictions
        .where('modelId', modelId)
        .where('timestamp', between: [model.deployedAt!, model.deployedAt!.add(Duration(days: 7))])
        .find();
    
    // Calculate drift metrics
    final featureDrift = _calculateFeatureDrift(recentPredictions, baselinePredictions);
    final predictionDrift = _calculatePredictionDrift(recentPredictions, baselinePredictions);
    
    return DriftReport(
      modelId: modelId,
      period: period,
      featureDrift: featureDrift,
      predictionDrift: predictionDrift,
      driftScore: (featureDrift + predictionDrift) / 2,
      requiresRetraining: (featureDrift + predictionDrift) / 2 > 0.3,
    );
  }
}

2. ML Pipeline Orchestration

class MLPipelineOrchestrator {
  final VektaDatabase db;
  final Map<String, PipelineStep> _steps = {};
  
  MLPipelineOrchestrator(this.db);
  
  /// Define ML pipeline
  void definePipeline(String pipelineId, List<PipelineStep> steps) {
    for (final step in steps) {
      _steps['${pipelineId}_${step.name}'] = step;
    }
  }
  
  /// Execute ML pipeline
  Future<PipelineResult> executePipeline(
    String pipelineId,
    Map<String, dynamic> parameters,
  ) async {
    final pipeline = await _getPipelineDefinition(pipelineId);
    final results = <String, dynamic>{};
    
    for (final step in pipeline.steps) {
      try {
        print('Executing step: ${step.name}');
        
        final stepResult = await _executeStep(step, parameters, results);
        results[step.name] = stepResult;
        
        // Update parameters for next step
        if (step.outputParameters != null) {
          parameters.addAll(step.outputParameters!(stepResult));
        }
        
      } catch (e) {
        return PipelineResult(
          pipelineId: pipelineId,
          status: 'failed',
          error: e.toString(),
          results: results,
        );
      }
    }
    
    return PipelineResult(
      pipelineId: pipelineId,
      status: 'completed',
      results: results,
    );
  }
  
  /// Execute pipeline step
  Future<dynamic> _executeStep(
    PipelineStep step,
    Map<String, dynamic> parameters,
    Map<String, dynamic> previousResults,
  ) async {
    switch (step.type) {
      case 'data_extraction':
        return await _executeDataExtraction(step, parameters);
      case 'feature_engineering':
        return await _executeFeatureEngineering(step, parameters, previousResults);
      case 'model_training':
        return await _executeModelTraining(step, parameters, previousResults);
      case 'model_validation':
        return await _executeModelValidation(step, parameters, previousResults);
      case 'model_deployment':
        return await _executeModelDeployment(step, parameters, previousResults);
      default:
        throw Exception('Unknown step type: ${step.type}');
    }
  }
}

Advanced Topics

Federated Learning Integration

class FederatedLearningCoordinator {
  final VektaDatabase db;
  
  FederatedLearningCoordinator(this.db);
  
  /// Coordinate federated learning across multiple clients
  Future<String> startFederatedTraining(
    String modelId,
    List<String> clientIds,
    FederatedConfig config,
  ) async {
    final federatedJob = FederatedTrainingJob()
      ..modelId = modelId
      ..clientIds = clientIds
      ..config = config.toMap()
      ..status = 'initializing'
      ..createdAt = DateTime.now();
    
    await db.federatedTrainingJobs.save(federatedJob);
    
    // Initialize global model
    await _initializeGlobalModel(federatedJob.id, modelId);
    
    // Start training rounds
    await _startTrainingRound(federatedJob.id, 1);
    
    return federatedJob.id;
  }
  
  /// Aggregate model updates from clients
  Future<void> aggregateModelUpdates(
    String jobId,
    int round,
    List<ModelUpdate> clientUpdates,
  ) async {
    // Implement federated averaging
    final aggregatedWeights = _federatedAveraging(clientUpdates);
    
    // Update global model
    await _updateGlobalModel(jobId, aggregatedWeights);
    
    // Check convergence
    final converged = await _checkConvergence(jobId, round);
    
    if (converged || round >= maxRounds) {
      await _completeFederatedTraining(jobId);
    } else {
      await _startTrainingRound(jobId, round + 1);
    }
  }
}

Summary

This chapter demonstrated comprehensive AI/ML integration patterns with Vektagraf. Key takeaways include:

  • Embedding Integration: Seamless embedding generation and management
  • Real-Time Inference: High-performance prediction serving
  • Model Versioning: Complete model lifecycle management
  • A/B Testing: Systematic model comparison and optimization
  • Feature Store: Centralized feature management and serving
  • Performance Monitoring: Comprehensive model observability
  • Pipeline Orchestration: End-to-end ML workflow automation

Vektagraf's vector-first architecture makes it particularly well-suited for AI/ML applications, as embeddings and features are treated as first-class database objects.

Next Steps

  • Part VII: Reference documentation for complete API coverage
  • Chapter 14: Performance Tuning - Optimize ML workloads
  • Chapter 13: Monitoring and Observability - Advanced ML monitoring
  • Vector Search Documentation (Chapter 5)
  • Performance Optimization (Chapter 7)
  • Multi-Tenant Architecture (Chapter 11)
  • Security and Privacy (Chapters 8-10)