Skip to main content

Chapter 17: Job Queue System

Overview

The Vektagraf Job Queue System is a comprehensive background job processing framework that replaces and extends the original NotificationQueue and ScheduledTaskManager. It provides a single, tenant-aware system for handling all types of background work including notifications, scheduled tasks, delayed jobs, and recurring operations.

Key Features

  • Job Model: Single SystemJob class handles all job types
  • Tenant Isolation: Complete multitenancy with per-tenant limits and monitoring
  • Priority Processing: Priority-based job execution with tenant tier consideration
  • Persistent Storage: Jobs survive server restarts with automatic recovery
  • Real-time Monitoring: Comprehensive analytics and health monitoring
  • Retry Logic: Exponential backoff for failed jobs with configurable limits
  • Extensible Handlers: Plugin architecture for custom job processors

Benefits Over Previous System

Feature Old System New Unified System
Job Types Separate queues for notifications and scheduled tasks Single queue handles all job types
Tenant Support Limited tenant awareness Full tenant isolation with limits
Monitoring Basic status tracking Comprehensive analytics and health scoring
Recovery Manual intervention required Automatic recovery on restart
Scalability Fixed processing capacity Dynamic, tenant-aware scaling
Extensibility Hard-coded handlers Plugin-based handler system

Architecture

Core Components

┌─────────────────────────────────────────────────────────────┐
│                VektagrafJobService                          │
│  ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐  │
│  │ JobQueueManager │ │ TenantMiddleware│ │ JobMonitoring│  │
│  └─────────────────┘ └─────────────────┘ └──────────────┘  │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    SystemJob Model                          │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────┐ │
│  │Notification │ │ Scheduled   │ │ Delayed     │ │Recurring│ │
│  │    Jobs     │ │    Jobs     │ │   Jobs      │ │  Jobs  │ │
│  └─────────────┘ └─────────────┘ └─────────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                 Tenant-Aware Processing                     │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────┐ │
│  │   Tier 1    │ │   Tier 2    │ │   Tier 3    │ │ Tier 4 │ │
│  │   Tenants   │ │   Tenants   │ │   Tenants   │ │Tenants │ │
│  └─────────────┘ └─────────────┘ └─────────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────┘

Job Lifecycle

graph TD
    A[Job Created] --> B{Scheduled?}
    B -->|Yes| C[Pending Status]
    B -->|No| D[Queued Status]
    C --> E[Timer Triggers]
    E --> D
    D --> F{Tenant Limits OK?}
    F -->|No| G[Limit Exception]
    F -->|Yes| H[Processing Status]
    H --> I[Execute Handler]
    I --> J{Success?}
    J -->|Yes| K[Completed Status]
    J -->|No| L{Retries Left?}
    L -->|Yes| M[Retry with Backoff]
    L -->|No| N[Failed Status]
    M --> D
    K --> O{Recurring?}
    O -->|Yes| P[Schedule Next]
    O -->|No| Q[End]
    P --> C
    N --> Q
    G --> Q

Job Types and Handlers

SystemJob Model

The unified SystemJob model supports all job types:

class SystemJob {
  final Identifier id;
  final String type;              // notification, scheduled, immediate, recurring
  final String handler;           // Handler name to process the job
  final Map<String, dynamic> payload;
  final JobPriority priority;     // low, normal, high, urgent, critical
  final String status;            // pending, queued, processing, completed, failed
  final Identifier? tenantId;    // Tenant isolation
  final DateTime? scheduledFor;   // For delayed/scheduled jobs
  final String? cronSchedule;     // For recurring jobs
  final int maxRetries;
  final int currentRetries;
  final DateTime? lastAttempt;
  final DateTime? completedAt;
  final JobExecutionResult? lastResult;
  final Map<String, dynamic> metadata;
  final DateTime createdAt;
  final DateTime updatedAt;
}

Job Types

1. Notification Jobs

Handle all types of notifications (email, SMS, push, webhook):

await jobService.queueNotification(
  type: 'email',
  recipient: 'user@example.com',
  subject: 'Welcome!',
  body: 'Thank you for signing up',
  tenantId: tenantId,
  priority: JobPriority.high,
);

2. Scheduled Jobs

Execute at specific times using cron expressions:

