Skip to main content

Chapter 18: Automation System

Overview

The Vektagraf Automation System provides a powerful, declarative approach to data lifecycle management, enabling fields to automatically modify their values based on time-based triggers, state changes, or conditional logic. This chapter covers the complete automation system, from basic rule definitions to advanced enterprise deployment patterns.

Learning Objectives

By the end of this chapter, you will understand:

  • How to design and implement automation rules in your schema
  • Time-based and state-based trigger mechanisms
  • Action types and their appropriate use cases
  • Compliance and audit requirements for automated data processing
  • Performance optimization and monitoring strategies
  • Enterprise deployment patterns and best practices

Core Concepts

Automation Architecture

The automation system consists of several key components:

  1. Rule Engine: Evaluates automation rules against objects
  2. Trigger System: Monitors for time-based and state-based events
  3. Action Executor: Applies transformations to field values
  4. Job Queue Integration: Schedules and manages automation execution
  5. Audit System: Tracks all automation activities for compliance

Rule Definition Structure

Automation rules are defined directly in your schema using a declarative syntax:

{
  "models": {
    "User": {
      "email": {
        "type": "string",
        "automations": [
          {
            "name": "gdpr_email_anonymization",
            "trigger": {
              "type": "time",
              "dateTimeField": "gdpr_deletion_date"
            },
            "action": {
              "type": "obfuscate",
              "pattern": "***@***.***"
            },
            "condition": "gdpr_deletion_date != null && status != 'active'",
            "enabled": true,
            "mandate": {
              "authority": "GDPR Article 17 - Right to Erasure",
              "description": "Anonymize email addresses when GDPR deletion date is reached",
              "approvedBy": "Data Protection Officer"
            }
          }
        ]
      }
    }
  }
}

Mandate Requirements

All automation rules must include mandate information documenting the legal, policy, or business authority for the automation:

{
  "mandate": {
    "authority": "GDPR Article 17 - Right to Erasure",
    "policy": "Data Retention Policy v2.1",
    "description": "Automatically anonymize email addresses when GDPR deletion date is reached",
    "approvedBy": "Data Protection Officer",
    "approvalDate": "2024-01-15",
    "reviewDate": "2025-01-15",
    "businessJustification": "Ensure compliance with data protection regulations",
    "riskAssessment": "Low - automated process reduces human error"
  }
}

Practical Examples

Example 1: GDPR Compliance Automation

This example demonstrates automatic data anonymization for GDPR compliance:

import 'package:vektagraf/vektagraf.dart';

// Schema definition with GDPR automation
const gdprSchema = '''
{
  "models": {
    "User": {
      "email": {
        "type": "string",
        "automations": [
          {
            "name": "gdpr_email_anonymization",
            "trigger": {
              "type": "time",
              "dateTimeField": "gdpr_deletion_date"
            },
            "action": {
              "type": "obfuscate",
              "pattern": "***@***.***"
            },
            "condition": "gdpr_deletion_date != null && status != 'active'",
            "mandate": {
              "authority": "GDPR Article 17 - Right to Erasure",
              "description": "Anonymize email addresses when GDPR deletion date is reached",
              "approvedBy": "Data Protection Officer"
            }
          }
        ]
      },
      "phone": {
        "type": "string",
        "automations": [
          {
            "name": "gdpr_phone_anonymization",
            "trigger": {
              "type": "time",
              "dateTimeField": "gdpr_deletion_date"
            },
            "action": {
              "type": "obfuscate",
              "pattern": "***-***-****"
            },
            "condition": "gdpr_deletion_date != null",
            "mandate": {
              "authority": "GDPR Article 17 - Right to Erasure",
              "description": "Anonymize phone numbers when GDPR deletion date is reached",
              "approvedBy": "Data Protection Officer"
            }
          }
        ]
      },
      "gdpr_deletion_date": {
        "type": "datetime"
      },
      "status": {
        "type": "string"
      }
    }
  }
}
''';

Future<void> setupGdprAutomation() async {
  // Initialize database and automation engine
  final database = Database();
  final jobQueue = JobQueueManager(database);
  final security = SecurityPolicyEngineInterface(database);
  final engine = AutomationEngine(database, jobQueue, security);
  
  // Load schema with automation rules
  final schema = VektagrafSchema.fromJson(gdprSchema);
  await engine.initialize(schema);
  
  // Create a user with GDPR deletion date
  final user = await database.create('User', {
    'email': 'user@example.com',
    'phone': '555-123-4567',
    'status': 'inactive',
    'gdpr_deletion_date': DateTime.now().add(Duration(days: 30)),
  });
  
  print('User created with GDPR deletion date: ${user.id}');
  
  // The automation will automatically trigger when gdpr_deletion_date is reached
  // You can also test with dry-run mode
  final results = await engine.evaluateObject(user, dryRun: true);
  for (final result in results) {
    print('Rule: ${result.ruleName}');
    print('Would change: ${result.oldValue} → ${result.newValue}');
  }
}

