Skip to main content

Server Development

Creating Hypermodern Servers

Building a Hypermodern server means creating an application that can simultaneously handle HTTP, WebSocket, and TCP connections while sharing the same business logic across all protocols.

Basic Server Setup

import 'package:hypermodern_server/hypermodern_server.dart';
import 'generated/models.dart';
import 'generated/server.dart';

void main() async {
  final server = HypermodernServer();
  
  // Configure server
  server.configure(ServerConfig(
    // Protocol ports
    httpPort: 8080,
    wsPort: 8082,
    tcpPort: 8081,
    
    // Performance settings
    maxConnections: 1000,
    requestTimeout: Duration(seconds: 30),
    keepAliveTimeout: Duration(seconds: 60),
    
    // Security
    corsEnabled: true,
    corsOrigins: ['http://localhost:3000', 'https://myapp.com'],
    rateLimitEnabled: true,
    rateLimitRequests: 100,
    rateLimitWindow: Duration(minutes: 1),
  ));
  
  // Register endpoints
  _registerEndpoints(server);
  
  // Start all protocol servers
  await server.listen();
  
  print('🚀 Hypermodern server running on:');
  print('   HTTP:      http://localhost:8080');
  print('   WebSocket: ws://localhost:8082');
  print('   TCP:       localhost:8081');
}

Server Configuration Options

class ServerConfig {
  // Protocol configuration
  final int httpPort;
  final int wsPort;
  final int tcpPort;
  final String bindAddress;
  
  // Performance tuning
  final int maxConnections;
  final int maxRequestSize;
  final Duration requestTimeout;
  final Duration keepAliveTimeout;
  final int workerThreads;
  
  // Security settings
  final bool corsEnabled;
  final List<String> corsOrigins;
  final bool rateLimitEnabled;
  final int rateLimitRequests;
  final Duration rateLimitWindow;
  final bool validateRequests;
  
  // Logging and monitoring
  final LogLevel logLevel;
  final bool metricsEnabled;
  final String? metricsEndpoint;
  
  // SSL/TLS (for production)
  final String? sslCertPath;
  final String? sslKeyPath;
  final bool requireSsl;
  
  const ServerConfig({
    this.httpPort = 8080,
    this.wsPort = 8082,
    this.tcpPort = 8081,
    this.bindAddress = '0.0.0.0',
    this.maxConnections = 1000,
    this.maxRequestSize = 1024 * 1024, // 1MB
    this.requestTimeout = const Duration(seconds: 30),
    this.keepAliveTimeout = const Duration(seconds: 60),
    this.workerThreads = 4,
    this.corsEnabled = false,
    this.corsOrigins = const [],
    this.rateLimitEnabled = false,
    this.rateLimitRequests = 100,
    this.rateLimitWindow = const Duration(minutes: 1),
    this.validateRequests = true,
    this.logLevel = LogLevel.info,
    this.metricsEnabled = false,
    this.metricsEndpoint,
    this.sslCertPath,
    this.sslKeyPath,
    this.requireSsl = false,
  });
}

Implementing Endpoint Handlers

Endpoint handlers contain your business logic and are automatically available across all protocols.

Basic Endpoint Implementation

void _registerEndpoints(HypermodernServer server) {
  // Simple request-response endpoint
  server.registerEndpoint<GetUserRequest, User>(
    'get_user',
    (request) async {
      final user = await userService.getUserById(request.id);
      if (user == null) {
        throw NotFoundException('User not found', details: {
          'user_id': request.id.toString(),
        });
      }
      return user;
    },
  );
  
  // Endpoint with validation
  server.registerEndpoint<CreateUserRequest, User>(
    'create_user',
    (request) async {
      // Validate request
      if (request.username.length < 3) {
        throw ValidationException('Username too short', fieldErrors: {
          'username': 'Must be at least 3 characters',
        });
      }
      
      if (!_isValidEmail(request.email)) {
        throw ValidationException('Invalid email format', fieldErrors: {
          'email': 'Must be a valid email address',
        });
      }
      
      // Check for existing user
      final existingUser = await userService.getUserByEmail(request.email);
      if (existingUser != null) {
        throw ConflictException('User already exists', details: {
          'email': request.email,
        });
      }
      
      // Create user
      final user = await userService.createUser(
        username: request.username,
        email: request.email,
        passwordHash: _hashPassword(request.password),
      );
      
      return user;
    },
  );
  
  // Endpoint with complex response
  server.registerEndpoint<SearchUsersRequest, SearchUsersResponse>(
    'search_users',
    (request) async {
      final results = await userService.searchUsers(
        query: request.query,
        filters: request.filters,
        pagination: request.pagination,
        sort: request.sort,
      );
      
      return SearchUsersResponse(
        users: results.users,
        totalCount: results.totalCount,
        page: request.pagination.page,
        hasMore: results.hasMore,
        facets: results.facets,
      );
    },
  );
}

Async and Database Operations

class UserService {
  final Database db;
  
  UserService(this.db);
  
  Future<User?> getUserById(int id) async {
    final result = await db.query(
      'SELECT * FROM users WHERE id = ?',
      [id],
    );
    
    if (result.isEmpty) return null;
    
    return User.fromJson(result.first);
  }
  
