Chapter 18: Automation System
Overview
The Vektagraf Automation System provides a powerful, declarative approach to data lifecycle management, enabling fields to automatically modify their values based on time-based triggers, state changes, or conditional logic. This chapter covers the complete automation system, from basic rule definitions to advanced enterprise deployment patterns.
Learning Objectives
By the end of this chapter, you will understand:
- How to design and implement automation rules in your schema
- Time-based and state-based trigger mechanisms
- Action types and their appropriate use cases
- Compliance and audit requirements for automated data processing
- Performance optimization and monitoring strategies
- Enterprise deployment patterns and best practices
Core Concepts
Automation Architecture
The automation system consists of several key components:
- Rule Engine: Evaluates automation rules against objects
- Trigger System: Monitors for time-based and state-based events
- Action Executor: Applies transformations to field values
- Job Queue Integration: Schedules and manages automation execution
- Audit System: Tracks all automation activities for compliance
Rule Definition Structure
Automation rules are defined directly in your schema using a declarative syntax:
{
"models": {
"User": {
"email": {
"type": "string",
"automations": [
{
"name": "gdpr_email_anonymization",
"trigger": {
"type": "time",
"dateTimeField": "gdpr_deletion_date"
},
"action": {
"type": "obfuscate",
"pattern": "***@***.***"
},
"condition": "gdpr_deletion_date != null && status != 'active'",
"enabled": true,
"mandate": {
"authority": "GDPR Article 17 - Right to Erasure",
"description": "Anonymize email addresses when GDPR deletion date is reached",
"approvedBy": "Data Protection Officer"
}
}
]
}
}
}
}
Mandate Requirements
All automation rules must include mandate information documenting the legal, policy, or business authority for the automation:
{
"mandate": {
"authority": "GDPR Article 17 - Right to Erasure",
"policy": "Data Retention Policy v2.1",
"description": "Automatically anonymize email addresses when GDPR deletion date is reached",
"approvedBy": "Data Protection Officer",
"approvalDate": "2024-01-15",
"reviewDate": "2025-01-15",
"businessJustification": "Ensure compliance with data protection regulations",
"riskAssessment": "Low - automated process reduces human error"
}
}
Practical Examples
Example 1: GDPR Compliance Automation
This example demonstrates automatic data anonymization for GDPR compliance:
import 'package:vektagraf/vektagraf.dart';
// Schema definition with GDPR automation
const gdprSchema = '''
{
"models": {
"User": {
"email": {
"type": "string",
"automations": [
{
"name": "gdpr_email_anonymization",
"trigger": {
"type": "time",
"dateTimeField": "gdpr_deletion_date"
},
"action": {
"type": "obfuscate",
"pattern": "***@***.***"
},
"condition": "gdpr_deletion_date != null && status != 'active'",
"mandate": {
"authority": "GDPR Article 17 - Right to Erasure",
"description": "Anonymize email addresses when GDPR deletion date is reached",
"approvedBy": "Data Protection Officer"
}
}
]
},
"phone": {
"type": "string",
"automations": [
{
"name": "gdpr_phone_anonymization",
"trigger": {
"type": "time",
"dateTimeField": "gdpr_deletion_date"
},
"action": {
"type": "obfuscate",
"pattern": "***-***-****"
},
"condition": "gdpr_deletion_date != null",
"mandate": {
"authority": "GDPR Article 17 - Right to Erasure",
"description": "Anonymize phone numbers when GDPR deletion date is reached",
"approvedBy": "Data Protection Officer"
}
}
]
},
"gdpr_deletion_date": {
"type": "datetime"
},
"status": {
"type": "string"
}
}
}
}
''';
Future<void> setupGdprAutomation() async {
// Initialize database and automation engine
final database = Database();
final jobQueue = JobQueueManager(database);
final security = SecurityPolicyEngineInterface(database);
final engine = AutomationEngine(database, jobQueue, security);
// Load schema with automation rules
final schema = VektagrafSchema.fromJson(gdprSchema);
await engine.initialize(schema);
// Create a user with GDPR deletion date
final user = await database.create('User', {
'email': 'user@example.com',
'phone': '555-123-4567',
'status': 'inactive',
'gdpr_deletion_date': DateTime.now().add(Duration(days: 30)),
});
print('User created with GDPR deletion date: ${user.id}');
// The automation will automatically trigger when gdpr_deletion_date is reached
// You can also test with dry-run mode
final results = await engine.evaluateObject(user, dryRun: true);
for (final result in results) {
print('Rule: ${result.ruleName}');
print('Would change: ${result.oldValue} → ${result.newValue}');
}
}
Example 2: Data Retention Management
Implement automatic data archival based on retention policies:
const retentionSchema = '''
{
"models": {
"Document": {
"status": {
"type": "string",
"automations": [
{
"name": "auto_archive_old_documents",
"trigger": {
"type": "time",
"delay": "7y"
},
"action": {
"type": "set_value",
"value": "archived"
},
"condition": "status != 'active' && status != 'archived'",
"mandate": {
"authority": "Document Retention Policy DRP-2024-001",
"description": "Archive documents after 7 years of inactivity",
"approvedBy": "Records Manager",
"businessJustification": "Comply with legal retention requirements while managing storage costs"
}
}
]
},
"content": {
"type": "string",
"automations": [
{
"name": "purge_archived_content",
"trigger": {
"type": "state",
"watchedField": "status",
"expectedValue": "archived"
},
"action": {
"type": "blank"
},
"condition": "created_at < now() - 10y",
"mandate": {
"authority": "Document Retention Policy DRP-2024-001",
"description": "Remove content from archived documents older than 10 years",
"approvedBy": "Records Manager"
}
}
]
},
"created_at": {
"type": "datetime"
}
}
}
}
''';
Future<void> setupRetentionAutomation() async {
final database = Database();
final jobQueue = JobQueueManager(database);
final security = SecurityPolicyEngineInterface(database);
final engine = AutomationEngine(database, jobQueue, security);
final schema = VektagrafSchema.fromJson(retentionSchema);
await engine.initialize(schema);
// Create a document that will be subject to retention rules
final document = await database.create('Document', {
'status': 'inactive',
'content': 'This is document content that will be archived and eventually purged.',
'created_at': DateTime.now().subtract(Duration(days: 2555)), // ~7 years ago
});
print('Document created: ${document.id}');
// Check what automations would apply
final results = await engine.evaluateObject(document, dryRun: true);
for (final result in results) {
print('Rule: ${result.ruleName} - Success: ${result.success}');
if (result.success) {
print(' ${result.oldValue} → ${result.newValue}');
}
}
}
Example 3: User Lifecycle Management
Automate user account lifecycle based on activity patterns:
const userLifecycleSchema = '''
{
"models": {
"User": {
"status": {
"type": "string",
"automations": [
{
"name": "deactivate_inactive_users",
"trigger": {
"type": "time",
"delay": "90d"
},
"action": {
"type": "set_value",
"value": "inactive"
},
"condition": "last_login_at < now() - 90d && status == 'active'",
"mandate": {
"authority": "Security Policy SEC-2024-003",
"description": "Deactivate users inactive for 90+ days",
"approvedBy": "Security Team",
"businessJustification": "Reduce security risk from dormant accounts"
}
},
{
"name": "suspend_long_inactive_users",
"trigger": {
"type": "time",
"delay": "180d"
},
"action": {
"type": "set_value",
"value": "suspended"
},
"condition": "last_login_at < now() - 180d && status == 'inactive'",
"mandate": {
"authority": "Security Policy SEC-2024-003",
"description": "Suspend users inactive for 180+ days",
"approvedBy": "Security Team"
}
}
]
},
"access_token": {
"type": "string",
"automations": [
{
"name": "revoke_suspended_user_tokens",
"trigger": {
"type": "state",
"watchedField": "status",
"expectedValue": "suspended"
},
"action": {
"type": "blank"
},
"mandate": {
"authority": "Security Policy SEC-2024-003",
"description": "Revoke access tokens when user is suspended",
"approvedBy": "Security Team"
}
}
]
},
"last_login_at": {
"type": "datetime"
}
}
}
}
''';
Future<void> setupUserLifecycleAutomation() async {
final database = Database();
final jobQueue = JobQueueManager(database);
final security = SecurityPolicyEngineInterface(database);
final engine = AutomationEngine(database, jobQueue, security);
final schema = VektagrafSchema.fromJson(userLifecycleSchema);
await engine.initialize(schema);
// Create users with different activity patterns
final activeUser = await database.create('User', {
'status': 'active',
'access_token': 'active_token_123',
'last_login_at': DateTime.now().subtract(Duration(days: 5)),
});
final inactiveUser = await database.create('User', {
'status': 'active',
'access_token': 'inactive_token_456',
'last_login_at': DateTime.now().subtract(Duration(days: 100)),
});
print('Created active user: ${activeUser.id}');
print('Created inactive user: ${inactiveUser.id}');
// Check automation results for inactive user
final results = await engine.evaluateObject(inactiveUser, dryRun: true);
for (final result in results) {
print('Rule: ${result.ruleName}');
if (result.success) {
print(' Would apply: ${result.oldValue} → ${result.newValue}');
}
}
}
Best Practices
1. Rule Design Principles
Start Simple: Begin with basic time-based triggers before implementing complex conditional logic.
// Good: Simple time-based trigger
{
"trigger": {
"type": "time",
"delay": "30d"
},
"action": {
"type": "set_value",
"value": "archived"
}
}
// Avoid initially: Complex conditions
{
"trigger": {
"type": "time",
"delay": "30d"
},
"condition": "status == 'inactive' && last_activity < now() - 60d && user_type != 'premium' && (region == 'EU' || gdpr_consent == true)",
"action": {
"type": "set_value",
"value": "archived"
}
}
Use Descriptive Names: Choose clear, descriptive names for your automation rules.
// Good
"name": "gdpr_email_anonymization"
"name": "quarterly_data_archive"
"name": "inactive_user_suspension"
// Avoid
"name": "rule1"
"name": "auto_thing"
"name": "cleanup"
2. Testing and Validation
Always Use Dry-Run Mode: Test automations before deploying to production.
// Test automation without applying changes
final results = await engine.evaluateObject(object, dryRun: true);
for (final result in results) {
print('Rule: ${result.ruleName}');
print('Would change: ${result.oldValue} → ${result.newValue}');
}
// Only apply if results look correct
if (resultsLookGood) {
await engine.evaluateObject(object, dryRun: false);
}
Validate Rule Definitions: Use built-in validation tools.
final validator = AutomationRuleValidator();
final issues = validator.validate(rule);
if (issues.isNotEmpty) {
for (final issue in issues) {
print('Validation error: ${issue.message}');
if (issue.suggestion != null) {
print('Suggestion: ${issue.suggestion}');
}
}
}
3. Performance Optimization
Optimize Condition Expressions: Write efficient conditions that minimize database queries.
// Good: Simple field comparisons
"condition": "status == 'inactive' && created_at < now() - 30d"
// Less efficient: Complex nested conditions
"condition": "status == 'inactive' && (SELECT COUNT(*) FROM related_table WHERE user_id = this.id) == 0"
Use Appropriate Batch Sizes: Configure batch processing for bulk operations.
{
"automation": {
"performance": {
"batchSize": 100,
"batchTimeout": "30s"
}
}
}
4. Security Considerations
Respect Field Permissions: Ensure automations have appropriate permissions.
// Check permissions before applying automation
final hasPermission = await security.checkFieldPermission(
userId: systemUserId,
modelName: 'User',
fieldName: 'email',
operation: 'write',
);
if (!hasPermission) {
throw AutomationPermissionException(
'Insufficient permissions for automation',
requiredPermission: 'User.email.write',
);
}
Audit Sensitive Operations: Enable comprehensive auditing for sensitive field modifications.
{
"automation": {
"security": {
"auditSensitiveFields": true,
"encryptAuditLogs": true
}
}
}
Advanced Topics
Custom Action Types
Implement custom actions for specialized transformations:
class CustomHashAction extends AutomationActionInterface {
final String algorithm;
CustomHashAction({this.algorithm = 'SHA-256'});
@override
Future<dynamic> execute(
dynamic currentValue,
Map<String, dynamic> context,
) async {
if (currentValue == null) return null;
final bytes = utf8.encode(currentValue.toString());
final digest = sha256.convert(bytes);
return digest.toString();
}
@override
String get type => 'custom_hash';
}
// Register custom action
engine.registerAction('custom_hash', CustomHashAction());
Multi-Tenant Automation
Configure tenant-specific automation behavior:
// Set tenant-specific configuration
await configManager.setTenantConfig(tenantId, {
'maxConcurrentAutomations': 5,
'excludedModels': ['SensitiveData'],
'customSettings': {
'gdprMode': true,
'retentionPeriod': '2y',
'anonymizationLevel': 'high'
}
});
// Get tenant-specific rules
final tenantRules = engine.getRulesForTenant(tenantId);
Integration with External Systems
Connect automations to external compliance systems:
class ComplianceIntegrationAction extends AutomationActionInterface {
final String complianceSystemUrl;
ComplianceIntegrationAction(this.complianceSystemUrl);
@override
Future<dynamic> execute(
dynamic currentValue,
Map<String, dynamic> context,
) async {
// Notify external compliance system
final response = await http.post(
Uri.parse('$complianceSystemUrl/data-modification'),
headers: {'Content-Type': 'application/json'},
body: jsonEncode({
'objectId': context['objectId'],
'fieldName': context['fieldName'],
'oldValue': currentValue,
'newValue': context['newValue'],
'ruleName': context['ruleName'],
'timestamp': DateTime.now().toIso8601String(),
}),
);
if (response.statusCode != 200) {
throw AutomationExecutionException(
'Failed to notify compliance system',
objectId: context['objectId'],
attemptedAt: DateTime.now(),
);
}
return context['newValue'];
}
@override
String get type => 'compliance_integration';
}
Reference
Configuration Options
Complete automation configuration reference:
{
"automation": {
"enabled": true,
"evaluationInterval": "1m",
"maxConcurrentAutomations": 10,
"auditEnabled": true,
"retryAttempts": 3,
"retryDelay": "5m",
"retryBackoffMultiplier": 2.0,
"maxRetryDelay": "1h",
"excludedModels": [],
"performance": {
"cachingEnabled": true,
"cacheTtl": "5m",
"maxCacheSize": 1000,
"batchSize": 100,
"batchTimeout": "30s"
},
"monitoring": {
"metricsEnabled": true,
"healthCheckInterval": "30s",
"alertThresholds": {
"failureRate": 0.05,
"averageExecutionTime": "10s",
"queueDepth": 1000
}
},
"security": {
"enforcePermissions": true,
"auditSensitiveFields": true,
"encryptAuditLogs": false,
"maxExecutionTime": "5m"
}
}
}
API Reference
Key automation APIs:
// Engine initialization
final engine = AutomationEngine(database, jobQueue, security);
await engine.initialize(applicationSchema, systemSchema);
// Rule execution
final result = await engine.executeRule(rule, object, fieldName);
// Object evaluation
final results = await engine.evaluateObject(object, dryRun: true);
// Configuration management
final configManager = AutomationConfigManager(database);
await configManager.updateConfig({'maxConcurrentAutomations': 20});
// Audit querying
final auditManager = AutomationAuditManager(database);
final logs = await auditManager.getLogsForObject(objectId);
// Performance monitoring
final monitoring = AutomationMonitoring(database);
final health = await monitoring.getSystemHealth();
Error Handling
Common automation exceptions and handling:
try {
await engine.evaluateObject(object);
} on AutomationRuleException catch (e) {
print('Rule validation error: ${e.message}');
print('Rule definition: ${e.ruleDefinition}');
} on AutomationExecutionException catch (e) {
print('Execution error: ${e.message}');
print('Object ID: ${e.objectId}');
print('Attempted at: ${e.attemptedAt}');
} on AutomationPermissionException catch (e) {
print('Permission error: ${e.message}');
print('Required permission: ${e.requiredPermission}');
} on AutomationException catch (e) {
print('General automation error: ${e.message}');
if (e.ruleName != null) print('Rule: ${e.ruleName}');
if (e.fieldName != null) print('Field: ${e.fieldName}');
}
Plugin System Integration
VektaGraf's plugin system extends the automation engine with powerful AI and external service capabilities. Plugins can be used as automation actions to generate embeddings, create content, transform data, and integrate with external services.
Plugin Architecture
The plugin system integrates seamlessly with the automation engine through specialized plugin actions:
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Schema Field │───→│ Automation Rule │───→│ Plugin Action │
│ with Automation │ │ with Trigger │ │ │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Updated Field │◄───│ Result Handler │◄───│ Plugin Manager │
│ Value │ │ │ │ │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Plugin Instance │
│ (Embeddings, │
│ Text Gen, etc) │
└─────────────────┘
Built-in Plugins
Embeddings Plugin
Generates vector embeddings from text for semantic search and similarity operations:
{
"models": {
"Document": {
"content": "string",
"content_embedding": {
"type": "[float64]",
"automations": [
{
"name": "generate_content_embedding",
"trigger": {
"type": "on_change",
"field": "content"
},
"action": {
"type": "plugin",
"pluginId": "vektagraf.embeddings",
"parameters": {
"text_field": "content",
"normalize": true,
"model": "sentence_transformer",
"dimensions": 384
}
},
"mandate": {
"authority": "AI Enhancement Policy AIP-2024-001",
"description": "Generate embeddings for semantic search capabilities",
"approvedBy": "AI Team Lead"
}
}
]
}
}
}
}
Text Generation Plugin
Creates content using language models for summarization, translation, and content generation:
{
"models": {
"Article": {
"content": "string",
"summary": {
"type": "string",
"automations": [
{
"name": "generate_article_summary",
"trigger": {
"type": "on_change",
"field": "content"
},
"action": {
"type": "plugin",
"pluginId": "vektagraf.text_generation",
"parameters": {
"task": "summarize",
"text_field": "content",
"max_tokens": 150,
"temperature": 0.7
}
},
"condition": "content.length > 500",
"mandate": {
"authority": "Content Enhancement Policy CEP-2024-002",
"description": "Auto-generate summaries for long articles",
"approvedBy": "Content Team Lead"
}
}
]
}
}
}
}
Plugin Setup and Configuration
Initialize Plugin Manager
import 'package:vektagraf/vektagraf.dart';
Future<void> setupPluginSystem() async {
// Initialize plugin manager
final pluginManager = PluginManager();
await pluginManager.initialize();
// Register embeddings plugin
final embeddingsPlugin = EmbeddingsPlugin();
await pluginManager.registerPlugin(
embeddingsPlugin,
config: {
'model': 'sentence_transformer',
'model_name': 'all-MiniLM-L6-v2',
'dimensions': 384,
},
);
// Register text generation plugin
final textGenPlugin = TextGenerationPlugin();
await pluginManager.registerPlugin(
textGenPlugin,
config: {
'model': 'openai',
'api_key': Platform.environment['OPENAI_API_KEY'],
'max_tokens': 1000,
'temperature': 0.7,
},
);
// Initialize automation engine with plugin support
final database = Database();
final jobQueue = JobQueueManager(database);
final security = SecurityPolicyEngineInterface(database);
final engine = AutomationEngine(
database,
jobQueue,
security,
pluginManager: pluginManager,
);
// Load schema with plugin actions
final schema = VektagrafSchema.fromJson(schemaWithPlugins);
await engine.initialize(schema);
}
Plugin Configuration Management
// Environment-specific plugin configurations
final pluginConfigs = {
'development': {
'vektagraf.embeddings': {
'model': 'local',
'model_path': '/dev/models/embeddings',
'dimensions': 256,
},
'vektagraf.text_generation': {
'model': 'mock',
'max_tokens': 100,
},
},
'production': {
'vektagraf.embeddings': {
'model': 'openai',
'api_key': Platform.environment['OPENAI_API_KEY'],
'dimensions': 1536,
},
'vektagraf.text_generation': {
'model': 'openai',
'api_key': Platform.environment['OPENAI_API_KEY'],
'max_tokens': 2000,
},
},
};
// Apply environment-specific configuration
final environment = Platform.environment['ENVIRONMENT'] ?? 'development';
final config = pluginConfigs[environment] ?? pluginConfigs['development'];
for (final entry in config.entries) {
await pluginManager.reconfigurePlugin(entry.key, entry.value);
}
Advanced Plugin Patterns
Chained Plugin Actions
Multiple plugins can work together through automation rules:
{
"models": {
"ResearchPaper": {
"full_text": "string",
"summary": {
"type": "string",
"automations": [
{
"name": "generate_summary",
"trigger": {"type": "on_change", "field": "full_text"},
"action": {
"type": "plugin",
"pluginId": "vektagraf.text_generation",
"parameters": {
"task": "summarize",
"text_field": "full_text",
"max_tokens": 300
}
}
}
]
},
"summary_embedding": {
"type": "[float64]",
"automations": [
{
"name": "embed_summary",
"trigger": {"type": "on_change", "field": "summary"},
"action": {
"type": "plugin",
"pluginId": "vektagraf.embeddings",
"parameters": {
"text_field": "summary",
"normalize": true
}
}
}
]
},
"keywords": {
"type": "[string]",
"automations": [
{
"name": "extract_keywords",
"trigger": {"type": "on_change", "field": "summary"},
"action": {
"type": "plugin",
"pluginId": "vektagraf.text_generation",
"parameters": {
"task": "extract_keywords",
"text_field": "summary",
"max_tokens": 50
}
}
}
]
}
}
}
}
Context-Aware Processing
Plugins can access multiple fields for context-aware processing:
{
"name": "generate_contextual_description",
"trigger": {"type": "on_create"},
"action": {
"type": "plugin",
"pluginId": "vektagraf.text_generation",
"parameters": {
"prompt_template": "Write a {style} description for {title} in the {category} category. Key features: {features}. Target audience: {audience}.",
"context_fields": ["title", "category", "features", "audience", "style"],
"max_tokens": 200,
"temperature": 0.8
}
}
}
Conditional Plugin Execution
Use conditions to control when plugins execute:
{
"name": "generate_embedding_for_long_content",
"trigger": {"type": "on_change", "field": "content"},
"condition": "content.length > 100 && language == 'en'",
"action": {
"type": "plugin",
"pluginId": "vektagraf.embeddings",
"parameters": {
"text_field": "content",
"model": "multilingual"
}
}
}
Plugin Development
Creating Custom Plugins
class CustomAnalyticsPlugin extends AutomationPlugin {
@override
String get id => 'custom.analytics';
@override
String get name => 'Custom Analytics Plugin';
@override
String get version => '1.0.0';
@override
String get description => 'Performs custom analytics on text content';
@override
List<String> get capabilities => ['text_analysis', 'sentiment_analysis'];
@override
Future<void> initialize(Map<String, dynamic> config) async {
// Initialize analytics service
final apiKey = config['api_key'] as String?;
if (apiKey == null) {
throw PluginException('Analytics API key is required');
}
_analyticsService = AnalyticsService(apiKey);
await _analyticsService.initialize();
}
@override
Future<PluginResult> execute(PluginInput input) async {
final stopwatch = Stopwatch()..start();
try {
final text = input.currentValue?.toString() ?? '';
if (text.isEmpty) {
return PluginResult.failure(
errorMessage: 'No text provided for analysis',
executionTimeMs: stopwatch.elapsedMilliseconds,
);
}
final analysis = await _analyticsService.analyzeText(text);
return PluginResult.success(
value: {
'sentiment': analysis.sentiment,
'confidence': analysis.confidence,
'topics': analysis.topics,
'language': analysis.detectedLanguage,
},
executionTimeMs: stopwatch.elapsedMilliseconds,
);
} catch (e) {
return PluginResult.failure(
errorMessage: 'Analytics failed: $e',
executionTimeMs: stopwatch.elapsedMilliseconds,
);
}
}
@override
Future<PluginValidationResult> validateInput(Map<String, dynamic> parameters) async {
final errors = <String>[];
if (!parameters.containsKey('text_field') && !parameters.containsKey('text_value')) {
errors.add('Either text_field or text_value must be specified');
}
return errors.isEmpty
? PluginValidationResult.valid()
: PluginValidationResult.invalid(errors: errors);
}
@override
Map<String, dynamic> getInputSchema() {
return {
'type': 'object',
'properties': {
'text_field': {'type': 'string'},
'text_value': {'type': 'string'},
'analysis_type': {
'type': 'string',
'enum': ['sentiment', 'topics', 'full'],
'default': 'full',
},
},
'oneOf': [
{'required': ['text_field']},
{'required': ['text_value']},
],
};
}
@override
Map<String, dynamic> getOutputSchema() {
return {
'type': 'object',
'properties': {
'sentiment': {'type': 'string'},
'confidence': {'type': 'number'},
'topics': {'type': 'array', 'items': {'type': 'string'}},
'language': {'type': 'string'},
},
};
}
}
Register Custom Plugin
// Register custom plugin
final customPlugin = CustomAnalyticsPlugin();
await pluginManager.registerPlugin(
customPlugin,
config: {
'api_key': Platform.environment['ANALYTICS_API_KEY'],
'timeout_ms': 30000,
},
);
// Use in automation rule
final rule = AutomationRule(
name: 'analyze_content_sentiment',
trigger: OnChangeTrigger(field: 'content'),
action: PluginAutomationActionInterface(
pluginId: 'custom.analytics',
parameters: {
'text_field': 'content',
'analysis_type': 'sentiment',
},
pluginManager: pluginManager,
),
);
Plugin Monitoring and Management
Health Monitoring
// Monitor plugin health
pluginManager.events.listen((event) {
switch (event.type) {
case 'plugin_executed':
final success = event.data['success'] as bool;
final executionTime = event.data['executionTimeMs'] as int;
if (!success) {
logger.warning('Plugin ${event.pluginId} execution failed');
}
if (executionTime > 10000) {
logger.warning('Plugin ${event.pluginId} slow execution: ${executionTime}ms');
}
break;
case 'plugin_health_changed':
final healthy = event.data['healthy'] as bool;
if (!healthy) {
logger.error('Plugin ${event.pluginId} became unhealthy');
// Implement alerting logic
}
break;
}
});
// Check plugin health status
final healthStatuses = pluginManager.getHealthStatuses();
for (final entry in healthStatuses.entries) {
if (!entry.value) {
logger.error('Plugin ${entry.key} is unhealthy');
}
}
Performance Metrics
// Collect plugin performance metrics
class PluginMetricsCollector {
final MetricsRegistry _registry;
void recordPluginExecution({
required String pluginId,
required bool success,
required Duration executionTime,
String? tenantId,
}) {
_registry.counter('plugin_executions_total')
.labels({'plugin_id': pluginId, 'success': success.toString()})
.increment();
_registry.histogram('plugin_execution_duration_ms')
.labels({'plugin_id': pluginId})
.observe(executionTime.inMilliseconds.toDouble());
if (tenantId != null) {
_registry.counter('plugin_executions_by_tenant')
.labels({'plugin_id': pluginId, 'tenant_id': tenantId})
.increment();
}
}
}
Plugin Security and Compliance
Resource Limits
class ResourceLimitedPlugin extends AutomationPlugin {
@override
Future<PluginResult> execute(PluginInput input) async {
return await ResourceLimiter.executeWithLimits(
maxMemoryMB: 512,
maxExecutionTimeMs: 30000,
maxCpuPercent: 80,
operation: () => _executePlugin(input),
);
}
}
Data Sanitization
class PrivacyAwarePlugin extends AutomationPlugin {
final DataSanitizer _sanitizer;
@override
Future<PluginResult> execute(PluginInput input) async {
// Sanitize input data
final sanitizedInput = await _sanitizer.sanitize(input);
// Execute with sanitized data
final result = await _executeSecurely(sanitizedInput);
// Ensure no sensitive data in result
return await _sanitizer.sanitizeResult(result);
}
}
Audit Logging
class AuditablePlugin extends AutomationPlugin {
@override
Future<PluginResult> execute(PluginInput input) async {
final auditId = _generateAuditId();
// Log execution start
await _auditLogger.logPluginExecution(
auditId: auditId,
pluginId: id,
tenantId: input.tenantId,
action: 'execute_start',
inputHash: _hashInput(input),
timestamp: DateTime.now(),
);
try {
final result = await _executeWithAudit(input);
// Log successful execution
await _auditLogger.logPluginExecution(
auditId: auditId,
pluginId: id,
action: 'execute_success',
resultHash: _hashResult(result),
executionTimeMs: result.executionTimeMs,
timestamp: DateTime.now(),
);
return result;
} catch (e) {
// Log execution failure
await _auditLogger.logPluginExecution(
auditId: auditId,
pluginId: id,
action: 'execute_failure',
error: e.toString(),
timestamp: DateTime.now(),
);
rethrow;
}
}
}
Plugin Best Practices
1. Design Principles
- Single Responsibility: Each plugin should have a focused purpose
- Stateless Design: Plugins should be stateless for better scalability
- Idempotent Operations: Same input should produce same output
- Error Resilience: Handle failures gracefully with meaningful error messages
2. Performance Optimization
- Caching: Cache expensive operations when appropriate
- Resource Pooling: Reuse expensive resources like ML models
- Batch Processing: Process multiple items efficiently when possible
- Async Operations: Use async/await for non-blocking execution
3. Security Considerations
- Input Validation: Always validate and sanitize inputs
- Resource Limits: Implement timeouts and resource constraints
- Data Privacy: Handle sensitive data according to privacy policies
- Access Control: Respect user permissions and tenant boundaries
4. Testing Strategies
- Unit Testing: Test plugin logic in isolation
- Integration Testing: Test with automation engine
- Mock Testing: Use mocks for external services
- Performance Testing: Validate performance under load
Plugin Configuration Reference
{
"plugins": {
"vektagraf.embeddings": {
"model": "sentence_transformer",
"model_name": "all-MiniLM-L6-v2",
"dimensions": 384,
"normalize": true,
"batch_size": 32,
"timeout_ms": 30000
},
"vektagraf.text_generation": {
"model": "openai",
"model_name": "gpt-3.5-turbo",
"api_key": "${OPENAI_API_KEY}",
"max_tokens": 1000,
"temperature": 0.7,
"timeout_ms": 60000
},
"custom.analytics": {
"api_key": "${ANALYTICS_API_KEY}",
"endpoint": "https://api.analytics.com/v1",
"timeout_ms": 30000,
"retry_attempts": 3
}
},
"plugin_manager": {
"health_check_interval": "5m",
"max_concurrent_executions": 10,
"default_timeout_ms": 30000,
"enable_metrics": true,
"enable_audit_logging": true
}
}
Summary
The VektaGraf Automation system provides a powerful, declarative approach to data lifecycle management that enables:
- Compliance Automation: Automatically handle GDPR, data retention, and regulatory requirements
- Lifecycle Management: Implement user account lifecycle and data archival policies
- Security Enforcement: Automate security-related data transformations
- AI Integration: Generate embeddings, create content, and perform intelligent data processing
- External Service Integration: Connect to APIs and external systems through plugins
- Operational Efficiency: Reduce manual data management overhead
Key Takeaways
- Schema-Driven: Automation rules are defined directly in your schema for version control and deployment consistency
- Plugin-Extensible: Extend automation capabilities with AI and external service plugins
- Mandate Documentation: All automations require proper authority and approval documentation
- Testing First: Always use dry-run mode and validation tools before production deployment
- Performance Aware: Configure appropriate batch sizes and monitoring for production workloads
- Security Conscious: Respect field permissions and enable comprehensive auditing
- AI-Enhanced: Built-in plugins for embeddings and text generation enable intelligent automation
- Compliance Ready: Built-in audit trails and reporting support regulatory requirements
Next Steps
- Chapter 19: Learn about production deployment patterns for automation systems
- Chapter 20: Explore scaling strategies for high-volume automation workloads
- Appendix II: Reference complete configuration options and API documentation
The automation system integrates seamlessly with VektaGraf's security, multi-tenancy, and job queue systems to provide a comprehensive data management solution that scales from simple use cases to enterprise-grade compliance requirements.
No Comments