Skip to main content

Chapter 15: Job Queue System

Overview

The Vektagraf Job Queue System is a comprehensive background job processing framework. It provides a single, tenant-aware system for handling all types of background work including notifications, scheduled tasks, delayed jobs, and recurring operations.

Key Features

  • Unified Job Model: Single SystemJob class handles all job types
  • Tenant Isolation: Complete multitenancy with per-tenant limits and monitoring
  • Priority Processing: Priority-based job execution with tenant tier consideration
  • Persistent Storage: Jobs survive server restarts with automatic recovery
  • Real-time Monitoring: Comprehensive analytics and health monitoring
  • Retry Logic: Exponential backoff for failed jobs with configurable limits
  • Extensible Handlers: Plugin architecture for custom job processors

Architecture

Core Components

┌─────────────────────────────────────────────────────────────┐
│                VektagrafJobService                          │
│  ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐  │
│  │ JobQueueManager │ │ TenantMiddleware│ │ JobMonitoring│  │
│  └─────────────────┘ └─────────────────┘ └──────────────┘  │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    SystemJob Model                          │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────┐ │
│  │Notification │ │ Scheduled   │ │ Delayed     │ │Recurring│ │
│  │    Jobs     │ │    Jobs     │ │   Jobs      │ │  Jobs  │ │
│  └─────────────┘ └─────────────┘ └─────────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                 Tenant-Aware Processing                     │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────┐ │
│  │   Tier 1    │ │   Tier 2    │ │   Tier 3    │ │ Tier 4 │ │
│  │   Tenants   │ │   Tenants   │ │   Tenants   │ │Tenants │ │
│  └─────────────┘ └─────────────┘ └─────────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────┘

Job Lifecycle

graph TD
    A[Job Created] --> B{Scheduled?}
    B -->|Yes| C[Pending Status]
    B -->|No| D[Queued Status]
    C --> E[Timer Triggers]
    E --> D
    D --> F{Tenant Limits OK?}
    F -->|No| G[Limit Exception]
    F -->|Yes| H[Processing Status]
    H --> I[Execute Handler]
    I --> J{Success?}
    J -->|Yes| K[Completed Status]
    J -->|No| L{Retries Left?}
    L -->|Yes| M[Retry with Backoff]
    L -->|No| N[Failed Status]
    M --> D
    K --> O{Recurring?}
    O -->|Yes| P[Schedule Next]
    O -->|No| Q[End]
    P --> C
    N --> Q
    G --> Q

Job Types and Handlers

SystemJob Model

The unified SystemJob model supports all job types:

class SystemJob {
  final VektagrafId id;
  final String type;              // notification, scheduled, immediate, recurring
  final String handler;           // Handler name to process the job
  final Map<String, dynamic> payload;
  final JobPriority priority;     // low, normal, high, urgent, critical
  final String status;            // pending, queued, processing, completed, failed
  final VektagrafId? tenantId;    // Tenant isolation
  final DateTime? scheduledFor;   // For delayed/scheduled jobs
  final String? cronSchedule;     // For recurring jobs
  final int maxRetries;
  final int currentRetries;
  final DateTime? lastAttempt;
  final DateTime? completedAt;
  final JobExecutionResult? lastResult;
  final Map<String, dynamic> metadata;
  final DateTime createdAt;
  final DateTime updatedAt;
}

Job Types

1. Notification Jobs

Handle all types of notifications (email, SMS, push, webhook):

await jobService.queueNotification(
  type: 'email',
  recipient: 'user@example.com',
  subject: 'Welcome!',
  body: 'Thank you for signing up',
  tenantId: tenantId,
  priority: JobPriority.high,
);

2. Scheduled Jobs

Execute at specific times using cron expressions:

await jobService.queueScheduledJob(
  name: 'Daily Report',
  cronExpression: '0 9 * * *', // 9 AM daily
  handler: 'report_generator',
  parameters: {'type': 'usage'},
  tenantId: tenantId,
);

3. Delayed Jobs

Execute after a specific delay:

await jobService.queueDelayedJob(
  handler: 'data_processor',
  payload: {'batch_id': 'batch-001'},
  delay: Duration(minutes: 30),
  tenantId: tenantId,
);

4. Recurring Jobs

