Chapter 26: AI/ML Integration Patterns
Overview
Vektagraf's vector-first architecture makes it an ideal platform for AI/ML integration. This chapter demonstrates advanced patterns for embedding generation, model integration, real-time inference, batch processing workflows, model versioning, A/B testing, and feature store implementation.
Learning Objectives
- Integrate embedding generation models with Vektagraf
- Implement real-time inference and batch processing workflows
- Design model versioning and A/B testing systems
- Build feature stores and ML pipeline integration
- Create feedback loops for model improvement
- Optimize ML workloads for production scale
Prerequisites
- Completed Chapters 1-6 (Foundations and Core Features)
- Understanding of machine learning concepts and workflows
- Knowledge of vector embeddings and similarity search
- Basic understanding of MLOps practices
Core Concepts
AI/ML Architecture with Vektagraf
Vektagraf serves as both a vector database and feature store, enabling seamless AI/ML integration:
class MLModel extends VektaObject {
late String name;
late String version;
late String modelType;
late Map<String, dynamic> config;
late List<double> performanceMetrics;
late DateTime trainedAt;
late String status; // training, deployed, deprecated
}
class FeatureVector extends VektaObject {
late String entityId;
late String entityType;
late List<double> features;
late Map<String, dynamic> metadata;
late DateTime computedAt;
late String modelVersion;
}
Embedding Integration Patterns
Vektagraf treats embeddings as first-class properties, making ML integration natural:
- Real-time Embedding Generation: Generate embeddings on-demand
- Batch Embedding Processing: Process large datasets efficiently
- Multi-modal Embeddings: Handle text, image, and audio embeddings
- Embedding Versioning: Track embedding model versions
- Embedding Quality Monitoring: Monitor embedding drift and quality
Practical Examples
Complete AI/ML Integration System
Let's build a comprehensive AI/ML platform with Vektagraf:
1. Schema Definition
{
"name": "AIMLPlatform",
"version": "1.0.0",
"objects": {
"MLModel": {
"properties": {
"name": {"type": "string", "required": true},
"version": {"type": "string", "required": true},
"modelType": {"type": "string", "enum": ["embedding", "classification", "regression", "clustering"], "required": true},
"framework": {"type": "string", "enum": ["tensorflow", "pytorch", "scikit-learn", "transformers"]},
"config": {
"type": "object",
"properties": {
"architecture": {"type": "string"},
"hyperparameters": {"type": "object"},
"inputDimensions": {"type": "integer"},
"outputDimensions": {"type": "integer"},
"trainingDataset": {"type": "string"},
"validationDataset": {"type": "string"}
}
},
"performance": {
"type": "object",
"properties": {
"accuracy": {"type": "number"},
"precision": {"type": "number"},
"recall": {"type": "number"},
"f1Score": {"type": "number"},
"loss": {"type": "number"},
"validationLoss": {"type": "number"}
}
},
"deployment": {
"type": "object",
"properties": {
"endpoint": {"type": "string"},
"replicas": {"type": "integer", "default": 1},
"resources": {"type": "object"},
"autoscaling": {"type": "boolean", "default": false}
}
},
"status": {"type": "string", "enum": ["training", "validating", "deployed", "deprecated"], "default": "training"},
"createdAt": {"type": "datetime", "required": true},
"trainedAt": {"type": "datetime"},
"deployedAt": {"type": "datetime"}
}
},
"FeatureVector": {
"properties": {
"entityId": {"type": "string", "required": true},
"entityType": {"type": "string", "required": true},
"features": {
"type": "vector",
"dimensions": 512,
"algorithm": "hnsw",
"distance": "cosine"
},
"rawFeatures": {"type": "object"},
"metadata": {
"type": "object",
"properties": {
"modelVersion": {"type": "string"},
"computationTime": {"type": "number"},
"confidence": {"type": "number"},
"source": {"type": "string"}
}
},
"computedAt": {"type": "datetime", "required": true},
"expiresAt": {"type": "datetime"}
}
},
"Prediction": {
"properties": {
"modelId": {"type": "string", "required": true},
"inputData": {"type": "object", "required": true},
"prediction": {"type": "object", "required": true},
"confidence": {"type": "number"},
"latency": {"type": "number"},
"features": {
"type": "vector",
"dimensions": 512,
"algorithm": "hnsw",
"distance": "cosine"
},
"metadata": {
"type": "object",
"properties": {
"requestId": {"type": "string"},
"userId": {"type": "string"},
"sessionId": {"type": "string"},
"abTestGroup": {"type": "string"}
}
},
"feedback": {
"type": "object",
"properties": {
"rating": {"type": "number"},
"correctPrediction": {"type": "boolean"},
"actualOutcome": {"type": "object"}
}
},
"timestamp": {"type": "datetime", "required": true}
}
},
"TrainingJob": {
"properties": {
"modelId": {"type": "string", "required": true},
"jobType": {"type": "string", "enum": ["training", "retraining", "fine_tuning"], "required": true},
"config": {
"type": "object",
"properties": {
"datasetPath": {"type": "string"},
"epochs": {"type": "integer"},
"batchSize": {"type": "integer"},
"learningRate": {"type": "number"},
"validationSplit": {"type": "number"}
}
},
"status": {"type": "string", "enum": ["queued", "running", "completed", "failed"], "default": "queued"},
"progress": {"type": "number", "default": 0.0},
"metrics": {"type": "object"},
"logs": {"type": "array", "items": {"type": "string"}},
"startedAt": {"type": "datetime"},
"completedAt": {"type": "datetime"},
"createdAt": {"type": "datetime", "required": true}
}
},
"ABTest": {
"properties": {
"name": {"type": "string", "required": true},
"description": {"type": "string"},
"modelIds": {"type": "array", "items": {"type": "string"}},
"trafficSplit": {"type": "object"},
"metrics": {"type": "array", "items": {"type": "string"}},
"status": {"type": "string", "enum": ["draft", "running", "completed", "paused"], "default": "draft"},
"startDate": {"type": "datetime"},
"endDate": {"type": "datetime"},
"results": {"type": "object"},
"winner": {"type": "string"},
"createdAt": {"type": "datetime", "required": true}
}
}
},
"relationships": {
"ModelPredictions": {
"from": "MLModel",
"to": "Prediction",
"type": "one_to_many",
"foreignKey": "modelId"
},
"ModelTrainingJobs": {
"from": "MLModel",
"to": "TrainingJob",
"type": "one_to_many",
"foreignKey": "modelId"
}
}
}
2. Embedding Generation Service
class EmbeddingGenerationService {
final VektaDatabase db;
final Map<String, EmbeddingModel> _models = {};
EmbeddingGenerationService(this.db);
/// Generate embeddings for text content
Future<List<double>> generateTextEmbedding(
String text, {
String modelName = 'sentence-transformers/all-MiniLM-L6-v2',
bool cache = true,
}) async {
final model = await _getOrLoadModel(modelName);
// Check cache first
if (cache) {
final cached = await _getCachedEmbedding(text, modelName);
if (cached != null) return cached;
}
// Generate embedding
final embedding = await model.encode(text);
// Cache result
if (cache) {
await _cacheEmbedding(text, modelName, embedding);
}
return embedding;
}
/// Batch generate embeddings for multiple texts
Future<List<List<double>>> batchGenerateEmbeddings(
List<String> texts, {
String modelName = 'sentence-transformers/all-MiniLM-L6-v2',
int batchSize = 32,
}) async {
final model = await _getOrLoadModel(modelName);
final results = <List<double>>[];
// Process in batches for memory efficiency
for (int i = 0; i < texts.length; i += batchSize) {
final batch = texts.skip(i).take(batchSize).toList();
final batchEmbeddings = await model.encodeBatch(batch);
results.addAll(batchEmbeddings);
// Small delay to prevent overwhelming the system
if (i + batchSize < texts.length) {
await Future.delayed(Duration(milliseconds: 100));
}
}
return results;
}
/// Generate multi-modal embeddings (text + image)
Future<List<double>> generateMultiModalEmbedding(
String? text,
String? imagePath, {
String modelName = 'clip-vit-base-patch32',
}) async {
final model = await _getOrLoadModel(modelName);
if (text != null && imagePath != null) {
// Combine text and image embeddings
final textEmbedding = await model.encodeText(text);
final imageEmbedding = await model.encodeImage(imagePath);
// Weighted combination
return _combineEmbeddings(textEmbedding, imageEmbedding, weights: [0.6, 0.4]);
} else if (text != null) {
return await model.encodeText(text);
} else if (imagePath != null) {
return await model.encodeImage(imagePath);
} else {
throw ArgumentError('Either text or imagePath must be provided');
}
}
/// Update embeddings when content changes
Future<void> updateEmbeddings(
String entityId,
String entityType,
String content, {
String modelVersion = 'v1.0',
}) async {
// Generate new embedding
final embedding = await generateTextEmbedding(content);
// Store feature vector
final featureVector = FeatureVector()
..entityId = entityId
..entityType = entityType
..features = embedding
..rawFeatures = {'content': content}
..metadata = {
'modelVersion': modelVersion,
'computationTime': DateTime.now().millisecondsSinceEpoch,
'source': 'content_update',
}
..computedAt = DateTime.now()
..expiresAt = DateTime.now().add(Duration(days: 30));
await db.featureVectors.save(featureVector);
// Update the original entity with new embedding
await _updateEntityEmbedding(entityId, entityType, embedding);
}
/// Monitor embedding quality and drift
Future<EmbeddingQualityReport> monitorEmbeddingQuality(
String modelVersion,
Duration period,
) async {
final endTime = DateTime.now();
final startTime = endTime.subtract(period);
final recentEmbeddings = await db.featureVectors
.where('metadata.modelVersion', modelVersion)
.where('computedAt', between: [startTime, endTime])
.find();
if (recentEmbeddings.isEmpty) {
return EmbeddingQualityReport.empty();
}
// Calculate quality metrics
final avgMagnitude = _calculateAverageMagnitude(recentEmbeddings);
final dimensionVariance = _calculateDimensionVariance(recentEmbeddings);
const driftScore = await _calculateEmbeddingDrift(recentEmbeddings, modelVersion);
return EmbeddingQualityReport(
modelVersion: modelVersion,
period: period,
sampleCount: recentEmbeddings.length,
averageMagnitude: avgMagnitude,
dimensionVariance: dimensionVariance,
driftScore: driftScore,
qualityScore: _calculateQualityScore(avgMagnitude, dimensionVariance, driftScore),
);
}
Future<EmbeddingModel> _getOrLoadModel(String modelName) async {
if (!_models.containsKey(modelName)) {
_models[modelName] = await EmbeddingModel.load(modelName);
}
return _models[modelName]!;
}
Future<List<double>?> _getCachedEmbedding(String text, String modelName) async {
final hash = _hashText(text);
final cached = await db.featureVectors
.where('entityId', hash)
.where('entityType', 'text_embedding')
.where('metadata.modelVersion', modelName)
.where('expiresAt', greaterThan: DateTime.now())
.findFirst();
return cached?.features;
}
Future<void> _cacheEmbedding(String text, String modelName, List<double> embedding) async {
final hash = _hashText(text);
final featureVector = FeatureVector()
..entityId = hash
..entityType = 'text_embedding'
..features = embedding
..rawFeatures = {'text': text}
..metadata = {
'modelVersion': modelName,
'source': 'cache',
}
..computedAt = DateTime.now()
..expiresAt = DateTime.now().add(Duration(days: 7));
await db.featureVectors.save(featureVector);
}
}
3. Real-Time Inference Engine
class RealTimeInferenceEngine {
final VektaDatabase db;
final Map<String, MLModelWrapper> _loadedModels = {};
RealTimeInferenceEngine(this.db);
/// Perform real-time prediction
Future<PredictionResult> predict(
String modelId,
Map<String, dynamic> inputData, {
String? requestId,
String? userId,
String? abTestGroup,
}) async {
final startTime = DateTime.now();
// Get model
final model = await _getLoadedModel(modelId);
if (model == null) {
throw Exception('Model $modelId not found or not deployed');
}
// Prepare features
final features = await _prepareFeatures(inputData, model.config);
// Make prediction
final prediction = await model.predict(features);
// Calculate latency
final latency = DateTime.now().difference(startTime).inMilliseconds;
// Store prediction for monitoring and feedback
final predictionRecord = Prediction()
..modelId = modelId
..inputData = inputData
..prediction = prediction.toMap()
..confidence = prediction.confidence
..latency = latency.toDouble()
..features = features
..metadata = {
'requestId': requestId,
'userId': userId,
'abTestGroup': abTestGroup,
}
..timestamp = DateTime.now();
await db.predictions.save(predictionRecord);
// Update model metrics
await _updateModelMetrics(modelId, latency, prediction.confidence);
return PredictionResult(
prediction: prediction.value,
confidence: prediction.confidence,
latency: latency,
modelVersion: model.version,
features: features,
);
}
/// Batch prediction for multiple inputs
Future<List<PredictionResult>> batchPredict(
String modelId,
List<Map<String, dynamic>> inputDataList, {
int batchSize = 32,
}) async {
final model = await _getLoadedModel(modelId);
if (model == null) {
throw Exception('Model $modelId not found or not deployed');
}
final results = <PredictionResult>[];
// Process in batches
for (int i = 0; i < inputDataList.length; i += batchSize) {
final batch = inputDataList.skip(i).take(batchSize).toList();
// Prepare features for batch
final featuresBatch = <List<double>>[];
for (final inputData in batch) {
final features = await _prepareFeatures(inputData, model.config);
featuresBatch.add(features);
}
// Batch prediction
final predictions = await model.batchPredict(featuresBatch);
// Create results
for (int j = 0; j < batch.length; j++) {
results.add(PredictionResult(
prediction: predictions[j].value,
confidence: predictions[j].confidence,
latency: 0, // Batch latency not per-item
modelVersion: model.version,
features: featuresBatch[j],
));
}
}
return results;
}
/// Stream predictions for real-time applications
Stream<PredictionResult> streamPredict(
String modelId,
Stream<Map<String, dynamic>> inputStream,
) async* {
await for (final inputData in inputStream) {
try {
final result = await predict(modelId, inputData);
yield result;
} catch (e) {
// Log error but continue processing
print('Prediction error: $e');
}
}
}
/// Prepare features from raw input data
Future<List<double>> _prepareFeatures(
Map<String, dynamic> inputData,
Map<String, dynamic> modelConfig,
) async {
final features = <double>[];
// Extract features based on model configuration
final featureConfig = modelConfig['features'] as Map<String, dynamic>? ?? {};
for (final entry in featureConfig.entries) {
final featureName = entry.key;
final featureSpec = entry.value as Map<String, dynamic>;
switch (featureSpec['type']) {
case 'numerical':
final value = inputData[featureName] as num? ?? 0.0;
features.add(value.toDouble());
break;
case 'categorical':
final value = inputData[featureName] as String? ?? '';
final encoded = _encodeCategorical(value, featureSpec['categories'] as List);
features.addAll(encoded);
break;
case 'embedding':
final text = inputData[featureName] as String? ?? '';
final embedding = await embeddingService.generateTextEmbedding(text);
features.addAll(embedding);
break;
case 'vector':
final vector = inputData[featureName] as List<double>? ?? [];
features.addAll(vector);
break;
}
}
return features;
}
List<double> _encodeCategorical(String value, List categories) {
final encoded = List<double>.filled(categories.length, 0.0);
final index = categories.indexOf(value);
if (index >= 0) {
encoded[index] = 1.0;
}
return encoded;
}
}
4. Model Training and Versioning System
class ModelTrainingService {
final VektaDatabase db;
final TrainingOrchestrator orchestrator;
ModelTrainingService(this.db, this.orchestrator);
/// Start model training job
Future<String> startTraining(
String modelId,
TrainingConfig config, {
String jobType = 'training',
}) async {
final model = await db.mlModels.findById(modelId);
if (model == null) throw Exception('Model not found');
// Create training job
final job = TrainingJob()
..modelId = modelId
..jobType = jobType
..config = config.toMap()
..status = 'queued'
..createdAt = DateTime.now();
await db.trainingJobs.save(job);
// Submit to training orchestrator
await orchestrator.submitJob(job);
return job.id;
}
/// Monitor training progress
Future<TrainingProgress> getTrainingProgress(String jobId) async {
final job = await db.trainingJobs.findById(jobId);
if (job == null) throw Exception('Training job not found');
return TrainingProgress(
jobId: jobId,
status: job.status,
progress: job.progress,
currentEpoch: job.metrics['current_epoch'] as int? ?? 0,
totalEpochs: job.config['epochs'] as int? ?? 0,
currentLoss: job.metrics['current_loss'] as double? ?? 0.0,
validationLoss: job.metrics['validation_loss'] as double? ?? 0.0,
estimatedTimeRemaining: _estimateTimeRemaining(job),
);
}
/// Complete training and create new model version
Future<void> completeTraining(
String jobId,
Map<String, dynamic> finalMetrics,
String modelArtifactPath,
) async {
final job = await db.trainingJobs.findById(jobId);
if (job == null) throw Exception('Training job not found');
final model = await db.mlModels.findById(job.modelId);
if (model == null) throw Exception('Model not found');
// Create new model version
final newVersion = _generateNewVersion(model.version);
final newModel = MLModel()
..name = model.name
..version = newVersion
..modelType = model.modelType
..framework = model.framework
..config = {
...model.config,
'artifactPath': modelArtifactPath,
'trainingJobId': jobId,
}
..performance = finalMetrics
..status = 'validating'
..createdAt = DateTime.now()
..trainedAt = DateTime.now();
await db.mlModels.save(newModel);
// Update training job
job.status = 'completed';
job.completedAt = DateTime.now();
job.metrics = finalMetrics;
await db.trainingJobs.save(job);
// Start validation process
await _startModelValidation(newModel.id);
}
/// Validate trained model
Future<ValidationResult> validateModel(String modelId) async {
final model = await db.mlModels.findById(modelId);
if (model == null) throw Exception('Model not found');
// Load validation dataset
final validationData = await _loadValidationDataset(model.config['validationDataset']);
// Load model for validation
final modelWrapper = await MLModelWrapper.load(model.config['artifactPath']);
// Run validation
final results = <ValidationSample>[];
for (final sample in validationData) {
final prediction = await modelWrapper.predict(sample.features);
results.add(ValidationSample(
features: sample.features,
actualLabel: sample.label,
predictedLabel: prediction.value,
confidence: prediction.confidence,
));
}
// Calculate metrics
final metrics = _calculateValidationMetrics(results);
// Update model performance
model.performance = metrics;
model.status = metrics['accuracy'] > 0.8 ? 'deployed' : 'failed';
await db.mlModels.save(model);
return ValidationResult(
modelId: modelId,
metrics: metrics,
sampleCount: results.length,
passed: metrics['accuracy'] > 0.8,
);
}
/// Deploy model to production
Future<void> deployModel(String modelId, DeploymentConfig config) async {
final model = await db.mlModels.findById(modelId);
if (model == null) throw Exception('Model not found');
if (model.status != 'validated') {
throw Exception('Model must be validated before deployment');
}
// Deploy to inference service
final endpoint = await orchestrator.deployModel(model, config);
// Update model deployment info
model.deployment = {
'endpoint': endpoint,
'replicas': config.replicas,
'resources': config.resources.toMap(),
'autoscaling': config.autoscaling,
};
model.status = 'deployed';
model.deployedAt = DateTime.now();
await db.mlModels.save(model);
}
String _generateNewVersion(String currentVersion) {
final parts = currentVersion.split('.');
final major = int.parse(parts[0]);
final minor = int.parse(parts[1]);
final patch = parts.length > 2 ? int.parse(parts[2]) : 0;
return '$major.${minor + 1}.0';
}
}
5. A/B Testing Framework for ML Models
class MLABTestingFramework {
final VektaDatabase db;
final RealTimeInferenceEngine inferenceEngine;
MLABTestingFramework(this.db, this.inferenceEngine);
/// Create A/B test for model comparison
Future<String> createABTest(
String name,
List<String> modelIds,
Map<String, double> trafficSplit,
List<String> metrics, {
DateTime? startDate,
DateTime? endDate,
}) async {
// Validate traffic split
final totalTraffic = trafficSplit.values.reduce((a, b) => a + b);
if ((totalTraffic - 1.0).abs() > 0.001) {
throw ArgumentError('Traffic split must sum to 1.0');
}
final abTest = ABTest()
..name = name
..description = 'A/B test comparing models: ${modelIds.join(', ')}'
..modelIds = modelIds
..trafficSplit = trafficSplit
..metrics = metrics
..status = 'draft'
..startDate = startDate ?? DateTime.now()
..endDate = endDate ?? DateTime.now().add(Duration(days: 14))
..createdAt = DateTime.now();
await db.abTests.save(abTest);
return abTest.id;
}
/// Start A/B test
Future<void> startABTest(String testId) async {
final test = await db.abTests.findById(testId);
if (test == null) throw Exception('A/B test not found');
// Validate all models are deployed
for (final modelId in test.modelIds) {
final model = await db.mlModels.findById(modelId);
if (model == null || model.status != 'deployed') {
throw Exception('Model $modelId is not deployed');
}
}
test.status = 'running';
test.startDate = DateTime.now();
await db.abTests.save(test);
}
/// Route prediction request based on A/B test
Future<PredictionResult> routePrediction(
String testId,
Map<String, dynamic> inputData, {
String? userId,
String? requestId,
}) async {
final test = await db.abTests.findById(testId);
if (test == null || test.status != 'running') {
throw Exception('A/B test not found or not running');
}
// Determine which model to use based on traffic split
final selectedModel = _selectModelForUser(userId ?? requestId ?? '', test.trafficSplit);
// Make prediction with selected model
final result = await inferenceEngine.predict(
selectedModel,
inputData,
requestId: requestId,
userId: userId,
abTestGroup: testId,
);
return result;
}
/// Analyze A/B test results
Future<ABTestResults> analyzeABTest(String testId) async {
final test = await db.abTests.findById(testId);
if (test == null) throw Exception('A/B test not found');
// Get predictions for each model in the test
final predictions = await db.predictions
.where('metadata.abTestGroup', testId)
.where('timestamp', between: [test.startDate!, test.endDate!])
.find();
// Group predictions by model
final modelPredictions = <String, List<Prediction>>{};
for (final prediction in predictions) {
final modelId = prediction.modelId;
modelPredictions[modelId] = (modelPredictions[modelId] ?? [])..add(prediction);
}
// Calculate metrics for each model
final modelResults = <String, ModelTestResults>{};
for (final entry in modelPredictions.entries) {
final modelId = entry.key;
final modelPreds = entry.value;
modelResults[modelId] = ModelTestResults(
modelId: modelId,
sampleCount: modelPreds.length,
averageLatency: _calculateAverageLatency(modelPreds),
averageConfidence: _calculateAverageConfidence(modelPreds),
conversionRate: await _calculateConversionRate(modelPreds),
clickThroughRate: await _calculateClickThroughRate(modelPreds),
);
}
// Determine statistical significance
final significance = await _calculateStatisticalSignificance(modelResults);
// Determine winner
final winner = _determineWinner(modelResults, test.metrics);
return ABTestResults(
testId: testId,
testName: test.name,
duration: test.endDate!.difference(test.startDate!),
modelResults: modelResults,
winner: winner,
significance: significance,
recommendations: _generateRecommendations(modelResults, significance),
);
}
/// Complete A/B test and promote winner
Future<void> completeABTest(String testId, {bool promoteWinner = true}) async {
final test = await db.abTests.findById(testId);
if (test == null) throw Exception('A/B test not found');
final results = await analyzeABTest(testId);
test.status = 'completed';
test.results = results.toMap();
test.winner = results.winner;
await db.abTests.save(test);
if (promoteWinner && results.winner != null) {
await _promoteWinnerModel(results.winner!);
}
}
String _selectModelForUser(String identifier, Map<String, double> trafficSplit) {
// Use consistent hashing to ensure same user gets same model
final hash = identifier.hashCode.abs() / 0x7FFFFFFF; // Normalize to 0-1
double cumulative = 0.0;
for (final entry in trafficSplit.entries) {
cumulative += entry.value;
if (hash <= cumulative) {
return entry.key;
}
}
// Fallback to first model
return trafficSplit.keys.first;
}
}
6. Feature Store Implementation
class FeatureStore {
final VektaDatabase db;
final Map<String, FeatureComputer> _featureComputers = {};
FeatureStore(this.db);
/// Register feature computer
void registerFeatureComputer(String featureName, FeatureComputer computer) {
_featureComputers[featureName] = computer;
}
/// Get features for entity
Future<Map<String, dynamic>> getFeatures(
String entityId,
String entityType,
List<String> featureNames, {
bool allowStale = false,
Duration maxAge = const Duration(hours: 24),
}) async {
final features = <String, dynamic>{};
final missingFeatures = <String>[];
// Try to get cached features first
for (final featureName in featureNames) {
final cached = await _getCachedFeature(entityId, entityType, featureName, maxAge);
if (cached != null) {
features[featureName] = cached;
} else {
missingFeatures.add(featureName);
}
}
// Compute missing features
if (missingFeatures.isNotEmpty && !allowStale) {
final computed = await _computeFeatures(entityId, entityType, missingFeatures);
features.addAll(computed);
}
return features;
}
/// Batch get features for multiple entities
Future<Map<String, Map<String, dynamic>>> batchGetFeatures(
List<String> entityIds,
String entityType,
List<String> featureNames, {
Duration maxAge = const Duration(hours: 24),
}) async {
final results = <String, Map<String, dynamic>>{};
// Get cached features in batch
final cachedFeatures = await _batchGetCachedFeatures(
entityIds, entityType, featureNames, maxAge);
// Identify missing features per entity
final missingFeatures = <String, List<String>>{};
for (final entityId in entityIds) {
final entityFeatures = cachedFeatures[entityId] ?? {};
final missing = featureNames.where((f) => !entityFeatures.containsKey(f)).toList();
if (missing.isNotEmpty) {
missingFeatures[entityId] = missing;
}
results[entityId] = entityFeatures;
}
// Compute missing features in parallel
final computeFutures = missingFeatures.entries.map((entry) async {
final entityId = entry.key;
final missing = entry.value;
final computed = await _computeFeatures(entityId, entityType, missing);
return MapEntry(entityId, computed);
});
final computedResults = await Future.wait(computeFutures);
// Merge computed features
for (final entry in computedResults) {
results[entry.key] = {...results[entry.key]!, ...entry.value};
}
return results;
}
/// Precompute features for entities
Future<void> precomputeFeatures(
List<String> entityIds,
String entityType,
List<String> featureNames,
) async {
const batchSize = 50;
for (int i = 0; i < entityIds.length; i += batchSize) {
final batch = entityIds.skip(i).take(batchSize).toList();
final futures = batch.map((entityId) async {
try {
await _computeFeatures(entityId, entityType, featureNames);
} catch (e) {
print('Error precomputing features for $entityId: $e');
}
});
await Future.wait(futures);
// Small delay between batches
if (i + batchSize < entityIds.length) {
await Future.delayed(Duration(milliseconds: 500));
}
}
}
/// Stream real-time feature updates
Stream<FeatureUpdate> streamFeatureUpdates(
String entityId,
String entityType,
List<String> featureNames,
) async* {
// Set up listeners for entity changes
final entityStream = db.watchEntity(entityId, entityType);
await for (final change in entityStream) {
// Recompute affected features
final affectedFeatures = _getAffectedFeatures(change, featureNames);
if (affectedFeatures.isNotEmpty) {
final updatedFeatures = await _computeFeatures(
entityId, entityType, affectedFeatures);
yield FeatureUpdate(
entityId: entityId,
entityType: entityType,
features: updatedFeatures,
timestamp: DateTime.now(),
);
}
}
}
Future<Map<String, dynamic>> _computeFeatures(
String entityId,
String entityType,
List<String> featureNames,
) async {
final features = <String, dynamic>{};
for (final featureName in featureNames) {
final computer = _featureComputers[featureName];
if (computer == null) {
throw Exception('Feature computer not found for $featureName');
}
try {
final value = await computer.compute(entityId, entityType);
features[featureName] = value;
// Cache the computed feature
await _cacheFeature(entityId, entityType, featureName, value);
} catch (e) {
print('Error computing feature $featureName for $entityId: $e');
features[featureName] = null;
}
}
return features;
}
Future<dynamic> _getCachedFeature(
String entityId,
String entityType,
String featureName,
Duration maxAge,
) async {
final cached = await db.featureVectors
.where('entityId', entityId)
.where('entityType', entityType)
.where('rawFeatures.$featureName', isNotNull: true)
.where('computedAt', greaterThan: DateTime.now().subtract(maxAge))
.orderBy('computedAt', descending: true)
.findFirst();
return cached?.rawFeatures[featureName];
}
Future<void> _cacheFeature(
String entityId,
String entityType,
String featureName,
dynamic value,
) async {
// Try to update existing feature vector
final existing = await db.featureVectors
.where('entityId', entityId)
.where('entityType', entityType)
.orderBy('computedAt', descending: true)
.findFirst();
if (existing != null) {
existing.rawFeatures[featureName] = value;
existing.computedAt = DateTime.now();
await db.featureVectors.save(existing);
} else {
// Create new feature vector
final featureVector = FeatureVector()
..entityId = entityId
..entityType = entityType
..features = [] // Will be populated when needed
..rawFeatures = {featureName: value}
..metadata = {'source': 'feature_store'}
..computedAt = DateTime.now()
..expiresAt = DateTime.now().add(Duration(days: 30));
await db.featureVectors.save(featureVector);
}
}
}
Best Practices
1. Model Performance Monitoring
class ModelPerformanceMonitor {
final VektaDatabase db;
ModelPerformanceMonitor(this.db);
/// Monitor model performance metrics
Future<PerformanceReport> generatePerformanceReport(
String modelId,
Duration period,
) async {
final endTime = DateTime.now();
final startTime = endTime.subtract(period);
final predictions = await db.predictions
.where('modelId', modelId)
.where('timestamp', between: [startTime, endTime])
.find();
if (predictions.isEmpty) {
return PerformanceReport.empty(modelId, period);
}
// Calculate performance metrics
final avgLatency = predictions.map((p) => p.latency).reduce((a, b) => a + b) / predictions.length;
final avgConfidence = predictions.map((p) => p.confidence).reduce((a, b) => a + b) / predictions.length;
// Calculate accuracy from feedback
final feedbackPredictions = predictions.where((p) => p.feedback.isNotEmpty).toList();
final accuracy = feedbackPredictions.isEmpty ? null :
feedbackPredictions.where((p) => p.feedback['correctPrediction'] == true).length /
feedbackPredictions.length;
// Detect performance degradation
final degradationAlert = await _detectPerformanceDegradation(modelId, predictions);
return PerformanceReport(
modelId: modelId,
period: period,
totalPredictions: predictions.length,
averageLatency: avgLatency,
averageConfidence: avgConfidence,
accuracy: accuracy,
throughput: predictions.length / period.inHours,
degradationAlert: degradationAlert,
);
}
/// Detect model drift
Future<DriftReport> detectModelDrift(String modelId, Duration period) async {
final endTime = DateTime.now();
final startTime = endTime.subtract(period);
// Get recent predictions
final recentPredictions = await db.predictions
.where('modelId', modelId)
.where('timestamp', between: [startTime, endTime])
.find();
// Get baseline predictions (from model training period)
final model = await db.mlModels.findById(modelId);
if (model == null) throw Exception('Model not found');
final baselinePredictions = await db.predictions
.where('modelId', modelId)
.where('timestamp', between: [model.deployedAt!, model.deployedAt!.add(Duration(days: 7))])
.find();
// Calculate drift metrics
final featureDrift = _calculateFeatureDrift(recentPredictions, baselinePredictions);
final predictionDrift = _calculatePredictionDrift(recentPredictions, baselinePredictions);
return DriftReport(
modelId: modelId,
period: period,
featureDrift: featureDrift,
predictionDrift: predictionDrift,
driftScore: (featureDrift + predictionDrift) / 2,
requiresRetraining: (featureDrift + predictionDrift) / 2 > 0.3,
);
}
}
2. ML Pipeline Orchestration
class MLPipelineOrchestrator {
final VektaDatabase db;
final Map<String, PipelineStep> _steps = {};
MLPipelineOrchestrator(this.db);
/// Define ML pipeline
void definePipeline(String pipelineId, List<PipelineStep> steps) {
for (final step in steps) {
_steps['${pipelineId}_${step.name}'] = step;
}
}
/// Execute ML pipeline
Future<PipelineResult> executePipeline(
String pipelineId,
Map<String, dynamic> parameters,
) async {
final pipeline = await _getPipelineDefinition(pipelineId);
final results = <String, dynamic>{};
for (final step in pipeline.steps) {
try {
print('Executing step: ${step.name}');
final stepResult = await _executeStep(step, parameters, results);
results[step.name] = stepResult;
// Update parameters for next step
if (step.outputParameters != null) {
parameters.addAll(step.outputParameters!(stepResult));
}
} catch (e) {
return PipelineResult(
pipelineId: pipelineId,
status: 'failed',
error: e.toString(),
results: results,
);
}
}
return PipelineResult(
pipelineId: pipelineId,
status: 'completed',
results: results,
);
}
/// Execute pipeline step
Future<dynamic> _executeStep(
PipelineStep step,
Map<String, dynamic> parameters,
Map<String, dynamic> previousResults,
) async {
switch (step.type) {
case 'data_extraction':
return await _executeDataExtraction(step, parameters);
case 'feature_engineering':
return await _executeFeatureEngineering(step, parameters, previousResults);
case 'model_training':
return await _executeModelTraining(step, parameters, previousResults);
case 'model_validation':
return await _executeModelValidation(step, parameters, previousResults);
case 'model_deployment':
return await _executeModelDeployment(step, parameters, previousResults);
default:
throw Exception('Unknown step type: ${step.type}');
}
}
}
Advanced Topics
Federated Learning Integration
class FederatedLearningCoordinator {
final VektaDatabase db;
FederatedLearningCoordinator(this.db);
/// Coordinate federated learning across multiple clients
Future<String> startFederatedTraining(
String modelId,
List<String> clientIds,
FederatedConfig config,
) async {
final federatedJob = FederatedTrainingJob()
..modelId = modelId
..clientIds = clientIds
..config = config.toMap()
..status = 'initializing'
..createdAt = DateTime.now();
await db.federatedTrainingJobs.save(federatedJob);
// Initialize global model
await _initializeGlobalModel(federatedJob.id, modelId);
// Start training rounds
await _startTrainingRound(federatedJob.id, 1);
return federatedJob.id;
}
/// Aggregate model updates from clients
Future<void> aggregateModelUpdates(
String jobId,
int round,
List<ModelUpdate> clientUpdates,
) async {
// Implement federated averaging
final aggregatedWeights = _federatedAveraging(clientUpdates);
// Update global model
await _updateGlobalModel(jobId, aggregatedWeights);
// Check convergence
final converged = await _checkConvergence(jobId, round);
if (converged || round >= maxRounds) {
await _completeFederatedTraining(jobId);
} else {
await _startTrainingRound(jobId, round + 1);
}
}
}
Summary
This chapter demonstrated comprehensive AI/ML integration patterns with Vektagraf. Key takeaways include:
- Embedding Integration: Seamless embedding generation and management
- Real-Time Inference: High-performance prediction serving
- Model Versioning: Complete model lifecycle management
- A/B Testing: Systematic model comparison and optimization
- Feature Store: Centralized feature management and serving
- Performance Monitoring: Comprehensive model observability
- Pipeline Orchestration: End-to-end ML workflow automation
Vektagraf's vector-first architecture makes it particularly well-suited for AI/ML applications, as embeddings and features are treated as first-class database objects.
Next Steps
- Part VII: Reference documentation for complete API coverage
- Chapter 14: Performance Tuning - Optimize ML workloads
- Chapter 13: Monitoring and Observability - Advanced ML monitoring
Related Resources
- Vector Search Documentation (Chapter 5)
- Performance Optimization (Chapter 7)
- Multi-Tenant Architecture (Chapter 11)
- Security and Privacy (Chapters 8-10)