  Future<User> createUser({
    required String username,
    required String email,
    required String passwordHash,
  }) async {
    final now = DateTime.now();
    
    final result = await db.query(
      '''
      INSERT INTO users (username, email, password_hash, created_at, updated_at)
      VALUES (?, ?, ?, ?, ?)
      RETURNING *
      ''',
      [username, email, passwordHash, now, now],
    );
    
    return User.fromJson(result.first);
  }
  
  Future<SearchResult> searchUsers({
    String? query,
    UserFilters? filters,
    required Pagination pagination,
    required Sort sort,
  }) async {
    final queryBuilder = QueryBuilder('users');
    
    // Add search conditions
    if (query != null && query.isNotEmpty) {
      queryBuilder.where('username ILIKE ? OR email ILIKE ?', 
                        ['%$query%', '%$query%']);
    }
    
    // Add filters
    if (filters?.status != null) {
      queryBuilder.where('status = ?', [filters!.status!.toString()]);
    }
    
    if (filters?.createdAfter != null) {
      queryBuilder.where('created_at > ?', [filters!.createdAfter]);
    }
    
    // Add sorting
    queryBuilder.orderBy('${sort.field} ${sort.direction}');
    
    // Add pagination
    queryBuilder.limit(pagination.limit);
    queryBuilder.offset(pagination.page * pagination.limit);
    
    // Execute queries
    final countQuery = queryBuilder.buildCountQuery();
    final dataQuery = queryBuilder.buildSelectQuery();
    
    final [countResult, dataResult] = await Future.wait([
      db.query(countQuery.sql, countQuery.parameters),
      db.query(dataQuery.sql, dataQuery.parameters),
    ]);
    
    final totalCount = countResult.first['count'] as int;
    final users = dataResult.map((row) => User.fromJson(row)).toList();
    
    return SearchResult(
      users: users,
      totalCount: totalCount,
      hasMore: (pagination.page + 1) * pagination.limit < totalCount,
    );
  }
}

Error Handling in Endpoints

server.registerEndpoint<UpdateUserRequest, User>(
  'update_user',
  (request) async {
    try {
      // Validate permissions
      final currentUser = await getCurrentUser(request.context);
      if (currentUser.id != request.id && !currentUser.isAdmin) {
        throw ForbiddenException('Cannot update other users');
      }
      
      // Get existing user
      final existingUser = await userService.getUserById(request.id);
      if (existingUser == null) {
        throw NotFoundException('User not found');
      }
      
      // Validate updates
      if (request.email != null && !_isValidEmail(request.email!)) {
        throw ValidationException('Invalid email format');
      }
      
      // Check for email conflicts
      if (request.email != null && request.email != existingUser.email) {
        final emailExists = await userService.getUserByEmail(request.email!);
        if (emailExists != null) {
          throw ConflictException('Email already in use');
        }
      }
      
      // Perform update
      final updatedUser = await userService.updateUser(
        id: request.id,
        username: request.username,
        email: request.email,
        profile: request.profile,
      );
      
      // Log the update
      await auditService.logUserUpdate(
        userId: currentUser.id,
        targetUserId: request.id,
        changes: request.getChanges(existingUser),
      );
      
      return updatedUser;
      
    } on DatabaseException catch (e) {
      // Handle database-specific errors
      if (e.isUniqueConstraintViolation) {
        throw ConflictException('Duplicate value detected');
      } else if (e.isForeignKeyViolation) {
        throw ValidationException('Referenced entity does not exist');
      } else {
        throw ServerException('Database operation failed');
      }
    } on ValidationException {
      rethrow; // Re-throw validation errors as-is
    } catch (e) {
      // Log unexpected errors
      logger.error('Unexpected error in update_user', error: e);
      throw ServerException('Internal server error');
    }
  },
);

Middleware and Request Processing

Middleware provides a powerful way to add cross-cutting concerns like authentication, logging, and validation.

Built-in Middleware

void configureMiddleware(HypermodernServer server) {
  // Request logging
  server.middleware.add(LoggingMiddleware(
    logRequests: true,
    logResponses: true,
    logHeaders: false, // Don't log sensitive headers
    logBodies: false,  // Don't log request/response bodies
  ));
  
  // CORS handling
  server.middleware.add(CorsMiddleware(
    allowedOrigins: ['http://localhost:3000', 'https://myapp.com'],
    allowedMethods: ['GET', 'POST', 'PUT', 'DELETE'],
    allowedHeaders: ['Content-Type', 'Authorization'],
    allowCredentials: true,
  ));
  
  // Rate limiting
  server.middleware.add(RateLimitMiddleware(
    requests: 100,
    window: Duration(minutes: 1),
    keyGenerator: (request) => request.clientIp,
  ));
  
  // Request validation
  server.middleware.add(ValidationMiddleware(
    validateRequests: true,
    validateResponses: false, // Disable in production
  ));
  
  // Authentication (applied to specific endpoints)
  server.middleware.add(AuthenticationMiddleware(
    jwtSecret: 'your-secret-key',
    requiredForEndpoints: [
      'create_user',
      'update_user',
      'delete_user',
    ],
  ));
}

Custom Middleware

class TimingMiddleware implements Middleware {
  @override
  Future<dynamic> handle(
    dynamic request,
    Future<dynamic> Function(dynamic) next,
  ) async {
    final stopwatch = Stopwatch()..start();
    
    try {
      final response = await next(request);
      
      final duration = stopwatch.elapsedMilliseconds;
      logger.info('Request completed in ${duration}ms', extra: {
        'endpoint': request.endpoint,
        'duration_ms': duration,
      });
      
      return response;
    } catch (e) {
      final duration = stopwatch.elapsedMilliseconds;
      logger.error('Request failed in ${duration}ms', error: e, extra: {
        'endpoint': request.endpoint,
        'duration_ms': duration,
      });
      rethrow;
    }
  }
}