await jobService.queueScheduledJob(
  name: 'Daily Report',
  cronExpression: '0 9 * * *', // 9 AM daily
  handler: 'report_generator',
  parameters: {'type': 'usage'},
  tenantId: tenantId,
);

3. Delayed Jobs

Execute after a specific delay:

await jobService.queueDelayedJob(
  handler: 'data_processor',
  payload: {'batch_id': 'batch-001'},
  delay: Duration(minutes: 30),
  tenantId: tenantId,
);

4. Recurring Jobs

Execute repeatedly based on cron schedule:

await jobService.queueJob(
  type: JobType.recurring,
  handler: 'cleanup_handler',
  payload: {'retention_days': 30},
  cronSchedule: '0 2 * * 0', // Weekly on Sunday at 2 AM
  tenantId: tenantId,
);

Custom Job Handlers

Create custom handlers by implementing the JobHandlerInterface interface:

class CustomDataProcessor implements JobHandlerInterface {
  @override
  String get handlerName => 'custom_data_processor';

  @override
  Set<String> get supportedJobTypes => {
    JobType.immediate,
    JobType.scheduled,
  };

  @override
  bool validatePayload(Map<String, dynamic> payload) {
    return payload.containsKey('data_id') && 
           payload.containsKey('processing_type');
  }

  @override
  Duration? estimateExecutionTime(Map<String, dynamic> payload) {
    final type = payload['processing_type'] as String;
    switch (type) {
      case 'quick': return Duration(seconds: 30);
      case 'standard': return Duration(minutes: 5);
      case 'deep': return Duration(minutes: 30);
      default: return Duration(minutes: 10);
    }
  }

  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final startTime = DateTime.now();
    
    try {
      final payload = context.job.payload;
      final dataId = payload['data_id'] as String;
      final processingType = payload['processing_type'] as String;
      
      // Access tenant context
      final tenantId = context.tenantId;
      final tenantUsage = context.tenantContext['usage'];
      
      // Perform processing
      final result = await processData(dataId, processingType);
      
      return JobExecutionResult(
        success: true,
        executedAt: startTime,
        executionTime: DateTime.now().difference(startTime),
        result: {
          'processed_records': result.recordCount,
          'output_file': result.outputPath,
        },
        metadata: {
          'tenant_id': tenantId,
          'processing_type': processingType,
        },
      );
    } catch (e) {
      return JobExecutionResult(
        success: false,
        errorMessage: e.toString(),
        executedAt: startTime,
        executionTime: DateTime.now().difference(startTime),
      );
    }
  }
}

// Register the handler
jobService.registerHandler(CustomDataProcessor());

Tenant-Aware Job Processing

Tenant Isolation

Jobs are completely isolated by tenant:

  • Queue Separation: Each tenant has its own logical queue
  • Resource Limits: Per-tenant limits on concurrent jobs, queue size, and daily usage
  • Priority Processing: Higher-tier tenants get priority processing
  • Usage Tracking: Detailed analytics per tenant

Tenant Job Limits

Configure job limits per tenant tier:

class JobLimitConfig {
  final int? maxQueuedJobs;        // Max jobs in queue
  final int? maxConcurrentJobs;    // Max concurrent execution
  final int? maxDailyJobs;         // Daily execution limit
  final int? maxJobPayloadSize;    // Max payload size in bytes
  final Set<String> allowedJobTypes; // Allowed job types
  final Duration? maxJobDuration;  // Max execution time
  final JobPriority defaultJobPriority;
}

Tier-Based Limits

Tier Queued Jobs Concurrent Daily Limit Allowed Types
Tier 1 10 2 100 notifications
Tier 2 50 5 1,000 notifications, scheduled, maintenance
Tier 3 200 20 10,000 + cleanup, data_sync
Tier 4 1,000 100 100,000 + file_processing

Limit Enforcement

// Automatic limit checking before job queuing
try {
  await jobService.queueJob(
    type: JobType.notification,
    handler: 'email_handler',
    payload: emailData,
    tenantId: tenantId,
  );
} catch (TenantLimitException e) {
  print('Limit exceeded: ${e.message}');
  print('Limit type: ${e.limitType}');
  print('Current usage: ${e.currentUsage}');
}

Job Limits and Configuration

Configuring Job Limits

Job limits are configured as part of the tenant configuration:

final tenantConfig = TenantConfig.forTier(
  tenantId: 'premium-tenant',
  name: 'Premium Customer',
  tier: TenantLimitTier.tier3,
  customSettings: {
    'jobLimits': {
      'maxQueuedJobs': 500,
      'maxConcurrentJobs': 50,
      'maxDailyJobs': 20000,
      'maxJobPayloadSize': 2 * 1024 * 1024, // 2MB
      'allowedJobTypes': [
        'notification',
        'scheduled',
        'maintenance',
        'cleanup',
        'data_sync',
        'custom_processing'
      ],
      'maxJobDuration': Duration(hours: 2).inMilliseconds,
      'defaultJobPriority': 'high',
    }
  },
  dbConfig: dbConfig,
);

Dynamic Limit Updates

Update tenant limits at runtime:

final updatedConfig = tenantConfig.copyWith(
  jobLimit: JobLimitConfig(
    maxQueuedJobs: 1000,
    maxConcurrentJobs: 100,
    maxDailyJobs: 50000,
    // ... other limits
  ),
);

tenantManager.setTenantConfig(updatedConfig);

Limit Monitoring

Monitor limit usage in real-time:

// Get current usage
final usage = await jobService.getTenantJobUsage(tenantId);
print('Queue usage: ${usage.queuedJobs}/${usage.limits['maxQueuedJobs']}');
print('Daily usage: ${usage.currentUsage['dailyJobs']}/${usage.limits['maxDailyJobs']}');

// Check for warnings
if (usage.warnings.isNotEmpty) {
  print('Warnings: ${usage.warnings.join(', ')}');
}

Monitoring and Analytics

Real-Time Metrics

The system provides comprehensive real-time monitoring:

// Listen to job updates
jobService.jobUpdates.listen((job) {
  print('Job ${job.id} status: ${job.status}');
  if (job.status == JobStatus.failed) {
    print('Error: ${job.lastResult?.errorMessage}');
  }
});

// Listen to tenant metrics
jobService.tenantMetricsUpdates.listen((metrics) {
  print('Tenant ${metrics.tenantId}:');
  print('  Success Rate: ${(metrics.successRate * 100).toStringAsFixed(1)}%');
  print('  Avg Execution: ${metrics.avgExecutionTime.inMilliseconds}ms');
  print('  Queue Length: ${metrics.queuedJobs}');
});

// Listen to system metrics
jobService.systemMetricsUpdates.listen((stats) {
  print('System Stats:');
  print('  Total Jobs: ${stats.totalJobs}');
  print('  Success Rate: ${(stats.systemSuccessRate * 100).toStringAsFixed(1)}%');
  print('  Active Tenants: ${stats.activeTenants}');
});

Tenant Usage Analytics

Get detailed tenant usage statistics:

final usage = await jobService.getTenantJobUsage(
  tenantId,
  startDate: DateTime.now().subtract(Duration(days: 7)),
  endDate: DateTime.now(),
);

print('7-Day Usage Summary:');
print('Total Jobs: ${usage.totalJobs}');
print('Success Rate: ${(usage.successRate * 100).toStringAsFixed(1)}%');
print('Failure Rate: ${(usage.failureRate * 100).toStringAsFixed(1)}%');
print('Avg Execution Time: ${usage.avgExecutionTime.inMilliseconds}ms');

print('\nJobs by Type:');
usage.jobsByType.forEach((type, count) {
  print('  $type: $count');
});

print('\nJobs by Status:');
usage.jobsByStatus.forEach((status, count) {
  print('  $status: $count');
});

Performance Metrics

Analyze job performance by type and handler:

final metrics = await jobService.getJobPerformanceMetrics(
  JobType.notification,
  'email_handler',
  startDate: DateTime.now().subtract(Duration(days: 30)),
);

print('Email Handler Performance (30 days):');
print('Total Executions: ${metrics.totalExecutions}');
print('Success Rate: ${(metrics.successRate * 100).toStringAsFixed(1)}%');
print('Avg Execution Time: ${metrics.avgExecutionTime.inMilliseconds}ms');
print('P95 Execution Time: ${metrics.p95ExecutionTime.inMilliseconds}ms');

print('\nError Breakdown:');
metrics.errorsByType.forEach((error, count) {
  print('  $error: $count');
});

Health Monitoring

Monitor tenant job health:

final health = await jobService.getTenantJobHealth(tenantId);