Example 2: Data Retention Management

Implement automatic data archival based on retention policies:

const retentionSchema = '''
{
  "models": {
    "Document": {
      "status": {
        "type": "string",
        "automations": [
          {
            "name": "auto_archive_old_documents",
            "trigger": {
              "type": "time",
              "delay": "7y"
            },
            "action": {
              "type": "set_value",
              "value": "archived"
            },
            "condition": "status != 'active' && status != 'archived'",
            "mandate": {
              "authority": "Document Retention Policy DRP-2024-001",
              "description": "Archive documents after 7 years of inactivity",
              "approvedBy": "Records Manager",
              "businessJustification": "Comply with legal retention requirements while managing storage costs"
            }
          }
        ]
      },
      "content": {
        "type": "string",
        "automations": [
          {
            "name": "purge_archived_content",
            "trigger": {
              "type": "state",
              "watchedField": "status",
              "expectedValue": "archived"
            },
            "action": {
              "type": "blank"
            },
            "condition": "created_at < now() - 10y",
            "mandate": {
              "authority": "Document Retention Policy DRP-2024-001",
              "description": "Remove content from archived documents older than 10 years",
              "approvedBy": "Records Manager"
            }
          }
        ]
      },
      "created_at": {
        "type": "datetime"
      }
    }
  }
}
''';

Future<void> setupRetentionAutomation() async {
  final database = Database();
  final jobQueue = JobQueueManager(database);
  final security = SecurityPolicyEngineInterface(database);
  final engine = AutomationEngine(database, jobQueue, security);
  
  final schema = VektagrafSchema.fromJson(retentionSchema);
  await engine.initialize(schema);
  
  // Create a document that will be subject to retention rules
  final document = await database.create('Document', {
    'status': 'inactive',
    'content': 'This is document content that will be archived and eventually purged.',
    'created_at': DateTime.now().subtract(Duration(days: 2555)), // ~7 years ago
  });
  
  print('Document created: ${document.id}');
  
  // Check what automations would apply
  final results = await engine.evaluateObject(document, dryRun: true);
  for (final result in results) {
    print('Rule: ${result.ruleName} - Success: ${result.success}');
    if (result.success) {
      print('  ${result.oldValue} → ${result.newValue}');
    }
  }
}

Example 3: User Lifecycle Management

Automate user account lifecycle based on activity patterns:

const userLifecycleSchema = '''
{
  "models": {
    "User": {
      "status": {
        "type": "string",
        "automations": [
          {
            "name": "deactivate_inactive_users",
            "trigger": {
              "type": "time",
              "delay": "90d"
            },
            "action": {
              "type": "set_value",
              "value": "inactive"
            },
            "condition": "last_login_at < now() - 90d && status == 'active'",
            "mandate": {
              "authority": "Security Policy SEC-2024-003",
              "description": "Deactivate users inactive for 90+ days",
              "approvedBy": "Security Team",
              "businessJustification": "Reduce security risk from dormant accounts"
            }
          },
          {
            "name": "suspend_long_inactive_users",
            "trigger": {
              "type": "time",
              "delay": "180d"
            },
            "action": {
              "type": "set_value",
              "value": "suspended"
            },
            "condition": "last_login_at < now() - 180d && status == 'inactive'",
            "mandate": {
              "authority": "Security Policy SEC-2024-003",
              "description": "Suspend users inactive for 180+ days",
              "approvedBy": "Security Team"
            }
          }
        ]
      },
      "access_token": {
        "type": "string",
        "automations": [
          {
            "name": "revoke_suspended_user_tokens",
            "trigger": {
              "type": "state",
              "watchedField": "status",
              "expectedValue": "suspended"
            },
            "action": {
              "type": "blank"
            },
            "mandate": {
              "authority": "Security Policy SEC-2024-003",
              "description": "Revoke access tokens when user is suspended",
              "approvedBy": "Security Team"
            }
          }
        ]
      },
      "last_login_at": {
        "type": "datetime"
      }
    }
  }
}
''';

