Chapter 17: Job Queue System
Overview
The Vektagraf Job Queue System is a comprehensive background job processing framework that replaces and extends the original NotificationQueue and ScheduledTaskManager. It provides a single, tenant-aware system for handling all types of background work including notifications, scheduled tasks, delayed jobs, and recurring operations.
Key Features
- Job Model: Single
SystemJobclass handles all job types - Tenant Isolation: Complete multitenancy with per-tenant limits and monitoring
- Priority Processing: Priority-based job execution with tenant tier consideration
- Persistent Storage: Jobs survive server restarts with automatic recovery
- Real-time Monitoring: Comprehensive analytics and health monitoring
- Retry Logic: Exponential backoff for failed jobs with configurable limits
- Extensible Handlers: Plugin architecture for custom job processors
Benefits Over Previous System
| Feature | Old System | New Unified System |
|---|---|---|
| Job Types | Separate queues for notifications and scheduled tasks | Single queue handles all job types |
| Tenant Support | Limited tenant awareness | Full tenant isolation with limits |
| Monitoring | Basic status tracking | Comprehensive analytics and health scoring |
| Recovery | Manual intervention required | Automatic recovery on restart |
| Scalability | Fixed processing capacity | Dynamic, tenant-aware scaling |
| Extensibility | Hard-coded handlers | Plugin-based handler system |
Architecture
Core Components
┌─────────────────────────────────────────────────────────────┐
│ VektagrafJobService │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ JobQueueManager │ │ TenantMiddleware│ │ JobMonitoring│ │
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ SystemJob Model │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────┐ │
│ │Notification │ │ Scheduled │ │ Delayed │ │Recurring│ │
│ │ Jobs │ │ Jobs │ │ Jobs │ │ Jobs │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Tenant-Aware Processing │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────┐ │
│ │ Tier 1 │ │ Tier 2 │ │ Tier 3 │ │ Tier 4 │ │
│ │ Tenants │ │ Tenants │ │ Tenants │ │Tenants │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────┘
Job Lifecycle
graph TD
A[Job Created] --> B{Scheduled?}
B -->|Yes| C[Pending Status]
B -->|No| D[Queued Status]
C --> E[Timer Triggers]
E --> D
D --> F{Tenant Limits OK?}
F -->|No| G[Limit Exception]
F -->|Yes| H[Processing Status]
H --> I[Execute Handler]
I --> J{Success?}
J -->|Yes| K[Completed Status]
J -->|No| L{Retries Left?}
L -->|Yes| M[Retry with Backoff]
L -->|No| N[Failed Status]
M --> D
K --> O{Recurring?}
O -->|Yes| P[Schedule Next]
O -->|No| Q[End]
P --> C
N --> Q
G --> Q
Job Types and Handlers
SystemJob Model
The unified SystemJob model supports all job types:
class SystemJob {
final Identifier id;
final String type; // notification, scheduled, immediate, recurring
final String handler; // Handler name to process the job
final Map<String, dynamic> payload;
final JobPriority priority; // low, normal, high, urgent, critical
final String status; // pending, queued, processing, completed, failed
final Identifier? tenantId; // Tenant isolation
final DateTime? scheduledFor; // For delayed/scheduled jobs
final String? cronSchedule; // For recurring jobs
final int maxRetries;
final int currentRetries;
final DateTime? lastAttempt;
final DateTime? completedAt;
final JobExecutionResult? lastResult;
final Map<String, dynamic> metadata;
final DateTime createdAt;
final DateTime updatedAt;
}
Job Types
1. Notification Jobs
Handle all types of notifications (email, SMS, push, webhook):
await jobService.queueNotification(
type: 'email',
recipient: 'user@example.com',
subject: 'Welcome!',
body: 'Thank you for signing up',
tenantId: tenantId,
priority: JobPriority.high,
);
2. Scheduled Jobs
Execute at specific times using cron expressions:
await jobService.queueScheduledJob(
name: 'Daily Report',
cronExpression: '0 9 * * *', // 9 AM daily
handler: 'report_generator',
parameters: {'type': 'usage'},
tenantId: tenantId,
);
3. Delayed Jobs
Execute after a specific delay:
await jobService.queueDelayedJob(
handler: 'data_processor',
payload: {'batch_id': 'batch-001'},
delay: Duration(minutes: 30),
tenantId: tenantId,
);
4. Recurring Jobs
Execute repeatedly based on cron schedule:
await jobService.queueJob(
type: JobType.recurring,
handler: 'cleanup_handler',
payload: {'retention_days': 30},
cronSchedule: '0 2 * * 0', // Weekly on Sunday at 2 AM
tenantId: tenantId,
);
Custom Job Handlers
Create custom handlers by implementing the JobHandlerInterface interface:
class CustomDataProcessor implements JobHandlerInterface {
@override
String get handlerName => 'custom_data_processor';
@override
Set<String> get supportedJobTypes => {
JobType.immediate,
JobType.scheduled,
};
@override
bool validatePayload(Map<String, dynamic> payload) {
return payload.containsKey('data_id') &&
payload.containsKey('processing_type');
}
@override
Duration? estimateExecutionTime(Map<String, dynamic> payload) {
final type = payload['processing_type'] as String;
switch (type) {
case 'quick': return Duration(seconds: 30);
case 'standard': return Duration(minutes: 5);
case 'deep': return Duration(minutes: 30);
default: return Duration(minutes: 10);
}
}
@override
Future<JobExecutionResult> execute(JobExecutionContext context) async {
final startTime = DateTime.now();
try {
final payload = context.job.payload;
final dataId = payload['data_id'] as String;
final processingType = payload['processing_type'] as String;
// Access tenant context
final tenantId = context.tenantId;
final tenantUsage = context.tenantContext['usage'];
// Perform processing
final result = await processData(dataId, processingType);
return JobExecutionResult(
success: true,
executedAt: startTime,
executionTime: DateTime.now().difference(startTime),
result: {
'processed_records': result.recordCount,
'output_file': result.outputPath,
},
metadata: {
'tenant_id': tenantId,
'processing_type': processingType,
},
);
} catch (e) {
return JobExecutionResult(
success: false,
errorMessage: e.toString(),
executedAt: startTime,
executionTime: DateTime.now().difference(startTime),
);
}
}
}
// Register the handler
jobService.registerHandler(CustomDataProcessor());
Tenant-Aware Job Processing
Tenant Isolation
Jobs are completely isolated by tenant:
- Queue Separation: Each tenant has its own logical queue
- Resource Limits: Per-tenant limits on concurrent jobs, queue size, and daily usage
- Priority Processing: Higher-tier tenants get priority processing
- Usage Tracking: Detailed analytics per tenant
Tenant Job Limits
Configure job limits per tenant tier:
class JobLimitConfig {
final int? maxQueuedJobs; // Max jobs in queue
final int? maxConcurrentJobs; // Max concurrent execution
final int? maxDailyJobs; // Daily execution limit
final int? maxJobPayloadSize; // Max payload size in bytes
final Set<String> allowedJobTypes; // Allowed job types
final Duration? maxJobDuration; // Max execution time
final JobPriority defaultJobPriority;
}
Tier-Based Limits
| Tier | Queued Jobs | Concurrent | Daily Limit | Allowed Types |
|---|---|---|---|---|
| Tier 1 | 10 | 2 | 100 | notifications |
| Tier 2 | 50 | 5 | 1,000 | notifications, scheduled, maintenance |
| Tier 3 | 200 | 20 | 10,000 | + cleanup, data_sync |
| Tier 4 | 1,000 | 100 | 100,000 | + file_processing |
Limit Enforcement
// Automatic limit checking before job queuing
try {
await jobService.queueJob(
type: JobType.notification,
handler: 'email_handler',
payload: emailData,
tenantId: tenantId,
);
} catch (TenantLimitException e) {
print('Limit exceeded: ${e.message}');
print('Limit type: ${e.limitType}');
print('Current usage: ${e.currentUsage}');
}
Job Limits and Configuration
Configuring Job Limits
Job limits are configured as part of the tenant configuration:
final tenantConfig = TenantConfig.forTier(
tenantId: 'premium-tenant',
name: 'Premium Customer',
tier: TenantLimitTier.tier3,
customSettings: {
'jobLimits': {
'maxQueuedJobs': 500,
'maxConcurrentJobs': 50,
'maxDailyJobs': 20000,
'maxJobPayloadSize': 2 * 1024 * 1024, // 2MB
'allowedJobTypes': [
'notification',
'scheduled',
'maintenance',
'cleanup',
'data_sync',
'custom_processing'
],
'maxJobDuration': Duration(hours: 2).inMilliseconds,
'defaultJobPriority': 'high',
}
},
dbConfig: dbConfig,
);
Dynamic Limit Updates
Update tenant limits at runtime:
final updatedConfig = tenantConfig.copyWith(
jobLimit: JobLimitConfig(
maxQueuedJobs: 1000,
maxConcurrentJobs: 100,
maxDailyJobs: 50000,
// ... other limits
),
);
tenantManager.setTenantConfig(updatedConfig);
Limit Monitoring
Monitor limit usage in real-time:
// Get current usage
final usage = await jobService.getTenantJobUsage(tenantId);
print('Queue usage: ${usage.queuedJobs}/${usage.limits['maxQueuedJobs']}');
print('Daily usage: ${usage.currentUsage['dailyJobs']}/${usage.limits['maxDailyJobs']}');
// Check for warnings
if (usage.warnings.isNotEmpty) {
print('Warnings: ${usage.warnings.join(', ')}');
}
Monitoring and Analytics
Real-Time Metrics
The system provides comprehensive real-time monitoring:
// Listen to job updates
jobService.jobUpdates.listen((job) {
print('Job ${job.id} status: ${job.status}');
if (job.status == JobStatus.failed) {
print('Error: ${job.lastResult?.errorMessage}');
}
});
// Listen to tenant metrics
jobService.tenantMetricsUpdates.listen((metrics) {
print('Tenant ${metrics.tenantId}:');
print(' Success Rate: ${(metrics.successRate * 100).toStringAsFixed(1)}%');
print(' Avg Execution: ${metrics.avgExecutionTime.inMilliseconds}ms');
print(' Queue Length: ${metrics.queuedJobs}');
});
// Listen to system metrics
jobService.systemMetricsUpdates.listen((stats) {
print('System Stats:');
print(' Total Jobs: ${stats.totalJobs}');
print(' Success Rate: ${(stats.systemSuccessRate * 100).toStringAsFixed(1)}%');
print(' Active Tenants: ${stats.activeTenants}');
});
Tenant Usage Analytics
Get detailed tenant usage statistics:
final usage = await jobService.getTenantJobUsage(
tenantId,
startDate: DateTime.now().subtract(Duration(days: 7)),
endDate: DateTime.now(),
);
print('7-Day Usage Summary:');
print('Total Jobs: ${usage.totalJobs}');
print('Success Rate: ${(usage.successRate * 100).toStringAsFixed(1)}%');
print('Failure Rate: ${(usage.failureRate * 100).toStringAsFixed(1)}%');
print('Avg Execution Time: ${usage.avgExecutionTime.inMilliseconds}ms');
print('\nJobs by Type:');
usage.jobsByType.forEach((type, count) {
print(' $type: $count');
});
print('\nJobs by Status:');
usage.jobsByStatus.forEach((status, count) {
print(' $status: $count');
});
Performance Metrics
Analyze job performance by type and handler:
final metrics = await jobService.getJobPerformanceMetrics(
JobType.notification,
'email_handler',
startDate: DateTime.now().subtract(Duration(days: 30)),
);
print('Email Handler Performance (30 days):');
print('Total Executions: ${metrics.totalExecutions}');
print('Success Rate: ${(metrics.successRate * 100).toStringAsFixed(1)}%');
print('Avg Execution Time: ${metrics.avgExecutionTime.inMilliseconds}ms');
print('P95 Execution Time: ${metrics.p95ExecutionTime.inMilliseconds}ms');
print('\nError Breakdown:');
metrics.errorsByType.forEach((error, count) {
print(' $error: $count');
});
Health Monitoring
Monitor tenant job health:
final health = await jobService.getTenantJobHealth(tenantId);
print('Tenant Health Report:');
print('Health Score: ${health['healthScore']}/100');
print('Health Status: ${health['healthStatus']}');
print('Success Rate: ${(health['successRate'] * 100).toStringAsFixed(1)}%');
print('Queue Length: ${health['queueLength']}');
print('\nRecommendations:');
final recommendations = health['recommendations'] as List<String>;
recommendations.forEach((rec) => print(' • $rec'));
Trend Analysis
Analyze usage trends over time:
final trends = await jobService.getTenantJobTrends(
tenantId,
period: Duration(days: 7),
interval: Duration(hours: 1),
);
print('Hourly Job Trends (7 days):');
for (final trend in trends) {
print('${trend.periodStart.hour}:00 - ${trend.totalJobs} jobs, '
'${(trend.successRate * 100).toStringAsFixed(1)}% success');
}
Usage Examples
Basic Job Operations
// Initialize the job service
final jobService = VektagrafJobService(
database,
tenantMiddleware: tenantMiddleware,
tenantManager: tenantManager,
);
await jobService.initialize();
// Queue a simple notification
final job = await jobService.queueNotification(
type: 'email',
recipient: 'user@example.com',
subject: 'Account Created',
body: 'Welcome to our platform!',
tenantId: tenantId,
);
print('Queued job: ${job.id}');
Batch Job Processing
// Queue multiple jobs with tenant limits
final jobSpecs = [
{
'type': JobType.notification,
'handler': 'email_handler',
'payload': {'recipient': 'user1@example.com', 'template': 'welcome'},
},
{
'type': JobType.notification,
'handler': 'sms_handler',
'payload': {'recipient': '+1234567890', 'message': 'Welcome!'},
},
{
'type': JobType.scheduled,
'handler': 'report_generator',
'payload': {'report_type': 'usage'},
'scheduledFor': DateTime.now().add(Duration(hours: 1)),
},
];
final jobs = await jobService.queueBatchJobs(
jobSpecs: jobSpecs,
tenantId: tenantId,
priority: JobPriority.normal,
);
print('Queued ${jobs.length} jobs');
Error Handling and Retry
// Queue job with custom retry logic
final job = await jobService.queueJob(
type: JobType.immediate,
handler: 'external_api_call',
payload: {'endpoint': 'https://api.example.com/webhook'},
tenantId: tenantId,
maxRetries: 5,
metadata: {
'retry_strategy': 'exponential_backoff',
'max_delay': Duration(minutes: 30).inMilliseconds,
},
);
// Monitor job progress
jobService.jobUpdates
.where((j) => j.id == job.id)
.listen((updatedJob) {
switch (updatedJob.status) {
case JobStatus.processing:
print('Job started processing');
break;
case JobStatus.completed:
print('Job completed successfully');
break;
case JobStatus.failed:
if (updatedJob.canRetry) {
print('Job failed, will retry (${updatedJob.currentRetries}/${updatedJob.maxRetries})');
} else {
print('Job failed permanently: ${updatedJob.lastResult?.errorMessage}');
}
break;
}
});
Maintenance Jobs
// Create standard maintenance jobs for a tenant
final maintenanceJobs = await jobService.createMaintenanceJobs(tenantId);
print('Created ${maintenanceJobs.length} maintenance jobs:');
maintenanceJobs.forEach((job) {
print(' ${job.payload['name']}: ${job.cronSchedule}');
});
// Create custom cleanup job
await jobService.queueScheduledJob(
name: 'Custom Data Cleanup',
cronExpression: '0 3 1 * *', // First day of month at 3 AM
handler: 'custom_cleanup_handler',
parameters: {
'data_types': ['logs', 'temp_files', 'cache'],
'retention_policy': {
'logs': Duration(days: 90).inDays,
'temp_files': Duration(days: 7).inDays,
'cache': Duration(days: 1).inDays,
},
},
tenantId: tenantId,
);
Best Practices
1. Job Design
Keep Jobs Idempotent
// Good: Idempotent job that can be safely retried
class IdempotentEmailHandler implements JobHandlerInterface {
@override
Future<JobExecutionResult> execute(JobExecutionContext context) async {
final payload = context.job.payload;
final messageId = payload['message_id'] as String;
// Check if already sent
if (await isMessageAlreadySent(messageId)) {
return JobExecutionResult(
success: true,
executedAt: DateTime.now(),
executionTime: Duration.zero,
result: {'status': 'already_sent'},
);
}
// Send message
return await sendMessage(payload);
}
}
Use Appropriate Timeouts
class TimeoutAwareHandler implements JobHandlerInterface {
@override
Duration? estimateExecutionTime(Map<String, dynamic> payload) {
final complexity = payload['complexity'] as String? ?? 'normal';
switch (complexity) {
case 'simple': return Duration(seconds: 30);
case 'normal': return Duration(minutes: 5);
case 'complex': return Duration(minutes: 30);
default: return Duration(minutes: 10);
}
}
@override
Future<JobExecutionResult> execute(JobExecutionContext context) async {
final timeout = estimateExecutionTime(context.job.payload) ?? Duration(minutes: 10);
return await Future.timeout(
performWork(context),
timeout,
onTimeout: () => JobExecutionResult(
success: false,
errorMessage: 'Job timed out after ${timeout.inMinutes} minutes',
executedAt: DateTime.now(),
executionTime: timeout,
),
);
}
}
2. Error Handling
Categorize Errors for Retry Logic
class SmartRetryHandler implements JobHandlerInterface {
@override
Future<JobExecutionResult> execute(JobExecutionContext context) async {
try {
return await performOperation(context);
} catch (e) {
final shouldRetry = _shouldRetryError(e);
return JobExecutionResult(
success: false,
errorMessage: e.toString(),
executedAt: DateTime.now(),
executionTime: DateTime.now().difference(context.actualStartTime),
metadata: {
'error_type': e.runtimeType.toString(),
'retryable': shouldRetry,
},
);
}
}
bool _shouldRetryError(dynamic error) {
// Retry on network errors, timeouts, temporary failures
if (error is SocketException ||
error is TimeoutException ||
error is HttpException) {
return true;
}
// Don't retry on validation errors, auth failures
if (error is ArgumentError ||
error is UnauthorizedException) {
return false;
}
return true; // Default to retry
}
}
3. Performance Optimization
Batch Similar Operations
class BatchEmailHandler implements JobHandlerInterface {
@override
Future<JobExecutionResult> execute(JobExecutionContext context) async {
final payload = context.job.payload;
final recipients = payload['recipients'] as List<String>;
// Process in batches of 50
const batchSize = 50;
final results = <String, bool>{};
for (int i = 0; i < recipients.length; i += batchSize) {
final batch = recipients.skip(i).take(batchSize).toList();
final batchResults = await sendEmailBatch(batch, payload);
results.addAll(batchResults);
}
final successCount = results.values.where((success) => success).length;
return JobExecutionResult(
success: successCount == recipients.length,
executedAt: DateTime.now(),
executionTime: DateTime.now().difference(context.actualStartTime),
result: {
'total_recipients': recipients.length,
'successful_sends': successCount,
'failed_sends': recipients.length - successCount,
},
);
}
}
Use Connection Pooling
class DatabaseJobHandler implements JobHandlerInterface {
static final ConnectionPool _pool = ConnectionPool();
@override
Future<JobExecutionResult> execute(JobExecutionContext context) async {
final connection = await _pool.getConnection();
try {
final result = await performDatabaseOperation(connection, context.job.payload);
return JobExecutionResult(
success: true,
executedAt: DateTime.now(),
executionTime: DateTime.now().difference(context.actualStartTime),
result: result,
);
} finally {
_pool.releaseConnection(connection);
}
}
}
4. Monitoring and Alerting
Set Up Health Checks
// Monitor system health
Timer.periodic(Duration(minutes: 5), (_) async {
final queueStatus = await jobService.getJobQueueStatus();
final systemStats = await jobService.getSystemJobStatistics();
// Alert on high queue backlog
if (queueStatus['queuedJobs'] > 1000) {
await sendAlert('High queue backlog: ${queueStatus['queuedJobs']} jobs');
}
// Alert on low success rate
if (systemStats.systemSuccessRate < 0.9) {
await sendAlert('Low system success rate: ${(systemStats.systemSuccessRate * 100).toStringAsFixed(1)}%');
}
});
Monitor Tenant Health
// Check tenant health scores
for (final tenantId in activeTenants) {
final health = await jobService.getTenantJobHealth(tenantId);
final healthScore = health['healthScore'] as double;
if (healthScore < 70) {
await notifyTenantAdmin(tenantId, {
'health_score': healthScore,
'recommendations': health['recommendations'],
});
}
}
Troubleshooting
Common Issues
1. Jobs Stuck in Queue
Symptoms:
- Jobs remain in
queuedstatus for extended periods - Queue length continuously growing
Diagnosis:
final queueStatus = await jobService.getJobQueueStatus();
print('Queue backlog: ${queueStatus['queuedJobs']}');
print('Oldest job age: ${queueStatus['oldestQueuedJobAge']}ms');
// Check tenant limits
final usage = await jobService.getTenantJobUsage(tenantId);
if (usage.warnings.isNotEmpty) {
print('Tenant warnings: ${usage.warnings}');
}
Solutions:
- Check tenant concurrent job limits
- Verify job handlers are registered
- Increase processing capacity
- Review job priorities
2. High Failure Rate
Symptoms:
- Many jobs failing with similar errors
- Low success rate in metrics
Diagnosis:
final metrics = await jobService.getJobPerformanceMetrics(
jobType,
handlerName,
);
print('Failure rate: ${(metrics.failureRate * 100).toStringAsFixed(1)}%');
print('Common errors:');
metrics.errorsByType.forEach((error, count) {
print(' $error: $count occurrences');
});
Solutions:
- Review error patterns in job results
- Update retry logic for transient failures
- Fix handler implementation issues
- Adjust timeout values
3. Memory Issues
Symptoms:
- Increasing memory usage over time
- Out of memory errors
Diagnosis:
// Monitor job queue sizes
final systemStats = await jobService.getSystemJobStatistics();
print('Total active jobs: ${systemStats.totalRunningJobs + systemStats.totalQueuedJobs}');
// Check for job leaks
final oldJobs = await jobService.getJobsByStatus(JobStatus.processing);
final stuckJobs = oldJobs.where((job) =>
DateTime.now().difference(job.updatedAt).inHours > 1
).toList();
if (stuckJobs.isNotEmpty) {
print('Found ${stuckJobs.length} stuck jobs');
}
Solutions:
- Implement job cleanup policies
- Set appropriate job timeouts
- Monitor for stuck jobs
- Optimize job payload sizes
4. Tenant Limit Issues
Symptoms:
TenantLimitExceptionerrors- Jobs rejected due to limits
Diagnosis:
try {
await jobService.queueJob(/* ... */);
} catch (TenantLimitException e) {
print('Limit exceeded: ${e.limitType}');
print('Current usage: ${e.currentUsage}');
final usage = await jobService.getTenantJobUsage(e.tenantId);
print('Daily usage: ${usage.currentUsage['dailyJobs']}/${usage.limits['maxDailyJobs']}');
print('Queue usage: ${usage.queuedJobs}/${usage.limits['maxQueuedJobs']}');
}
Solutions:
- Review tenant tier assignments
- Adjust limits for high-usage tenants
- Implement job prioritization
- Optimize job efficiency
Debugging Tools
Job Inspection
// Get detailed job information
final job = await jobService.getJobById(jobId);
if (job != null) {
print('Job Details:');
print(' Status: ${job.status}');
print(' Created: ${job.createdAt}');
print(' Last Attempt: ${job.lastAttempt}');
print(' Retries: ${job.currentRetries}/${job.maxRetries}');
if (job.lastResult != null) {
print(' Last Result: ${job.lastResult!.success ? 'Success' : 'Failed'}');
if (!job.lastResult!.success) {
print(' Error: ${job.lastResult!.errorMessage}');
}
}
}
System Health Check
Future<Map<String, dynamic>> performHealthCheck() async {
final queueStatus = await jobService.getJobQueueStatus();
final systemStats = await jobService.getSystemJobStatistics();
return {
'queue_health': {
'queued_jobs': queueStatus['queuedJobs'],
'processing_jobs': queueStatus['processingJobs'],
'oldest_job_age_minutes': queueStatus['oldestQueuedJobAge'] / (1000 * 60),
},
'system_health': {
'success_rate': systemStats.systemSuccessRate,
'total_jobs_24h': systemStats.totalJobs,
'active_tenants': systemStats.activeTenants,
},
'warnings': systemStats.systemWarnings,
'timestamp': DateTime.now().toIso8601String(),
};
}
This comprehensive job queue system provides a robust foundation for handling all background processing needs in a multi-tenant environment with full monitoring, analytics, and operational capabilities.