class CachingMiddleware implements Middleware {
  final Map<String, CacheEntry> _cache = {};
  final Duration defaultTtl;
  
  CachingMiddleware({this.defaultTtl = const Duration(minutes: 5)});
  
  @override
  Future<dynamic> handle(
    dynamic request,
    Future<dynamic> Function(dynamic) next,
  ) async {
    // Only cache GET requests
    if (request.method != 'GET') {
      return await next(request);
    }
    
    final cacheKey = _generateCacheKey(request);
    final entry = _cache[cacheKey];
    
    // Return cached response if valid
    if (entry != null && !entry.isExpired) {
      logger.debug('Cache hit for $cacheKey');
      return entry.data;
    }
    
    // Execute request and cache result
    final response = await next(request);
    
    _cache[cacheKey] = CacheEntry(
      data: response,
      expiresAt: DateTime.now().add(defaultTtl),
    );
    
    logger.debug('Cached response for $cacheKey');
    return response;
  }
  
  String _generateCacheKey(dynamic request) {
    return '${request.endpoint}:${request.hashCode}';
  }
}

class SecurityHeadersMiddleware implements Middleware {
  @override
  Future<dynamic> handle(
    dynamic request,
    Future<dynamic> Function(dynamic) next,
  ) async {
    final response = await next(request);
    
    // Add security headers for HTTP responses
    if (request.protocol == 'http') {
      response.headers.addAll({
        'X-Content-Type-Options': 'nosniff',
        'X-Frame-Options': 'DENY',
        'X-XSS-Protection': '1; mode=block',
        'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
        'Content-Security-Policy': "default-src 'self'",
      });
    }
    
    return response;
  }
}

Conditional Middleware

Apply middleware only to specific endpoints or conditions:

class ConditionalMiddleware implements Middleware {
  final Middleware middleware;
  final bool Function(dynamic request) condition;
  
  ConditionalMiddleware(this.middleware, this.condition);
  
  @override
  Future<dynamic> handle(
    dynamic request,
    Future<dynamic> Function(dynamic) next,
  ) async {
    if (condition(request)) {
      return await middleware.handle(request, next);
    } else {
      return await next(request);
    }
  }
}

// Usage
server.middleware.add(ConditionalMiddleware(
  AuthenticationMiddleware(jwtSecret: 'secret'),
  (request) => ['create_user', 'update_user'].contains(request.endpoint),
));

server.middleware.add(ConditionalMiddleware(
  CachingMiddleware(defaultTtl: Duration(hours: 1)),
  (request) => request.method == 'GET' && request.endpoint.startsWith('get_'),
));

Streaming Endpoints

Streaming endpoints enable real-time communication and are automatically available over WebSocket and TCP protocols.

Basic Streaming

void registerStreamingEndpoints(HypermodernServer server) {
  // Server-to-client streaming
  server.registerStreamingEndpoint<WatchUsersRequest, UserUpdate>(
    'watch_users',
    (request) async* {
      // Validate request
      if (request.userIds.isEmpty) {
        throw ValidationException('At least one user ID required');
      }
      
      // Create stream controller
      final controller = StreamController<UserUpdate>();
      
      // Set up database change listener
      final subscription = userService.watchUsers(request.userIds).listen(
        (update) => controller.add(update),
        onError: (error) => controller.addError(error),
        onDone: () => controller.close(),
      );
      
      // Clean up when client disconnects
      controller.onCancel = () {
        subscription.cancel();
      };
      
      yield* controller.stream;
    },
  );
  
  // Bidirectional streaming (chat example)
  server.registerBidirectionalStreamingEndpoint<ChatMessage, ChatMessage>(
    'chat_messages',
    (incomingMessages, outgoingMessages) async {
      final roomId = await _extractRoomId(incomingMessages);
      
      // Join chat room
      await chatService.joinRoom(roomId);
      
      // Forward incoming messages to chat service
      final incomingSubscription = incomingMessages.listen(
        (message) async {
          // Validate and process message
          final processedMessage = await chatService.processMessage(message);
          
          // Broadcast to all room members
          await chatService.broadcastMessage(roomId, processedMessage);
        },
        onError: (error) => logger.error('Chat message error', error: error),
      );
      
      // Forward room messages to client
      final outgoingSubscription = chatService.getRoomMessages(roomId).listen(
        (message) => outgoingMessages.add(message),
        onError: (error) => outgoingMessages.addError(error),
      );
      
      // Clean up when connection closes
      outgoingMessages.onCancel = () async {
        await incomingSubscription.cancel();
        await outgoingSubscription.cancel();
        await chatService.leaveRoom(roomId);
      };
    },
  );
}

Advanced Streaming Patterns

// Stream with filtering and transformation
server.registerStreamingEndpoint<WatchOrdersRequest, OrderUpdate>(
  'watch_orders',
  (request) async* {
    final userId = await getCurrentUserId(request.context);
    
    // Create filtered stream
    final orderStream = orderService
        .watchAllOrders()
        .where((order) => order.userId == userId)
        .where((order) => request.statuses.isEmpty || 
                         request.statuses.contains(order.status))
        .map((order) => OrderUpdate.fromOrder(order));
    
    // Add rate limiting to prevent overwhelming clients
    final rateLimitedStream = orderStream.throttle(Duration(milliseconds: 100));
    
    yield* rateLimitedStream;
  },
);