Future<void> setupUserLifecycleAutomation() async {
  final database = Database();
  final jobQueue = JobQueueManager(database);
  final security = SecurityPolicyEngineInterface(database);
  final engine = AutomationEngine(database, jobQueue, security);
  
  final schema = VektagrafSchema.fromJson(userLifecycleSchema);
  await engine.initialize(schema);
  
  // Create users with different activity patterns
  final activeUser = await database.create('User', {
    'status': 'active',
    'access_token': 'active_token_123',
    'last_login_at': DateTime.now().subtract(Duration(days: 5)),
  });
  
  final inactiveUser = await database.create('User', {
    'status': 'active',
    'access_token': 'inactive_token_456',
    'last_login_at': DateTime.now().subtract(Duration(days: 100)),
  });
  
  print('Created active user: ${activeUser.id}');
  print('Created inactive user: ${inactiveUser.id}');
  
  // Check automation results for inactive user
  final results = await engine.evaluateObject(inactiveUser, dryRun: true);
  for (final result in results) {
    print('Rule: ${result.ruleName}');
    if (result.success) {
      print('  Would apply: ${result.oldValue} → ${result.newValue}');
    }
  }
}

Best Practices

1. Rule Design Principles

Start Simple: Begin with basic time-based triggers before implementing complex conditional logic.

// Good: Simple time-based trigger
{
  "trigger": {
    "type": "time",
    "delay": "30d"
  },
  "action": {
    "type": "set_value",
    "value": "archived"
  }
}

// Avoid initially: Complex conditions
{
  "trigger": {
    "type": "time",
    "delay": "30d"
  },
  "condition": "status == 'inactive' && last_activity < now() - 60d && user_type != 'premium' && (region == 'EU' || gdpr_consent == true)",
  "action": {
    "type": "set_value",
    "value": "archived"
  }
}

Use Descriptive Names: Choose clear, descriptive names for your automation rules.

// Good
"name": "gdpr_email_anonymization"
"name": "quarterly_data_archive"
"name": "inactive_user_suspension"

// Avoid
"name": "rule1"
"name": "auto_thing"
"name": "cleanup"

2. Testing and Validation

Always Use Dry-Run Mode: Test automations before deploying to production.

// Test automation without applying changes
final results = await engine.evaluateObject(object, dryRun: true);
for (final result in results) {
  print('Rule: ${result.ruleName}');
  print('Would change: ${result.oldValue} → ${result.newValue}');
}

// Only apply if results look correct
if (resultsLookGood) {
  await engine.evaluateObject(object, dryRun: false);
}

Validate Rule Definitions: Use built-in validation tools.

final validator = AutomationRuleValidator();
final issues = validator.validate(rule);

if (issues.isNotEmpty) {
  for (final issue in issues) {
    print('Validation error: ${issue.message}');
    if (issue.suggestion != null) {
      print('Suggestion: ${issue.suggestion}');
    }
  }
}

3. Performance Optimization

Optimize Condition Expressions: Write efficient conditions that minimize database queries.

// Good: Simple field comparisons
"condition": "status == 'inactive' && created_at < now() - 30d"

// Less efficient: Complex nested conditions
"condition": "status == 'inactive' && (SELECT COUNT(*) FROM related_table WHERE user_id = this.id) == 0"

Use Appropriate Batch Sizes: Configure batch processing for bulk operations.

{
  "automation": {
    "performance": {
      "batchSize": 100,
      "batchTimeout": "30s"
    }
  }
}

4. Security Considerations

Respect Field Permissions: Ensure automations have appropriate permissions.

// Check permissions before applying automation
final hasPermission = await security.checkFieldPermission(
  userId: systemUserId,
  modelName: 'User',
  fieldName: 'email',
  operation: 'write',
);

if (!hasPermission) {
  throw AutomationPermissionException(
    'Insufficient permissions for automation',
    requiredPermission: 'User.email.write',
  );
}

Audit Sensitive Operations: Enable comprehensive auditing for sensitive field modifications.

{
  "automation": {
    "security": {
      "auditSensitiveFields": true,
      "encryptAuditLogs": true
    }
  }
}

Advanced Topics

Custom Action Types

Implement custom actions for specialized transformations:

class CustomHashAction extends AutomationActionInterface {
  final String algorithm;
  
  CustomHashAction({this.algorithm = 'SHA-256'});
  