print('Tenant Health Report:');
print('Health Score: ${health['healthScore']}/100');
print('Health Status: ${health['healthStatus']}');
print('Success Rate: ${(health['successRate'] * 100).toStringAsFixed(1)}%');
print('Queue Length: ${health['queueLength']}');

print('\nRecommendations:');
final recommendations = health['recommendations'] as List<String>;
recommendations.forEach((rec) => print('  • $rec'));

Trend Analysis

Usage Examples

Basic Job Operations

// Initialize the job service
final jobService = VektagrafJobService(
  database,
  tenantMiddleware: tenantMiddleware,
  tenantManager: tenantManager,
);

await jobService.initialize();

// Queue a simple notification
final job = await jobService.queueNotification(
  type: 'email',
  recipient: 'user@example.com',
  subject: 'Account Created',
  body: 'Welcome to our platform!',
  tenantId: tenantId,
);

print('Queued job: ${job.id}');

Batch Job Processing

// Queue multiple jobs with tenant limits
final jobSpecs = [
  {
    'type': JobType.notification,
    'handler': 'email_handler',
    'payload': {'recipient': 'user1@example.com', 'template': 'welcome'},
  },
  {
    'type': JobType.notification,
    'handler': 'sms_handler',
    'payload': {'recipient': '+1234567890', 'message': 'Welcome!'},
  },
  {
    'type': JobType.scheduled,
    'handler': 'report_generator',
    'payload': {'report_type': 'usage'},
    'scheduledFor': DateTime.now().add(Duration(hours: 1)),
  },
];

final jobs = await jobService.queueBatchJobs(
  jobSpecs: jobSpecs,
  tenantId: tenantId,
  priority: JobPriority.normal,
);

print('Queued ${jobs.length} jobs');

Error Handling and Retry

// Queue job with custom retry logic
final job = await jobService.queueJob(
  type: JobType.immediate,
  handler: 'external_api_call',
  payload: {'endpoint': 'https://api.example.com/webhook'},
  tenantId: tenantId,
  maxRetries: 5,
  metadata: {
    'retry_strategy': 'exponential_backoff',
    'max_delay': Duration(minutes: 30).inMilliseconds,
  },
);

// Monitor job progress
jobService.jobUpdates
    .where((j) => j.id == job.id)
    .listen((updatedJob) {
  switch (updatedJob.status) {
    case JobStatus.processing:
      print('Job started processing');
      break;
    case JobStatus.completed:
      print('Job completed successfully');
      break;
    case JobStatus.failed:
      if (updatedJob.canRetry) {
        print('Job failed, will retry (${updatedJob.currentRetries}/${updatedJob.maxRetries})');
      } else {
        print('Job failed permanently: ${updatedJob.lastResult?.errorMessage}');
      }
      break;
  }
});

Maintenance Jobs

// Create standard maintenance jobs for a tenant
final maintenanceJobs = await jobService.createMaintenanceJobs(tenantId);

print('Created ${maintenanceJobs.length} maintenance jobs:');
maintenanceJobs.forEach((job) {
  print('  ${job.payload['name']}: ${job.cronSchedule}');
});

// Create custom cleanup job
await jobService.queueScheduledJob(
  name: 'Custom Data Cleanup',
  cronExpression: '0 3 1 * *', // First day of month at 3 AM
  handler: 'custom_cleanup_handler',
  parameters: {
    'data_types': ['logs', 'temp_files', 'cache'],
    'retention_policy': {
      'logs': Duration(days: 90).inDays,
      'temp_files': Duration(days: 7).inDays,
      'cache': Duration(days: 1).inDays,
    },
  },
  tenantId: tenantId,
);

Best Practices

1. Job Design

Keep Jobs Idempotent

// Good: Idempotent job that can be safely retried
class IdempotentEmailHandler implements JobHandlerInterface {
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final payload = context.job.payload;
    final messageId = payload['message_id'] as String;
    
    // Check if already sent
    if (await isMessageAlreadySent(messageId)) {
      return JobExecutionResult(
        success: true,
        executedAt: DateTime.now(),
        executionTime: Duration.zero,
        result: {'status': 'already_sent'},
      );
    }
    
    // Send message
    return await sendMessage(payload);
  }
}

Use Appropriate Timeouts