// Stream with backpressure handling
server.registerStreamingEndpoint<WatchMetricsRequest, MetricUpdate>(
  'watch_metrics',
  (request) async* {
    final controller = StreamController<MetricUpdate>();
    
    // High-frequency metrics source
    final metricsSubscription = metricsService.getMetricsStream().listen(
      (metric) {
        // Handle backpressure
        if (controller.hasListener && !controller.isPaused) {
          controller.add(MetricUpdate.fromMetric(metric));
        }
      },
      onError: (error) => controller.addError(error),
    );
    
    // Pause/resume based on client backpressure
    controller.onPause = () => metricsSubscription.pause();
    controller.onResume = () => metricsSubscription.resume();
    controller.onCancel = () => metricsSubscription.cancel();
    
    yield* controller.stream;
  },
);

// Stream with periodic updates
server.registerStreamingEndpoint<WatchSystemStatusRequest, SystemStatus>(
  'watch_system_status',
  (request) async* {
    while (true) {
      try {
        final status = await systemService.getCurrentStatus();
        yield status;
        
        // Wait for next update interval
        await Future.delayed(Duration(seconds: request.intervalSeconds));
      } catch (e) {
        // Log error but continue streaming
        logger.error('Error getting system status', error: e);
        
        // Send error status
        yield SystemStatus(
          healthy: false,
          error: e.toString(),
          timestamp: DateTime.now(),
        );
        
        // Wait before retrying
        await Future.delayed(Duration(seconds: 5));
      }
    }
  },
);

Stream Connection Management

class StreamManager {
  final Map<String, Set<StreamSubscription>> _activeStreams = {};
  final Map<String, int> _connectionCounts = {};
  
  void registerStream(String clientId, String endpoint, StreamSubscription subscription) {
    _activeStreams.putIfAbsent(clientId, () => {}).add(subscription);
    _connectionCounts[endpoint] = (_connectionCounts[endpoint] ?? 0) + 1;
    
    logger.info('Client $clientId subscribed to $endpoint', extra: {
      'endpoint': endpoint,
      'total_connections': _connectionCounts[endpoint],
    });
  }
  
  Future<void> disconnectClient(String clientId) async {
    final streams = _activeStreams.remove(clientId);
    if (streams != null) {
      for (final stream in streams) {
        await stream.cancel();
      }
      
      logger.info('Disconnected client $clientId', extra: {
        'streams_closed': streams.length,
      });
    }
  }
  
  Map<String, int> getConnectionStats() {
    return Map.from(_connectionCounts);
  }
  
  Future<void> shutdown() async {
    for (final streams in _activeStreams.values) {
      for (final stream in streams) {
        await stream.cancel();
      }
    }
    _activeStreams.clear();
    _connectionCounts.clear();
  }
}

Database Integration and ORM

Hypermodern servers typically need robust database integration for persistent storage.

Database Connection Setup

class DatabaseManager {
  late Database _db;
  late ConnectionPool _pool;
  
  Future<void> initialize() async {
    _pool = ConnectionPool(
      host: 'localhost',
      port: 5432,
      database: 'hypermodern_app',
      username: 'app_user',
      password: 'secure_password',
      minConnections: 5,
      maxConnections: 20,
      connectionTimeout: Duration(seconds: 10),
      idleTimeout: Duration(minutes: 10),
    );
    
    _db = Database(_pool);
    
    // Run migrations
    await _runMigrations();
    
    // Set up connection monitoring
    _pool.onConnectionCreated.listen((conn) {
      logger.debug('Database connection created: ${conn.id}');
    });
    
    _pool.onConnectionClosed.listen((conn) {
      logger.debug('Database connection closed: ${conn.id}');
    });
  }
  
  Database get database => _db;
  
  Future<void> _runMigrations() async {
    final migrator = Migrator(_db);
    await migrator.runPendingMigrations();
  }
  
  Future<void> close() async {
    await _pool.close();
  }
}

Repository Pattern

abstract class Repository<T> {
  Future<T?> findById(int id);
  Future<List<T>> findAll();
  Future<T> create(T entity);
  Future<T> update(T entity);
  Future<void> delete(int id);
}

class UserRepository implements Repository<User> {
  final Database db;
  
  UserRepository(this.db);
  
  @override
  Future<User?> findById(int id) async {
    final result = await db.query(
      'SELECT * FROM users WHERE id = ? AND deleted_at IS NULL',
      [id],
    );
    
    return result.isEmpty ? null : User.fromJson(result.first);
  }
  
  @override
  Future<List<User>> findAll() async {
    final result = await db.query(
      'SELECT * FROM users WHERE deleted_at IS NULL ORDER BY created_at DESC',
    );
    
    return result.map((row) => User.fromJson(row)).toList();
  }
  
  Future<User?> findByEmail(String email) async {
    final result = await db.query(
      'SELECT * FROM users WHERE email = ? AND deleted_at IS NULL',
      [email],
    );
    
    return result.isEmpty ? null : User.fromJson(result.first);
  }
  