  @override
  Future<dynamic> execute(
    dynamic currentValue,
    Map<String, dynamic> context,
  ) async {
    if (currentValue == null) return null;
    
    final bytes = utf8.encode(currentValue.toString());
    final digest = sha256.convert(bytes);
    return digest.toString();
  }
  
  @override
  String get type => 'custom_hash';
}

// Register custom action
engine.registerAction('custom_hash', CustomHashAction());

Multi-Tenant Automation

Configure tenant-specific automation behavior:

// Set tenant-specific configuration
await configManager.setTenantConfig(tenantId, {
  'maxConcurrentAutomations': 5,
  'excludedModels': ['SensitiveData'],
  'customSettings': {
    'gdprMode': true,
    'retentionPeriod': '2y',
    'anonymizationLevel': 'high'
  }
});

// Get tenant-specific rules
final tenantRules = engine.getRulesForTenant(tenantId);

Integration with External Systems

Connect automations to external compliance systems:

class ComplianceIntegrationAction extends AutomationActionInterface {
  final String complianceSystemUrl;
  
  ComplianceIntegrationAction(this.complianceSystemUrl);
  
  @override
  Future<dynamic> execute(
    dynamic currentValue,
    Map<String, dynamic> context,
  ) async {
    // Notify external compliance system
    final response = await http.post(
      Uri.parse('$complianceSystemUrl/data-modification'),
      headers: {'Content-Type': 'application/json'},
      body: jsonEncode({
        'objectId': context['objectId'],
        'fieldName': context['fieldName'],
        'oldValue': currentValue,
        'newValue': context['newValue'],
        'ruleName': context['ruleName'],
        'timestamp': DateTime.now().toIso8601String(),
      }),
    );
    
    if (response.statusCode != 200) {
      throw AutomationExecutionException(
        'Failed to notify compliance system',
        objectId: context['objectId'],
        attemptedAt: DateTime.now(),
      );
    }
    
    return context['newValue'];
  }
  
  @override
  String get type => 'compliance_integration';
}

Reference

Configuration Options

Complete automation configuration reference:

{
  "automation": {
    "enabled": true,
    "evaluationInterval": "1m",
    "maxConcurrentAutomations": 10,
    "auditEnabled": true,
    "retryAttempts": 3,
    "retryDelay": "5m",
    "retryBackoffMultiplier": 2.0,
    "maxRetryDelay": "1h",
    "excludedModels": [],
    "performance": {
      "cachingEnabled": true,
      "cacheTtl": "5m",
      "maxCacheSize": 1000,
      "batchSize": 100,
      "batchTimeout": "30s"
    },
    "monitoring": {
      "metricsEnabled": true,
      "healthCheckInterval": "30s",
      "alertThresholds": {
        "failureRate": 0.05,
        "averageExecutionTime": "10s",
        "queueDepth": 1000
      }
    },
    "security": {
      "enforcePermissions": true,
      "auditSensitiveFields": true,
      "encryptAuditLogs": false,
      "maxExecutionTime": "5m"
    }
  }
}

API Reference

Key automation APIs:

// Engine initialization
final engine = AutomationEngine(database, jobQueue, security);
await engine.initialize(applicationSchema, systemSchema);

// Rule execution
final result = await engine.executeRule(rule, object, fieldName);

// Object evaluation
final results = await engine.evaluateObject(object, dryRun: true);

// Configuration management
final configManager = AutomationConfigManager(database);
await configManager.updateConfig({'maxConcurrentAutomations': 20});

// Audit querying
final auditManager = AutomationAuditManager(database);
final logs = await auditManager.getLogsForObject(objectId);

// Performance monitoring
final monitoring = AutomationMonitoring(database);
final health = await monitoring.getSystemHealth();

Error Handling

Common automation exceptions and handling:

try {
  await engine.evaluateObject(object);
} on AutomationRuleException catch (e) {
  print('Rule validation error: ${e.message}');
  print('Rule definition: ${e.ruleDefinition}');
} on AutomationExecutionException catch (e) {
  print('Execution error: ${e.message}');
  print('Object ID: ${e.objectId}');
  print('Attempted at: ${e.attemptedAt}');
} on AutomationPermissionException catch (e) {
  print('Permission error: ${e.message}');
  print('Required permission: ${e.requiredPermission}');
} on AutomationException catch (e) {
  print('General automation error: ${e.message}');
  if (e.ruleName != null) print('Rule: ${e.ruleName}');
  if (e.fieldName != null) print('Field: ${e.fieldName}');
}

Plugin System Integration