Execute repeatedly based on cron schedule:

await jobService.queueJob(
  type: JobType.recurring,
  handler: 'cleanup_handler',
  payload: {'retention_days': 30},
  cronSchedule: '0 2 * * 0', // Weekly on Sunday at 2 AM
  tenantId: tenantId,
);

Custom Job Handlers

Create custom handlers by implementing the JobHandler interface:

class CustomDataProcessor implements JobHandler {
  @override
  String get handlerName => 'custom_data_processor';

  @override
  Set<String> get supportedJobTypes => {
    JobType.immediate,
    JobType.scheduled,
  };

  @override
  bool validatePayload(Map<String, dynamic> payload) {
    return payload.containsKey('data_id') && 
           payload.containsKey('processing_type');
  }

  @override
  Duration? estimateExecutionTime(Map<String, dynamic> payload) {
    final type = payload['processing_type'] as String;
    switch (type) {
      case 'quick': return Duration(seconds: 30);
      case 'standard': return Duration(minutes: 5);
      case 'deep': return Duration(minutes: 30);
      default: return Duration(minutes: 10);
    }
  }

  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final startTime = DateTime.now();
    
    try {
      final payload = context.job.payload;
      final dataId = payload['data_id'] as String;
      final processingType = payload['processing_type'] as String;
      
      // Access tenant context
      final tenantId = context.tenantId;
      final tenantUsage = context.tenantContext['usage'];
      
      // Perform processing
      final result = await processData(dataId, processingType);
      
      return JobExecutionResult(
        success: true,
        executedAt: startTime,
        executionTime: DateTime.now().difference(startTime),
        result: {
          'processed_records': result.recordCount,
          'output_file': result.outputPath,
        },
        metadata: {
          'tenant_id': tenantId,
          'processing_type': processingType,
        },
      );
    } catch (e) {
      return JobExecutionResult(
        success: false,
        errorMessage: e.toString(),
        executedAt: startTime,
        executionTime: DateTime.now().difference(startTime),
      );
    }
  }
}

// Register the handler
jobService.registerHandler(CustomDataProcessor());

Tenant-Aware Job Processing

Tenant Isolation

Jobs are completely isolated by tenant:

  • Queue Separation: Each tenant has its own logical queue
  • Resource Limits: Per-tenant limits on concurrent jobs, queue size, and daily usage
  • Priority Processing: Higher-tier tenants get priority processing
  • Usage Tracking: Detailed analytics per tenant

Tenant Job Limits

Configure job limits per tenant tier:

class JobLimitConfig {
  final int? maxQueuedJobs;        // Max jobs in queue
  final int? maxConcurrentJobs;    // Max concurrent execution
  final int? maxDailyJobs;         // Daily execution limit
  final int? maxJobPayloadSize;    // Max payload size in bytes
  final Set<String> allowedJobTypes; // Allowed job types
  final Duration? maxJobDuration;  // Max execution time
  final JobPriority defaultJobPriority;
}

Tier-Based Limits

Tier Queued Jobs Concurrent Daily Limit Allowed Types
Tier 1 10 2 100 notifications
Tier 2 50 5 1,000 notifications, scheduled, maintenance
Tier 3 200 20 10,000 + cleanup, data_sync
Tier 4 1,000 100 100,000 + file_processing

Limit Enforcement

// Automatic limit checking before job queuing
try {
  await jobService.queueJob(
    type: JobType.notification,
    handler: 'email_handler',
    payload: emailData,
    tenantId: tenantId,
  );
} catch (TenantLimitException e) {
  print('Limit exceeded: ${e.message}');
  print('Limit type: ${e.limitType}');
  print('Current usage: ${e.currentUsage}');
}

Job Limits and Configuration

Configuring Job Limits

Job limits are configured as part of the tenant configuration:

final tenantConfig = TenantConfig.forTier(
  tenantId: 'premium-tenant',
  name: 'Premium Customer',
  tier: TenantLimitTier.tier3,
  customSettings: {
    'jobLimits': {
      'maxQueuedJobs': 500,
      'maxConcurrentJobs': 50,
      'maxDailyJobs': 20000,
      'maxJobPayloadSize': 2 * 1024 * 1024, // 2MB
      'allowedJobTypes': [
        'notification',
        'scheduled',
        'maintenance',
        'cleanup',
        'data_sync',
        'custom_processing'
      ],
      'maxJobDuration': Duration(hours: 2).inMilliseconds,
      'defaultJobPriority': 'high',
    }
  },
  dbConfig: dbConfig,
);