  Future<List<User>> search({
    String? query,
    UserStatus? status,
    DateTime? createdAfter,
    int limit = 50,
    int offset = 0,
  }) async {
    final queryBuilder = QueryBuilder('users')
        .where('deleted_at IS NULL');
    
    if (query != null && query.isNotEmpty) {
      queryBuilder.where(
        '(username ILIKE ? OR email ILIKE ?)',
        ['%$query%', '%$query%'],
      );
    }
    
    if (status != null) {
      queryBuilder.where('status = ?', [status.toString()]);
    }
    
    if (createdAfter != null) {
      queryBuilder.where('created_at > ?', [createdAfter]);
    }
    
    final query_result = queryBuilder
        .orderBy('created_at DESC')
        .limit(limit)
        .offset(offset)
        .build();
    
    final result = await db.query(query_result.sql, query_result.parameters);
    return result.map((row) => User.fromJson(row)).toList();
  }
  
  @override
  Future<User> create(User user) async {
    final result = await db.query(
      '''
      INSERT INTO users (username, email, password_hash, status, created_at, updated_at)
      VALUES (?, ?, ?, ?, ?, ?)
      RETURNING *
      ''',
      [
        user.username,
        user.email,
        user.passwordHash,
        user.status.toString(),
        user.createdAt,
        user.updatedAt,
      ],
    );
    
    return User.fromJson(result.first);
  }
  
  @override
  Future<User> update(User user) async {
    final result = await db.query(
      '''
      UPDATE users 
      SET username = ?, email = ?, status = ?, updated_at = ?
      WHERE id = ? AND deleted_at IS NULL
      RETURNING *
      ''',
      [
        user.username,
        user.email,
        user.status.toString(),
        DateTime.now(),
        user.id,
      ],
    );
    
    if (result.isEmpty) {
      throw NotFoundException('User not found');
    }
    
    return User.fromJson(result.first);
  }
  
  @override
  Future<void> delete(int id) async {
    // Soft delete
    final result = await db.query(
      '''
      UPDATE users 
      SET deleted_at = ?
      WHERE id = ? AND deleted_at IS NULL
      ''',
      [DateTime.now(), id],
    );
    
    if (result.affectedRows == 0) {
      throw NotFoundException('User not found');
    }
  }
}

Transaction Management

class UserService {
  final UserRepository userRepo;
  final ProfileRepository profileRepo;
  final Database db;
  
  UserService(this.userRepo, this.profileRepo, this.db);
  
  Future<User> createUserWithProfile({
    required String username,
    required String email,
    required String password,
    required UserProfile profile,
  }) async {
    return await db.transaction((tx) async {
      // Create user
      final user = await userRepo.create(User(
        username: username,
        email: email,
        passwordHash: _hashPassword(password),
        status: UserStatus.active,
        createdAt: DateTime.now(),
        updatedAt: DateTime.now(),
      ));
      
      // Create profile
      final userProfile = await profileRepo.create(profile.copyWith(
        userId: user.id,
      ));
      
      // Return user with profile
      return user.copyWith(profile: userProfile);
    });
  }
  
  Future<void> transferUserData(int fromUserId, int toUserId) async {
    await db.transaction((tx) async {
      // Verify both users exist
      final fromUser = await userRepo.findById(fromUserId);
      final toUser = await userRepo.findById(toUserId);
      
      if (fromUser == null || toUser == null) {
        throw NotFoundException('One or both users not found');
      }
      
      // Transfer posts
      await tx.query(
        'UPDATE posts SET author_id = ? WHERE author_id = ?',
        [toUserId, fromUserId],
      );
      
      // Transfer comments
      await tx.query(
        'UPDATE comments SET user_id = ? WHERE user_id = ?',
        [toUserId, fromUserId],
      );
      
      // Merge user preferences
      await _mergeUserPreferences(tx, fromUserId, toUserId);
      
      // Deactivate source user
      await tx.query(
        'UPDATE users SET status = ?, updated_at = ? WHERE id = ?',
        [UserStatus.inactive.toString(), DateTime.now(), fromUserId],
      );
    });
  }
}

File Upload Handling

Hypermodern servers provide comprehensive file upload support with multipart form data parsing, validation, and storage management.

Basic File Upload Setup

import 'package:hypermodern_server/hypermodern_server.dart';

void configureFileUploads(HypermodernServer server) {
  // Configure file upload middleware
  final uploadConfig = FileUploadConfig(
    uploadDir: 'temp_uploads',
    maxFileSize: 50 * 1024 * 1024, // 50MB
    maxFiles: 10,
    allowedExtensions: ['.jpg', '.png', '.pdf', '.txt', '.doc', '.docx'],
    allowedMimeTypes: [
      'image/jpeg',
      'image/png', 
      'application/pdf',
      'text/plain',
      'application/msword',
    ],
    autoSave: false, // Handle saving manually for more control
  );
  
  server.addMiddleware(FileUploadMiddleware(config: uploadConfig));
  
  // Set up file storage service
  final fileStorage = FileStorageService(
    config: FileStorageConfig(
      baseDirectory: 'storage/uploads',
      organizeByDate: true,
      preserveOriginalNames: true,
      maxFileSize: 50 * 1024 * 1024,
    ),
  );
  
  // Register file upload endpoints
  _registerFileUploadEndpoints(server, fileStorage);
}

File Upload Endpoints

