import 'dart:async'; import 'dart:collection'; import 'dart:io'; import 'package:catcher_2/catcher_2.dart'; import 'package:collection/collection.dart'; import 'package:dio/dio.dart'; import 'package:flutter/foundation.dart'; import 'package:path/path.dart' as path; import 'package:path_provider/path_provider.dart'; import 'package:spotube/models/logger.dart'; import 'package:spotube/services/download_manager/chunked_download.dart'; import 'package:spotube/services/download_manager/download_request.dart'; import 'package:spotube/services/download_manager/download_status.dart'; import 'package:spotube/services/download_manager/download_task.dart'; import 'package:spotube/utils/primitive_utils.dart'; export './download_request.dart'; export './download_status.dart'; export './download_task.dart'; typedef DownloadStatusEvent = ({ DownloadStatus status, DownloadRequest request }); class DownloadManager { final logger = getLogger("DownloadManager"); final Map _cache = {}; final Queue _queue = Queue(); var dio = Dio(); static const partialExtension = ".partial"; static const tempExtension = ".temp"; // var tasks = StreamController(); final _statusStreamController = StreamController.broadcast(); Stream get statusStream => _statusStreamController.stream; int maxConcurrentTasks = 2; int runningTasks = 0; static final DownloadManager _dm = DownloadManager._internal(); DownloadManager._internal(); factory DownloadManager({int? maxConcurrentTasks}) { if (maxConcurrentTasks != null) { _dm.maxConcurrentTasks = maxConcurrentTasks; } return _dm; } void Function(int, int) createCallback(url, int partialFileLength) => (int received, int total) { getDownload(url)?.progress.value = (received + partialFileLength) / (total + partialFileLength); if (total == -1) {} }; Future download( String url, String savePath, CancelToken cancelToken, { forceDownload = false, }) async { late String partialFilePath; late File partialFile; try { final task = getDownload(url); if (task == null || task.status.value == DownloadStatus.canceled) { return; } setStatus(task, DownloadStatus.downloading); logger.d("[DownloadManager] $url"); final file = File(savePath.toString()); final tmpDirPath = await Directory( path.join( (await getTemporaryDirectory()).path, "spotube-downloads", ), ).create(recursive: true); partialFilePath = path.join( tmpDirPath.path, path.basename(savePath) + partialExtension, ); partialFile = File(partialFilePath); final fileExist = await file.exists(); final partialFileExist = await partialFile.exists(); if (fileExist) { logger.d("[DownloadManager] File Exists"); setStatus(task, DownloadStatus.completed); } else if (partialFileExist) { logger.d("[DownloadManager] Partial File Exists"); final partialFileLength = await partialFile.length(); final response = await dio.download( url, partialFilePath + tempExtension, onReceiveProgress: createCallback(url, partialFileLength), options: Options( headers: { HttpHeaders.rangeHeader: 'bytes=$partialFileLength-', HttpHeaders.connectionHeader: "close", }, ), cancelToken: cancelToken, deleteOnError: true, ); if (response.statusCode == HttpStatus.partialContent) { final ioSink = partialFile.openWrite(mode: FileMode.writeOnlyAppend); final partialChunkFile = File(partialFilePath + tempExtension); await ioSink.addStream(partialChunkFile.openRead()); await partialChunkFile.delete(); await ioSink.close(); await partialFile.copy(savePath); await partialFile.delete(); setStatus(task, DownloadStatus.completed); } } else { final response = await dio.chunkedDownload( url, savePath: partialFilePath, onReceiveProgress: createCallback(url, 0), cancelToken: cancelToken, deleteOnError: true, ); if (response.statusCode == HttpStatus.ok) { await partialFile.copy(savePath); await partialFile.delete(); setStatus(task, DownloadStatus.completed); } } } catch (e, stackTrace) { Catcher2.reportCheckedError(e, stackTrace); var task = getDownload(url)!; if (task.status.value != DownloadStatus.canceled && task.status.value != DownloadStatus.paused) { setStatus(task, DownloadStatus.failed); runningTasks--; if (_queue.isNotEmpty) { _startExecution(); } rethrow; } else if (task.status.value == DownloadStatus.paused) { final ioSink = partialFile.openWrite(mode: FileMode.writeOnlyAppend); final f = File(partialFilePath + tempExtension); if (await f.exists()) { await ioSink.addStream(f.openRead()); } await ioSink.close(); } } runningTasks--; if (_queue.isNotEmpty) { _startExecution(); } } void disposeNotifiers(DownloadTask task) { // task.status.dispose(); // task.progress.dispose(); } void setStatus(DownloadTask? task, DownloadStatus status) { if (task != null) { task.status.value = status; // tasks.add(task); if (status.isCompleted) { disposeNotifiers(task); } _statusStreamController.add((status: status, request: task.request)); } } Future addDownload(String url, String savedPath) async { if (url.isEmpty) throw Exception("Invalid Url. Url is empty: $url"); return _addDownloadRequest(DownloadRequest(url, savedPath)); } Future _addDownloadRequest( DownloadRequest downloadRequest, ) async { if (_cache[downloadRequest.url] != null) { if (!_cache[downloadRequest.url]!.status.value.isCompleted && _cache[downloadRequest.url]!.request == downloadRequest) { // Do nothing return _cache[downloadRequest.url]!; } else { _queue.remove(_cache[downloadRequest.url]?.request); } } _queue.add(DownloadRequest(downloadRequest.url, downloadRequest.path)); final task = DownloadTask(_queue.last); _cache[downloadRequest.url] = task; _startExecution(); return task; } Future pauseDownload(String url) async { logger.d("[DownloadManager] Pause Download"); var task = getDownload(url)!; setStatus(task, DownloadStatus.paused); task.request.cancelToken.cancel(); _queue.remove(task.request); } Future cancelDownload(String url) async { logger.d("[DownloadManager] Cancel Download"); var task = getDownload(url)!; setStatus(task, DownloadStatus.canceled); _queue.remove(task.request); task.request.cancelToken.cancel(); } Future resumeDownload(String url) async { logger.d("[DownloadManager] Resume Download"); var task = getDownload(url)!; setStatus(task, DownloadStatus.downloading); task.request.cancelToken = CancelToken(); _queue.add(task.request); _startExecution(); } Future removeDownload(String url) async { cancelDownload(url); _cache.remove(url); } // Do not immediately call getDownload After addDownload, rather use the returned DownloadTask from addDownload DownloadTask? getDownload(String url) { return _cache[url]; } Future whenDownloadComplete(String url, {Duration timeout = const Duration(hours: 2)}) async { DownloadTask? task = getDownload(url); if (task != null) { return task.whenDownloadComplete(timeout: timeout); } else { return Future.error("Not found"); } } List getAllDownloads() { return _cache.values.toList(); } // Batch Download Mechanism Future addBatchDownloads(List urls, String savePath) async { for (final url in urls) { addDownload(url, savePath); } } List getBatchDownloads(List urls) { return urls.map((e) => _cache[e]).toList(); } Future pauseBatchDownloads(List urls) async { for (var element in urls) { pauseDownload(element); } } Future cancelBatchDownloads(List urls) async { for (var element in urls) { cancelDownload(element); } } Future resumeBatchDownloads(List urls) async { for (var element in urls) { resumeDownload(element); } } ValueNotifier getBatchDownloadProgress(List urls) { ValueNotifier progress = ValueNotifier(0); var total = urls.length; if (total == 0) { return progress; } if (total == 1) { return getDownload(urls.first)?.progress ?? progress; } var progressMap = {}; for (var url in urls) { DownloadTask? task = getDownload(url); if (task != null) { progressMap[url] = 0.0; if (task.status.value.isCompleted) { progressMap[url] = 1.0; progress.value = progressMap.values.sum / total; } void progressListener() { progressMap[url] = task.progress.value; progress.value = progressMap.values.sum / total; } task.progress.addListener(progressListener); void listener() { if (task.status.value.isCompleted) { progressMap[url] = 1.0; progress.value = progressMap.values.sum / total; task.status.removeListener(listener); task.progress.removeListener(progressListener); } } task.status.addListener(listener); } else { total--; } } return progress; } Future?> whenBatchDownloadsComplete(List urls, {Duration timeout = const Duration(hours: 2)}) async { var completer = Completer?>(); var completed = 0; var total = urls.length; for (final url in urls) { DownloadTask? task = getDownload(url); if (task != null) { if (task.status.value.isCompleted) { completed++; if (completed == total) { completer.complete(getBatchDownloads(urls)); } } void listener() { if (task.status.value.isCompleted) { completed++; if (completed == total) { completer.complete(getBatchDownloads(urls)); task.status.removeListener(listener); } } } task.status.addListener(listener); } else { total--; if (total == 0) { completer.complete(null); } } } return completer.future.timeout(timeout); } void _startExecution() async { if (runningTasks == maxConcurrentTasks || _queue.isEmpty) { return; } while (_queue.isNotEmpty && runningTasks < maxConcurrentTasks) { runningTasks++; logger.d('Concurrent workers: $runningTasks'); var currentRequest = _queue.removeFirst(); await download( currentRequest.url, currentRequest.path, currentRequest.cancelToken, ); await Future.delayed(const Duration(milliseconds: 500), null); } } /// This function is used for get file name with extension from url String getFileNameFromUrl(String url) { return PrimitiveUtils.toSafeFileName(url.split('/').last); } }