Dynamic Limit Updates

Update tenant limits at runtime:

final updatedConfig = tenantConfig.copyWith(
  jobLimit: JobLimitConfig(
    maxQueuedJobs: 1000,
    maxConcurrentJobs: 100,
    maxDailyJobs: 50000,
    // ... other limits
  ),
);

tenantManager.setTenantConfig(updatedConfig);

Limit Monitoring

Monitor limit usage in real-time:

// Get current usage
final usage = await jobService.getTenantJobUsage(tenantId);
print('Queue usage: ${usage.queuedJobs}/${usage.limits['maxQueuedJobs']}');
print('Daily usage: ${usage.currentUsage['dailyJobs']}/${usage.limits['maxDailyJobs']}');

// Check for warnings
if (usage.warnings.isNotEmpty) {
  print('Warnings: ${usage.warnings.join(', ')}');
}

Monitoring and Analytics

Real-Time Metrics

The system provides comprehensive real-time monitoring:

// Listen to job updates
jobService.jobUpdates.listen((job) {
  print('Job ${job.id} status: ${job.status}');
  if (job.status == JobStatus.failed) {
    print('Error: ${job.lastResult?.errorMessage}');
  }
});

// Listen to tenant metrics
jobService.tenantMetricsUpdates.listen((metrics) {
  print('Tenant ${metrics.tenantId}:');
  print('  Success Rate: ${(metrics.successRate * 100).toStringAsFixed(1)}%');
  print('  Avg Execution: ${metrics.avgExecutionTime.inMilliseconds}ms');
  print('  Queue Length: ${metrics.queuedJobs}');
});

// Listen to system metrics
jobService.systemMetricsUpdates.listen((stats) {
  print('System Stats:');
  print('  Total Jobs: ${stats.totalJobs}');
  print('  Success Rate: ${(stats.systemSuccessRate * 100).toStringAsFixed(1)}%');
  print('  Active Tenants: ${stats.activeTenants}');
});

Tenant Usage Analytics

Get detailed tenant usage statistics:

final usage = await jobService.getTenantJobUsage(
  tenantId,
  startDate: DateTime.now().subtract(Duration(days: 7)),
  endDate: DateTime.now(),
);

print('7-Day Usage Summary:');
print('Total Jobs: ${usage.totalJobs}');
print('Success Rate: ${(usage.successRate * 100).toStringAsFixed(1)}%');
print('Failure Rate: ${(usage.failureRate * 100).toStringAsFixed(1)}%');
print('Avg Execution Time: ${usage.avgExecutionTime.inMilliseconds}ms');

print('\nJobs by Type:');
usage.jobsByType.forEach((type, count) {
  print('  $type: $count');
});

print('\nJobs by Status:');
usage.jobsByStatus.forEach((status, count) {
  print('  $status: $count');
});

Performance Metrics

Analyze job performance by type and handler:

final metrics = await jobService.getJobPerformanceMetrics(
  JobType.notification,
  'email_handler',
  startDate: DateTime.now().subtract(Duration(days: 30)),
);

print('Email Handler Performance (30 days):');
print('Total Executions: ${metrics.totalExecutions}');
print('Success Rate: ${(metrics.successRate * 100).toStringAsFixed(1)}%');
print('Avg Execution Time: ${metrics.avgExecutionTime.inMilliseconds}ms');
print('P95 Execution Time: ${metrics.p95ExecutionTime.inMilliseconds}ms');

print('\nError Breakdown:');
metrics.errorsByType.forEach((error, count) {
  print('  $error: $count');
});

Health Monitoring

Monitor tenant job health:

final health = await jobService.getTenantJobHealth(tenantId);

print('Tenant Health Report:');
print('Health Score: ${health['healthScore']}/100');
print('Health Status: ${health['healthStatus']}');
print('Success Rate: ${(health['successRate'] * 100).toStringAsFixed(1)}%');
print('Queue Length: ${health['queueLength']}');