void _registerFileUploadEndpoints(
  HypermodernServer server, 
  FileStorageService fileStorage,
) {
  // Single file upload
  server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
    'upload_file',
    (request) async {
      final uploadedFiles = request.getAllUploadedFiles();
      
      if (uploadedFiles.isEmpty) {
        throw ValidationException('No files uploaded');
      }
      
      if (uploadedFiles.length > 1) {
        throw ValidationException('Only one file allowed for this endpoint');
      }
      
      final file = uploadedFiles.first;
      
      // Additional validation
      if (file.size > 10 * 1024 * 1024) { // 10MB limit for this endpoint
        throw ValidationException('File too large (max 10MB)');
      }
      
      // Store the file
      final storedFile = await fileStorage.storeFile(file);
      
      // Save file metadata to database
      final fileRecord = await fileService.createFileRecord(
        storedFile: storedFile,
        uploadedBy: request['user_id'],
        description: request['description'],
        category: request['category'],
      );
      
      return {
        'success': true,
        'file': {
          'id': fileRecord.id,
          'filename': storedFile.originalName,
          'size': storedFile.size,
          'contentType': storedFile.contentType,
          'uploadedAt': storedFile.uploadedAt.toIso8601String(),
        },
      };
    },
  );
  
  // Multiple file upload
  server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
    'upload_files',
    (request) async {
      final uploadedFiles = request.getAllUploadedFiles();
      
      if (uploadedFiles.isEmpty) {
        throw ValidationException('No files uploaded');
      }
      
      // Validate total size
      final totalSize = uploadedFiles.fold<int>(
        0, 
        (sum, file) => sum + file.size,
      );
      
      if (totalSize > 100 * 1024 * 1024) { // 100MB total limit
        throw ValidationException('Total file size too large (max 100MB)');
      }
      
      // Store all files
      final storedFiles = await fileStorage.storeFiles(uploadedFiles);
      
      // Create database records
      final fileRecords = <Map<String, dynamic>>[];
      for (int i = 0; i < storedFiles.length; i++) {
        final stored = storedFiles[i];
        final uploaded = uploadedFiles[i];
        
        final record = await fileService.createFileRecord(
          storedFile: stored,
          uploadedBy: request['user_id'],
          description: request['descriptions']?[uploaded.fieldName],
          category: request['category'],
        );
        
        fileRecords.add({
          'id': record.id,
          'fieldName': uploaded.fieldName,
          'filename': stored.originalName,
          'size': stored.size,
          'contentType': stored.contentType,
        });
      }
      
      return {
        'success': true,
        'files': fileRecords,
        'totalFiles': fileRecords.length,
        'totalSize': totalSize,
      };
    },
  );
  
  // File download endpoint
  server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
    'download_file',
    (request) async {
      final fileId = request['file_id'] as String?;
      if (fileId == null) {
        throw ValidationException('file_id is required');
      }
      
      // Get file info from database
      final fileRecord = await fileService.getFileRecord(fileId);
      if (fileRecord == null) {
        throw NotFoundException('File not found');
      }
      
      // Check permissions
      final currentUserId = request['user_id'];
      if (!await fileService.canUserAccessFile(currentUserId, fileRecord.id)) {
        throw ForbiddenException('Access denied');
      }
      
      // Get file from storage
      final file = await fileStorage.getFile(fileRecord.storageId);
      if (file == null || !await file.exists()) {
        throw NotFoundException('File not found in storage');
      }
      
      // For HTTP requests, we'd typically stream the file
      // For this example, we'll return file info and let the client
      // make a separate request to a file serving endpoint
      return {
        'success': true,
        'file': {
          'id': fileRecord.id,
          'filename': fileRecord.originalName,
          'size': fileRecord.size,
          'contentType': fileRecord.contentType,
          'downloadUrl': '/api/files/${fileRecord.id}/download',
        },
      };
    },
  );
}

Advanced File Upload Features

// Image processing endpoint
server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
  'upload_image',
  (request) async {
    final uploadedFiles = request.getAllUploadedFiles();
    final imageFile = uploadedFiles.firstWhere(
      (f) => f.contentType?.startsWith('image/') == true,
      orElse: () => throw ValidationException('No image file found'),
    );
    
    // Process image
    final processedImages = await imageService.processImage(
      imageFile.data,
      options: ImageProcessingOptions(
        generateThumbnails: true,
        thumbnailSizes: [150, 300, 600],
        optimizeForWeb: true,
        maxWidth: request['max_width'] ?? 1920,
        maxHeight: request['max_height'] ?? 1080,
        quality: request['quality'] ?? 85,
      ),
    );
    
    // Store original and processed versions
    final storedFiles = <StoredFile>[];
    
    // Store original
    final originalStored = await fileStorage.storeFile(imageFile);
    storedFiles.add(originalStored);
    
    // Store thumbnails
    for (final processed in processedImages.thumbnails) {
      final thumbnailFile = UploadedFile(
        fieldName: 'thumbnail_${processed.size}',
        filename: 'thumb_${processed.size}_${imageFile.filename}',
        contentType: imageFile.contentType,
        data: processed.data,
      );
      
      final thumbnailStored = await fileStorage.storeFile(thumbnailFile);
      storedFiles.add(thumbnailStored);
    }
    
    // Create database records
    final imageRecord = await imageService.createImageRecord(
      originalFile: storedFiles.first,
      thumbnails: storedFiles.skip(1).toList(),
      uploadedBy: request['user_id'],
      metadata: processedImages.metadata,
    );
    
    return {
      'success': true,
      'image': {
        'id': imageRecord.id,
        'original': {
          'filename': storedFiles.first.originalName,
          'size': storedFiles.first.size,
          'url': '/api/images/${imageRecord.id}/original',
        },
        'thumbnails': storedFiles.skip(1).map((f) => {
          'size': _extractSizeFromFilename(f.originalName),
          'url': '/api/images/${imageRecord.id}/thumbnail/${_extractSizeFromFilename(f.originalName)}',
        }).toList(),
        'metadata': processedImages.metadata,
      },
    };
  },
);