VektaGraf's plugin system extends the automation engine with powerful AI and external service capabilities. Plugins can be used as automation actions to generate embeddings, create content, transform data, and integrate with external services.

Plugin Architecture

The plugin system integrates seamlessly with the automation engine through specialized plugin actions:

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Schema Field    │───→│ Automation Rule  │───→│ Plugin Action   │
│ with Automation │    │ with Trigger     │    │                 │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                                                         │
                                                         ▼
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Updated Field   │◄───│ Result Handler   │◄───│ Plugin Manager  │
│ Value           │    │                  │    │                 │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                                                         │
                                                         ▼
                                               ┌─────────────────┐
                                               │ Plugin Instance │
                                               │ (Embeddings,    │
                                               │  Text Gen, etc) │
                                               └─────────────────┘

Built-in Plugins

Embeddings Plugin

Generates vector embeddings from text for semantic search and similarity operations:

{
  "models": {
    "Document": {
      "content": "string",
      "content_embedding": {
        "type": "[float64]",
        "automations": [
          {
            "name": "generate_content_embedding",
            "trigger": {
              "type": "on_change",
              "field": "content"
            },
            "action": {
              "type": "plugin",
              "pluginId": "vektagraf.embeddings",
              "parameters": {
                "text_field": "content",
                "normalize": true,
                "model": "sentence_transformer",
                "dimensions": 384
              }
            },
            "mandate": {
              "authority": "AI Enhancement Policy AIP-2024-001",
              "description": "Generate embeddings for semantic search capabilities",
              "approvedBy": "AI Team Lead"
            }
          }
        ]
      }
    }
  }
}

Text Generation Plugin

Creates content using language models for summarization, translation, and content generation:

{
  "models": {
    "Article": {
      "content": "string",
      "summary": {
        "type": "string",
        "automations": [
          {
            "name": "generate_article_summary",
            "trigger": {
              "type": "on_change",
              "field": "content"
            },
            "action": {
              "type": "plugin",
              "pluginId": "vektagraf.text_generation",
              "parameters": {
                "task": "summarize",
                "text_field": "content",
                "max_tokens": 150,
                "temperature": 0.7
              }
            },
            "condition": "content.length > 500",
            "mandate": {
              "authority": "Content Enhancement Policy CEP-2024-002",
              "description": "Auto-generate summaries for long articles",
              "approvedBy": "Content Team Lead"
            }
          }
        ]
      }
    }
  }
}

Plugin Setup and Configuration

Initialize Plugin Manager

import 'package:vektagraf/vektagraf.dart';

Future<void> setupPluginSystem() async {
  // Initialize plugin manager
  final pluginManager = PluginManager();
  await pluginManager.initialize();

  // Register embeddings plugin
  final embeddingsPlugin = EmbeddingsPlugin();
  await pluginManager.registerPlugin(
    embeddingsPlugin,
    config: {
      'model': 'sentence_transformer',
      'model_name': 'all-MiniLM-L6-v2',
      'dimensions': 384,
    },
  );

  // Register text generation plugin
  final textGenPlugin = TextGenerationPlugin();
  await pluginManager.registerPlugin(
    textGenPlugin,
    config: {
      'model': 'openai',
      'api_key': Platform.environment['OPENAI_API_KEY'],
      'max_tokens': 1000,
      'temperature': 0.7,
    },
  );

  // Initialize automation engine with plugin support
  final database = Database();
  final jobQueue = JobQueueManager(database);
  final security = SecurityPolicyEngineInterface(database);
  
  final engine = AutomationEngine(
    database,
    jobQueue,
    security,
    pluginManager: pluginManager,
  );

  // Load schema with plugin actions
  final schema = VektagrafSchema.fromJson(schemaWithPlugins);
  await engine.initialize(schema);
}

Plugin Configuration Management

// Environment-specific plugin configurations
final pluginConfigs = {
  'development': {
    'vektagraf.embeddings': {
      'model': 'local',
      'model_path': '/dev/models/embeddings',
      'dimensions': 256,
    },
    'vektagraf.text_generation': {
      'model': 'mock',
      'max_tokens': 100,
    },
  },
  'production': {
    'vektagraf.embeddings': {
      'model': 'openai',
      'api_key': Platform.environment['OPENAI_API_KEY'],
      'dimensions': 1536,
    },
    'vektagraf.text_generation': {
      'model': 'openai',
      'api_key': Platform.environment['OPENAI_API_KEY'],
      'max_tokens': 2000,
    },
  },
};

