spotube/lib/services/download_manager/download_manager.dart
Kingkor Roy Tirtho 38dc4beb44
feat: blazingly™ fast download manager (#619)
* feat: concurrent download service & download prorvider

* feat: implement chunked downloader

* fix: no audio-tags in Linux and duration not showing up for local tracks

* feat: show matching tracks in queue as well

* feat: always uses piped api for download to avoid IP block

* fix: invalid downloadCount
2023-08-07 16:49:11 +06:00

408 lines
11 KiB
Dart

import 'dart:async';
import 'dart:collection';
import 'dart:io';
import 'package:catcher/core/catcher.dart';
import 'package:collection/collection.dart';
import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart';
import 'package:spotube/services/download_manager/chunked_download.dart';
import 'package:spotube/services/download_manager/download_request.dart';
import 'package:spotube/services/download_manager/download_status.dart';
import 'package:spotube/services/download_manager/download_task.dart';
export './download_request.dart';
export './download_status.dart';
export './download_task.dart';
typedef DownloadStatusEvent = ({
DownloadStatus status,
DownloadRequest request
});
class DownloadManager {
final Map<String, DownloadTask> _cache = <String, DownloadTask>{};
final Queue<DownloadRequest> _queue = Queue();
var dio = Dio();
static const partialExtension = ".partial";
static const tempExtension = ".temp";
// var tasks = StreamController<DownloadTask>();
final _statusStreamController =
StreamController<DownloadStatusEvent>.broadcast();
Stream<DownloadStatusEvent> get statusStream =>
_statusStreamController.stream;
int maxConcurrentTasks = 2;
int runningTasks = 0;
static final DownloadManager _dm = DownloadManager._internal();
DownloadManager._internal();
factory DownloadManager({int? maxConcurrentTasks}) {
if (maxConcurrentTasks != null) {
_dm.maxConcurrentTasks = maxConcurrentTasks;
}
return _dm;
}
void Function(int, int) createCallback(url, int partialFileLength) =>
(int received, int total) {
getDownload(url)?.progress.value =
(received + partialFileLength) / (total + partialFileLength);
if (total == -1) {}
};
Future<void> download(
String url,
String savePath,
CancelToken cancelToken, {
forceDownload = false,
}) async {
late String partialFilePath;
late File partialFile;
try {
final task = getDownload(url);
if (task == null || task.status.value == DownloadStatus.canceled) {
return;
}
setStatus(task, DownloadStatus.downloading);
debugPrint("[DownloadManager] $url");
final file = File(savePath.toString());
partialFilePath = savePath + partialExtension;
partialFile = File(partialFilePath);
final fileExist = await file.exists();
final partialFileExist = await partialFile.exists();
if (fileExist) {
debugPrint("[DownloadManager] File Exists");
setStatus(task, DownloadStatus.completed);
} else if (partialFileExist) {
debugPrint("[DownloadManager] Partial File Exists");
final partialFileLength = await partialFile.length();
final response = await dio.download(
url,
partialFilePath + tempExtension,
onReceiveProgress: createCallback(url, partialFileLength),
options: Options(
headers: {
HttpHeaders.rangeHeader: 'bytes=$partialFileLength-',
HttpHeaders.connectionHeader: "close",
},
),
cancelToken: cancelToken,
deleteOnError: true,
);
if (response.statusCode == HttpStatus.partialContent) {
final ioSink = partialFile.openWrite(mode: FileMode.writeOnlyAppend);
final partialChunkFile = File(partialFilePath + tempExtension);
await ioSink.addStream(partialChunkFile.openRead());
await partialChunkFile.delete();
await ioSink.close();
await partialFile.rename(savePath);
setStatus(task, DownloadStatus.completed);
}
} else {
final response = await dio.chunkedDownload(
url,
savePath: partialFilePath,
onReceiveProgress: createCallback(url, 0),
cancelToken: cancelToken,
deleteOnError: true,
);
if (response.statusCode == HttpStatus.ok) {
await partialFile.rename(savePath);
setStatus(task, DownloadStatus.completed);
}
}
} catch (e, stackTrace) {
Catcher.reportCheckedError(e, stackTrace);
var task = getDownload(url)!;
if (task.status.value != DownloadStatus.canceled &&
task.status.value != DownloadStatus.paused) {
setStatus(task, DownloadStatus.failed);
runningTasks--;
if (_queue.isNotEmpty) {
_startExecution();
}
rethrow;
} else if (task.status.value == DownloadStatus.paused) {
final ioSink = partialFile.openWrite(mode: FileMode.writeOnlyAppend);
final f = File(partialFilePath + tempExtension);
if (await f.exists()) {
await ioSink.addStream(f.openRead());
}
await ioSink.close();
}
}
runningTasks--;
if (_queue.isNotEmpty) {
_startExecution();
}
}
void disposeNotifiers(DownloadTask task) {
// task.status.dispose();
// task.progress.dispose();
}
void setStatus(DownloadTask? task, DownloadStatus status) {
if (task != null) {
task.status.value = status;
// tasks.add(task);
if (status.isCompleted) {
disposeNotifiers(task);
}
_statusStreamController.add((status: status, request: task.request));
}
}
Future<DownloadTask?> addDownload(String url, String savedPath) async {
if (url.isEmpty) throw Exception("Invalid Url. Url is empty: $url");
return _addDownloadRequest(DownloadRequest(url, savedPath));
}
Future<DownloadTask> _addDownloadRequest(
DownloadRequest downloadRequest,
) async {
if (_cache[downloadRequest.url] != null) {
if (!_cache[downloadRequest.url]!.status.value.isCompleted &&
_cache[downloadRequest.url]!.request == downloadRequest) {
// Do nothing
return _cache[downloadRequest.url]!;
} else {
_queue.remove(_cache[downloadRequest.url]);
}
}
_queue.add(DownloadRequest(downloadRequest.url, downloadRequest.path));
final task = DownloadTask(_queue.last);
_cache[downloadRequest.url] = task;
_startExecution();
return task;
}
Future<void> pauseDownload(String url) async {
debugPrint("[DownloadManager] Pause Download");
var task = getDownload(url)!;
setStatus(task, DownloadStatus.paused);
task.request.cancelToken.cancel();
_queue.remove(task.request);
}
Future<void> cancelDownload(String url) async {
debugPrint("[DownloadManager] Cancel Download");
var task = getDownload(url)!;
setStatus(task, DownloadStatus.canceled);
_queue.remove(task.request);
task.request.cancelToken.cancel();
}
Future<void> resumeDownload(String url) async {
debugPrint("[DownloadManager] Resume Download");
var task = getDownload(url)!;
setStatus(task, DownloadStatus.downloading);
task.request.cancelToken = CancelToken();
_queue.add(task.request);
_startExecution();
}
Future<void> removeDownload(String url) async {
cancelDownload(url);
_cache.remove(url);
}
// Do not immediately call getDownload After addDownload, rather use the returned DownloadTask from addDownload
DownloadTask? getDownload(String url) {
return _cache[url];
}
Future<DownloadStatus> whenDownloadComplete(String url,
{Duration timeout = const Duration(hours: 2)}) async {
DownloadTask? task = getDownload(url);
if (task != null) {
return task.whenDownloadComplete(timeout: timeout);
} else {
return Future.error("Not found");
}
}
List<DownloadTask> getAllDownloads() {
return _cache.values.toList();
}
// Batch Download Mechanism
Future<void> addBatchDownloads(List<String> urls, String savePath) async {
for (final url in urls) {
addDownload(url, savePath);
}
}
List<DownloadTask?> getBatchDownloads(List<String> urls) {
return urls.map((e) => _cache[e]).toList();
}
Future<void> pauseBatchDownloads(List<String> urls) async {
urls.forEach((element) {
pauseDownload(element);
});
}
Future<void> cancelBatchDownloads(List<String> urls) async {
urls.forEach((element) {
cancelDownload(element);
});
}
Future<void> resumeBatchDownloads(List<String> urls) async {
urls.forEach((element) {
resumeDownload(element);
});
}
ValueNotifier<double> getBatchDownloadProgress(List<String> urls) {
ValueNotifier<double> progress = ValueNotifier(0);
var total = urls.length;
if (total == 0) {
return progress;
}
if (total == 1) {
return getDownload(urls.first)?.progress ?? progress;
}
var progressMap = Map<String, double>();
urls.forEach((url) {
DownloadTask? task = getDownload(url);
if (task != null) {
progressMap[url] = 0.0;
if (task.status.value.isCompleted) {
progressMap[url] = 1.0;
progress.value = progressMap.values.sum / total;
}
var progressListener;
progressListener = () {
progressMap[url] = task.progress.value;
progress.value = progressMap.values.sum / total;
};
task.progress.addListener(progressListener);
var listener;
listener = () {
if (task.status.value.isCompleted) {
progressMap[url] = 1.0;
progress.value = progressMap.values.sum / total;
task.status.removeListener(listener);
task.progress.removeListener(progressListener);
}
};
task.status.addListener(listener);
} else {
total--;
}
});
return progress;
}
Future<List<DownloadTask?>?> whenBatchDownloadsComplete(List<String> urls,
{Duration timeout = const Duration(hours: 2)}) async {
var completer = Completer<List<DownloadTask?>?>();
var completed = 0;
var total = urls.length;
for (final url in urls) {
DownloadTask? task = getDownload(url);
if (task != null) {
if (task.status.value.isCompleted) {
completed++;
if (completed == total) {
completer.complete(getBatchDownloads(urls));
}
}
var listener;
listener = () {
if (task.status.value.isCompleted) {
completed++;
if (completed == total) {
completer.complete(getBatchDownloads(urls));
task.status.removeListener(listener);
}
}
};
task.status.addListener(listener);
} else {
total--;
if (total == 0) {
completer.complete(null);
}
}
}
return completer.future.timeout(timeout);
}
void _startExecution() async {
if (runningTasks == maxConcurrentTasks || _queue.isEmpty) {
return;
}
while (_queue.isNotEmpty && runningTasks < maxConcurrentTasks) {
runningTasks++;
debugPrint('Concurrent workers: $runningTasks');
var currentRequest = _queue.removeFirst();
await download(
currentRequest.url,
currentRequest.path,
currentRequest.cancelToken,
);
await Future.delayed(const Duration(milliseconds: 500), null);
}
}
/// This function is used for get file name with extension from url
String getFileNameFromUrl(String url) {
return url.split('/').last;
}
}