Skip to main content

Background Job Scheduling

The Hypermodern Platform includes a robust job scheduling system that allows you to execute tasks in the background, schedule work for future execution, and handle long-running processes without blocking your main application.

Overview

The scheduling system provides:

  • Persistent Job Storage: Jobs survive server restarts
  • Retry Logic: Failed jobs can be automatically retried with configurable delays
  • Background Execution: Jobs run without blocking the main application
  • Flexible Scheduling: Schedule jobs for specific times or delays
  • Status Tracking: Monitor job execution status and history
  • Error Handling: Comprehensive error handling and logging

Core Components

ScheduledJob

The ScheduledJob abstract class is the foundation for all schedulable tasks:

import 'package:hypermodern_server/hypermodern_server.dart';

class SendWelcomeEmailJob extends ScheduledJob {
  @override
  String get identifier => 'send_welcome_email';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final email = parameters['email'] as String;
    final name = parameters['name'] as String;
    
    // Your job logic here
    await emailService.sendWelcomeEmail(email, name);
    
    print('Welcome email sent to $email');
  }
}

JobRegistry

The JobRegistry manages job type registration and creation:

final registry = JobRegistry();

// Register job types
registry.register('send_welcome_email', () => SendWelcomeEmailJob());
registry.register('cleanup_old_data', () => CleanupJob());
registry.register('generate_report', () => ReportGenerationJob());

// Check if a job type is registered
if (registry.isRegistered('send_welcome_email')) {
  final job = registry.create('send_welcome_email');
}

JobScheduler

The JobScheduler handles job scheduling and execution:

final scheduler = JobScheduler(
  registry: registry,
  loggerName: 'MyAppScheduler',
  executionInterval: Duration(seconds: 30), // Check for jobs every 30s
  maxTasksPerCycle: 100, // Process up to 100 jobs per cycle
);

// Start the background processor
scheduler.start();

Scheduling Jobs

Schedule for Later Execution

// Schedule a job to run in 5 minutes
await scheduler.scheduleDelayed(
  'send_welcome_email',
  Duration(minutes: 5),
  {
    'email': 'user@example.com',
    'name': 'John Doe',
  },
  description: 'Send welcome email to new user',
);

Schedule for Specific Time

// Schedule a job for a specific date and time
await scheduler.scheduleAt(
  'generate_report',
  DateTime(2024, 12, 31, 23, 59), // New Year's Eve report
  {
    'reportType': 'yearly_summary',
    'year': 2024,
  },
  description: 'Generate yearly summary report',
);

Advanced Scheduling Options

await scheduler.schedule(JobSchedule(
  identifier: 'critical_backup',
  parameters: {'database': 'production'},
  scheduledAt: DateTime.now().add(Duration(hours: 1)),
  description: 'Critical database backup',
  maxRetries: 5, // Retry up to 5 times if it fails
  retryDelay: Duration(minutes: 10), // Wait 10 minutes between retries
));

Database Integration

The scheduling system requires a database table to store job information. Run the included migration:

import 'package:hypermodern_server/src/database/migrations/create_scheduled_tasks_table.dart';

// In your migration runner
await CreateScheduledTasksTable().up();

The migration creates a scheduled_tasks table with the following structure:

CREATE TABLE scheduled_tasks (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  identifier VARCHAR(255) NOT NULL,
  parameters TEXT, -- JSON encoded
  scheduled_at DATETIME NOT NULL,
  status VARCHAR(50) DEFAULT 'pending',
  description TEXT,
  max_retries INTEGER DEFAULT 3,
  retry_count INTEGER DEFAULT 0,
  retry_delay INTEGER, -- milliseconds
  executed_at DATETIME,
  failed_at DATETIME,
  error_message TEXT,
  created_at DATETIME,
  updated_at DATETIME
);

Job Lifecycle

Jobs progress through these states:

  1. pending: Job is scheduled and waiting to run
  2. running: Job is currently executing
  3. completed: Job finished successfully
  4. failed: Job failed (may be retried)
  5. cancelled: Job was cancelled before execution

Monitoring Job Status

// Get scheduler statistics
final stats = await scheduler.getStatistics();
print('Pending jobs: ${stats['pending']}');
print('Failed jobs: ${stats['failed']}');
print('Completed jobs: ${stats['completed']}');

// Cancel a specific job
await scheduler.cancel(taskId);

Service Provider Integration

Use the SchedulingServiceProvider for easy integration with your application:

import 'package:hypermodern_server/hypermodern_server.dart';

void main() async {
  final container = IoCContainer();
  
  // Register the scheduling service provider
  final schedulingProvider = SchedulingServiceProvider();
  schedulingProvider.registerServices(container);
  schedulingProvider.bootServices(container);
  
  // Get the registry and register your jobs
  final jobRegistry = container.resolve<JobRegistry>();
  jobRegistry.register('send_welcome_email', () => SendWelcomeEmailJob());
  jobRegistry.register('cleanup_old_data', () => CleanupJob());
  
  // The scheduler is automatically started
  final scheduler = container.resolve<JobScheduler>();
  
  // Your application code...
}