class TimeoutAwareHandler implements JobHandlerInterface {
  @override
  Duration? estimateExecutionTime(Map<String, dynamic> payload) {
    final complexity = payload['complexity'] as String? ?? 'normal';
    switch (complexity) {
      case 'simple': return Duration(seconds: 30);
      case 'normal': return Duration(minutes: 5);
      case 'complex': return Duration(minutes: 30);
      default: return Duration(minutes: 10);
    }
  }
  
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final timeout = estimateExecutionTime(context.job.payload) ?? Duration(minutes: 10);
    
    return await Future.timeout(
      performWork(context),
      timeout,
      onTimeout: () => JobExecutionResult(
        success: false,
        errorMessage: 'Job timed out after ${timeout.inMinutes} minutes',
        executedAt: DateTime.now(),
        executionTime: timeout,
      ),
    );
  }
}

2. Error Handling

Categorize Errors for Retry Logic

class SmartRetryHandler implements JobHandlerInterface {
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    try {
      return await performOperation(context);
    } catch (e) {
      final shouldRetry = _shouldRetryError(e);
      
      return JobExecutionResult(
        success: false,
        errorMessage: e.toString(),
        executedAt: DateTime.now(),
        executionTime: DateTime.now().difference(context.actualStartTime),
        metadata: {
          'error_type': e.runtimeType.toString(),
          'retryable': shouldRetry,
        },
      );
    }
  }
  
  bool _shouldRetryError(dynamic error) {
    // Retry on network errors, timeouts, temporary failures
    if (error is SocketException || 
        error is TimeoutException ||
        error is HttpException) {
      return true;
    }
    
    // Don't retry on validation errors, auth failures
    if (error is ArgumentError || 
        error is UnauthorizedException) {
      return false;
    }
    
    return true; // Default to retry
  }
}

3. Performance Optimization

Batch Similar Operations

class BatchEmailHandler implements JobHandlerInterface {
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final payload = context.job.payload;
    final recipients = payload['recipients'] as List<String>;
    
    // Process in batches of 50
    const batchSize = 50;
    final results = <String, bool>{};
    
    for (int i = 0; i < recipients.length; i += batchSize) {
      final batch = recipients.skip(i).take(batchSize).toList();
      final batchResults = await sendEmailBatch(batch, payload);
      results.addAll(batchResults);
    }
    
    final successCount = results.values.where((success) => success).length;
    
    return JobExecutionResult(
      success: successCount == recipients.length,
      executedAt: DateTime.now(),
      executionTime: DateTime.now().difference(context.actualStartTime),
      result: {
        'total_recipients': recipients.length,
        'successful_sends': successCount,
        'failed_sends': recipients.length - successCount,
      },
    );
  }
}

Use Connection Pooling

class DatabaseJobHandler implements JobHandlerInterface {
  static final ConnectionPool _pool = ConnectionPool();
  
  @override
  Future<JobExecutionResult> execute(JobExecutionContext context) async {
    final connection = await _pool.getConnection();
    
    try {
      final result = await performDatabaseOperation(connection, context.job.payload);
      return JobExecutionResult(
        success: true,
        executedAt: DateTime.now(),
        executionTime: DateTime.now().difference(context.actualStartTime),
        result: result,
      );
    } finally {
      _pool.releaseConnection(connection);
    }
  }
}

4. Monitoring and Alerting

Set Up Health Checks

// Monitor system health
Timer.periodic(Duration(minutes: 5), (_) async {
  final queueStatus = await jobService.getJobQueueStatus();
  final systemStats = await jobService.getSystemJobStatistics();
  
  // Alert on high queue backlog
  if (queueStatus['queuedJobs'] > 1000) {
    await sendAlert('High queue backlog: ${queueStatus['queuedJobs']} jobs');
  }
  
  // Alert on low success rate
  if (systemStats.systemSuccessRate < 0.9) {
    await sendAlert('Low system success rate: ${(systemStats.systemSuccessRate * 100).toStringAsFixed(1)}%');
  }
});

Monitor Tenant Health

// Check tenant health scores
for (final tenantId in activeTenants) {
  final health = await jobService.getTenantJobHealth(tenantId);
  final healthScore = health['healthScore'] as double;
  
  if (healthScore < 70) {
    await notifyTenantAdmin(tenantId, {
      'health_score': healthScore,
      'recommendations': health['recommendations'],
    });
  }
}

Troubleshooting

Common Issues

1. Jobs Stuck in Queue