// Chunked upload for large files
server.registerEndpoint<Map<String, dynamic>, Map<String, dynamic>>(
  'upload_chunk',
  (request) async {
    final chunkIndex = request['chunk_index'] as int;
    final totalChunks = request['total_chunks'] as int;
    final fileId = request['file_id'] as String;
    final uploadedFiles = request.getAllUploadedFiles();
    
    if (uploadedFiles.isEmpty) {
      throw ValidationException('No chunk data uploaded');
    }
    
    final chunkFile = uploadedFiles.first;
    
    // Store chunk temporarily
    await chunkStorage.storeChunk(fileId, chunkIndex, chunkFile.data);
    
    // Check if all chunks are uploaded
    if (await chunkStorage.areAllChunksUploaded(fileId, totalChunks)) {
      // Assemble final file
      final assembledFile = await chunkStorage.assembleFile(fileId, totalChunks);
      
      // Store the complete file
      final storedFile = await fileStorage.storeFile(assembledFile);
      
      // Clean up chunks
      await chunkStorage.cleanupChunks(fileId);
      
      // Create database record
      final fileRecord = await fileService.createFileRecord(
        storedFile: storedFile,
        uploadedBy: request['user_id'],
      );
      
      return {
        'success': true,
        'completed': true,
        'file': {
          'id': fileRecord.id,
          'filename': storedFile.originalName,
          'size': storedFile.size,
        },
      };
    } else {
      return {
        'success': true,
        'completed': false,
        'chunksReceived': await chunkStorage.getReceivedChunkCount(fileId),
        'totalChunks': totalChunks,
      };
    }
  },
);

File Upload Security

class SecureFileUploadMiddleware implements Middleware {
  final FileUploadConfig config;
  final VirusScanner? virusScanner;
  
  SecureFileUploadMiddleware({
    required this.config,
    this.virusScanner,
  });
  
  @override
  Future<dynamic> handle(
    dynamic request,
    Future<dynamic> Function(dynamic) next,
  ) async {
    if (request is! HttpRequestContext) {
      return await next(request);
    }
    
    final contentType = request.request.headers.contentType;
    if (contentType?.mimeType != 'multipart/form-data') {
      return await next(request);
    }
    
    try {
      // Parse multipart data
      final formData = await MultipartParser.parse(request.request);
      
      // Security validations
      await _validateFiles(formData.getAllFiles());
      
      // Virus scanning (if configured)
      if (virusScanner != null) {
        await _scanFiles(formData.getAllFiles());
      }
      
      // Add form data to request context
      request.data['_formData'] = formData;
      request.data['_files'] = formData.files;
      request.data['_fields'] = formData.fields;
      
      // Add form fields to main data
      for (final field in formData.fields.values) {
        request.data[field.name] = field.value;
      }
      
      return await next(request);
    } catch (e) {
      // Log security violations
      logger.warning('File upload security violation', extra: {
        'client_ip': request.request.connectionInfo?.remoteAddress.address,
        'user_agent': request.request.headers.value('user-agent'),
        'error': e.toString(),
      });
      
      throw HttpException('File upload rejected: $e', 400);
    }
  }
  
  Future<void> _validateFiles(List<UploadedFile> files) async {
    for (final file in files) {
      // Check file signature (magic bytes)
      if (!_isValidFileSignature(file)) {
        throw SecurityException('Invalid file signature for ${file.filename}');
      }
      
      // Check for executable files
      if (_isExecutableFile(file)) {
        throw SecurityException('Executable files not allowed');
      }
      
      // Check filename for path traversal
      if (_containsPathTraversal(file.filename)) {
        throw SecurityException('Invalid filename: ${file.filename}');
      }
      
      // Additional content validation
      await _validateFileContent(file);
    }
  }
  
  bool _isValidFileSignature(UploadedFile file) {
    if (file.data.length < 4) return false;
    
    final signature = file.data.take(4).toList();
    final contentType = file.contentType?.toLowerCase();
    
    // Check common file signatures
    if (contentType?.startsWith('image/jpeg') == true) {
      return signature[0] == 0xFF && signature[1] == 0xD8;
    } else if (contentType?.startsWith('image/png') == true) {
      return signature[0] == 0x89 && signature[1] == 0x50 && 
             signature[2] == 0x4E && signature[3] == 0x47;
    } else if (contentType == 'application/pdf') {
      return signature[0] == 0x25 && signature[1] == 0x50 && 
             signature[2] == 0x44 && signature[3] == 0x46;
    }
    
    return true; // Allow other types for now
  }
  
  bool _isExecutableFile(UploadedFile file) {
    final filename = file.filename?.toLowerCase() ?? '';
    final dangerousExtensions = [
      '.exe', '.bat', '.cmd', '.com', '.scr', '.pif',
      '.js', '.vbs', '.jar', '.app', '.deb', '.rpm',
    ];
    
    return dangerousExtensions.any((ext) => filename.endsWith(ext));
  }
  