// Apply environment-specific configuration
final environment = Platform.environment['ENVIRONMENT'] ?? 'development';
final config = pluginConfigs[environment] ?? pluginConfigs['development'];

for (final entry in config.entries) {
  await pluginManager.reconfigurePlugin(entry.key, entry.value);
}

Advanced Plugin Patterns

Chained Plugin Actions

Multiple plugins can work together through automation rules:

{
  "models": {
    "ResearchPaper": {
      "full_text": "string",
      "summary": {
        "type": "string",
        "automations": [
          {
            "name": "generate_summary",
            "trigger": {"type": "on_change", "field": "full_text"},
            "action": {
              "type": "plugin",
              "pluginId": "vektagraf.text_generation",
              "parameters": {
                "task": "summarize",
                "text_field": "full_text",
                "max_tokens": 300
              }
            }
          }
        ]
      },
      "summary_embedding": {
        "type": "[float64]",
        "automations": [
          {
            "name": "embed_summary",
            "trigger": {"type": "on_change", "field": "summary"},
            "action": {
              "type": "plugin",
              "pluginId": "vektagraf.embeddings",
              "parameters": {
                "text_field": "summary",
                "normalize": true
              }
            }
          }
        ]
      },
      "keywords": {
        "type": "[string]",
        "automations": [
          {
            "name": "extract_keywords",
            "trigger": {"type": "on_change", "field": "summary"},
            "action": {
              "type": "plugin",
              "pluginId": "vektagraf.text_generation",
              "parameters": {
                "task": "extract_keywords",
                "text_field": "summary",
                "max_tokens": 50
              }
            }
          }
        ]
      }
    }
  }
}

Context-Aware Processing

Plugins can access multiple fields for context-aware processing:

{
  "name": "generate_contextual_description",
  "trigger": {"type": "on_create"},
  "action": {
    "type": "plugin",
    "pluginId": "vektagraf.text_generation",
    "parameters": {
      "prompt_template": "Write a {style} description for {title} in the {category} category. Key features: {features}. Target audience: {audience}.",
      "context_fields": ["title", "category", "features", "audience", "style"],
      "max_tokens": 200,
      "temperature": 0.8
    }
  }
}

Conditional Plugin Execution

Use conditions to control when plugins execute:

{
  "name": "generate_embedding_for_long_content",
  "trigger": {"type": "on_change", "field": "content"},
  "condition": "content.length > 100 && language == 'en'",
  "action": {
    "type": "plugin",
    "pluginId": "vektagraf.embeddings",
    "parameters": {
      "text_field": "content",
      "model": "multilingual"
    }
  }
}

Plugin Development

Creating Custom Plugins

class CustomAnalyticsPlugin extends AutomationPlugin {
  @override
  String get id => 'custom.analytics';

  @override
  String get name => 'Custom Analytics Plugin';

  @override
  String get version => '1.0.0';

  @override
  String get description => 'Performs custom analytics on text content';

  @override
  List<String> get capabilities => ['text_analysis', 'sentiment_analysis'];

  @override
  Future<void> initialize(Map<String, dynamic> config) async {
    // Initialize analytics service
    final apiKey = config['api_key'] as String?;
    if (apiKey == null) {
      throw PluginException('Analytics API key is required');
    }
    
    _analyticsService = AnalyticsService(apiKey);
    await _analyticsService.initialize();
  }

  @override
  Future<PluginResult> execute(PluginInput input) async {
    final stopwatch = Stopwatch()..start();

    try {
      final text = input.currentValue?.toString() ?? '';
      if (text.isEmpty) {
        return PluginResult.failure(
          errorMessage: 'No text provided for analysis',
          executionTimeMs: stopwatch.elapsedMilliseconds,
        );
      }

      final analysis = await _analyticsService.analyzeText(text);

      return PluginResult.success(
        value: {
          'sentiment': analysis.sentiment,
          'confidence': analysis.confidence,
          'topics': analysis.topics,
          'language': analysis.detectedLanguage,
        },
        executionTimeMs: stopwatch.elapsedMilliseconds,
      );
    } catch (e) {
      return PluginResult.failure(
        errorMessage: 'Analytics failed: $e',
        executionTimeMs: stopwatch.elapsedMilliseconds,
      );
    }
  }

  @override
  Future<PluginValidationResult> validateInput(Map<String, dynamic> parameters) async {
    final errors = <String>[];
    
    if (!parameters.containsKey('text_field') && !parameters.containsKey('text_value')) {
      errors.add('Either text_field or text_value must be specified');
    }

    return errors.isEmpty
        ? PluginValidationResult.valid()
        : PluginValidationResult.invalid(errors: errors);
  }