Symptoms:

  • Jobs remain in queued status for extended periods
  • Queue length continuously growing

Diagnosis:

final queueStatus = await jobService.getJobQueueStatus();
print('Queue backlog: ${queueStatus['queuedJobs']}');
print('Oldest job age: ${queueStatus['oldestQueuedJobAge']}ms');

// Check tenant limits
final usage = await jobService.getTenantJobUsage(tenantId);
if (usage.warnings.isNotEmpty) {
  print('Tenant warnings: ${usage.warnings}');
}

Solutions:

  • Check tenant concurrent job limits
  • Verify job handlers are registered
  • Increase processing capacity
  • Review job priorities

2. High Failure Rate

Symptoms:

  • Many jobs failing with similar errors
  • Low success rate in metrics

Diagnosis:

final metrics = await jobService.getJobPerformanceMetrics(
  jobType,
  handlerName,
);

print('Failure rate: ${(metrics.failureRate * 100).toStringAsFixed(1)}%');
print('Common errors:');
metrics.errorsByType.forEach((error, count) {
  print('  $error: $count occurrences');
});

Solutions:

  • Review error patterns in job results
  • Update retry logic for transient failures
  • Fix handler implementation issues
  • Adjust timeout values

3. Memory Issues

Symptoms:

  • Increasing memory usage over time
  • Out of memory errors

Diagnosis:

// Monitor job queue sizes
final systemStats = await jobService.getSystemJobStatistics();
print('Total active jobs: ${systemStats.totalRunningJobs + systemStats.totalQueuedJobs}');

// Check for job leaks
final oldJobs = await jobService.getJobsByStatus(JobStatus.processing);
final stuckJobs = oldJobs.where((job) => 
  DateTime.now().difference(job.updatedAt).inHours > 1
).toList();

if (stuckJobs.isNotEmpty) {
  print('Found ${stuckJobs.length} stuck jobs');
}

Solutions:

  • Implement job cleanup policies
  • Set appropriate job timeouts
  • Monitor for stuck jobs
  • Optimize job payload sizes

4. Tenant Limit Issues

Symptoms:

  • TenantLimitException errors
  • Jobs rejected due to limits

Diagnosis:

try {
  await jobService.queueJob(/* ... */);
} catch (TenantLimitException e) {
  print('Limit exceeded: ${e.limitType}');
  print('Current usage: ${e.currentUsage}');
  
  final usage = await jobService.getTenantJobUsage(e.tenantId);
  print('Daily usage: ${usage.currentUsage['dailyJobs']}/${usage.limits['maxDailyJobs']}');
  print('Queue usage: ${usage.queuedJobs}/${usage.limits['maxQueuedJobs']}');
}

Solutions:

  • Review tenant tier assignments
  • Adjust limits for high-usage tenants
  • Implement job prioritization
  • Optimize job efficiency

Debugging Tools

Job Inspection

// Get detailed job information
final job = await jobService.getJobById(jobId);
if (job != null) {
  print('Job Details:');
  print('  Status: ${job.status}');
  print('  Created: ${job.createdAt}');
  print('  Last Attempt: ${job.lastAttempt}');
  print('  Retries: ${job.currentRetries}/${job.maxRetries}');
  
  if (job.lastResult != null) {
    print('  Last Result: ${job.lastResult!.success ? 'Success' : 'Failed'}');
    if (!job.lastResult!.success) {
      print('  Error: ${job.lastResult!.errorMessage}');
    }
  }
}

System Health Check

Future<Map<String, dynamic>> performHealthCheck() async {
  final queueStatus = await jobService.getJobQueueStatus();
  final systemStats = await jobService.getSystemJobStatistics();
  
  return {
    'queue_health': {
      'queued_jobs': queueStatus['queuedJobs'],
      'processing_jobs': queueStatus['processingJobs'],
      'oldest_job_age_minutes': queueStatus['oldestQueuedJobAge'] / (1000 * 60),
    },
    'system_health': {
      'success_rate': systemStats.systemSuccessRate,
      'total_jobs_24h': systemStats.totalJobs,
      'active_tenants': systemStats.activeTenants,
    },
    'warnings': systemStats.systemWarnings,
    'timestamp': DateTime.now().toIso8601String(),
  };
}

This comprehensive job queue system provides a robust foundation for handling all background processing needs in a multi-tenant environment with full monitoring, analytics, and operational capabilities.