Background Job Scheduling
The Hypermodern Platform includes a robust job scheduling system that allows you to execute tasks in the background, schedule work for future execution, and handle long-running processes without blocking your main application.
Overview
The scheduling system provides:
- Persistent Job Storage: Jobs survive server restarts
- Retry Logic: Failed jobs can be automatically retried with configurable delays
- Background Execution: Jobs run without blocking the main application
- Flexible Scheduling: Schedule jobs for specific times or delays
- Status Tracking: Monitor job execution status and history
- Error Handling: Comprehensive error handling and logging
Core Components
ScheduledJob
The ScheduledJob abstract class is the foundation for all schedulable tasks:
import 'package:hypermodern_server/hypermodern_server.dart';
class SendWelcomeEmailJob extends ScheduledJob {
@override
String get identifier => 'send_welcome_email';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final email = parameters['email'] as String;
final name = parameters['name'] as String;
// Your job logic here
await emailService.sendWelcomeEmail(email, name);
print('Welcome email sent to $email');
}
}
JobRegistry
The JobRegistry manages job type registration and creation:
final registry = JobRegistry();
// Register job types
registry.register('send_welcome_email', () => SendWelcomeEmailJob());
registry.register('cleanup_old_data', () => CleanupJob());
registry.register('generate_report', () => ReportGenerationJob());
// Check if a job type is registered
if (registry.isRegistered('send_welcome_email')) {
final job = registry.create('send_welcome_email');
}
JobScheduler
The JobScheduler handles job scheduling and execution:
final scheduler = JobScheduler(
registry: registry,
loggerName: 'MyAppScheduler',
executionInterval: Duration(seconds: 30), // Check for jobs every 30s
maxTasksPerCycle: 100, // Process up to 100 jobs per cycle
);
// Start the background processor
scheduler.start();
Scheduling Jobs
Schedule for Later Execution
// Schedule a job to run in 5 minutes
await scheduler.scheduleDelayed(
'send_welcome_email',
Duration(minutes: 5),
{
'email': 'user@example.com',
'name': 'John Doe',
},
description: 'Send welcome email to new user',
);
Schedule for Specific Time
// Schedule a job for a specific date and time
await scheduler.scheduleAt(
'generate_report',
DateTime(2024, 12, 31, 23, 59), // New Year's Eve report
{
'reportType': 'yearly_summary',
'year': 2024,
},
description: 'Generate yearly summary report',
);
Advanced Scheduling Options
await scheduler.schedule(JobSchedule(
identifier: 'critical_backup',
parameters: {'database': 'production'},
scheduledAt: DateTime.now().add(Duration(hours: 1)),
description: 'Critical database backup',
maxRetries: 5, // Retry up to 5 times if it fails
retryDelay: Duration(minutes: 10), // Wait 10 minutes between retries
));
Database Integration
The scheduling system requires a database table to store job information. Run the included migration:
import 'package:hypermodern_server/src/database/migrations/create_scheduled_tasks_table.dart';
// In your migration runner
await CreateScheduledTasksTable().up();
The migration creates a scheduled_tasks table with the following structure:
CREATE TABLE scheduled_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identifier VARCHAR(255) NOT NULL,
parameters TEXT, -- JSON encoded
scheduled_at DATETIME NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
description TEXT,
max_retries INTEGER DEFAULT 3,
retry_count INTEGER DEFAULT 0,
retry_delay INTEGER, -- milliseconds
executed_at DATETIME,
failed_at DATETIME,
error_message TEXT,
created_at DATETIME,
updated_at DATETIME
);
Job Lifecycle
Jobs progress through these states:
- pending: Job is scheduled and waiting to run
- running: Job is currently executing
- completed: Job finished successfully
- failed: Job failed (may be retried)
- cancelled: Job was cancelled before execution
Monitoring Job Status
// Get scheduler statistics
final stats = await scheduler.getStatistics();
print('Pending jobs: ${stats['pending']}');
print('Failed jobs: ${stats['failed']}');
print('Completed jobs: ${stats['completed']}');
// Cancel a specific job
await scheduler.cancel(taskId);
Service Provider Integration
Use the SchedulingServiceProvider for easy integration with your application:
import 'package:hypermodern_server/hypermodern_server.dart';
void main() async {
final container = IoCContainer();
// Register the scheduling service provider
final schedulingProvider = SchedulingServiceProvider();
schedulingProvider.registerServices(container);
schedulingProvider.bootServices(container);
// Get the registry and register your jobs
final jobRegistry = container.resolve<JobRegistry>();
jobRegistry.register('send_welcome_email', () => SendWelcomeEmailJob());
jobRegistry.register('cleanup_old_data', () => CleanupJob());
// The scheduler is automatically started
final scheduler = container.resolve<JobScheduler>();
// Your application code...
}
Common Job Patterns
Email Jobs
class SendEmailJob extends ScheduledJob {
@override
String get identifier => 'send_email';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final to = parameters['to'] as String;
final subject = parameters['subject'] as String;
final body = parameters['body'] as String;
final template = parameters['template'] as String?;
if (template != null) {
await emailService.sendTemplatedEmail(to, subject, template, parameters);
} else {
await emailService.sendEmail(to, subject, body);
}
}
}
Data Cleanup Jobs
class CleanupOldDataJob extends ScheduledJob {
@override
String get identifier => 'cleanup_old_data';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final tableName = parameters['table'] as String;
final daysOld = parameters['days_old'] as int? ?? 30;
final cutoffDate = DateTime.now().subtract(Duration(days: daysOld));
// Clean up old records
await database.execute(
'DELETE FROM $tableName WHERE created_at < ?',
[cutoffDate.toIso8601String()],
);
print('Cleaned up records older than $daysOld days from $tableName');
}
}
Report Generation Jobs
class GenerateReportJob extends ScheduledJob {
@override
String get identifier => 'generate_report';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final reportType = parameters['report_type'] as String;
final userId = parameters['user_id'] as int?;
// Generate the report
final reportData = await reportService.generateReport(reportType, userId);
// Save to file or send via email
final filePath = await reportService.saveReport(reportData, reportType);
if (userId != null) {
await notificationService.notifyUser(
userId,
'Your $reportType report is ready',
{'download_url': filePath},
);
}
}
}
Error Handling and Retries
Implementing Retry Logic
class ResilientApiCallJob extends ScheduledJob {
@override
String get identifier => 'api_call';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final url = parameters['url'] as String;
final retryCount = parameters['_retry_count'] as int? ?? 0;
try {
final response = await httpClient.get(url);
if (response.statusCode != 200) {
throw Exception('API call failed with status ${response.statusCode}');
}
// Process successful response
await processApiResponse(response.body);
} catch (e) {
// Log the error with context
print('API call failed (attempt ${retryCount + 1}): $e');
// Re-throw to trigger retry mechanism
rethrow;
}
}
}
Custom Error Handling
class CustomErrorHandlingJob extends ScheduledJob {
@override
String get identifier => 'custom_error_job';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
try {
await riskyOperation(parameters);
} on SpecificException catch (e) {
// Handle specific exceptions differently
await handleSpecificError(e, parameters);
// Don't rethrow - job completes successfully
} on AnotherException catch (e) {
// Log and rethrow to trigger retry
await logError(e, parameters);
rethrow;
}
}
}
Performance Considerations
Batch Processing
class BatchProcessingJob extends ScheduledJob {
@override
String get identifier => 'batch_process';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final batchSize = parameters['batch_size'] as int? ?? 100;
final tableName = parameters['table'] as String;
int offset = 0;
bool hasMore = true;
while (hasMore) {
final batch = await database.query(
'SELECT * FROM $tableName LIMIT ? OFFSET ?',
[batchSize, offset],
);
if (batch.isEmpty) {
hasMore = false;
break;
}
// Process batch
await processBatch(batch);
offset += batchSize;
// Small delay to prevent overwhelming the system
await Future.delayed(Duration(milliseconds: 100));
}
}
}
Memory Management
class MemoryEfficientJob extends ScheduledJob {
@override
String get identifier => 'memory_efficient';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
// Process data in streams to avoid loading everything into memory
final inputFile = parameters['input_file'] as String;
final outputFile = parameters['output_file'] as String;
final inputStream = File(inputFile).openRead();
final outputSink = File(outputFile).openWrite();
try {
await for (final chunk in inputStream) {
final processedChunk = await processChunk(chunk);
outputSink.add(processedChunk);
}
} finally {
await outputSink.close();
}
}
}
Best Practices
1. Keep Jobs Idempotent
Jobs should be safe to run multiple times:
class IdempotentJob extends ScheduledJob {
@override
String get identifier => 'idempotent_job';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final recordId = parameters['record_id'] as int;
// Check if work was already done
final existing = await database.query(
'SELECT * FROM processed_records WHERE id = ?',
[recordId],
);
if (existing.isNotEmpty) {
print('Record $recordId already processed, skipping');
return;
}
// Do the work
await processRecord(recordId);
// Mark as processed
await database.execute(
'INSERT INTO processed_records (id, processed_at) VALUES (?, ?)',
[recordId, DateTime.now().toIso8601String()],
);
}
}
2. Use Descriptive Job Names and Parameters
// Good: Clear identifier and well-structured parameters
await scheduler.scheduleDelayed(
'send_password_reset_email',
Duration(minutes: 2),
{
'user_id': userId,
'reset_token': token,
'expires_at': expiryTime.toIso8601String(),
},
description: 'Send password reset email to user $userId',
);
// Avoid: Generic names and unclear parameters
await scheduler.scheduleDelayed(
'email_job',
Duration(minutes: 2),
{'data': someData},
);
3. Implement Proper Logging
class WellLoggedJob extends ScheduledJob {
@override
String get identifier => 'well_logged_job';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final startTime = DateTime.now();
final jobId = parameters['job_id'] ?? 'unknown';
print('Starting job $identifier for $jobId at $startTime');
try {
await doWork(parameters);
final duration = DateTime.now().difference(startTime);
print('Completed job $identifier for $jobId in ${duration.inMilliseconds}ms');
} catch (e) {
final duration = DateTime.now().difference(startTime);
print('Failed job $identifier for $jobId after ${duration.inMilliseconds}ms: $e');
rethrow;
}
}
}
4. Clean Up Old Jobs
Implement periodic cleanup of completed and failed jobs:
class JobCleanupJob extends ScheduledJob {
@override
String get identifier => 'job_cleanup';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final daysToKeep = parameters['days_to_keep'] as int? ?? 30;
final cutoffDate = DateTime.now().subtract(Duration(days: daysToKeep));
// Clean up completed jobs older than cutoff
await database.execute(
'DELETE FROM scheduled_tasks WHERE status = ? AND completed_at < ?',
['completed', cutoffDate.toIso8601String()],
);
// Clean up failed jobs that have exceeded max retries
await database.execute(
'DELETE FROM scheduled_tasks WHERE status = ? AND retry_count >= max_retries AND failed_at < ?',
['failed', cutoffDate.toIso8601String()],
);
print('Cleaned up old scheduled tasks');
}
}
// Schedule the cleanup job to run daily
await scheduler.scheduleDelayed(
'job_cleanup',
Duration(days: 1),
{'days_to_keep': 30},
description: 'Daily cleanup of old scheduled tasks',
);
Integration with Hypermodern Modules
You can create modules that provide their own scheduled jobs:
// In your module
class EmailModule extends HypermodernModule {
@override
Future<void> boot() async {
await super.boot();
// Register email-related jobs
final registry = serviceContainer.resolve<JobRegistry>();
registry.register('send_welcome_email', () => SendWelcomeEmailJob());
registry.register('send_newsletter', () => NewsletterJob());
registry.register('cleanup_bounced_emails', () => EmailCleanupJob());
}
}
Testing Scheduled Jobs
Unit Testing Jobs
import 'package:test/test.dart';
void main() {
group('SendWelcomeEmailJob', () {
late SendWelcomeEmailJob job;
setUp(() {
job = SendWelcomeEmailJob();
});
test('has correct identifier', () {
expect(job.identifier, equals('send_welcome_email'));
});
test('executes successfully with valid parameters', () async {
final parameters = {
'email': 'test@example.com',
'name': 'Test User',
};
// Mock your email service here
await expectLater(
job.execute(parameters),
completes,
);
});
test('throws exception with invalid parameters', () async {
final parameters = <String, dynamic>{}; // Missing required fields
await expectLater(
job.execute(parameters),
throwsA(isA<ArgumentError>()),
);
});
});
}
Integration Testing
void main() {
group('Job Scheduling Integration', () {
late JobRegistry registry;
late JobScheduler scheduler;
setUp(() async {
registry = JobRegistry();
scheduler = JobScheduler(registry: registry);
registry.register('test_job', () => TestJob());
});
tearDown(() {
scheduler.stop();
});
test('can schedule and execute job', () async {
final task = await scheduler.scheduleDelayed(
'test_job',
Duration(milliseconds: 100),
{'message': 'test'},
);
expect(task.identifier, equals('test_job'));
expect(task.status, equals(JobStatus.pending));
// Start scheduler and wait for execution
scheduler.start();
await Future.delayed(Duration(milliseconds: 200));
// Verify job was executed (you'd check your test job's side effects)
});
});
}
The background job scheduling system provides a robust foundation for handling asynchronous work in your Hypermodern applications. By following these patterns and best practices, you can build reliable, scalable background processing systems that enhance your application's capabilities without compromising performance.