print('\nRecommendations:');
final recommendations = health['recommendations'] as List<String>;
recommendations.forEach((rec) => print('  • $rec'));

Trend Analysis

Usage Examples

Basic Job Operations

// Initialize the job service
final jobService = VektagrafJobService(
  database,
  tenantMiddleware: tenantMiddleware,
  tenantManager: tenantManager,
);

await jobService.initialize();

// Queue a simple notification
final job = await jobService.queueNotification(
  type: 'email',
  recipient: 'user@example.com',
  subject: 'Account Created',
  body: 'Welcome to our platform!',
  tenantId: tenantId,
);

print('Queued job: ${job.id}');

Batch Job Processing

// Queue multiple jobs with tenant limits
final jobSpecs = [
  {
    'type': JobType.notification,
    'handler': 'email_handler',
    'payload': {'recipient': 'user1@example.com', 'template': 'welcome'},
  },
  {
    'type': JobType.notification,
    'handler': 'sms_handler',
    'payload': {'recipient': '+1234567890', 'message': 'Welcome!'},
  },
  {
    'type': JobType.scheduled,
    'handler': 'report_generator',
    'payload': {'report_type': 'usage'},
    'scheduledFor': DateTime.now().add(Duration(hours: 1)),
  },
];

final jobs = await jobService.queueBatchJobs(
  jobSpecs: jobSpecs,
  tenantId: tenantId,
  priority: JobPriority.normal,
);

print('Queued ${jobs.length} jobs');

Error Handling and Retry

// Queue job with custom retry logic
final job = await jobService.queueJob(
  type: JobType.immediate,
  handler: 'external_api_call',
  payload: {'endpoint': 'https://api.example.com/webhook'},
  tenantId: tenantId,
  maxRetries: 5,
  metadata: {
    'retry_strategy': 'exponential_backoff',
    'max_delay': Duration(minutes: 30).inMilliseconds,
  },
);

// Monitor job progress
jobService.jobUpdates
    .where((j) => j.id == job.id)
    .listen((updatedJob) {
  switch (updatedJob.status) {
    case JobStatus.processing:
      print('Job started processing');
      break;
    case JobStatus.completed:
      print('Job completed successfully');
      break;
    case JobStatus.failed:
      if (updatedJob.canRetry) {
        print('Job failed, will retry (${updatedJob.currentRetries}/${updatedJob.maxRetries})');
      } else {
        print('Job failed permanently: ${updatedJob.lastResult?.errorMessage}');
      }
      break;
  }
});

Maintenance Jobs

// Create standard maintenance jobs for a tenant
final maintenanceJobs = await jobService.createMaintenanceJobs(tenantId);

print('Created ${maintenanceJobs.length} maintenance jobs:');
maintenanceJobs.forEach((job) {
  print('  ${job.payload['name']}: ${job.cronSchedule}');
});

// Create custom cleanup job
await jobService.queueScheduledJob(
  name: 'Custom Data Cleanup',
  cronExpression: '0 3 1 * *', // First day of month at 3 AM
  handler: 'custom_cleanup_handler',
  parameters: {
    'data_types': ['logs', 'temp_files', 'cache'],
    'retention_policy': {
      'logs': Duration(days: 90).inDays,
      'temp_files': Duration(days: 7).inDays,
      'cache': Duration(days: 1).inDays,
    },
  },
  tenantId: tenantId,
);

Best Practices

1. Job Design

Keep Jobs Idempotent

// Good: Idempotent job that can be safely retried
class IdempotentEmailHandler implements JobHandler {
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final payload = context.job.payload;
    final messageId = payload['message_id'] as String;
    
    // Check if already sent
    if (await isMessageAlreadySent(messageId)) {
      return JobExecutionResult(
        success: true,
        executedAt: DateTime.now(),
        executionTime: Duration.zero,
        result: {'status': 'already_sent'},
      );
    }
    
    // Send message
    return await sendMessage(payload);
  }
}

Use Appropriate Timeouts