  bool _containsPathTraversal(String? filename) {
    if (filename == null) return false;
    return filename.contains('..') || filename.contains('/') || filename.contains('\\');
  }
  
  Future<void> _scanFiles(List<UploadedFile> files) async {
    for (final file in files) {
      final scanResult = await virusScanner!.scanFile(file.data);
      if (scanResult.isInfected) {
        throw SecurityException('Virus detected: ${scanResult.threatName}');
      }
    }
  }
}

File Storage Management

class FileStorageService {
  final FileStorageConfig config;
  final Map<String, StoredFile> _fileCache = {};
  
  FileStorageService({required this.config});
  
  Future<StoredFile> storeFile(UploadedFile uploadedFile) async {
    await _validateFile(uploadedFile);
    
    final fileId = _generateFileId();
    final storedPath = await _getStoragePath(fileId, uploadedFile.filename);
    
    // Ensure directory exists
    final file = File(storedPath);
    await file.parent.create(recursive: true);
    
    // Save file with atomic operation
    final tempPath = '$storedPath.tmp';
    final tempFile = File(tempPath);
    await tempFile.writeAsBytes(uploadedFile.data);
    await tempFile.rename(storedPath);
    
    final storedFile = StoredFile(
      id: fileId,
      originalName: uploadedFile.filename ?? 'unknown',
      storedPath: storedPath,
      contentType: uploadedFile.contentType,
      size: uploadedFile.size,
      uploadedAt: DateTime.now(),
    );
    
    // Cache file info
    _fileCache[fileId] = storedFile;
    
    return storedFile;
  }
  
  Future<void> deleteFile(String fileId) async {
    final storedFile = _fileCache[fileId];
    if (storedFile != null) {
      final file = File(storedFile.storedPath);
      if (await file.exists()) {
        await file.delete();
      }
      _fileCache.remove(fileId);
    }
  }
  
  Future<void> cleanupOldFiles({Duration? olderThan}) async {
    olderThan ??= Duration(days: 30);
    final cutoffDate = DateTime.now().subtract(olderThan);
    
    final directory = Directory(config.baseDirectory);
    if (!await directory.exists()) return;
    
    await for (final entity in directory.list(recursive: true)) {
      if (entity is File) {
        final stat = await entity.stat();
        if (stat.modified.isBefore(cutoffDate)) {
          await entity.delete();
          logger.info('Cleaned up old file: ${entity.path}');
        }
      }
    }
  }
}

Background Job Scheduling

Hypermodern servers include a built-in job scheduling system for handling background tasks, delayed execution, and long-running processes without blocking your main application.

Quick Start with Scheduling

import 'package:hypermodern_server/hypermodern_server.dart';

// Define a scheduled job
class SendWelcomeEmailJob extends ScheduledJob {
  @override
  String get identifier => 'send_welcome_email';

  @override
  Future<void> execute(Map<String, dynamic> parameters) async {
    final email = parameters['email'] as String;
    final name = parameters['name'] as String;
    
    await emailService.sendWelcomeEmail(email, name);
  }
}

void main() async {
  final server = HypermodernServer();
  
  // Set up job scheduling
  final jobRegistry = JobRegistry();
  final scheduler = JobScheduler(registry: jobRegistry);
  
  // Register jobs
  jobRegistry.register('send_welcome_email', () => SendWelcomeEmailJob());
  
  // Start the scheduler
  scheduler.start();
  
  // In your endpoint handlers, schedule jobs
  server.registerEndpoint<CreateUserRequest, User>(
    'create_user',
    (request) async {
      final user = await userService.createUser(request);
      
      // Schedule welcome email for 5 minutes later
      await scheduler.scheduleDelayed(
        'send_welcome_email',
        Duration(minutes: 5),
        {
          'email': user.email,
          'name': user.username,
        },
        description: 'Send welcome email to new user',
      );
      
      return user;
    },
  );
  
  await server.listen();
}

Integration with Server Lifecycle

class HypermodernServerWithScheduling extends HypermodernServer {
  late JobScheduler _scheduler;
  late JobRegistry _jobRegistry;
  
  @override
  Future<void> initialize() async {
    await super.initialize();
    
    // Initialize scheduling system
    _jobRegistry = JobRegistry();
    _scheduler = JobScheduler(
      registry: _jobRegistry,
      executionInterval: Duration(seconds: 30),
      maxTasksPerCycle: 50,
    );
    
    // Register your jobs
    _registerJobs();
    
    // Start scheduler
    _scheduler.start();
  }
  
  void _registerJobs() {
    _jobRegistry.register('send_welcome_email', () => SendWelcomeEmailJob());
    _jobRegistry.register('cleanup_old_data', () => CleanupJob());
    _jobRegistry.register('generate_report', () => ReportJob());
  }
  
  JobScheduler get scheduler => _scheduler;
  
  @override
  Future<void> shutdown() async {
    _scheduler.stop();
    await super.shutdown();
  }
}

The scheduling system provides persistent job storage, automatic retries, error handling, and comprehensive monitoring. For detailed information on creating complex jobs, handling failures, and performance optimization, see PartChapter III: Advanced Features:11.5: Background Job Scheduling.

What's Next

You now have a solid foundation in Hypermodern server development, including background job scheduling. In the next chapter, we'll explore the multi-protocol communication system in detail, learning how to optimize for each protocol's strengths while maintaining a unified codebase.