  @override
  Map<String, dynamic> getInputSchema() {
    return {
      'type': 'object',
      'properties': {
        'text_field': {'type': 'string'},
        'text_value': {'type': 'string'},
        'analysis_type': {
          'type': 'string',
          'enum': ['sentiment', 'topics', 'full'],
          'default': 'full',
        },
      },
      'oneOf': [
        {'required': ['text_field']},
        {'required': ['text_value']},
      ],
    };
  }

  @override
  Map<String, dynamic> getOutputSchema() {
    return {
      'type': 'object',
      'properties': {
        'sentiment': {'type': 'string'},
        'confidence': {'type': 'number'},
        'topics': {'type': 'array', 'items': {'type': 'string'}},
        'language': {'type': 'string'},
      },
    };
  }
}

Register Custom Plugin

// Register custom plugin
final customPlugin = CustomAnalyticsPlugin();
await pluginManager.registerPlugin(
  customPlugin,
  config: {
    'api_key': Platform.environment['ANALYTICS_API_KEY'],
    'timeout_ms': 30000,
  },
);

// Use in automation rule
final rule = AutomationRule(
  name: 'analyze_content_sentiment',
  trigger: OnChangeTrigger(field: 'content'),
  action: PluginAutomationActionInterface(
    pluginId: 'custom.analytics',
    parameters: {
      'text_field': 'content',
      'analysis_type': 'sentiment',
    },
    pluginManager: pluginManager,
  ),
);

Plugin Monitoring and Management

Health Monitoring

// Monitor plugin health
pluginManager.events.listen((event) {
  switch (event.type) {
    case 'plugin_executed':
      final success = event.data['success'] as bool;
      final executionTime = event.data['executionTimeMs'] as int;
      
      if (!success) {
        logger.warning('Plugin ${event.pluginId} execution failed');
      }
      
      if (executionTime > 10000) {
        logger.warning('Plugin ${event.pluginId} slow execution: ${executionTime}ms');
      }
      break;
      
    case 'plugin_health_changed':
      final healthy = event.data['healthy'] as bool;
      if (!healthy) {
        logger.error('Plugin ${event.pluginId} became unhealthy');
        // Implement alerting logic
      }
      break;
  }
});

// Check plugin health status
final healthStatuses = pluginManager.getHealthStatuses();
for (final entry in healthStatuses.entries) {
  if (!entry.value) {
    logger.error('Plugin ${entry.key} is unhealthy');
  }
}

Performance Metrics

// Collect plugin performance metrics
class PluginMetricsCollector {
  final MetricsRegistry _registry;
  
  void recordPluginExecution({
    required String pluginId,
    required bool success,
    required Duration executionTime,
    String? tenantId,
  }) {
    _registry.counter('plugin_executions_total')
        .labels({'plugin_id': pluginId, 'success': success.toString()})
        .increment();
    
    _registry.histogram('plugin_execution_duration_ms')
        .labels({'plugin_id': pluginId})
        .observe(executionTime.inMilliseconds.toDouble());
    
    if (tenantId != null) {
      _registry.counter('plugin_executions_by_tenant')
          .labels({'plugin_id': pluginId, 'tenant_id': tenantId})
          .increment();
    }
  }
}

Plugin Security and Compliance

Resource Limits

class ResourceLimitedPlugin extends AutomationPlugin {
  @override
  Future<PluginResult> execute(PluginInput input) async {
    return await ResourceLimiter.executeWithLimits(
      maxMemoryMB: 512,
      maxExecutionTimeMs: 30000,
      maxCpuPercent: 80,
      operation: () => _executePlugin(input),
    );
  }
}

Data Sanitization

class PrivacyAwarePlugin extends AutomationPlugin {
  final DataSanitizer _sanitizer;
  
  @override
  Future<PluginResult> execute(PluginInput input) async {
    // Sanitize input data
    final sanitizedInput = await _sanitizer.sanitize(input);
    
    // Execute with sanitized data
    final result = await _executeSecurely(sanitizedInput);
    
    // Ensure no sensitive data in result
    return await _sanitizer.sanitizeResult(result);
  }
}

Audit Logging