class TimeoutAwareHandler implements JobHandler {
  @override
  Duration? estimateExecutionTime(Map<String, dynamic> payload) {
    final complexity = payload['complexity'] as String? ?? 'normal';
    switch (complexity) {
      case 'simple': return Duration(seconds: 30);
      case 'normal': return Duration(minutes: 5);
      case 'complex': return Duration(minutes: 30);
      default: return Duration(minutes: 10);
    }
  }
  
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final timeout = estimateExecutionTime(context.job.payload) ?? Duration(minutes: 10);
    
    return await Future.timeout(
      performWork(context),
      timeout,
      onTimeout: () => JobExecutionResult(
        success: false,
        errorMessage: 'Job timed out after ${timeout.inMinutes} minutes',
        executedAt: DateTime.now(),
        executionTime: timeout,
      ),
    );
  }
}

2. Error Handling

Categorize Errors for Retry Logic

class SmartRetryHandler implements JobHandler {
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    try {
      return await performOperation(context);
    } catch (e) {
      final shouldRetry = _shouldRetryError(e);
      
      return JobExecutionResult(
        success: false,
        errorMessage: e.toString(),
        executedAt: DateTime.now(),
        executionTime: DateTime.now().difference(context.actualStartTime),
        metadata: {
          'error_type': e.runtimeType.toString(),
          'retryable': shouldRetry,
        },
      );
    }
  }
  
  bool _shouldRetryError(dynamic error) {
    // Retry on network errors, timeouts, temporary failures
    if (error is SocketException || 
        error is TimeoutException ||
        error is HttpException) {
      return true;
    }
    
    // Don't retry on validation errors, auth failures
    if (error is ArgumentError || 
        error is UnauthorizedException) {
      return false;
    }
    
    return true; // Default to retry
  }
}

3. Performance Optimization

Batch Similar Operations

class BatchEmailHandler implements JobHandler {
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final payload = context.job.payload;
    final recipients = payload['recipients'] as List<String>;
    
    // Process in batches of 50
    const batchSize = 50;
    final results = <String, bool>{};
    
    for (int i = 0; i < recipients.length; i += batchSize) {
      final batch = recipients.skip(i).take(batchSize).toList();
      final batchResults = await sendEmailBatch(batch, payload);
      results.addAll(batchResults);
    }
    
    final successCount = results.values.where((success) => success).length;
    
    return JobExecutionResult(
      success: successCount == recipients.length,
      executedAt: DateTime.now(),
      executionTime: DateTime.now().difference(context.actualStartTime),
      result: {
        'total_recipients': recipients.length,
        'successful_sends': successCount,
        'failed_sends': recipients.length - successCount,
      },
    );
  }
}

Use Connection Pooling

class DatabaseJobHandler implements JobHandler {
  static final ConnectionPool _pool = ConnectionPool();
  
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final connection = await _pool.getConnection();
    
    try {
      final result = await performDatabaseOperation(connection, context.job.payload);
      return JobExecutionResult(
        success: true,
        executedAt: DateTime.now(),
        executionTime: DateTime.now().difference(context.actualStartTime),
        result: result,
      );
    } finally {
      _pool.releaseConnection(connection);
    }
  }
}

4. Monitoring and Alerting

Set Up Health Checks

// Monitor system health
Timer.periodic(Duration(minutes: 5), (_) async {
  final queueStatus = await jobService.getJobQueueStatus();
  final systemStats = await jobService.getSystemJobStatistics();
  
  // Alert on high queue backlog
  if (queueStatus['queuedJobs'] > 1000) {
    await sendAlert('High queue backlog: ${queueStatus['queuedJobs']} jobs');
  }
  
  // Alert on low success rate
  if (systemStats.systemSuccessRate < 0.9) {
    await sendAlert('Low system success rate: ${(systemStats.systemSuccessRate * 100).toStringAsFixed(1)}%');
  }
});

Monitor Tenant Health

// Check tenant health scores
for (final tenantId in activeTenants) {
  final health = await jobService.getTenantJobHealth(tenantId);
  final healthScore = health['healthScore'] as double;
  
  if (healthScore < 70) {
    await notifyTenantAdmin(tenantId, {
      'health_score': healthScore,
      'recommendations': health['recommendations'],
    });
  }
}

Troubleshooting

Common Issues

1. Jobs Stuck in Queue

Symptoms:

  • Jobs remain in queued status for extended periods
  • Queue length continuously growing

Diagnosis:

final queueStatus = await jobService.getJobQueueStatus();
print('Queue backlog: ${queueStatus['queuedJobs']}');
print('Oldest job age: ${queueStatus['oldestQueuedJobAge']}ms');

// Check tenant limits
final usage = await jobService.getTenantJobUsage(tenantId);
if (usage.warnings.isNotEmpty) {
  print('Tenant warnings: ${usage.warnings}');
}

Solutions:

  • Check tenant concurrent job limits
  • Verify job handlers are registered
  • Increase processing capacity
  • Review job priorities

2. High Failure Rate

Symptoms:

  • Many jobs failing with similar errors
  • Low success rate in metrics

Diagnosis:

final metrics = await jobService.getJobPerformanceMetrics(
  jobType,
  handlerName,
);

print('Failure rate: ${(metrics.failureRate * 100).toStringAsFixed(1)}%');
print('Common errors:');
metrics.errorsByType.forEach((error, count) {
  print('  $error: $count occurrences');
});

Solutions:

  • Review error patterns in job results
  • Update retry logic for transient failures
  • Fix handler implementation issues
  • Adjust timeout values

3. Memory Issues

Symptoms:

  • Increasing memory usage over time
  • Out of memory errors

Diagnosis:

// Monitor job queue sizes
final systemStats = await jobService.getSystemJobStatistics();
print('Total active jobs: ${systemStats.totalRunningJobs + systemStats.totalQueuedJobs}');

// Check for job leaks
final oldJobs = await jobService.getJobsByStatus(JobStatus.processing);
final stuckJobs = oldJobs.where((job) => 
  DateTime.now().difference(job.updatedAt).inHours > 1
).toList();

if (stuckJobs.isNotEmpty) {
  print('Found ${stuckJobs.length} stuck jobs');
}

Solutions:

  • Implement job cleanup policies
  • Set appropriate job timeouts
  • Monitor for stuck jobs
  • Optimize job payload sizes

4. Tenant Limit Issues

Symptoms:

  • TenantLimitException errors
  • Jobs rejected due to limits

Diagnosis:

try {
  await jobService.queueJob(/* ... */);
} catch (TenantLimitException e) {
  print('Limit exceeded: ${e.limitType}');
  print('Current usage: ${e.currentUsage}');
  
  final usage = await jobService.getTenantJobUsage(e.tenantId);
  print('Daily usage: ${usage.currentUsage['dailyJobs']}/${usage.limits['maxDailyJobs']}');
  print('Queue usage: ${usage.queuedJobs}/${usage.limits['maxQueuedJobs']}');
}

Solutions:

  • Review tenant tier assignments
  • Adjust limits for high-usage tenants
  • Implement job prioritization
  • Optimize job efficiency

Debugging Tools

Job Inspection

// Get detailed job information
final job = await jobService.getJobById(jobId);
if (job != null) {
  print('Job Details:');
  print('  Status: ${job.status}');
  print('  Created: ${job.createdAt}');
  print('  Last Attempt: ${job.lastAttempt}');
  print('  Retries: ${job.currentRetries}/${job.maxRetries}');
  
  if (job.lastResult != null) {
    print('  Last Result: ${job.lastResult!.success ? 'Success' : 'Failed'}');
    if (!job.lastResult!.success) {
      print('  Error: ${job.lastResult!.errorMessage}');
    }
  }
}

System Health Check

Future<Map<String, dynamic>> performHealthCheck() async {
  final queueStatus = await jobService.getJobQueueStatus();
  final systemStats = await jobService.getSystemJobStatistics();
  
  return {
    'queue_health': {
      'queued_jobs': queueStatus['queuedJobs'],
      'processing_jobs': queueStatus['processingJobs'],
      'oldest_job_age_minutes': queueStatus['oldestQueuedJobAge'] / (1000 * 60),
    },
    'system_health': {
      'success_rate': systemStats.systemSuccessRate,
      'total_jobs_24h': systemStats.totalJobs,
      'active_tenants': systemStats.activeTenants,
    },
    'warnings': systemStats.systemWarnings,
    'timestamp': DateTime.now().toIso8601String(),
  };
}

This comprehensive job queue system provides a robust foundation for handling all background processing needs in a multi-tenant environment with full monitoring, analytics, and operational capabilities.