Common Job Patterns

Email Jobs

class SendEmailJob extends ScheduledJob {
  @override
  String get identifier => 'send_email';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final to = parameters['to'] as String;
    final subject = parameters['subject'] as String;
    final body = parameters['body'] as String;
    final template = parameters['template'] as String?;
    
    if (template != null) {
      await emailService.sendTemplatedEmail(to, subject, template, parameters);
    } else {
      await emailService.sendEmail(to, subject, body);
    }
  }
}

Data Cleanup Jobs

class CleanupOldDataJob extends ScheduledJob {
  @override
  String get identifier => 'cleanup_old_data';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final tableName = parameters['table'] as String;
    final daysOld = parameters['days_old'] as int? ?? 30;
    
    final cutoffDate = DateTime.now().subtract(Duration(days: daysOld));
    
    // Clean up old records
    await database.execute(
      'DELETE FROM $tableName WHERE created_at < ?',
      [cutoffDate.toIso8601String()],
    );
    
    print('Cleaned up records older than $daysOld days from $tableName');
  }
}

Report Generation Jobs

class GenerateReportJob extends ScheduledJob {
  @override
  String get identifier => 'generate_report';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final reportType = parameters['report_type'] as String;
    final userId = parameters['user_id'] as int?;
    
    // Generate the report
    final reportData = await reportService.generateReport(reportType, userId);
    
    // Save to file or send via email
    final filePath = await reportService.saveReport(reportData, reportType);
    
    if (userId != null) {
      await notificationService.notifyUser(
        userId,
        'Your $reportType report is ready',
        {'download_url': filePath},
      );
    }
  }
}

Error Handling and Retries

Implementing Retry Logic

class ResilientApiCallJob extends ScheduledJob {
  @override
  String get identifier => 'api_call';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final url = parameters['url'] as String;
    final retryCount = parameters['_retry_count'] as int? ?? 0;
    
    try {
      final response = await httpClient.get(url);
      
      if (response.statusCode != 200) {
        throw Exception('API call failed with status ${response.statusCode}');
      }
      
      // Process successful response
      await processApiResponse(response.body);
      
    } catch (e) {
      // Log the error with context
      print('API call failed (attempt ${retryCount + 1}): $e');
      
      // Re-throw to trigger retry mechanism
      rethrow;
    }
  }
}

Custom Error Handling

class CustomErrorHandlingJob extends ScheduledJob {
  @override
  String get identifier => 'custom_error_job';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    try {
      await riskyOperation(parameters);
    } on SpecificException catch (e) {
      // Handle specific exceptions differently
      await handleSpecificError(e, parameters);
      // Don't rethrow - job completes successfully
    } on AnotherException catch (e) {
      // Log and rethrow to trigger retry
      await logError(e, parameters);
      rethrow;
    }
  }
}

Performance Considerations

Batch Processing

class BatchProcessingJob extends ScheduledJob {
  @override
  String get identifier => 'batch_process';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final batchSize = parameters['batch_size'] as int? ?? 100;
    final tableName = parameters['table'] as String;
    
    int offset = 0;
    bool hasMore = true;
    
    while (hasMore) {
      final batch = await database.query(
        'SELECT * FROM $tableName LIMIT ? OFFSET ?',
        [batchSize, offset],
      );
      
      if (batch.isEmpty) {
        hasMore = false;
        break;
      }
      
      // Process batch
      await processBatch(batch);
      
      offset += batchSize;
      
      // Small delay to prevent overwhelming the system
      await Future.delayed(Duration(milliseconds: 100));
    }
  }
}

Memory Management

class MemoryEfficientJob extends ScheduledJob {
  @override
  String get identifier => 'memory_efficient';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    // Process data in streams to avoid loading everything into memory
    final inputFile = parameters['input_file'] as String;
    final outputFile = parameters['output_file'] as String;
    
    final inputStream = File(inputFile).openRead();
    final outputSink = File(outputFile).openWrite();
    
    try {
      await for (final chunk in inputStream) {
        final processedChunk = await processChunk(chunk);
        outputSink.add(processedChunk);
      }
    } finally {
      await outputSink.close();
    }
  }
}

Best Practices

1. Keep Jobs Idempotent

Jobs should be safe to run multiple times:

class IdempotentJob extends ScheduledJob {
  @override
  String get identifier => 'idempotent_job';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final recordId = parameters['record_id'] as int;
    
    // Check if work was already done
    final existing = await database.query(
      'SELECT * FROM processed_records WHERE id = ?',
      [recordId],
    );
    
    if (existing.isNotEmpty) {
      print('Record $recordId already processed, skipping');
      return;
    }
    
    // Do the work
    await processRecord(recordId);
    