class AuditablePlugin extends AutomationPlugin {
  @override
  Future<PluginResult> execute(PluginInput input) async {
    final auditId = _generateAuditId();
    
    // Log execution start
    await _auditLogger.logPluginExecution(
      auditId: auditId,
      pluginId: id,
      tenantId: input.tenantId,
      action: 'execute_start',
      inputHash: _hashInput(input),
      timestamp: DateTime.now(),
    );
    
    try {
      final result = await _executeWithAudit(input);
      
      // Log successful execution
      await _auditLogger.logPluginExecution(
        auditId: auditId,
        pluginId: id,
        action: 'execute_success',
        resultHash: _hashResult(result),
        executionTimeMs: result.executionTimeMs,
        timestamp: DateTime.now(),
      );
      
      return result;
    } catch (e) {
      // Log execution failure
      await _auditLogger.logPluginExecution(
        auditId: auditId,
        pluginId: id,
        action: 'execute_failure',
        error: e.toString(),
        timestamp: DateTime.now(),
      );
      
      rethrow;
    }
  }
}

Plugin Best Practices

1. Design Principles

  • Single Responsibility: Each plugin should have a focused purpose
  • Stateless Design: Plugins should be stateless for better scalability
  • Idempotent Operations: Same input should produce same output
  • Error Resilience: Handle failures gracefully with meaningful error messages

2. Performance Optimization

  • Caching: Cache expensive operations when appropriate
  • Resource Pooling: Reuse expensive resources like ML models
  • Batch Processing: Process multiple items efficiently when possible
  • Async Operations: Use async/await for non-blocking execution

3. Security Considerations

  • Input Validation: Always validate and sanitize inputs
  • Resource Limits: Implement timeouts and resource constraints
  • Data Privacy: Handle sensitive data according to privacy policies
  • Access Control: Respect user permissions and tenant boundaries

4. Testing Strategies

  • Unit Testing: Test plugin logic in isolation
  • Integration Testing: Test with automation engine
  • Mock Testing: Use mocks for external services
  • Performance Testing: Validate performance under load

Plugin Configuration Reference

{
  "plugins": {
    "vektagraf.embeddings": {
      "model": "sentence_transformer",
      "model_name": "all-MiniLM-L6-v2",
      "dimensions": 384,
      "normalize": true,
      "batch_size": 32,
      "timeout_ms": 30000
    },
    "vektagraf.text_generation": {
      "model": "openai",
      "model_name": "gpt-3.5-turbo",
      "api_key": "${OPENAI_API_KEY}",
      "max_tokens": 1000,
      "temperature": 0.7,
      "timeout_ms": 60000
    },
    "custom.analytics": {
      "api_key": "${ANALYTICS_API_KEY}",
      "endpoint": "https://api.analytics.com/v1",
      "timeout_ms": 30000,
      "retry_attempts": 3
    }
  },
  "plugin_manager": {
    "health_check_interval": "5m",
    "max_concurrent_executions": 10,
    "default_timeout_ms": 30000,
    "enable_metrics": true,
    "enable_audit_logging": true
  }
}

Summary

The VektaGraf Automation system provides a powerful, declarative approach to data lifecycle management that enables:

  • Compliance Automation: Automatically handle GDPR, data retention, and regulatory requirements
  • Lifecycle Management: Implement user account lifecycle and data archival policies
  • Security Enforcement: Automate security-related data transformations
  • AI Integration: Generate embeddings, create content, and perform intelligent data processing
  • External Service Integration: Connect to APIs and external systems through plugins
  • Operational Efficiency: Reduce manual data management overhead

Key Takeaways

  1. Schema-Driven: Automation rules are defined directly in your schema for version control and deployment consistency
  2. Plugin-Extensible: Extend automation capabilities with AI and external service plugins
  3. Mandate Documentation: All automations require proper authority and approval documentation
  4. Testing First: Always use dry-run mode and validation tools before production deployment
  5. Performance Aware: Configure appropriate batch sizes and monitoring for production workloads
  6. Security Conscious: Respect field permissions and enable comprehensive auditing
  7. AI-Enhanced: Built-in plugins for embeddings and text generation enable intelligent automation
  8. Compliance Ready: Built-in audit trails and reporting support regulatory requirements

Next Steps

  • Chapter 19: Learn about production deployment patterns for automation systems
  • Chapter 20: Explore scaling strategies for high-volume automation workloads
  • Appendix II: Reference complete configuration options and API documentation

The automation system integrates seamlessly with VektaGraf's security, multi-tenancy, and job queue systems to provide a comprehensive data management solution that scales from simple use cases to enterprise-grade compliance requirements.