Background Job Scheduling
The Hypermodern Platform includes a robust job scheduling system that allows you to execute tasks in the background, schedule work for future execution, and handle long-running processes without blocking your main application.
Overview
The scheduling system provides:
- Persistent Job Storage: Jobs survive server restarts
- Retry Logic: Failed jobs can be automatically retried with configurable delays
- Background Execution: Jobs run without blocking the main application
- Flexible Scheduling: Schedule jobs for specific times or delays
- Status Tracking: Monitor job execution status and history
- Error Handling: Comprehensive error handling and logging
Core Components
ScheduledJob
The ScheduledJob abstract class is the foundation for all schedulable tasks:
import 'package:hypermodern_server/hypermodern_server.dart';
class SendWelcomeEmailJob extends ScheduledJob {
@override
String get identifier => 'send_welcome_email';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final email = parameters['email'] as String;
final name = parameters['name'] as String;
// Your job logic here
await emailService.sendWelcomeEmail(email, name);
print('Welcome email sent to $email');
}
}
JobRegistry
The JobRegistry manages job type registration and creation:
final registry = JobRegistry();
// Register job types
registry.register('send_welcome_email', () => SendWelcomeEmailJob());
registry.register('cleanup_old_data', () => CleanupJob());
registry.register('generate_report', () => ReportGenerationJob());
// Check if a job type is registered
if (registry.isRegistered('send_welcome_email')) {
final job = registry.create('send_welcome_email');
}
JobScheduler
The JobScheduler handles job scheduling and execution:
final scheduler = JobScheduler(
registry: registry,
loggerName: 'MyAppScheduler',
executionInterval: Duration(seconds: 30), // Check for jobs every 30s
maxTasksPerCycle: 100, // Process up to 100 jobs per cycle
);
// Start the background processor
scheduler.start();
Scheduling Jobs
Schedule for Later Execution
// Schedule a job to run in 5 minutes
await scheduler.scheduleDelayed(
'send_welcome_email',
Duration(minutes: 5),
{
'email': 'user@example.com',
'name': 'John Doe',
},
description: 'Send welcome email to new user',
);
Schedule for Specific Time
// Schedule a job for a specific date and time
await scheduler.scheduleAt(
'generate_report',
DateTime(2024, 12, 31, 23, 59), // New Year's Eve report
{
'reportType': 'yearly_summary',
'year': 2024,
},
description: 'Generate yearly summary report',
);
Advanced Scheduling Options
await scheduler.schedule(JobSchedule(
identifier: 'critical_backup',
parameters: {'database': 'production'},
scheduledAt: DateTime.now().add(Duration(hours: 1)),
description: 'Critical database backup',
maxRetries: 5, // Retry up to 5 times if it fails
retryDelay: Duration(minutes: 10), // Wait 10 minutes between retries
));
Database Integration
The scheduling system requires a database table to store job information. Run the included migration:
import 'package:hypermodern_server/src/database/migrations/create_scheduled_tasks_table.dart';
// In your migration runner
await CreateScheduledTasksTable().up();
The migration creates a scheduled_tasks table with the following structure:
CREATE TABLE scheduled_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identifier VARCHAR(255) NOT NULL,
parameters TEXT, -- JSON encoded
scheduled_at DATETIME NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
description TEXT,
max_retries INTEGER DEFAULT 3,
retry_count INTEGER DEFAULT 0,
retry_delay INTEGER, -- milliseconds
executed_at DATETIME,
failed_at DATETIME,
error_message TEXT,
created_at DATETIME,
updated_at DATETIME
);
Job Lifecycle
Jobs progress through these states:
- pending: Job is scheduled and waiting to run
- running: Job is currently executing
- completed: Job finished successfully
- failed: Job failed (may be retried)
- cancelled: Job was cancelled before execution
Monitoring Job Status
// Get scheduler statistics
final stats = await scheduler.getStatistics();
print('Pending jobs: ${stats['pending']}');
print('Failed jobs: ${stats['failed']}');
print('Completed jobs: ${stats['completed']}');
// Cancel a specific job
await scheduler.cancel(taskId);
Service Provider Integration
Use the SchedulingServiceProvider for easy integration with your application:
import 'package:hypermodern_server/hypermodern_server.dart';
void main() async {
final container = IoCContainer();
// Register the scheduling service provider
final schedulingProvider = SchedulingServiceProvider();
schedulingProvider.registerServices(container);
schedulingProvider.bootServices(container);
// Get the registry and register your jobs
final jobRegistry = container.resolve<JobRegistry>();
jobRegistry.register('send_welcome_email', () => SendWelcomeEmailJob());
jobRegistry.register('cleanup_old_data', () => CleanupJob());
// The scheduler is automatically started
final scheduler = container.resolve<JobScheduler>();
// Your application code...
}
Common Job Patterns
Email Jobs
class SendEmailJob extends ScheduledJob {
@override
String get identifier => 'send_email';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final to = parameters['to'] as String;
final subject = parameters['subject'] as String;
final body = parameters['body'] as String;
final template = parameters['template'] as String?;
if (template != null) {
await emailService.sendTemplatedEmail(to, subject, template, parameters);
} else {
await emailService.sendEmail(to, subject, body);
}
}
}
Data Cleanup Jobs
class CleanupOldDataJob extends ScheduledJob {
@override
String get identifier => 'cleanup_old_data';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final tableName = parameters['table'] as String;
final daysOld = parameters['days_old'] as int? ?? 30;
final cutoffDate = DateTime.now().subtract(Duration(days: daysOld));
// Clean up old records
await database.execute(
'DELETE FROM $tableName WHERE created_at < ?',
[cutoffDate.toIso8601String()],
);
print('Cleaned up records older than $daysOld days from $tableName');
}
}
Report Generation Jobs
class GenerateReportJob extends ScheduledJob {
@override
String get identifier => 'generate_report';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final reportType = parameters['report_type'] as String;
final userId = parameters['user_id'] as int?;
// Generate the report
final reportData = await reportService.generateReport(reportType, userId);
// Save to file or send via email
final filePath = await reportService.saveReport(reportData, reportType);
if (userId != null) {
await notificationService.notifyUser(
userId,
'Your $reportType report is ready',
{'download_url': filePath},
);
}
}
}
Error Handling and Retries
Implementing Retry Logic
class ResilientApiCallJob extends ScheduledJob {
@override
String get identifier => 'api_call';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final url = parameters['url'] as String;
final retryCount = parameters['_retry_count'] as int? ?? 0;
try {
final response = await httpClient.get(url);
if (response.statusCode != 200) {
throw Exception('API call failed with status ${response.statusCode}');
}
// Process successful response
await processApiResponse(response.body);
} catch (e) {
// Log the error with context
print('API call failed (attempt ${retryCount + 1}): $e');
// Re-throw to trigger retry mechanism
rethrow;
}
}
}
Custom Error Handling
class CustomErrorHandlingJob extends ScheduledJob {
@override
String get identifier => 'custom_error_job';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
try {
await riskyOperation(parameters);
} on SpecificException catch (e) {
// Handle specific exceptions differently
await handleSpecificError(e, parameters);
// Don't rethrow - job completes successfully
} on AnotherException catch (e) {
// Log and rethrow to trigger retry
await logError(e, parameters);
rethrow;
}
}
}
Performance Considerations
Batch Processing
class BatchProcessingJob extends ScheduledJob {
@override
String get identifier => 'batch_process';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final batchSize = parameters['batch_size'] as int? ?? 100;
final tableName = parameters['table'] as String;
int offset = 0;
bool hasMore = true;
while (hasMore) {
final batch = await database.query(
'SELECT * FROM $tableName LIMIT ? OFFSET ?',
[batchSize, offset],
);
if (batch.isEmpty) {
hasMore = false;
break;
}
// Process batch
await processBatch(batch);
offset += batchSize;
// Small delay to prevent overwhelming the system
await Future.delayed(Duration(milliseconds: 100));
}
}
}
Memory Management
class MemoryEfficientJob extends ScheduledJob {
@override
String get identifier => 'memory_efficient';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
// Process data in streams to avoid loading everything into memory
final inputFile = parameters['input_file'] as String;
final outputFile = parameters['output_file'] as String;
final inputStream = File(inputFile).openRead();
final outputSink = File(outputFile).openWrite();
try {
await for (final chunk in inputStream) {
final processedChunk = await processChunk(chunk);
outputSink.add(processedChunk);
}
} finally {
await outputSink.close();
}
}
}
Best Practices
1. Keep Jobs Idempotent
Jobs should be safe to run multiple times:
class IdempotentJob extends ScheduledJob {
@override
String get identifier => 'idempotent_job';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final recordId = parameters['record_id'] as int;
// Check if work was already done
final existing = await database.query(
'SELECT * FROM processed_records WHERE id = ?',
[recordId],
);
if (existing.isNotEmpty) {
print('Record $recordId already processed, skipping');
return;
}
// Do the work
await processRecord(recordId);
// Mark as processed
await database.execute(
'INSERT INTO processed_records (id, processed_at) VALUES (?, ?)',
[recordId, DateTime.now().toIso8601String()],
);
}
}
2. Use Descriptive Job Names and Parameters
// Good: Clear identifier and well-structured parameters
await scheduler.scheduleDelayed(
'send_password_reset_email',
Duration(minutes: 2),
{
'user_id': userId,
'reset_token': token,
'expires_at': expiryTime.toIso8601String(),
},
description: 'Send password reset email to user $userId',
);
// Avoid: Generic names and unclear parameters
await scheduler.scheduleDelayed(
'email_job',
Duration(minutes: 2),
{'data': someData},
);
3. Implement Proper Logging
class WellLoggedJob extends ScheduledJob {
@override
String get identifier => 'well_logged_job';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final startTime = DateTime.now();
final jobId = parameters['job_id'] ?? 'unknown';
print('Starting job $identifier for $jobId at $startTime');
try {
await doWork(parameters);
final duration = DateTime.now().difference(startTime);
print('Completed job $identifier for $jobId in ${duration.inMilliseconds}ms');
} catch (e) {
final duration = DateTime.now().difference(startTime);
print('Failed job $identifier for $jobId after ${duration.inMilliseconds}ms: $e');
rethrow;
}
}
}
4. Clean Up Old Jobs
Implement periodic cleanup of completed and failed jobs:
class JobCleanupJob extends ScheduledJob {
@override
String get identifier => 'job_cleanup';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final daysToKeep = parameters['days_to_keep'] as int? ?? 30;
final cutoffDate = DateTime.now().subtract(Duration(days: daysToKeep));
// Clean up completed jobs older than cutoff
await database.execute(
'DELETE FROM scheduled_tasks WHERE status = ? AND completed_at < ?',
['completed', cutoffDate.toIso8601String()],
);
// Clean up failed jobs that have exceeded max retries
await database.execute(
'DELETE FROM scheduled_tasks WHERE status = ? AND retry_count >= max_retries AND failed_at < ?',
['failed', cutoffDate.toIso8601String()],
);
print('Cleaned up old scheduled tasks');
}
}
// Schedule the cleanup job to run daily
await scheduler.scheduleDelayed(
'job_cleanup',
Duration(days: 1),
{'days_to_keep': 30},
description: 'Daily cleanup of old scheduled tasks',
);
CLI Code Generation
The Hypermodern CLI provides powerful code generation for scheduled jobs, making it easy to create new jobs with proper structure and best practices.
Generating Jobs with the CLI
# Generate a basic job
hypermodern generate:job send_welcome_email
# Generate with description and parameters
hypermodern generate:job send_welcome_email \
--description "Send welcome email to new users" \
--parameters "user_id,email,name"
# Generate using a template
hypermodern generate:job newsletter_sender \
--template email \
--description "Send newsletter to subscribers"
# List available templates
hypermodern generate:job --list-templates
Available Job Templates
The CLI includes several pre-built templates for common job patterns:
- basic - Simple job template with minimal structure
- email - Email sending job with template support
- cleanup - Data cleanup job with configurable retention
- report - Report generation job with file output
- api - External API call job with retry logic
- batch - Batch processing job for large datasets
- notification - Push notification job with multiple providers
Generated Files
When you generate a job, the CLI creates:
- Job class - The main job implementation
- Test file - Unit tests for the job
- Registry helper - Helper for registering jobs
# Example output structure
lib/src/jobs/
├── send_welcome_email_job.dart # Job implementation
├── job_registry.dart # Registration helper
test/jobs/
└── send_welcome_email_job_test.dart # Unit tests
Example Generated Job
// Generated job class
import 'package:hypermodern_server/hypermodern_server.dart';
/// Send welcome email to new users
class SendWelcomeEmailJob extends ScheduledJob {
@override
String get identifier => 'send_welcome_email';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final to = parameters['to'] as String;
final subject = parameters['subject'] as String;
final body = parameters['body'] as String?;
final template = parameters['template'] as String?;
// Implementation generated based on template
if (template != null) {
await _sendTemplatedEmail(to, subject, template, parameters);
} else if (body != null) {
await _sendPlainEmail(to, subject, body);
}
}
// Helper methods included in template
Future<void> _sendTemplatedEmail(/* ... */) async { /* ... */ }
Future<void> _sendPlainEmail(/* ... */) async { /* ... */ }
}
Custom Templates
You can extend the CLI with custom job templates by modifying the JobTemplates class or creating your own generator plugins.
Integration with Hypermodern Modules
You can create modules that provide their own scheduled jobs:
// In your module
class EmailModule extends HypermodernModule {
@override
Future<void> boot() async {
await super.boot();
// Register email-related jobs
final registry = serviceContainer.resolve<JobRegistry>();
registry.register('send_welcome_email', () => SendWelcomeEmailJob());
registry.register('send_newsletter', () => NewsletterJob());
registry.register('cleanup_bounced_emails', () => EmailCleanupJob());
}
}
Testing Scheduled Jobs
Unit Testing Jobs
import 'package:test/test.dart';
void main() {
group('SendWelcomeEmailJob', () {
late SendWelcomeEmailJob job;
setUp(() {
job = SendWelcomeEmailJob();
});
test('has correct identifier', () {
expect(job.identifier, equals('send_welcome_email'));
});
test('executes successfully with valid parameters', () async {
final parameters = {
'email': 'test@example.com',
'name': 'Test User',
};
// Mock your email service here
await expectLater(
job.execute(parameters),
completes,
);
});
test('throws exception with invalid parameters', () async {
final parameters = <String, dynamic>{}; // Missing required fields
await expectLater(
job.execute(parameters),
throwsA(isA<ArgumentError>()),
);
});
});
}
Integration Testing
void main() {
group('Job Scheduling Integration', () {
late JobRegistry registry;
late JobScheduler scheduler;
setUp(() async {
registry = JobRegistry();
scheduler = JobScheduler(registry: registry);
registry.register('test_job', () => TestJob());
});
tearDown(() {
scheduler.stop();
});
test('can schedule and execute job', () async {
final task = await scheduler.scheduleDelayed(
'test_job',
Duration(milliseconds: 100),
{'message': 'test'},
);
expect(task.identifier, equals('test_job'));
expect(task.status, equals(JobStatus.pending));
// Start scheduler and wait for execution
scheduler.start();
await Future.delayed(Duration(milliseconds: 200));
// Verify job was executed (you'd check your test job's side effects)
});
});
}
The background job scheduling system provides a robust foundation for handling asynchronous work in your Hypermodern applications. By following these patterns and best practices, you can build reliable, scalable background processing systems that enhance your application's capabilities without compromising performance.
No Comments