    // Mark as processed
    await database.execute(
      'INSERT INTO processed_records (id, processed_at) VALUES (?, ?)',
      [recordId, DateTime.now().toIso8601String()],
    );
  }
}

2. Use Descriptive Job Names and Parameters

// Good: Clear identifier and well-structured parameters
await scheduler.scheduleDelayed(
  'send_password_reset_email',
  Duration(minutes: 2),
  {
    'user_id': userId,
    'reset_token': token,
    'expires_at': expiryTime.toIso8601String(),
  },
  description: 'Send password reset email to user $userId',
);

// Avoid: Generic names and unclear parameters
await scheduler.scheduleDelayed(
  'email_job',
  Duration(minutes: 2),
  {'data': someData},
);

3. Implement Proper Logging

class WellLoggedJob extends ScheduledJob {
  @override
  String get identifier => 'well_logged_job';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final startTime = DateTime.now();
    final jobId = parameters['job_id'] ?? 'unknown';
    
    print('Starting job $identifier for $jobId at $startTime');
    
    try {
      await doWork(parameters);
      
      final duration = DateTime.now().difference(startTime);
      print('Completed job $identifier for $jobId in ${duration.inMilliseconds}ms');
      
    } catch (e) {
      final duration = DateTime.now().difference(startTime);
      print('Failed job $identifier for $jobId after ${duration.inMilliseconds}ms: $e');
      rethrow;
    }
  }
}

4. Clean Up Old Jobs

Implement periodic cleanup of completed and failed jobs:

class JobCleanupJob extends ScheduledJob {
  @override
  String get identifier => 'job_cleanup';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final daysToKeep = parameters['days_to_keep'] as int? ?? 30;
    final cutoffDate = DateTime.now().subtract(Duration(days: daysToKeep));
    
    // Clean up completed jobs older than cutoff
    await database.execute(
      'DELETE FROM scheduled_tasks WHERE status = ? AND completed_at < ?',
      ['completed', cutoffDate.toIso8601String()],
    );
    
    // Clean up failed jobs that have exceeded max retries
    await database.execute(
      'DELETE FROM scheduled_tasks WHERE status = ? AND retry_count >= max_retries AND failed_at < ?',
      ['failed', cutoffDate.toIso8601String()],
    );
    
    print('Cleaned up old scheduled tasks');
  }
}

// Schedule the cleanup job to run daily
await scheduler.scheduleDelayed(
  'job_cleanup',
  Duration(days: 1),
  {'days_to_keep': 30},
  description: 'Daily cleanup of old scheduled tasks',
);

Integration with Hypermodern Modules

You can create modules that provide their own scheduled jobs:

// In your module
class EmailModule extends HypermodernModule {
  @override
  Future<void> boot() async {
    await super.boot();
    
    // Register email-related jobs
    final registry = serviceContainer.resolve<JobRegistry>();
    registry.register('send_welcome_email', () => SendWelcomeEmailJob());
    registry.register('send_newsletter', () => NewsletterJob());
    registry.register('cleanup_bounced_emails', () => EmailCleanupJob());
  }
}

Testing Scheduled Jobs

Unit Testing Jobs

import 'package:test/test.dart';

void main() {
  group('SendWelcomeEmailJob', () {
    late SendWelcomeEmailJob job;
    
    setUp(() {
      job = SendWelcomeEmailJob();
    });
    
    test('has correct identifier', () {
      expect(job.identifier, equals('send_welcome_email'));
    });
    
    test('executes successfully with valid parameters', () async {
      final parameters = {
        'email': 'test@example.com',
        'name': 'Test User',
      };
      
      // Mock your email service here
      await expectLater(
        job.execute(parameters),
        completes,
      );
    });
    
    test('throws exception with invalid parameters', () async {
      final parameters = <String, dynamic>{}; // Missing required fields
      
      await expectLater(
        job.execute(parameters),
        throwsA(isA<ArgumentError>()),
      );
    });
  });
}

Integration Testing

void main() {
  group('Job Scheduling Integration', () {
    late JobRegistry registry;
    late JobScheduler scheduler;
    
    setUp(() async {
      registry = JobRegistry();
      scheduler = JobScheduler(registry: registry);
      
      registry.register('test_job', () => TestJob());
    });
    
    tearDown(() {
      scheduler.stop();
    });
    
    test('can schedule and execute job', () async {
      final task = await scheduler.scheduleDelayed(
        'test_job',
        Duration(milliseconds: 100),
        {'message': 'test'},
      );
      
      expect(task.identifier, equals('test_job'));
      expect(task.status, equals(JobStatus.pending));
      
      // Start scheduler and wait for execution
      scheduler.start();
      await Future.delayed(Duration(milliseconds: 200));
      
      // Verify job was executed (you'd check your test job's side effects)
    });
  });
}

The background job scheduling system provides a robust foundation for handling asynchronous work in your Hypermodern applications. By following these patterns and best practices, you can build reliable, scalable background processing systems that enhance your application's capabilities without compromising performance.