ORM and Data Management
Built-in ORM Features
Hypermodern includes a powerful Object-Relational Mapping (ORM) system that seamlessly integrates with the schema-driven development approach. The ORM automatically generates database schemas from your JSON models and provides type-safe database operations.
ORM Architecture
The Hypermodern ORM consists of several key components:
- Schema Generator: Creates database schemas from JSON model definitions
- Query Builder: Provides a fluent API for building complex queries
- Migration System: Manages database schema evolution over time
- Connection Pool: Efficiently manages database connections
- Transaction Manager: Handles database transactions with proper isolation
- Relationship Mapper: Manages relationships between models
Model-to-Database Mapping
JSON schema models are automatically mapped to database tables:
{
"models": {
"user": {
"id": "int64",
"username": "string",
"email": "string",
"profile": "@user_profile?",
"posts": ["@post"],
"created_at": "datetime",
"updated_at": "datetime"
},
"user_profile": {
"id": "int64",
"user_id": "int64",
"first_name": "string?",
"last_name": "string?",
"bio": "string?",
"avatar_url": "string?",
"preferences": "map<string, any>"
},
"post": {
"id": "int64",
"author_id": "int64",
"title": "string",
"content": "string",
"status": "@post_status",
"tags": ["string"],
"metadata": "map<string, any>",
"created_at": "datetime",
"updated_at": "datetime"
}
},
"enums": {
"post_status": ["draft", "published", "archived"]
}
}
This generates the following database schema:
-- Users table
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
email VARCHAR(255) NOT NULL UNIQUE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
-- User profiles table
CREATE TABLE user_profiles (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
first_name VARCHAR(255),
last_name VARCHAR(255),
bio TEXT,
avatar_url VARCHAR(500),
preferences JSONB,
UNIQUE(user_id)
);
-- Posts table
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
author_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
status VARCHAR(50) NOT NULL CHECK (status IN ('draft', 'published', 'archived')),
tags TEXT[] DEFAULT '{}',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
-- Indexes for performance
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_posts_author_id ON posts(author_id);
CREATE INDEX idx_posts_status ON posts(status);
CREATE INDEX idx_posts_created_at ON posts(created_at);
Database Integration
Connection Configuration
class DatabaseConfig {
final String host;
final int port;
final String database;
final String username;
final String password;
final int minConnections;
final int maxConnections;
final Duration connectionTimeout;
final Duration idleTimeout;
final Duration maxLifetime;
final bool sslEnabled;
final String? sslCertPath;
const DatabaseConfig({
required this.host,
required this.port,
required this.database,
required this.username,
required this.password,
this.minConnections = 5,
this.maxConnections = 20,
this.connectionTimeout = const Duration(seconds: 10),
this.idleTimeout = const Duration(minutes: 10),
this.maxLifetime = const Duration(hours: 1),
this.sslEnabled = false,
this.sslCertPath,
});
}
class DatabaseManager {
late ConnectionPool _pool;
late Database _database;
Future<void> initialize(DatabaseConfig config) async {
_pool = ConnectionPool(
host: config.host,
port: config.port,
database: config.database,
username: config.username,
password: config.password,
minConnections: config.minConnections,
maxConnections: config.maxConnections,
connectionTimeout: config.connectionTimeout,
idleTimeout: config.idleTimeout,
maxLifetime: config.maxLifetime,
sslEnabled: config.sslEnabled,
sslCertPath: config.sslCertPath,
);
_database = Database(_pool);
// Test connection
await _testConnection();
// Set up monitoring
_setupMonitoring();
print('✅ Database connected successfully');
}
Database get database => _database;
Future<void> _testConnection() async {
try {
await _database.query('SELECT 1');
} catch (e) {
throw DatabaseException('Failed to connect to database: $e');
}
}
void _setupMonitoring() {
_pool.onConnectionCreated.listen((connection) {
logger.debug('Database connection created', extra: {
'connection_id': connection.id,
'active_connections': _pool.activeConnections,
});
});
_pool.onConnectionClosed.listen((connection) {
logger.debug('Database connection closed', extra: {
'connection_id': connection.id,
'active_connections': _pool.activeConnections,
});
});
_pool.onConnectionError.listen((error) {
logger.error('Database connection error', error: error);
});
}
Future<void> close() async {
await _pool.close();
print('Database connections closed');
}
}
Query Builder
The query builder provides a fluent, type-safe API for database operations:
class QueryBuilder<T> {
final String _tableName;
final List<String> _selectFields = [];
final List<WhereClause> _whereClauses = [];
final List<JoinClause> _joinClauses = [];
final List<String> _orderByClauses = [];
final List<String> _groupByClauses = [];
String? _havingClause;
int? _limitValue;
int? _offsetValue;
QueryBuilder(this._tableName);
QueryBuilder<T> select(List<String> fields) {
_selectFields.addAll(fields);
return this;
}
QueryBuilder<T> where(String condition, [List<dynamic>? parameters]) {
_whereClauses.add(WhereClause(condition, parameters ?? []));
return this;
}
QueryBuilder<T> whereEquals(String field, dynamic value) {
return where('$field = ?', [value]);
}
QueryBuilder<T> whereIn(String field, List<dynamic> values) {
final placeholders = List.filled(values.length, '?').join(', ');
return where('$field IN ($placeholders)', values);
}
QueryBuilder<T> whereNotNull(String field) {
return where('$field IS NOT NULL');
}
QueryBuilder<T> whereNull(String field) {
return where('$field IS NULL');
}
QueryBuilder<T> whereBetween(String field, dynamic start, dynamic end) {
return where('$field BETWEEN ? AND ?', [start, end]);
}
QueryBuilder<T> whereLike(String field, String pattern) {
return where('$field LIKE ?', [pattern]);
}
QueryBuilder<T> join(String table, String condition, {JoinType type = JoinType.inner}) {
_joinClauses.add(JoinClause(table, condition, type));
return this;
}
QueryBuilder<T> leftJoin(String table, String condition) {
return join(table, condition, type: JoinType.left);
}
QueryBuilder<T> rightJoin(String table, String condition) {
return join(table, condition, type: JoinType.right);
}
QueryBuilder<T> orderBy(String field, {SortDirection direction = SortDirection.asc}) {
_orderByClauses.add('$field ${direction.name.toUpperCase()}');
return this;
}
QueryBuilder<T> groupBy(List<String> fields) {
_groupByClauses.addAll(fields);
return this;
}
QueryBuilder<T> having(String condition) {
_havingClause = condition;
return this;
}
QueryBuilder<T> limit(int count) {
_limitValue = count;
return this;
}
QueryBuilder<T> offset(int count) {
_offsetValue = count;
return this;
}
QueryResult build() {
final buffer = StringBuffer();
final parameters = <dynamic>[];
// SELECT clause
buffer.write('SELECT ');
if (_selectFields.isEmpty) {
buffer.write('*');
} else {
buffer.write(_selectFields.join(', '));
}
// FROM clause
buffer.write(' FROM $_tableName');
// JOIN clauses
for (final join in _joinClauses) {
buffer.write(' ${join.type.sql} JOIN ${join.table} ON ${join.condition}');
}
// WHERE clause
if (_whereClauses.isNotEmpty) {
buffer.write(' WHERE ');
buffer.write(_whereClauses.map((w) => w.condition).join(' AND '));
parameters.addAll(_whereClauses.expand((w) => w.parameters));
}
// GROUP BY clause
if (_groupByClauses.isNotEmpty) {
buffer.write(' GROUP BY ${_groupByClauses.join(', ')}');
}
// HAVING clause
if (_havingClause != null) {
buffer.write(' HAVING $_havingClause');
}
// ORDER BY clause
if (_orderByClauses.isNotEmpty) {
buffer.write(' ORDER BY ${_orderByClauses.join(', ')}');
}
// LIMIT clause
if (_limitValue != null) {
buffer.write(' LIMIT $_limitValue');
}
// OFFSET clause
if (_offsetValue != null) {
buffer.write(' OFFSET $_offsetValue');
}
return QueryResult(buffer.toString(), parameters);
}
}
// Usage example
final users = await QueryBuilder<User>('users')
.select(['id', 'username', 'email'])
.where('created_at > ?', [DateTime.now().subtract(Duration(days: 30))])
.whereEquals('status', 'active')
.orderBy('created_at', direction: SortDirection.desc)
.limit(50)
.execute();
Repository Pattern Implementation
abstract class Repository<T, ID> {
Future<T?> findById(ID id);
Future<List<T>> findAll();
Future<T> save(T entity);
Future<T> update(T entity);
Future<void> delete(ID id);
Future<bool> exists(ID id);
Future<int> count();
}
class UserRepository extends Repository<User, int> {
final Database _db;
UserRepository(this._db);
@override
Future<User?> findById(int id) async {
final result = await QueryBuilder<User>('users')
.whereEquals('id', id)
.whereNull('deleted_at')
.limit(1)
.execute();
return result.isEmpty ? null : User.fromJson(result.first);
}
@override
Future<List<User>> findAll() async {
final result = await QueryBuilder<User>('users')
.whereNull('deleted_at')
.orderBy('created_at', direction: SortDirection.desc)
.execute();
return result.map((row) => User.fromJson(row)).toList();
}
Future<User?> findByEmail(String email) async {
final result = await QueryBuilder<User>('users')
.whereEquals('email', email)
.whereNull('deleted_at')
.limit(1)
.execute();
return result.isEmpty ? null : User.fromJson(result.first);
}
Future<User?> findByUsername(String username) async {
final result = await QueryBuilder<User>('users')
.whereEquals('username', username)
.whereNull('deleted_at')
.limit(1)
.execute();
return result.isEmpty ? null : User.fromJson(result.first);
}
Future<List<User>> findByStatus(UserStatus status, {int? limit, int? offset}) async {
final query = QueryBuilder<User>('users')
.whereEquals('status', status.toString())
.whereNull('deleted_at')
.orderBy('created_at', direction: SortDirection.desc);
if (limit != null) query.limit(limit);
if (offset != null) query.offset(offset);
final result = await query.execute();
return result.map((row) => User.fromJson(row)).toList();
}
Future<List<User>> search({
String? query,
UserStatus? status,
DateTime? createdAfter,
DateTime? createdBefore,
int limit = 50,
int offset = 0,
}) async {
final queryBuilder = QueryBuilder<User>('users')
.whereNull('deleted_at');
if (query != null && query.isNotEmpty) {
queryBuilder.where(
'(username ILIKE ? OR email ILIKE ? OR first_name ILIKE ? OR last_name ILIKE ?)',
['%$query%', '%$query%', '%$query%', '%$query%'],
);
}
if (status != null) {
queryBuilder.whereEquals('status', status.toString());
}
if (createdAfter != null) {
queryBuilder.where('created_at >= ?', [createdAfter]);
}
if (createdBefore != null) {
queryBuilder.where('created_at <= ?', [createdBefore]);
}
final result = await queryBuilder
.orderBy('created_at', direction: SortDirection.desc)
.limit(limit)
.offset(offset)
.execute();
return result.map((row) => User.fromJson(row)).toList();
}
@override
Future<User> save(User user) async {
final now = DateTime.now();
final result = await _db.query(
'''
INSERT INTO users (username, email, password_hash, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
RETURNING *
''',
[
user.username,
user.email,
user.passwordHash,
user.status.toString(),
now,
now,
],
);
return User.fromJson(result.first);
}
@override
Future<User> update(User user) async {
final result = await _db.query(
'''
UPDATE users
SET username = ?, email = ?, status = ?, updated_at = ?
WHERE id = ? AND deleted_at IS NULL
RETURNING *
''',
[
user.username,
user.email,
user.status.toString(),
DateTime.now(),
user.id,
],
);
if (result.isEmpty) {
throw NotFoundException('User not found or already deleted');
}
return User.fromJson(result.first);
}
@override
Future<void> delete(int id) async {
// Soft delete
final result = await _db.query(
'UPDATE users SET deleted_at = ? WHERE id = ? AND deleted_at IS NULL',
[DateTime.now(), id],
);
if (result.affectedRows == 0) {
throw NotFoundException('User not found or already deleted');
}
}
Future<void> hardDelete(int id) async {
// Hard delete - use with caution
await _db.query('DELETE FROM users WHERE id = ?', [id]);
}
@override
Future<bool> exists(int id) async {
final result = await _db.query(
'SELECT 1 FROM users WHERE id = ? AND deleted_at IS NULL',
[id],
);
return result.isNotEmpty;
}
@override
Future<int> count() async {
final result = await _db.query(
'SELECT COUNT(*) as count FROM users WHERE deleted_at IS NULL',
);
return result.first['count'] as int;
}
}
Migrations and Schema Evolution
Migration System
abstract class Migration {
String get id;
String get description;
DateTime get timestamp;
Future<void> up(Database db);
Future<void> down(Database db);
}
class CreateUsersTableMigration extends Migration {
@override
String get id => '001_create_users_table';
@override
String get description => 'Create users table with basic fields';
@override
DateTime get timestamp => DateTime(2024, 1, 1, 10, 0, 0);
@override
Future<void> up(Database db) async {
await db.execute('''
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
email VARCHAR(255) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'active',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMP WITH TIME ZONE
)
''');
await db.execute('CREATE INDEX idx_users_email ON users(email)');
await db.execute('CREATE INDEX idx_users_username ON users(username)');
await db.execute('CREATE INDEX idx_users_status ON users(status)');
await db.execute('CREATE INDEX idx_users_deleted_at ON users(deleted_at)');
}
@override
Future<void> down(Database db) async {
await db.execute('DROP TABLE IF EXISTS users CASCADE');
}
}
class AddUserProfilesMigration extends Migration {
@override
String get id => '002_add_user_profiles';
@override
String get description => 'Add user profiles table';
@override
DateTime get timestamp => DateTime(2024, 1, 2, 10, 0, 0);
@override
Future<void> up(Database db) async {
await db.execute('''
CREATE TABLE user_profiles (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
first_name VARCHAR(255),
last_name VARCHAR(255),
bio TEXT,
avatar_url VARCHAR(500),
preferences JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
UNIQUE(user_id)
)
''');
await db.execute('CREATE INDEX idx_user_profiles_user_id ON user_profiles(user_id)');
}
@override
Future<void> down(Database db) async {
await db.execute('DROP TABLE IF EXISTS user_profiles CASCADE');
}
}
class MigrationManager {
final Database _db;
final List<Migration> _migrations;
MigrationManager(this._db, this._migrations);
Future<void> initialize() async {
await _createMigrationsTable();
}
Future<void> _createMigrationsTable() async {
await _db.execute('''
CREATE TABLE IF NOT EXISTS schema_migrations (
id VARCHAR(255) PRIMARY KEY,
description TEXT NOT NULL,
executed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
''');
}
Future<void> runPendingMigrations() async {
final executedMigrations = await _getExecutedMigrations();
final pendingMigrations = _migrations
.where((m) => !executedMigrations.contains(m.id))
.toList()
..sort((a, b) => a.timestamp.compareTo(b.timestamp));
if (pendingMigrations.isEmpty) {
print('No pending migrations');
return;
}
print('Running ${pendingMigrations.length} pending migrations...');
for (final migration in pendingMigrations) {
await _runMigration(migration);
}
print('✅ All migrations completed successfully');
}
Future<void> _runMigration(Migration migration) async {
print('Running migration: ${migration.id} - ${migration.description}');
await _db.transaction((tx) async {
try {
await migration.up(tx);
await tx.query(
'INSERT INTO schema_migrations (id, description) VALUES (?, ?)',
[migration.id, migration.description],
);
print('✅ Migration ${migration.id} completed');
} catch (e) {
print('❌ Migration ${migration.id} failed: $e');
rethrow;
}
});
}
Future<void> rollbackMigration(String migrationId) async {
final migration = _migrations.firstWhere(
(m) => m.id == migrationId,
orElse: () => throw ArgumentError('Migration not found: $migrationId'),
);
print('Rolling back migration: ${migration.id}');
await _db.transaction((tx) async {
try {
await migration.down(tx);
await tx.query(
'DELETE FROM schema_migrations WHERE id = ?',
[migration.id],
);
print('✅ Migration ${migration.id} rolled back');
} catch (e) {
print('❌ Rollback failed for ${migration.id}: $e');
rethrow;
}
});
}
Future<Set<String>> _getExecutedMigrations() async {
final result = await _db.query('SELECT id FROM schema_migrations');
return result.map((row) => row['id'] as String).toSet();
}
Future<List<MigrationStatus>> getMigrationStatus() async {
final executedMigrations = await _getExecutedMigrations();
return _migrations.map((migration) {
return MigrationStatus(
id: migration.id,
description: migration.description,
timestamp: migration.timestamp,
executed: executedMigrations.contains(migration.id),
);
}).toList()
..sort((a, b) => a.timestamp.compareTo(b.timestamp));
}
}
Schema Evolution Strategies
class SchemaEvolutionManager {
final Database _db;
final SchemaComparator _comparator;
SchemaEvolutionManager(this._db, this._comparator);
Future<List<Migration>> generateMigrations({
required Map<String, dynamic> currentSchema,
required Map<String, dynamic> targetSchema,
}) async {
final differences = await _comparator.compare(currentSchema, targetSchema);
final migrations = <Migration>[];
// Generate migrations for each difference
for (final diff in differences) {
switch (diff.type) {
case DifferenceType.tableAdded:
migrations.add(_generateCreateTableMigration(diff));
break;
case DifferenceType.tableRemoved:
migrations.add(_generateDropTableMigration(diff));
break;
case DifferenceType.columnAdded:
migrations.add(_generateAddColumnMigration(diff));
break;
case DifferenceType.columnRemoved:
migrations.add(_generateDropColumnMigration(diff));
break;
case DifferenceType.columnModified:
migrations.add(_generateModifyColumnMigration(diff));
break;
case DifferenceType.indexAdded:
migrations.add(_generateCreateIndexMigration(diff));
break;
case DifferenceType.indexRemoved:
migrations.add(_generateDropIndexMigration(diff));
break;
}
}
return migrations;
}
Migration _generateCreateTableMigration(SchemaDifference diff) {
return GeneratedMigration(
id: 'create_${diff.tableName}_${DateTime.now().millisecondsSinceEpoch}',
description: 'Create ${diff.tableName} table',
upSql: _buildCreateTableSql(diff.tableDefinition),
downSql: 'DROP TABLE IF EXISTS ${diff.tableName} CASCADE',
);
}
Migration _generateAddColumnMigration(SchemaDifference diff) {
final column = diff.columnDefinition!;
final nullable = column.nullable ? '' : ' NOT NULL';
final defaultValue = column.defaultValue != null ? ' DEFAULT ${column.defaultValue}' : '';
return GeneratedMigration(
id: 'add_${diff.tableName}_${diff.columnName}_${DateTime.now().millisecondsSinceEpoch}',
description: 'Add ${diff.columnName} to ${diff.tableName}',
upSql: 'ALTER TABLE ${diff.tableName} ADD COLUMN ${diff.columnName} ${column.type}$nullable$defaultValue',
downSql: 'ALTER TABLE ${diff.tableName} DROP COLUMN IF EXISTS ${diff.columnName}',
);
}
String _buildCreateTableSql(TableDefinition table) {
final buffer = StringBuffer();
buffer.writeln('CREATE TABLE ${table.name} (');
final columns = table.columns.map((col) {
final nullable = col.nullable ? '' : ' NOT NULL';
final defaultValue = col.defaultValue != null ? ' DEFAULT ${col.defaultValue}' : '';
final primaryKey = col.primaryKey ? ' PRIMARY KEY' : '';
final unique = col.unique ? ' UNIQUE' : '';
return ' ${col.name} ${col.type}$nullable$defaultValue$primaryKey$unique';
}).join(',\n');
buffer.writeln(columns);
// Add foreign key constraints
for (final fk in table.foreignKeys) {
buffer.writeln(',');
buffer.write(' FOREIGN KEY (${fk.column}) REFERENCES ${fk.referencedTable}(${fk.referencedColumn})');
if (fk.onDelete != null) buffer.write(' ON DELETE ${fk.onDelete}');
if (fk.onUpdate != null) buffer.write(' ON UPDATE ${fk.onUpdate}');
}
buffer.writeln('\n)');
return buffer.toString();
}
}
class GeneratedMigration extends Migration {
@override
final String id;
@override
final String description;
@override
final DateTime timestamp;
final String upSql;
final String downSql;
GeneratedMigration({
required this.id,
required this.description,
required this.upSql,
required this.downSql,
}) : timestamp = DateTime.now();
@override
Future<void> up(Database db) async {
await db.execute(upSql);
}
@override
Future<void> down(Database db) async {
await db.execute(downSql);
}
}
Relationship Modeling
Defining Relationships in Schemas
{
"models": {
"user": {
"id": "int64",
"username": "string",
"email": "string",
"profile": {
"type": "@user_profile",
"relationship": "has_one",
"foreign_key": "user_id"
},
"posts": {
"type": ["@post"],
"relationship": "has_many",
"foreign_key": "author_id"
},
"roles": {
"type": ["@role"],
"relationship": "many_to_many",
"through": "user_roles",
"foreign_key": "user_id",
"other_key": "role_id"
}
},
"post": {
"id": "int64",
"title": "string",
"content": "string",
"author_id": "int64",
"author": {
"type": "@user",
"relationship": "belongs_to",
"foreign_key": "author_id"
},
"comments": {
"type": ["@comment"],
"relationship": "has_many",
"foreign_key": "post_id"
},
"tags": {
"type": ["@tag"],
"relationship": "many_to_many",
"through": "post_tags"
}
},
"comment": {
"id": "int64",
"content": "string",
"post_id": "int64",
"user_id": "int64",
"parent_id": "int64?",
"post": {
"type": "@post",
"relationship": "belongs_to",
"foreign_key": "post_id"
},
"user": {
"type": "@user",
"relationship": "belongs_to",
"foreign_key": "user_id"
},
"parent": {
"type": "@comment",
"relationship": "belongs_to",
"foreign_key": "parent_id"
},
"replies": {
"type": ["@comment"],
"relationship": "has_many",
"foreign_key": "parent_id"
}
}
}
}
Relationship Loading Strategies
class RelationshipLoader {
final Database _db;
RelationshipLoader(this._db);
// Eager loading - load relationships in initial query
Future<List<User>> getUsersWithProfiles() async {
final result = await _db.query('''
SELECT
u.id, u.username, u.email, u.created_at,
p.id as profile_id, p.first_name, p.last_name, p.bio, p.avatar_url
FROM users u
LEFT JOIN user_profiles p ON u.id = p.user_id
WHERE u.deleted_at IS NULL
ORDER BY u.created_at DESC
''');
return _mapUsersWithProfiles(result);
}
// Lazy loading - load relationships on demand
Future<UserProfile?> loadUserProfile(int userId) async {
final result = await _db.query(
'SELECT * FROM user_profiles WHERE user_id = ?',
[userId],
);
return result.isEmpty ? null : UserProfile.fromJson(result.first);
}
// Batch loading - load relationships for multiple entities
Future<Map<int, List<Post>>> loadPostsForUsers(List<int> userIds) async {
if (userIds.isEmpty) return {};
final placeholders = List.filled(userIds.length, '?').join(', ');
final result = await _db.query(
'SELECT * FROM posts WHERE author_id IN ($placeholders) ORDER BY created_at DESC',
userIds,
);
final postsByUser = <int, List<Post>>{};
for (final row in result) {
final post = Post.fromJson(row);
postsByUser.putIfAbsent(post.authorId, () => []).add(post);
}
return postsByUser;
}
// N+1 query prevention
Future<List<User>> getUsersWithPostCounts() async {
final result = await _db.query('''
SELECT
u.id, u.username, u.email, u.created_at,
COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.author_id AND p.deleted_at IS NULL
WHERE u.deleted_at IS NULL
GROUP BY u.id, u.username, u.email, u.created_at
ORDER BY u.created_at DESC
''');
return result.map((row) {
final user = User.fromJson(row);
return user.copyWith(
metadata: {'post_count': row['post_count']},
);
}).toList();
}
List<User> _mapUsersWithProfiles(List<Map<String, dynamic>> result) {
final users = <User>[];
for (final row in result) {
final user = User.fromJson({
'id': row['id'],
'username': row['username'],
'email': row['email'],
'created_at': row['created_at'],
});
UserProfile? profile;
if (row['profile_id'] != null) {
profile = UserProfile.fromJson({
'id': row['profile_id'],
'user_id': row['id'],
'first_name': row['first_name'],
'last_name': row['last_name'],
'bio': row['bio'],
'avatar_url': row['avatar_url'],
});
}
users.add(user.copyWith(profile: profile));
}
return users;
}
}
Advanced Relationship Patterns
class AdvancedRelationshipQueries {
final Database _db;
AdvancedRelationshipQueries(this._db);
// Polymorphic relationships
Future<List<Comment>> getCommentsWithCommentable() async {
final result = await _db.query('''
SELECT
c.*,
CASE
WHEN c.commentable_type = 'post' THEN p.title
WHEN c.commentable_type = 'article' THEN a.title
END as commentable_title
FROM comments c
LEFT JOIN posts p ON c.commentable_type = 'post' AND c.commentable_id = p.id
LEFT JOIN articles a ON c.commentable_type = 'article' AND c.commentable_id = a.id
ORDER BY c.created_at DESC
''');
return result.map((row) => Comment.fromJson(row)).toList();
}
// Self-referencing relationships (tree structures)
Future<List<Comment>> getCommentThread(int rootCommentId) async {
final result = await _db.query('''
WITH RECURSIVE comment_tree AS (
-- Base case: root comment
SELECT id, content, parent_id, user_id, post_id, 0 as depth
FROM comments
WHERE id = ? AND deleted_at IS NULL
UNION ALL
-- Recursive case: child comments
SELECT c.id, c.content, c.parent_id, c.user_id, c.post_id, ct.depth + 1
FROM comments c
INNER JOIN comment_tree ct ON c.parent_id = ct.id
WHERE c.deleted_at IS NULL
)
SELECT * FROM comment_tree ORDER BY depth, id
''', [rootCommentId]);
return result.map((row) => Comment.fromJson(row)).toList();
}
// Many-to-many with pivot data
Future<List<UserRole>> getUserRolesWithPermissions(int userId) async {
final result = await _db.query('''
SELECT
ur.user_id,
ur.role_id,
ur.assigned_at,
ur.assigned_by,
r.name as role_name,
r.description as role_description,
array_agg(p.name) as permissions
FROM user_roles ur
JOIN roles r ON ur.role_id = r.id
LEFT JOIN role_permissions rp ON r.id = rp.role_id
LEFT JOIN permissions p ON rp.permission_id = p.id
WHERE ur.user_id = ?
GROUP BY ur.user_id, ur.role_id, ur.assigned_at, ur.assigned_by, r.name, r.description
''', [userId]);
return result.map((row) => UserRole.fromJson(row)).toList();
}
// Aggregated relationships
Future<List<User>> getUsersWithStats() async {
final result = await _db.query('''
SELECT
u.*,
COUNT(DISTINCT p.id) as post_count,
COUNT(DISTINCT c.id) as comment_count,
COUNT(DISTINCT l.id) as like_count,
AVG(pr.rating) as average_rating
FROM users u
LEFT JOIN posts p ON u.id = p.author_id AND p.deleted_at IS NULL
LEFT JOIN comments c ON u.id = c.user_id AND c.deleted_at IS NULL
LEFT JOIN likes l ON u.id = l.user_id
LEFT JOIN post_ratings pr ON p.id = pr.post_id
WHERE u.deleted_at IS NULL
GROUP BY u.id
ORDER BY post_count DESC, u.created_at DESC
''');
return result.map((row) {
final user = User.fromJson(row);
return user.copyWith(
metadata: {
'post_count': row['post_count'],
'comment_count': row['comment_count'],
'like_count': row['like_count'],
'average_rating': row['average_rating'],
},
);
}).toList();
}
}
Performance Optimization
Query Optimization
class QueryOptimizer {
final Database _db;
QueryOptimizer(this._db);
// Index analysis and recommendations
Future<List<IndexRecommendation>> analyzeQueries() async {
final slowQueries = await _getSlowQueries();
final recommendations = <IndexRecommendation>[];
for (final query in slowQueries) {
final analysis = await _analyzeQuery(query);
if (analysis.needsIndex) {
recommendations.add(IndexRecommendation(
table: analysis.table,
columns: analysis.columns,
type: analysis.indexType,
reason: analysis.reason,
estimatedImprovement: analysis.estimatedImprovement,
));
}
}
return recommendations;
}
// Connection pool monitoring
Future<ConnectionPoolStats> getConnectionPoolStats() async {
return ConnectionPoolStats(
activeConnections: await _getActiveConnectionCount(),
idleConnections: await _getIdleConnectionCount(),
totalConnections: await _getTotalConnectionCount(),
averageWaitTime: await _getAverageWaitTime(),
peakConnections: await _getPeakConnectionCount(),
);
}
// Query caching
final Map<String, CachedQuery> _queryCache = {};
Future<List<Map<String, dynamic>>> cachedQuery(
String sql,
List<dynamic> parameters, {
Duration ttl = const Duration(minutes: 5),
}) async {
final cacheKey = _generateCacheKey(sql, parameters);
final cached = _queryCache[cacheKey];
if (cached != null && !cached.isExpired) {
return cached.result;
}
final result = await _db.query(sql, parameters);
_queryCache[cacheKey] = CachedQuery(
result: result,
expiresAt: DateTime.now().add(ttl),
);
return result;
}
// Batch operations
Future<void> batchInsert<T>(
String table,
List<T> entities,
Map<String, dynamic> Function(T) mapper, {
int batchSize = 1000,
}) async {
if (entities.isEmpty) return;
final fields = mapper(entities.first).keys.toList();
final placeholders = List.filled(fields.length, '?').join(', ');
for (int i = 0; i < entities.length; i += batchSize) {
final batch = entities.skip(i).take(batchSize).toList();
final values = <dynamic>[];
final valuePlaceholders = <String>[];
for (final entity in batch) {
final mapped = mapper(entity);
values.addAll(fields.map((field) => mapped[field]));
valuePlaceholders.add('($placeholders)');
}
final sql = '''
INSERT INTO $table (${fields.join(', ')})
VALUES ${valuePlaceholders.join(', ')}
''';
await _db.query(sql, values);
}
}
// Prepared statements
final Map<String, PreparedStatement> _preparedStatements = {};
Future<PreparedStatement> prepare(String sql) async {
if (_preparedStatements.containsKey(sql)) {
return _preparedStatements[sql]!;
}
final statement = await _db.prepare(sql);
_preparedStatements[sql] = statement;
return statement;
}
String _generateCacheKey(String sql, List<dynamic> parameters) {
return '$sql:${parameters.join(':')}';
}
}
class CachedQuery {
final List<Map<String, dynamic>> result;
final DateTime expiresAt;
CachedQuery({required this.result, required this.expiresAt});
bool get isExpired => DateTime.now().isAfter(expiresAt);
}
Database Monitoring
class DatabaseMonitor {
final Database _db;
final Duration _monitoringInterval;
Timer? _monitoringTimer;
DatabaseMonitor(this._db, {
this._monitoringInterval = const Duration(minutes: 1),
});
void startMonitoring() {
_monitoringTimer = Timer.periodic(_monitoringInterval, (_) async {
await _collectMetrics();
});
}
void stopMonitoring() {
_monitoringTimer?.cancel();
_monitoringTimer = null;
}
Future<void> _collectMetrics() async {
try {
final metrics = await _gatherDatabaseMetrics();
await _recordMetrics(metrics);
await _checkAlerts(metrics);
} catch (e) {
logger.error('Failed to collect database metrics', error: e);
}
}
Future<DatabaseMetrics> _gatherDatabaseMetrics() async {
final results = await Future.wait([
_getConnectionStats(),
_getQueryStats(),
_getTableStats(),
_getIndexStats(),
]);
return DatabaseMetrics(
connectionStats: results[0] as ConnectionStats,
queryStats: results[1] as QueryStats,
tableStats: results[2] as List<TableStats>,
indexStats: results[3] as List<IndexStats>,
timestamp: DateTime.now(),
);
}
Future<ConnectionStats> _getConnectionStats() async {
final result = await _db.query('''
SELECT
count(*) as total_connections,
count(*) FILTER (WHERE state = 'active') as active_connections,
count(*) FILTER (WHERE state = 'idle') as idle_connections
FROM pg_stat_activity
WHERE datname = current_database()
''');
final row = result.first;
return ConnectionStats(
total: row['total_connections'] as int,
active: row['active_connections'] as int,
idle: row['idle_connections'] as int,
);
}
Future<QueryStats> _getQueryStats() async {
final result = await _db.query('''
SELECT
calls,
total_time,
mean_time,
rows
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT 10
''');
return QueryStats(
totalQueries: result.fold<int>(0, (sum, row) => sum + (row['calls'] as int)),
averageTime: result.fold<double>(0, (sum, row) => sum + (row['mean_time'] as double)) / result.length,
slowQueries: result.length,
);
}
Future<void> _checkAlerts(DatabaseMetrics metrics) async {
// Check connection pool utilization
final connectionUtilization = metrics.connectionStats.active / metrics.connectionStats.total;
if (connectionUtilization > 0.8) {
await _sendAlert(AlertType.highConnectionUsage, {
'utilization': connectionUtilization,
'active_connections': metrics.connectionStats.active,
'total_connections': metrics.connectionStats.total,
});
}
// Check for slow queries
if (metrics.queryStats.averageTime > 1000) { // 1 second
await _sendAlert(AlertType.slowQueries, {
'average_time': metrics.queryStats.averageTime,
'slow_query_count': metrics.queryStats.slowQueries,
});
}
// Check table sizes
for (final tableStats in metrics.tableStats) {
if (tableStats.sizeBytes > 1024 * 1024 * 1024) { // 1GB
await _sendAlert(AlertType.largeTable, {
'table_name': tableStats.tableName,
'size_mb': tableStats.sizeBytes / (1024 * 1024),
});
}
}
}
Future<void> _sendAlert(AlertType type, Map<String, dynamic> data) async {
logger.warning('Database alert: ${type.name}', extra: data);
// Send to monitoring system
// await monitoringService.sendAlert(type, data);
}
}
What's Next
You now have a comprehensive understanding of Hypermodern's ORM and data management capabilities. The next chapter will cover authentication and security, showing you how to implement robust security measures including JWT authentication, authorization, and security best practices across all transport protocols.