// ============================================================ // 闲言APP — 文件传输助手数据库服务 // 创建时间: 2026-05-09 // 更新时间: 2026-05-14 // 作用: 文件传输助手数据库CRUD操作 — 设备/记录/配对/消息/云端暂存/离线队列 // 上次更新: v6.5.0 _rowToDevice/insertDevice/updateDevice支持isFavorite字段 // ============================================================ import 'package:drift/drift.dart'; import 'package:xianyan/core/storage/database/app_database.dart'; import '../models/transfer_enums.dart'; import '../models/transfer_device.dart' as model; import '../models/transfer_task.dart' as model; import '../models/transfer_message.dart' as model; import '../models/offline_queue_item.dart'; class TransferDatabase { TransferDatabase._(); static final TransferDatabase _instance = TransferDatabase._(); static TransferDatabase get instance => _instance; AppDatabase get _db => AppDatabase.instance; // ============================================================ // 设备 CRUD // ============================================================ Future insertDevice(model.TransferDevice device) async { await _db .into(_db.transferDeviceRecords) .insertOnConflictUpdate( TransferDeviceRecordsCompanion.insert( id: device.id, alias: device.alias, deviceModel: Value(device.deviceModel), deviceType: Value(device.deviceType.id), ip: Value(device.ip), port: Value(device.port), pairingMethod: Value(device.pairingMethod.id), preferredTransport: Value(device.preferredTransport.id), isOnline: Value(device.isOnline), isVerified: Value(device.isVerified), isFavorite: Value(device.isFavorite), publicKey: Value(device.publicKey), fingerprint: Value(device.fingerprint), lastSeen: device.lastSeen, createdAt: DateTime.now(), updatedAt: DateTime.now(), ), ); } Future getDevice(String id) async { final row = await (_db.select( _db.transferDeviceRecords, )..where((t) => t.id.equals(id))).getSingleOrNull(); if (row == null) return null; return _rowToDevice(row); } Future> getAllDevices() async { final rows = await _db.select(_db.transferDeviceRecords).get(); return rows.map(_rowToDevice).toList(); } Future> getOnlineDevices() async { final rows = await (_db.select( _db.transferDeviceRecords, )..where((t) => t.isOnline.equals(true))).get(); return rows.map(_rowToDevice).toList(); } Future updateDevice( String id, { String? alias, String? ip, bool? isOnline, bool? isVerified, bool? isFavorite, }) async { await (_db.update( _db.transferDeviceRecords, )..where((t) => t.id.equals(id))).write( TransferDeviceRecordsCompanion( alias: alias != null ? Value(alias) : const Value.absent(), ip: ip != null ? Value(ip) : const Value.absent(), isOnline: isOnline != null ? Value(isOnline) : const Value.absent(), isVerified: isVerified != null ? Value(isVerified) : const Value.absent(), isFavorite: isFavorite != null ? Value(isFavorite) : const Value.absent(), lastSeen: Value(DateTime.now()), updatedAt: Value(DateTime.now()), ), ); } Future updateDeviceOnline(String id, bool isOnline) async { await (_db.update( _db.transferDeviceRecords, )..where((t) => t.id.equals(id))).write( TransferDeviceRecordsCompanion( isOnline: Value(isOnline), lastSeen: Value(DateTime.now()), updatedAt: Value(DateTime.now()), ), ); } Future deleteDevice(String id) async { await (_db.delete( _db.transferDeviceRecords, )..where((t) => t.id.equals(id))).go(); } model.TransferDevice _rowToDevice(TransferDeviceRecord row) { return model.TransferDevice( id: row.id, alias: row.alias, deviceModel: row.deviceModel, deviceType: DeviceType.fromId(row.deviceType), ip: row.ip, port: row.port, pairingMethod: PairingMethod.fromId(row.pairingMethod), preferredTransport: TransportType.fromId(row.preferredTransport), lastSeen: row.lastSeen, isOnline: row.isOnline, isVerified: row.isVerified, isFavorite: row.isFavorite, publicKey: row.publicKey, fingerprint: row.fingerprint, ); } // ============================================================ // 传输记录 CRUD // ============================================================ Future insertRecord(model.TransferTask task) async { await _db .into(_db.transferRecords) .insertOnConflictUpdate( TransferRecordsCompanion.insert( id: task.id, sessionId: task.sessionId, peerId: task.peer.id, peerAlias: Value(task.peer.alias), transport: Value(task.transport.id), direction: Value(task.direction.id), status: Value(task.status.id), fileName: task.fileName, fileSize: Value(task.fileSize), transferredBytes: Value(task.transferredBytes), speed: Value(task.speed), mimeType: Value(task.mimeType), filePath: Value(task.filePath), thumbnailPath: Value(task.thumbnailPath), fileSha256: Value(task.fileSha256), hashVerified: Value(task.hashVerified), errorMessage: Value(task.errorMessage), startTime: task.startTime, endTime: Value(task.endTime), createdAt: DateTime.now(), updatedAt: DateTime.now(), ), ); } Future getRecord(String id) async { return await (_db.select( _db.transferRecords, )..where((t) => t.id.equals(id))).getSingleOrNull(); } Future> getAllRecords() async { return await (_db.select( _db.transferRecords, )..orderBy([(t) => OrderingTerm.desc(t.createdAt)])).get(); } Future> getRecordsByStatus( TransferTaskStatus status, ) async { return await (_db.select( _db.transferRecords, )..where((t) => t.status.equals(status.id))).get(); } Future updateRecordProgress( String id, { int? transferredBytes, double? speed, TransferTaskStatus? status, }) async { await (_db.update( _db.transferRecords, )..where((t) => t.id.equals(id))).write( TransferRecordsCompanion( transferredBytes: transferredBytes != null ? Value(transferredBytes) : const Value.absent(), speed: speed != null ? Value(speed) : const Value.absent(), status: status != null ? Value(status.id) : const Value.absent(), updatedAt: Value(DateTime.now()), ), ); } Future updateRecordStatus( String id, TransferTaskStatus status, { String? errorMessage, }) async { await (_db.update( _db.transferRecords, )..where((t) => t.id.equals(id))).write( TransferRecordsCompanion( status: Value(status.id), errorMessage: errorMessage != null ? Value(errorMessage) : const Value.absent(), endTime: status.isTerminal ? Value(DateTime.now()) : const Value.absent(), updatedAt: Value(DateTime.now()), ), ); } Future deleteRecord(String id) async { await (_db.delete(_db.transferRecords)..where((t) => t.id.equals(id))).go(); } Future cleanOldRecords(int keepDays) async { final cutoff = DateTime.now().subtract(Duration(days: keepDays)); await (_db.delete( _db.transferRecords, )..where((t) => t.createdAt.isSmallerThanValue(cutoff))).go(); } // ============================================================ // 配对记录 CRUD // ============================================================ Future insertPairingRecord(model.PairingInfo info) async { await _db .into(_db.pairingRecords) .insertOnConflictUpdate( PairingRecordsCompanion.insert( id: info.deviceId, deviceId: info.deviceId, alias: info.alias, pairingMethod: Value(info.method.id), isTrusted: Value(info.isTrusted), ip: Value(info.ip), port: Value(info.port), fingerprint: Value(info.fingerprint), publicKey: Value(info.publicKey), pairedAt: info.pairedAt ?? DateTime.now(), createdAt: DateTime.now(), updatedAt: DateTime.now(), ), ); } Future getPairingRecord(String id) async { return await (_db.select( _db.pairingRecords, )..where((t) => t.id.equals(id))).getSingleOrNull(); } Future> getTrustedPairings() async { return await (_db.select( _db.pairingRecords, )..where((t) => t.isTrusted.equals(true))).get(); } Future> getAllPairings() async { return await _db.select(_db.pairingRecords).get(); } Future deletePairingRecord(String id) async { await (_db.delete(_db.pairingRecords)..where((t) => t.id.equals(id))).go(); } // ============================================================ // 传输消息 CRUD // ============================================================ Future insertMessage(model.TransferMessage msg) async { await _db .into(_db.transferMsgRecords) .insertOnConflictUpdate( TransferMsgRecordsCompanion.insert( id: msg.id, sessionId: msg.sessionId, type: Value(msg.type.id), content: msg.content, isRemote: Value(msg.isRemote), peerDeviceId: Value(msg.peerDeviceId), transferTaskId: Value(msg.transferTaskId), fileName: Value(msg.fileName), fileSize: Value(msg.fileSize), mimeType: Value(msg.mimeType), thumbnailPath: Value(msg.thumbnailPath), filePath: Value(msg.filePath), progress: Value(msg.progress), transferStatus: Value(msg.transferStatus?.id), deviceAlias: Value(msg.deviceAlias), deviceEmoji: Value(msg.deviceEmoji), timestamp: msg.timestamp, createdAt: DateTime.now(), ), ); } Future deleteMessage(String messageId) async { await (_db.delete(_db.transferMsgRecords) ..where((t) => t.id.equals(messageId))) .go(); } Future> getSessionMessages( String sessionId, { int limit = 50, }) async { final rows = await (_db.select(_db.transferMsgRecords) ..where((t) => t.sessionId.equals(sessionId)) ..orderBy([(t) => OrderingTerm.asc(t.timestamp)]) ..limit(limit)) .get(); return rows.map(_rowToMessage).toList(); } Future> getRecentMessages({ int limit = 10, }) async { final rows = await (_db.select(_db.transferMsgRecords) ..orderBy([(t) => OrderingTerm.desc(t.timestamp)]) ..limit(limit)) .get(); return rows.reversed.map(_rowToMessage).toList(); } Future updateMessageProgress( String id, double progress, TransferTaskStatus status, ) async { await (_db.update( _db.transferMsgRecords, )..where((t) => t.id.equals(id))).write( TransferMsgRecordsCompanion( progress: Value(progress), transferStatus: Value(status.id), ), ); } Future deleteSessionMessages(String sessionId) async { await (_db.delete( _db.transferMsgRecords, )..where((t) => t.sessionId.equals(sessionId))).go(); } model.TransferMessage _rowToMessage(TransferMsgRecord row) { return model.TransferMessage( id: row.id, sessionId: row.sessionId, type: model.TransferMessageType.fromId(row.type), content: row.content, isRemote: row.isRemote, peerDeviceId: row.peerDeviceId, transferTaskId: row.transferTaskId, fileName: row.fileName, fileSize: row.fileSize, mimeType: row.mimeType, thumbnailPath: row.thumbnailPath, filePath: row.filePath, progress: row.progress, transferStatus: TransferTaskStatus.tryFromId(row.transferStatus), deviceAlias: row.deviceAlias, deviceEmoji: row.deviceEmoji, timestamp: row.timestamp, ); } // ============================================================ // 统计 // ============================================================ Future getTransferRecordCount() async { final rows = await _db.select(_db.transferRecords).get(); return rows.length; } Future getPairedDeviceCount() async { final rows = await _db.select(_db.pairingRecords).get(); return rows.length; } Future getReceivedFileCount() async { final rows = await (_db.select( _db.transferRecords, )..where((t) => t.direction.equals(TransferDirection.receive.id))).get(); return rows.length; } Future getReceivedFileSizeBytes() async { final rows = await (_db.select( _db.transferRecords, )..where((t) => t.direction.equals(TransferDirection.receive.id))).get(); return rows.fold(0, (sum, r) => sum + r.fileSize); } // ============================================================ // 云端暂存 CRUD // ============================================================ Future insertCloudCacheRecord(CloudCacheRecordsCompanion record) async { await _db.into(_db.cloudCacheRecords).insertOnConflictUpdate(record); } Future getCloudCacheRecord(String id) async { return await (_db.select( _db.cloudCacheRecords, )..where((t) => t.id.equals(id))).getSingleOrNull(); } Future> getAllCloudCacheRecords() async { return await (_db.select( _db.cloudCacheRecords, )..orderBy([(t) => OrderingTerm.desc(t.createdAt)])).get(); } Future> getCloudCacheRecordsByOwner( String ownerId, ) async { return await (_db.select( _db.cloudCacheRecords, )..where((t) => t.ownerId.equals(ownerId))).get(); } Future> getExpiredCloudCacheRecords() async { final now = DateTime.now(); return await (_db.select( _db.cloudCacheRecords, )..where((t) => t.expiresAt.isSmallerThanValue(now))).get(); } Future updateCloudCacheRecord( String id, CloudCacheRecordsCompanion updates, ) async { await (_db.update( _db.cloudCacheRecords, )..where((t) => t.id.equals(id))).write(updates); } Future deleteCloudCacheRecord(String id) async { await (_db.delete( _db.cloudCacheRecords, )..where((t) => t.id.equals(id))).go(); } Future cleanExpiredCloudCacheRecords() async { final now = DateTime.now(); await (_db.delete( _db.cloudCacheRecords, )..where((t) => t.expiresAt.isSmallerThanValue(now))).go(); } // ============================================================ // 传输统计 CRUD // ============================================================ Future getStatsRecordByDate( String date, { String transportType = 'all', }) async { return await (_db.select( _db.transferStatsRecords, )..where( (t) => t.date.equals(date) & t.transportType.equals(transportType), )).getSingleOrNull(); } Future insertStatsRecord(TransferStatsRecordsCompanion companion) async { await _db.into(_db.transferStatsRecords).insert(companion); } Future updateStatsRecord(TransferStatsRecord record) async { await _db.update(_db.transferStatsRecords) .replace(record); } Future> getStatsRecords({ int limit = 30, }) async { return await (_db.select( _db.transferStatsRecords, )..orderBy([ (t) => OrderingTerm.desc(t.date), ])..limit(limit)) .get(); } // ============================================================ // 离线队列 CRUD (自定义SQL — 无需build_runner) // ============================================================ static const _offlineQueueTable = 'offline_queue_records'; Future ensureOfflineQueueTable() async { await _db.customStatement(''' CREATE TABLE IF NOT EXISTS $_offlineQueueTable ( id TEXT PRIMARY KEY, target_device_id TEXT NOT NULL, type TEXT NOT NULL DEFAULT 'text', status TEXT NOT NULL DEFAULT 'pending', content TEXT DEFAULT NULL, file_path TEXT DEFAULT NULL, file_name TEXT DEFAULT NULL, file_size INTEGER DEFAULT NULL, retry_count INTEGER NOT NULL DEFAULT 0, error_message TEXT DEFAULT NULL, message_id TEXT DEFAULT NULL, device_alias TEXT DEFAULT NULL, created_at TEXT NOT NULL, last_attempt_at TEXT DEFAULT NULL ) '''); await _db.customStatement(''' CREATE INDEX IF NOT EXISTS idx_offline_queue_target ON $_offlineQueueTable (target_device_id, status) '''); } Future insertOfflineQueueItem(OfflineQueueItem item) async { await ensureOfflineQueueTable(); await _db.customStatement( 'INSERT OR REPLACE INTO $_offlineQueueTable ' '(id, target_device_id, type, status, content, file_path, file_name, ' 'file_size, retry_count, error_message, message_id, device_alias, ' 'created_at, last_attempt_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', [ item.id, item.targetDeviceId, item.type.id, item.status.id, item.content, item.filePath, item.fileName, item.fileSize, item.retryCount, item.errorMessage, item.messageId, item.deviceAlias, item.createdAt.toIso8601String(), item.lastAttemptAt?.toIso8601String(), ], ); } Future> getPendingOfflineItems(String deviceId) async { await ensureOfflineQueueTable(); final rows = await _db.customSelect( 'SELECT * FROM $_offlineQueueTable ' 'WHERE target_device_id = ? AND status IN (?, ?) ' 'ORDER BY created_at ASC', variables: [ Variable.withString(deviceId), Variable.withString(OfflineQueueItemStatus.pending.id), Variable.withString(OfflineQueueItemStatus.failed.id), ], ).get(); return rows.map(_rowToOfflineQueueItem).toList(); } Future> getAllPendingOfflineItems() async { await ensureOfflineQueueTable(); final rows = await _db.customSelect( 'SELECT * FROM $_offlineQueueTable ' 'WHERE status IN (?, ?) ' 'ORDER BY created_at ASC', variables: [ Variable.withString(OfflineQueueItemStatus.pending.id), Variable.withString(OfflineQueueItemStatus.failed.id), ], ).get(); return rows.map(_rowToOfflineQueueItem).toList(); } Future> getAllOfflineItems() async { await ensureOfflineQueueTable(); final rows = await _db.customSelect( 'SELECT * FROM $_offlineQueueTable ORDER BY created_at DESC', ).get(); return rows.map(_rowToOfflineQueueItem).toList(); } Future getPendingOfflineCount(String deviceId) async { await ensureOfflineQueueTable(); final rows = await _db.customSelect( 'SELECT COUNT(*) AS cnt FROM $_offlineQueueTable ' 'WHERE target_device_id = ? AND status IN (?, ?)', variables: [ Variable.withString(deviceId), Variable.withString(OfflineQueueItemStatus.pending.id), Variable.withString(OfflineQueueItemStatus.failed.id), ], ).get(); return rows.first.read('cnt'); } Future getTotalPendingCount() async { await ensureOfflineQueueTable(); final rows = await _db.customSelect( 'SELECT COUNT(*) AS cnt FROM $_offlineQueueTable ' 'WHERE status IN (?, ?)', variables: [ Variable.withString(OfflineQueueItemStatus.pending.id), Variable.withString(OfflineQueueItemStatus.failed.id), ], ).get(); return rows.first.read('cnt'); } Future updateOfflineQueueItemStatus( String id, OfflineQueueItemStatus status, { String? errorMessage, int? retryCount, }) async { await ensureOfflineQueueTable(); final parts = ['status = ?']; final args = [status.id]; if (errorMessage != null) { parts.add('error_message = ?'); args.add(errorMessage); } if (retryCount != null) { parts.add('retry_count = ?'); args.add(retryCount); } if (status == OfflineQueueItemStatus.sending || status == OfflineQueueItemStatus.failed) { parts.add('last_attempt_at = ?'); args.add(DateTime.now().toIso8601String()); } args.add(id); await _db.customStatement( 'UPDATE $_offlineQueueTable SET ${parts.join(', ')} WHERE id = ?', args, ); } Future deleteOfflineQueueItem(String id) async { await ensureOfflineQueueTable(); await _db.customStatement( 'DELETE FROM $_offlineQueueTable WHERE id = ?', [id], ); } Future deleteOfflineItemsByDevice(String deviceId) async { await ensureOfflineQueueTable(); await _db.customStatement( 'DELETE FROM $_offlineQueueTable WHERE target_device_id = ? AND status IN (?, ?)', [ deviceId, OfflineQueueItemStatus.pending.id, OfflineQueueItemStatus.failed.id, ], ); } Future cleanOldOfflineItems({int maxAgeDays = 7}) async { await ensureOfflineQueueTable(); final cutoff = DateTime.now().subtract(Duration(days: maxAgeDays)); await _db.customStatement( 'DELETE FROM $_offlineQueueTable WHERE status IN (?, ?) AND created_at < ?', [ OfflineQueueItemStatus.sent.id, OfflineQueueItemStatus.failed.id, cutoff.toIso8601String(), ], ); } OfflineQueueItem _rowToOfflineQueueItem(QueryRow row) { return OfflineQueueItem( id: row.read('id'), targetDeviceId: row.read('target_device_id'), type: OfflineQueueItemType.fromId(row.read('type')), status: OfflineQueueItemStatus.fromId(row.read('status')), content: row.read('content'), filePath: row.read('file_path'), fileName: row.read('file_name'), fileSize: row.read('file_size'), retryCount: row.read('retry_count'), errorMessage: row.read('error_message'), messageId: row.read('message_id'), deviceAlias: row.read('device_alias'), createdAt: DateTime.parse(row.read('created_at')), lastAttemptAt: row.read('last_attempt_at') != null ? DateTime.parse(row.read('last_attempt_at')) : null, ); } }