Server Development
Creating Hypermodern Servers
Building a Hypermodern server means creating an application that can simultaneously handle HTTP, WebSocket, and TCP connections while sharing the same business logic across all protocols.
Basic Server Setup
import 'package:hypermodern_server/hypermodern_server.dart';
import 'generated/models.dart';
import 'generated/server.dart';
void main() async {
final server = HypermodernServer();
// Configure server
server.configure(ServerConfig(
// Protocol ports
httpPort: 8080,
wsPort: 8082,
tcpPort: 8081,
// Performance settings
maxConnections: 1000,
requestTimeout: Duration(seconds: 30),
keepAliveTimeout: Duration(seconds: 60),
// Security
corsEnabled: true,
corsOrigins: ['http://localhost:3000', 'https://myapp.com'],
rateLimitEnabled: true,
rateLimitRequests: 100,
rateLimitWindow: Duration(minutes: 1),
));
// Register endpoints
_registerEndpoints(server);
// Start all protocol servers
await server.listen();
print('🚀 Hypermodern server running on:');
print(' HTTP: http://localhost:8080');
print(' WebSocket: ws://localhost:8082');
print(' TCP: localhost:8081');
}
Server Configuration Options
class ServerConfig {
// Protocol configuration
final int httpPort;
final int wsPort;
final int tcpPort;
final String bindAddress;
// Performance tuning
final int maxConnections;
final int maxRequestSize;
final Duration requestTimeout;
final Duration keepAliveTimeout;
final int workerThreads;
// Security settings
final bool corsEnabled;
final List<String> corsOrigins;
final bool rateLimitEnabled;
final int rateLimitRequests;
final Duration rateLimitWindow;
final bool validateRequests;
// Logging and monitoring
final LogLevel logLevel;
final bool metricsEnabled;
final String? metricsEndpoint;
// SSL/TLS (for production)
final String? sslCertPath;
final String? sslKeyPath;
final bool requireSsl;
const ServerConfig({
this.httpPort = 8080,
this.wsPort = 8082,
this.tcpPort = 8081,
this.bindAddress = '0.0.0.0',
this.maxConnections = 1000,
this.maxRequestSize = 1024 * 1024, // 1MB
this.requestTimeout = const Duration(seconds: 30),
this.keepAliveTimeout = const Duration(seconds: 60),
this.workerThreads = 4,
this.corsEnabled = false,
this.corsOrigins = const [],
this.rateLimitEnabled = false,
this.rateLimitRequests = 100,
this.rateLimitWindow = const Duration(minutes: 1),
this.validateRequests = true,
this.logLevel = LogLevel.info,
this.metricsEnabled = false,
this.metricsEndpoint,
this.sslCertPath,
this.sslKeyPath,
this.requireSsl = false,
});
}
Implementing Endpoint Handlers
Endpoint handlers contain your business logic and are automatically available across all protocols.
Basic Endpoint Implementation
void _registerEndpoints(HypermodernServer server) {
// Simple request-response endpoint
server.registerEndpoint<GetUserRequest, User>(
'get_user',
(request) async {
final user = await userService.getUserById(request.id);
if (user == null) {
throw NotFoundException('User not found', details: {
'user_id': request.id.toString(),
});
}
return user;
},
);
// Endpoint with validation
server.registerEndpoint<CreateUserRequest, User>(
'create_user',
(request) async {
// Validate request
if (request.username.length < 3) {
throw ValidationException('Username too short', fieldErrors: {
'username': 'Must be at least 3 characters',
});
}
if (!_isValidEmail(request.email)) {
throw ValidationException('Invalid email format', fieldErrors: {
'email': 'Must be a valid email address',
});
}
// Check for existing user
final existingUser = await userService.getUserByEmail(request.email);
if (existingUser != null) {
throw ConflictException('User already exists', details: {
'email': request.email,
});
}
// Create user
final user = await userService.createUser(
username: request.username,
email: request.email,
passwordHash: _hashPassword(request.password),
);
return user;
},
);
// Endpoint with complex response
server.registerEndpoint<SearchUsersRequest, SearchUsersResponse>(
'search_users',
(request) async {
final results = await userService.searchUsers(
query: request.query,
filters: request.filters,
pagination: request.pagination,
sort: request.sort,
);
return SearchUsersResponse(
users: results.users,
totalCount: results.totalCount,
page: request.pagination.page,
hasMore: results.hasMore,
facets: results.facets,
);
},
);
}
Async and Database Operations
class UserService {
final Database db;
UserService(this.db);
Future<User?> getUserById(int id) async {
final result = await db.query(
'SELECT * FROM users WHERE id = ?',
[id],
);
if (result.isEmpty) return null;
return User.fromJson(result.first);
}
Future<User> createUser({
required String username,
required String email,
required String passwordHash,
}) async {
final now = DateTime.now();
final result = await db.query(
'''
INSERT INTO users (username, email, password_hash, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
RETURNING *
''',
[username, email, passwordHash, now, now],
);
return User.fromJson(result.first);
}
Future<SearchResult> searchUsers({
String? query,
UserFilters? filters,
required Pagination pagination,
required Sort sort,
}) async {
final queryBuilder = QueryBuilder('users');
// Add search conditions
if (query != null && query.isNotEmpty) {
queryBuilder.where('username ILIKE ? OR email ILIKE ?',
['%$query%', '%$query%']);
}
// Add filters
if (filters?.status != null) {
queryBuilder.where('status = ?', [filters!.status!.toString()]);
}
if (filters?.createdAfter != null) {
queryBuilder.where('created_at > ?', [filters!.createdAfter]);
}
// Add sorting
queryBuilder.orderBy('${sort.field} ${sort.direction}');
// Add pagination
queryBuilder.limit(pagination.limit);
queryBuilder.offset(pagination.page * pagination.limit);
// Execute queries
final countQuery = queryBuilder.buildCountQuery();
final dataQuery = queryBuilder.buildSelectQuery();
final [countResult, dataResult] = await Future.wait([
db.query(countQuery.sql, countQuery.parameters),
db.query(dataQuery.sql, dataQuery.parameters),
]);
final totalCount = countResult.first['count'] as int;
final users = dataResult.map((row) => User.fromJson(row)).toList();
return SearchResult(
users: users,
totalCount: totalCount,
hasMore: (pagination.page + 1) * pagination.limit < totalCount,
);
}
}
Error Handling in Endpoints
server.registerEndpoint<UpdateUserRequest, User>(
'update_user',
(request) async {
try {
// Validate permissions
final currentUser = await getCurrentUser(request.context);
if (currentUser.id != request.id && !currentUser.isAdmin) {
throw ForbiddenException('Cannot update other users');
}
// Get existing user
final existingUser = await userService.getUserById(request.id);
if (existingUser == null) {
throw NotFoundException('User not found');
}
// Validate updates
if (request.email != null && !_isValidEmail(request.email!)) {
throw ValidationException('Invalid email format');
}
// Check for email conflicts
if (request.email != null && request.email != existingUser.email) {
final emailExists = await userService.getUserByEmail(request.email!);
if (emailExists != null) {
throw ConflictException('Email already in use');
}
}
// Perform update
final updatedUser = await userService.updateUser(
id: request.id,
username: request.username,
email: request.email,
profile: request.profile,
);
// Log the update
await auditService.logUserUpdate(
userId: currentUser.id,
targetUserId: request.id,
changes: request.getChanges(existingUser),
);
return updatedUser;
} on DatabaseException catch (e) {
// Handle database-specific errors
if (e.isUniqueConstraintViolation) {
throw ConflictException('Duplicate value detected');
} else if (e.isForeignKeyViolation) {
throw ValidationException('Referenced entity does not exist');
} else {
throw ServerException('Database operation failed');
}
} on ValidationException {
rethrow; // Re-throw validation errors as-is
} catch (e) {
// Log unexpected errors
logger.error('Unexpected error in update_user', error: e);
throw ServerException('Internal server error');
}
},
);
Middleware and Request Processing
Middleware provides a powerful way to add cross-cutting concerns like authentication, logging, and validation.
Built-in Middleware
void configureMiddleware(HypermodernServer server) {
// Request logging
server.middleware.add(LoggingMiddleware(
logRequests: true,
logResponses: true,
logHeaders: false, // Don't log sensitive headers
logBodies: false, // Don't log request/response bodies
));
// CORS handling
server.middleware.add(CorsMiddleware(
allowedOrigins: ['http://localhost:3000', 'https://myapp.com'],
allowedMethods: ['GET', 'POST', 'PUT', 'DELETE'],
allowedHeaders: ['Content-Type', 'Authorization'],
allowCredentials: true,
));
// Rate limiting
server.middleware.add(RateLimitMiddleware(
requests: 100,
window: Duration(minutes: 1),
keyGenerator: (request) => request.clientIp,
));
// Request validation
server.middleware.add(ValidationMiddleware(
validateRequests: true,
validateResponses: false, // Disable in production
));
// Authentication (applied to specific endpoints)
server.middleware.add(AuthenticationMiddleware(
jwtSecret: 'your-secret-key',
requiredForEndpoints: [
'create_user',
'update_user',
'delete_user',
],
));
}
Custom Middleware
class TimingMiddleware implements Middleware {
@override
Future<dynamic> handle(
dynamic request,
Future<dynamic> Function(dynamic) next,
) async {
final stopwatch = Stopwatch()..start();
try {
final response = await next(request);
final duration = stopwatch.elapsedMilliseconds;
logger.info('Request completed in ${duration}ms', extra: {
'endpoint': request.endpoint,
'duration_ms': duration,
});
return response;
} catch (e) {
final duration = stopwatch.elapsedMilliseconds;
logger.error('Request failed in ${duration}ms', error: e, extra: {
'endpoint': request.endpoint,
'duration_ms': duration,
});
rethrow;
}
}
}
class CachingMiddleware implements Middleware {
final Map<String, CacheEntry> _cache = {};
final Duration defaultTtl;
CachingMiddleware({this.defaultTtl = const Duration(minutes: 5)});
@override
Future<dynamic> handle(
dynamic request,
Future<dynamic> Function(dynamic) next,
) async {
// Only cache GET requests
if (request.method != 'GET') {
return await next(request);
}
final cacheKey = _generateCacheKey(request);
final entry = _cache[cacheKey];
// Return cached response if valid
if (entry != null && !entry.isExpired) {
logger.debug('Cache hit for $cacheKey');
return entry.data;
}
// Execute request and cache result
final response = await next(request);
_cache[cacheKey] = CacheEntry(
data: response,
expiresAt: DateTime.now().add(defaultTtl),
);
logger.debug('Cached response for $cacheKey');
return response;
}
String _generateCacheKey(dynamic request) {
return '${request.endpoint}:${request.hashCode}';
}
}
class SecurityHeadersMiddleware implements Middleware {
@override
Future<dynamic> handle(
dynamic request,
Future<dynamic> Function(dynamic) next,
) async {
final response = await next(request);
// Add security headers for HTTP responses
if (request.protocol == 'http') {
response.headers.addAll({
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'Content-Security-Policy': "default-src 'self'",
});
}
return response;
}
}
Conditional Middleware
Apply middleware only to specific endpoints or conditions:
class ConditionalMiddleware implements Middleware {
final Middleware middleware;
final bool Function(dynamic request) condition;
ConditionalMiddleware(this.middleware, this.condition);
@override
Future<dynamic> handle(
dynamic request,
Future<dynamic> Function(dynamic) next,
) async {
if (condition(request)) {
return await middleware.handle(request, next);
} else {
return await next(request);
}
}
}
// Usage
server.middleware.add(ConditionalMiddleware(
AuthenticationMiddleware(jwtSecret: 'secret'),
(request) => ['create_user', 'update_user'].contains(request.endpoint),
));
server.middleware.add(ConditionalMiddleware(
CachingMiddleware(defaultTtl: Duration(hours: 1)),
(request) => request.method == 'GET' && request.endpoint.startsWith('get_'),
));
Streaming Endpoints
Streaming endpoints enable real-time communication and are automatically available over WebSocket and TCP protocols.
Basic Streaming
void registerStreamingEndpoints(HypermodernServer server) {
// Server-to-client streaming
server.registerStreamingEndpoint<WatchUsersRequest, UserUpdate>(
'watch_users',
(request) async* {
// Validate request
if (request.userIds.isEmpty) {
throw ValidationException('At least one user ID required');
}
// Create stream controller
final controller = StreamController<UserUpdate>();
// Set up database change listener
final subscription = userService.watchUsers(request.userIds).listen(
(update) => controller.add(update),
onError: (error) => controller.addError(error),
onDone: () => controller.close(),
);
// Clean up when client disconnects
controller.onCancel = () {
subscription.cancel();
};
yield* controller.stream;
},
);
// Bidirectional streaming (chat example)
server.registerBidirectionalStreamingEndpoint<ChatMessage, ChatMessage>(
'chat_messages',
(incomingMessages, outgoingMessages) async {
final roomId = await _extractRoomId(incomingMessages);
// Join chat room
await chatService.joinRoom(roomId);
// Forward incoming messages to chat service
final incomingSubscription = incomingMessages.listen(
(message) async {
// Validate and process message
final processedMessage = await chatService.processMessage(message);
// Broadcast to all room members
await chatService.broadcastMessage(roomId, processedMessage);
},
onError: (error) => logger.error('Chat message error', error: error),
);
// Forward room messages to client
final outgoingSubscription = chatService.getRoomMessages(roomId).listen(
(message) => outgoingMessages.add(message),
onError: (error) => outgoingMessages.addError(error),
);
// Clean up when connection closes
outgoingMessages.onCancel = () async {
await incomingSubscription.cancel();
await outgoingSubscription.cancel();
await chatService.leaveRoom(roomId);
};
},
);
}
Advanced Streaming Patterns
// Stream with filtering and transformation
server.registerStreamingEndpoint<WatchOrdersRequest, OrderUpdate>(
'watch_orders',
(request) async* {
final userId = await getCurrentUserId(request.context);
// Create filtered stream
final orderStream = orderService
.watchAllOrders()
.where((order) => order.userId == userId)
.where((order) => request.statuses.isEmpty ||
request.statuses.contains(order.status))
.map((order) => OrderUpdate.fromOrder(order));
// Add rate limiting to prevent overwhelming clients
final rateLimitedStream = orderStream.throttle(Duration(milliseconds: 100));
yield* rateLimitedStream;
},
);
// Stream with backpressure handling
server.registerStreamingEndpoint<WatchMetricsRequest, MetricUpdate>(
'watch_metrics',
(request) async* {
final controller = StreamController<MetricUpdate>();
// High-frequency metrics source
final metricsSubscription = metricsService.getMetricsStream().listen(
(metric) {
// Handle backpressure
if (controller.hasListener && !controller.isPaused) {
controller.add(MetricUpdate.fromMetric(metric));
}
},
onError: (error) => controller.addError(error),
);
// Pause/resume based on client backpressure
controller.onPause = () => metricsSubscription.pause();
controller.onResume = () => metricsSubscription.resume();
controller.onCancel = () => metricsSubscription.cancel();
yield* controller.stream;
},
);
// Stream with periodic updates
server.registerStreamingEndpoint<WatchSystemStatusRequest, SystemStatus>(
'watch_system_status',
(request) async* {
while (true) {
try {
final status = await systemService.getCurrentStatus();
yield status;
// Wait for next update interval
await Future.delayed(Duration(seconds: request.intervalSeconds));
} catch (e) {
// Log error but continue streaming
logger.error('Error getting system status', error: e);
// Send error status
yield SystemStatus(
healthy: false,
error: e.toString(),
timestamp: DateTime.now(),
);
// Wait before retrying
await Future.delayed(Duration(seconds: 5));
}
}
},
);
Stream Connection Management
class StreamManager {
final Map<String, Set<StreamSubscription>> _activeStreams = {};
final Map<String, int> _connectionCounts = {};
void registerStream(String clientId, String endpoint, StreamSubscription subscription) {
_activeStreams.putIfAbsent(clientId, () => {}).add(subscription);
_connectionCounts[endpoint] = (_connectionCounts[endpoint] ?? 0) + 1;
logger.info('Client $clientId subscribed to $endpoint', extra: {
'endpoint': endpoint,
'total_connections': _connectionCounts[endpoint],
});
}
Future<void> disconnectClient(String clientId) async {
final streams = _activeStreams.remove(clientId);
if (streams != null) {
for (final stream in streams) {
await stream.cancel();
}
logger.info('Disconnected client $clientId', extra: {
'streams_closed': streams.length,
});
}
}
Map<String, int> getConnectionStats() {
return Map.from(_connectionCounts);
}
Future<void> shutdown() async {
for (final streams in _activeStreams.values) {
for (final stream in streams) {
await stream.cancel();
}
}
_activeStreams.clear();
_connectionCounts.clear();
}
}
Database Integration and ORM
Hypermodern servers typically need robust database integration for persistent storage.
Database Connection Setup
class DatabaseManager {
late Database _db;
late ConnectionPool _pool;
Future<void> initialize() async {
_pool = ConnectionPool(
host: 'localhost',
port: 5432,
database: 'hypermodern_app',
username: 'app_user',
password: 'secure_password',
minConnections: 5,
maxConnections: 20,
connectionTimeout: Duration(seconds: 10),
idleTimeout: Duration(minutes: 10),
);
_db = Database(_pool);
// Run migrations
await _runMigrations();
// Set up connection monitoring
_pool.onConnectionCreated.listen((conn) {
logger.debug('Database connection created: ${conn.id}');
});
_pool.onConnectionClosed.listen((conn) {
logger.debug('Database connection closed: ${conn.id}');
});
}
Database get database => _db;
Future<void> _runMigrations() async {
final migrator = Migrator(_db);
await migrator.runPendingMigrations();
}
Future<void> close() async {
await _pool.close();
}
}
Repository Pattern
abstract class Repository<T> {
Future<T?> findById(int id);
Future<List<T>> findAll();
Future<T> create(T entity);
Future<T> update(T entity);
Future<void> delete(int id);
}
class UserRepository implements Repository<User> {
final Database db;
UserRepository(this.db);
@override
Future<User?> findById(int id) async {
final result = await db.query(
'SELECT * FROM users WHERE id = ? AND deleted_at IS NULL',
[id],
);
return result.isEmpty ? null : User.fromJson(result.first);
}
@override
Future<List<User>> findAll() async {
final result = await db.query(
'SELECT * FROM users WHERE deleted_at IS NULL ORDER BY created_at DESC',
);
return result.map((row) => User.fromJson(row)).toList();
}
Future<User?> findByEmail(String email) async {
final result = await db.query(
'SELECT * FROM users WHERE email = ? AND deleted_at IS NULL',
[email],
);
return result.isEmpty ? null : User.fromJson(result.first);
}
Future<List<User>> search({
String? query,
UserStatus? status,
DateTime? createdAfter,
int limit = 50,
int offset = 0,
}) async {
final queryBuilder = QueryBuilder('users')
.where('deleted_at IS NULL');
if (query != null && query.isNotEmpty) {
queryBuilder.where(
'(username ILIKE ? OR email ILIKE ?)',
['%$query%', '%$query%'],
);
}
if (status != null) {
queryBuilder.where('status = ?', [status.toString()]);
}
if (createdAfter != null) {
queryBuilder.where('created_at > ?', [createdAfter]);
}
final query_result = queryBuilder
.orderBy('created_at DESC')
.limit(limit)
.offset(offset)
.build();
final result = await db.query(query_result.sql, query_result.parameters);
return result.map((row) => User.fromJson(row)).toList();
}
@override
Future<User> create(User user) async {
final result = await db.query(
'''
INSERT INTO users (username, email, password_hash, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
RETURNING *
''',
[
user.username,
user.email,
user.passwordHash,
user.status.toString(),
user.createdAt,
user.updatedAt,
],
);
return User.fromJson(result.first);
}
@override
Future<User> update(User user) async {
final result = await db.query(
'''
UPDATE users
SET username = ?, email = ?, status = ?, updated_at = ?
WHERE id = ? AND deleted_at IS NULL
RETURNING *
''',
[
user.username,
user.email,
user.status.toString(),
DateTime.now(),
user.id,
],
);
if (result.isEmpty) {
throw NotFoundException('User not found');
}
return User.fromJson(result.first);
}
@override
Future<void> delete(int id) async {
// Soft delete
final result = await db.query(
'''
UPDATE users
SET deleted_at = ?
WHERE id = ? AND deleted_at IS NULL
''',
[DateTime.now(), id],
);
if (result.affectedRows == 0) {
throw NotFoundException('User not found');
}
}
}
Transaction Management
class UserService {
final UserRepository userRepo;
final ProfileRepository profileRepo;
final Database db;
UserService(this.userRepo, this.profileRepo, this.db);
Future<User> createUserWithProfile({
required String username,
required String email,
required String password,
required UserProfile profile,
}) async {
return await db.transaction((tx) async {
// Create user
final user = await userRepo.create(User(
username: username,
email: email,
passwordHash: _hashPassword(password),
status: UserStatus.active,
createdAt: DateTime.now(),
updatedAt: DateTime.now(),
));
// Create profile
final userProfile = await profileRepo.create(profile.copyWith(
userId: user.id,
));
// Return user with profile
return user.copyWith(profile: userProfile);
});
}
Future<void> transferUserData(int fromUserId, int toUserId) async {
await db.transaction((tx) async {
// Verify both users exist
final fromUser = await userRepo.findById(fromUserId);
final toUser = await userRepo.findById(toUserId);
if (fromUser == null || toUser == null) {
throw NotFoundException('One or both users not found');
}
// Transfer posts
await tx.query(
'UPDATE posts SET author_id = ? WHERE author_id = ?',
[toUserId, fromUserId],
);
// Transfer comments
await tx.query(
'UPDATE comments SET user_id = ? WHERE user_id = ?',
[toUserId, fromUserId],
);
// Merge user preferences
await _mergeUserPreferences(tx, fromUserId, toUserId);
// Deactivate source user
await tx.query(
'UPDATE users SET status = ?, updated_at = ? WHERE id = ?',
[UserStatus.inactive.toString(), DateTime.now(), fromUserId],
);
});
}
}
File Upload Handling
Hypermodern servers provide comprehensive file upload support with multipart form data parsing, validation, and storage management.
Basic File Upload Setup
import 'package:hypermodern_server/hypermodern_server.dart';
void configureFileUploads(HypermodernServer server) {
// Configure file upload middleware
final uploadConfig = FileUploadConfig(
uploadDir: 'temp_uploads',
maxFileSize: 50 * 1024 * 1024, // 50MB
maxFiles: 10,
allowedExtensions: ['.jpg', '.png', '.pdf', '.txt', '.doc', '.docx'],
allowedMimeTypes: [
'image/jpeg',
'image/png',
'application/pdf',
'text/plain',
'application/msword',
],
autoSave: false, // Handle saving manually for more control
);
server.addMiddleware(FileUploadMiddleware(config: uploadConfig));
// Set up file storage service
final fileStorage = FileStorageService(
config: FileStorageConfig(
baseDirectory: 'storage/uploads',
organizeByDate: true,
preserveOriginalNames: true,
maxFileSize: 50 * 1024 * 1024,
),
);
// Register file upload endpoints
_registerFileUploadEndpoints(server, fileStorage);
}
File Upload Endpoints
void _registerFileUploadEndpoints(
HypermodernServer server,
FileStorageService fileStorage,
) {
// Single file upload
server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
'upload_file',
(request) async {
final uploadedFiles = request.getAllUploadedFiles();
if (uploadedFiles.isEmpty) {
throw ValidationException('No files uploaded');
}
if (uploadedFiles.length > 1) {
throw ValidationException('Only one file allowed for this endpoint');
}
final file = uploadedFiles.first;
// Additional validation
if (file.size > 10 * 1024 * 1024) { // 10MB limit for this endpoint
throw ValidationException('File too large (max 10MB)');
}
// Store the file
final storedFile = await fileStorage.storeFile(file);
// Save file metadata to database
final fileRecord = await fileService.createFileRecord(
storedFile: storedFile,
uploadedBy: request['user_id'],
description: request['description'],
category: request['category'],
);
return {
'success': true,
'file': {
'id': fileRecord.id,
'filename': storedFile.originalName,
'size': storedFile.size,
'contentType': storedFile.contentType,
'uploadedAt': storedFile.uploadedAt.toIso8601String(),
},
};
},
);
// Multiple file upload
server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
'upload_files',
(request) async {
final uploadedFiles = request.getAllUploadedFiles();
if (uploadedFiles.isEmpty) {
throw ValidationException('No files uploaded');
}
// Validate total size
final totalSize = uploadedFiles.fold<int>(
0,
(sum, file) => sum + file.size,
);
if (totalSize > 100 * 1024 * 1024) { // 100MB total limit
throw ValidationException('Total file size too large (max 100MB)');
}
// Store all files
final storedFiles = await fileStorage.storeFiles(uploadedFiles);
// Create database records
final fileRecords = <Map<String, dynamic>>[];
for (int i = 0; i < storedFiles.length; i++) {
final stored = storedFiles[i];
final uploaded = uploadedFiles[i];
final record = await fileService.createFileRecord(
storedFile: stored,
uploadedBy: request['user_id'],
description: request['descriptions']?[uploaded.fieldName],
category: request['category'],
);
fileRecords.add({
'id': record.id,
'fieldName': uploaded.fieldName,
'filename': stored.originalName,
'size': stored.size,
'contentType': stored.contentType,
});
}
return {
'success': true,
'files': fileRecords,
'totalFiles': fileRecords.length,
'totalSize': totalSize,
};
},
);
// File download endpoint
server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
'download_file',
(request) async {
final fileId = request['file_id'] as String?;
if (fileId == null) {
throw ValidationException('file_id is required');
}
// Get file info from database
final fileRecord = await fileService.getFileRecord(fileId);
if (fileRecord == null) {
throw NotFoundException('File not found');
}
// Check permissions
final currentUserId = request['user_id'];
if (!await fileService.canUserAccessFile(currentUserId, fileRecord.id)) {
throw ForbiddenException('Access denied');
}
// Get file from storage
final file = await fileStorage.getFile(fileRecord.storageId);
if (file == null || !await file.exists()) {
throw NotFoundException('File not found in storage');
}
// For HTTP requests, we'd typically stream the file
// For this example, we'll return file info and let the client
// make a separate request to a file serving endpoint
return {
'success': true,
'file': {
'id': fileRecord.id,
'filename': fileRecord.originalName,
'size': fileRecord.size,
'contentType': fileRecord.contentType,
'downloadUrl': '/api/files/${fileRecord.id}/download',
},
};
},
);
}
Advanced File Upload Features
// Image processing endpoint
server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
'upload_image',
(request) async {
final uploadedFiles = request.getAllUploadedFiles();
final imageFile = uploadedFiles.firstWhere(
(f) => f.contentType?.startsWith('image/') == true,
orElse: () => throw ValidationException('No image file found'),
);
// Process image
final processedImages = await imageService.processImage(
imageFile.data,
options: ImageProcessingOptions(
generateThumbnails: true,
thumbnailSizes: [150, 300, 600],
optimizeForWeb: true,
maxWidth: request['max_width'] ?? 1920,
maxHeight: request['max_height'] ?? 1080,
quality: request['quality'] ?? 85,
),
);
// Store original and processed versions
final storedFiles = <StoredFile>[];
// Store original
final originalStored = await fileStorage.storeFile(imageFile);
storedFiles.add(originalStored);
// Store thumbnails
for (final processed in processedImages.thumbnails) {
final thumbnailFile = UploadedFile(
fieldName: 'thumbnail_${processed.size}',
filename: 'thumb_${processed.size}_${imageFile.filename}',
contentType: imageFile.contentType,
data: processed.data,
);
final thumbnailStored = await fileStorage.storeFile(thumbnailFile);
storedFiles.add(thumbnailStored);
}
// Create database records
final imageRecord = await imageService.createImageRecord(
originalFile: storedFiles.first,
thumbnails: storedFiles.skip(1).toList(),
uploadedBy: request['user_id'],
metadata: processedImages.metadata,
);
return {
'success': true,
'image': {
'id': imageRecord.id,
'original': {
'filename': storedFiles.first.originalName,
'size': storedFiles.first.size,
'url': '/api/images/${imageRecord.id}/original',
},
'thumbnails': storedFiles.skip(1).map((f) => {
'size': _extractSizeFromFilename(f.originalName),
'url': '/api/images/${imageRecord.id}/thumbnail/${_extractSizeFromFilename(f.originalName)}',
}).toList(),
'metadata': processedImages.metadata,
},
};
},
);
// Chunked upload for large files
server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
'upload_chunk',
(request) async {
final chunkIndex = request['chunk_index'] as int;
final totalChunks = request['total_chunks'] as int;
final fileId = request['file_id'] as String;
final uploadedFiles = request.getAllUploadedFiles();
if (uploadedFiles.isEmpty) {
throw ValidationException('No chunk data uploaded');
}
final chunkFile = uploadedFiles.first;
// Store chunk temporarily
await chunkStorage.storeChunk(fileId, chunkIndex, chunkFile.data);
// Check if all chunks are uploaded
if (await chunkStorage.areAllChunksUploaded(fileId, totalChunks)) {
// Assemble final file
final assembledFile = await chunkStorage.assembleFile(fileId, totalChunks);
// Store the complete file
final storedFile = await fileStorage.storeFile(assembledFile);
// Clean up chunks
await chunkStorage.cleanupChunks(fileId);
// Create database record
final fileRecord = await fileService.createFileRecord(
storedFile: storedFile,
uploadedBy: request['user_id'],
);
return {
'success': true,
'completed': true,
'file': {
'id': fileRecord.id,
'filename': storedFile.originalName,
'size': storedFile.size,
},
};
} else {
return {
'success': true,
'completed': false,
'chunksReceived': await chunkStorage.getReceivedChunkCount(fileId),
'totalChunks': totalChunks,
};
}
},
);
File Upload Security
class SecureFileUploadMiddleware implements Middleware {
final FileUploadConfig config;
final VirusScanner? virusScanner;
SecureFileUploadMiddleware({
required this.config,
this.virusScanner,
});
@override
Future<dynamic> handle(
dynamic request,
Future<dynamic> Function(dynamic) next,
) async {
if (request is! HttpRequestContext) {
return await next(request);
}
final contentType = request.request.headers.contentType;
if (contentType?.mimeType != 'multipart/form-data') {
return await next(request);
}
try {
// Parse multipart data
final formData = await MultipartParser.parse(request.request);
// Security validations
await _validateFiles(formData.getAllFiles());
// Virus scanning (if configured)
if (virusScanner != null) {
await _scanFiles(formData.getAllFiles());
}
// Add form data to request context
request.data['_formData'] = formData;
request.data['_files'] = formData.files;
request.data['_fields'] = formData.fields;
// Add form fields to main data
for (final field in formData.fields.values) {
request.data[field.name] = field.value;
}
return await next(request);
} catch (e) {
// Log security violations
logger.warning('File upload security violation', extra: {
'client_ip': request.request.connectionInfo?.remoteAddress.address,
'user_agent': request.request.headers.value('user-agent'),
'error': e.toString(),
});
throw HttpException('File upload rejected: $e', 400);
}
}
Future<void> _validateFiles(List<UploadedFile> files) async {
for (final file in files) {
// Check file signature (magic bytes)
if (!_isValidFileSignature(file)) {
throw SecurityException('Invalid file signature for ${file.filename}');
}
// Check for executable files
if (_isExecutableFile(file)) {
throw SecurityException('Executable files not allowed');
}
// Check filename for path traversal
if (_containsPathTraversal(file.filename)) {
throw SecurityException('Invalid filename: ${file.filename}');
}
// Additional content validation
await _validateFileContent(file);
}
}
bool _isValidFileSignature(UploadedFile file) {
if (file.data.length < 4) return false;
final signature = file.data.take(4).toList();
final contentType = file.contentType?.toLowerCase();
// Check common file signatures
if (contentType?.startsWith('image/jpeg') == true) {
return signature[0] == 0xFF && signature[1] == 0xD8;
} else if (contentType?.startsWith('image/png') == true) {
return signature[0] == 0x89 && signature[1] == 0x50 &&
signature[2] == 0x4E && signature[3] == 0x47;
} else if (contentType == 'application/pdf') {
return signature[0] == 0x25 && signature[1] == 0x50 &&
signature[2] == 0x44 && signature[3] == 0x46;
}
return true; // Allow other types for now
}
bool _isExecutableFile(UploadedFile file) {
final filename = file.filename?.toLowerCase() ?? '';
final dangerousExtensions = [
'.exe', '.bat', '.cmd', '.com', '.scr', '.pif',
'.js', '.vbs', '.jar', '.app', '.deb', '.rpm',
];
return dangerousExtensions.any((ext) => filename.endsWith(ext));
}
bool _containsPathTraversal(String? filename) {
if (filename == null) return false;
return filename.contains('..') || filename.contains('/') || filename.contains('\\');
}
Future<void> _scanFiles(List<UploadedFile> files) async {
for (final file in files) {
final scanResult = await virusScanner!.scanFile(file.data);
if (scanResult.isInfected) {
throw SecurityException('Virus detected: ${scanResult.threatName}');
}
}
}
}
File Storage Management
class FileStorageService {
final FileStorageConfig config;
final Map<String, StoredFile> _fileCache = {};
FileStorageService({required this.config});
Future<StoredFile> storeFile(UploadedFile uploadedFile) async {
await _validateFile(uploadedFile);
final fileId = _generateFileId();
final storedPath = await _getStoragePath(fileId, uploadedFile.filename);
// Ensure directory exists
final file = File(storedPath);
await file.parent.create(recursive: true);
// Save file with atomic operation
final tempPath = '$storedPath.tmp';
final tempFile = File(tempPath);
await tempFile.writeAsBytes(uploadedFile.data);
await tempFile.rename(storedPath);
final storedFile = StoredFile(
id: fileId,
originalName: uploadedFile.filename ?? 'unknown',
storedPath: storedPath,
contentType: uploadedFile.contentType,
size: uploadedFile.size,
uploadedAt: DateTime.now(),
);
// Cache file info
_fileCache[fileId] = storedFile;
return storedFile;
}
Future<void> deleteFile(String fileId) async {
final storedFile = _fileCache[fileId];
if (storedFile != null) {
final file = File(storedFile.storedPath);
if (await file.exists()) {
await file.delete();
}
_fileCache.remove(fileId);
}
}
Future<void> cleanupOldFiles({Duration? olderThan}) async {
olderThan ??= Duration(days: 30);
final cutoffDate = DateTime.now().subtract(olderThan);
final directory = Directory(config.baseDirectory);
if (!await directory.exists()) return;
await for (final entity in directory.list(recursive: true)) {
if (entity is File) {
final stat = await entity.stat();
if (stat.modified.isBefore(cutoffDate)) {
await entity.delete();
logger.info('Cleaned up old file: ${entity.path}');
}
}
}
}
}
Background Job Scheduling
Hypermodern servers include a built-in job scheduling system for handling background tasks, delayed execution, and long-running processes without blocking your main application.
Quick Start with Scheduling
import 'package:hypermodern_server/hypermodern_server.dart';
// Define a scheduled job
class SendWelcomeEmailJob extends ScheduledJob {
@override
String get identifier => 'send_welcome_email';
@override
Future<void> execute(Map<String, dynamic> parameters) async {
final email = parameters['email'] as String;
final name = parameters['name'] as String;
await emailService.sendWelcomeEmail(email, name);
}
}
void main() async {
final server = HypermodernServer();
// Set up job scheduling
final jobRegistry = JobRegistry();
final scheduler = JobScheduler(registry: jobRegistry);
// Register jobs
jobRegistry.register('send_welcome_email', () => SendWelcomeEmailJob());
// Start the scheduler
scheduler.start();
// In your endpoint handlers, schedule jobs
server.registerEndpoint<CreateUserRequest, User>(
'create_user',
(request) async {
final user = await userService.createUser(request);
// Schedule welcome email for 5 minutes later
await scheduler.scheduleDelayed(
'send_welcome_email',
Duration(minutes: 5),
{
'email': user.email,
'name': user.username,
},
description: 'Send welcome email to new user',
);
return user;
},
);
await server.listen();
}
Integration with Server Lifecycle
class HypermodernServerWithScheduling extends HypermodernServer {
late JobScheduler _scheduler;
late JobRegistry _jobRegistry;
@override
Future<void> initialize() async {
await super.initialize();
// Initialize scheduling system
_jobRegistry = JobRegistry();
_scheduler = JobScheduler(
registry: _jobRegistry,
executionInterval: Duration(seconds: 30),
maxTasksPerCycle: 50,
);
// Register your jobs
_registerJobs();
// Start scheduler
_scheduler.start();
}
void _registerJobs() {
_jobRegistry.register('send_welcome_email', () => SendWelcomeEmailJob());
_jobRegistry.register('cleanup_old_data', () => CleanupJob());
_jobRegistry.register('generate_report', () => ReportJob());
}
JobScheduler get scheduler => _scheduler;
@override
Future<void> shutdown() async {
_scheduler.stop();
await super.shutdown();
}
}
The scheduling system provides persistent job storage, automatic retries, error handling, and comprehensive monitoring. For detailed information on creating complex jobs, handling failures, and performance optimization, see Chapter 11.5: Background Job Scheduling.
What's Next
You now have a solid foundation in Hypermodern server development, including background job scheduling. In the next chapter, we'll explore the multi-protocol communication system in detail, learning how to optimize for each protocol's strengths while maintaining a unified codebase.