feat: implement chunked downloader

This commit is contained in:
Kingkor Roy Tirtho 2023-08-07 00:42:53 +06:00
parent f0f0abd782
commit b31933f31d
3 changed files with 197 additions and 58 deletions

View File

@ -1,5 +1,4 @@
import 'dart:async'; import 'dart:async';
import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'package:catcher/catcher.dart'; import 'package:catcher/catcher.dart';
@ -31,6 +30,7 @@ class DownloadManagerProvider extends ChangeNotifier {
if (track == null) return; if (track == null) return;
final savePath = getTrackFileUrl(track); final savePath = getTrackFileUrl(track);
// related to onFileExists
final oldFile = File("$savePath.old"); final oldFile = File("$savePath.old");
if ((status == DownloadStatus.failed || if ((status == DownloadStatus.failed ||
@ -40,12 +40,7 @@ class DownloadManagerProvider extends ChangeNotifier {
} }
if (status != DownloadStatus.completed) return; if (status != DownloadStatus.completed) return;
var file = File(request.path); final file = File(request.path);
file.copySync(savePath);
file.deleteSync();
file = File(savePath);
if (await oldFile.exists()) { if (await oldFile.exists()) {
await oldFile.delete(); await oldFile.delete();
@ -62,13 +57,13 @@ class DownloadManagerProvider extends ChangeNotifier {
album: track.album?.name, album: track.album?.name,
albumArtist: track.artists?.map((a) => a.name).join(", "), albumArtist: track.artists?.map((a) => a.name).join(", "),
year: track.album?.releaseDate != null year: track.album?.releaseDate != null
? int.tryParse(track.album!.releaseDate!) ? int.tryParse(track.album!.releaseDate!) ?? 1969
: null, : 1969,
trackNumber: track.trackNumber, trackNumber: track.trackNumber,
discNumber: track.discNumber, discNumber: track.discNumber,
durationMs: track.durationMs?.toDouble(), durationMs: track.durationMs?.toDouble() ?? 0.0,
fileSize: file.lengthSync(), fileSize: await file.length(),
trackTotal: track.album?.tracks?.length, trackTotal: track.album?.tracks?.length ?? 0,
picture: imageBytes != null picture: imageBytes != null
? Picture( ? Picture(
data: imageBytes, data: imageBytes,

View File

@ -0,0 +1,150 @@
import 'dart:async';
import 'dart:io';
import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart';
/// Downloading by spiting as file in chunks
extension ChunkDownload on Dio {
Future<Response> chunkedDownload(
url, {
Map<String, dynamic>? queryParameters,
required String savePath,
ProgressCallback? onReceiveProgress,
CancelToken? cancelToken,
bool deleteOnError = true,
int chunkSize = 102400, // 100KB
int maxConcurrentChunk = 3,
String tempExtension = ".temp",
}) async {
int total = 0;
var progress = <int>[];
ProgressCallback createCallback(int chunkIndex) {
return (int received, _) {
progress[chunkIndex] = received;
if (onReceiveProgress != null && total != 0) {
onReceiveProgress(progress.reduce((a, b) => a + b), total);
}
};
}
// this is the last response
// status & headers will the last chunk's status & headers
final completer = Completer<Response>();
Future<Response> downloadChunk(
String url, {
required int start,
required int end,
required int chunkIndex,
}) async {
progress.add(0);
--end;
final res = await download(
url,
savePath + tempExtension + chunkIndex.toString(),
onReceiveProgress: createCallback(chunkIndex),
cancelToken: cancelToken,
queryParameters: queryParameters,
deleteOnError: deleteOnError,
options: Options(
responseType: ResponseType.bytes,
headers: {"range": "bytes=$start-$end"},
),
);
return res;
}
Future<void> mergeTempFiles(int chunk) async {
File headFile = File("$savePath${tempExtension}0");
var raf = await headFile.open(mode: FileMode.writeOnlyAppend);
for (int i = 1; i < chunk; ++i) {
File chunkFile = File(savePath + tempExtension + i.toString());
raf = await raf.writeFrom(await chunkFile.readAsBytes());
await chunkFile.delete();
}
await raf.close();
debugPrint("Downloaded file path: ${headFile.path}");
headFile = await headFile.rename(savePath);
debugPrint("Renamed file path: ${headFile.path}");
}
final firstResponse = await downloadChunk(
url,
start: 0,
end: chunkSize,
chunkIndex: 0,
);
final responses = <Response>[firstResponse];
if (firstResponse.statusCode == HttpStatus.partialContent) {
total = int.parse(
firstResponse.headers
.value(HttpHeaders.contentRangeHeader)
?.split("/")
.lastOrNull ??
'0',
);
final reserved = total -
int.parse(
firstResponse.headers.value(HttpHeaders.contentLengthHeader) ??
// since its a partial content, the content length will be the chunk size
chunkSize.toString(),
);
int chunk = (reserved / chunkSize).ceil() + 1;
if (chunk > 1) {
int currentChunkSize = chunkSize;
if (chunk > maxConcurrentChunk + 1) {
chunk = maxConcurrentChunk + 1;
currentChunkSize = (reserved / maxConcurrentChunk).ceil();
}
responses.addAll(
await Future.wait(
List.generate(maxConcurrentChunk, (i) {
int start = chunkSize + i * currentChunkSize;
return downloadChunk(
url,
start: start,
end: start + currentChunkSize,
chunkIndex: i + 1,
);
}),
),
);
}
await mergeTempFiles(chunk).then((_) {
final response = responses.last;
final isPartialStatus =
response.statusCode == HttpStatus.partialContent;
completer.complete(
Response(
data: response.data,
headers: response.headers,
requestOptions: response.requestOptions,
statusCode: isPartialStatus ? HttpStatus.ok : response.statusCode,
statusMessage: isPartialStatus ? 'Ok' : response.statusMessage,
extra: response.extra,
isRedirect: response.isRedirect,
redirects: response.redirects,
),
);
}).catchError((e) {
completer.completeError(e);
});
}
return completer.future;
}
}

View File

@ -1,10 +1,12 @@
import 'dart:async'; import 'dart:async';
import 'dart:collection'; import 'dart:collection';
import 'dart:io'; import 'dart:io';
import 'package:catcher/core/catcher.dart';
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
import 'package:dio/dio.dart'; import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart'; import 'package:flutter/foundation.dart';
import 'package:spotube/services/download_manager/chunked_download.dart';
import 'package:spotube/services/download_manager/download_request.dart'; import 'package:spotube/services/download_manager/download_request.dart';
import 'package:spotube/services/download_manager/download_status.dart'; import 'package:spotube/services/download_manager/download_status.dart';
import 'package:spotube/services/download_manager/download_task.dart'; import 'package:spotube/services/download_manager/download_task.dart';
@ -54,35 +56,39 @@ class DownloadManager {
if (total == -1) {} if (total == -1) {}
}; };
Future<void> download(String url, String savePath, cancelToken, Future<void> download(
{forceDownload = false}) async { String url,
String savePath,
CancelToken cancelToken, {
forceDownload = false,
}) async {
late String partialFilePath; late String partialFilePath;
late File partialFile; late File partialFile;
try { try {
var task = getDownload(url); final task = getDownload(url);
if (task == null || task.status.value == DownloadStatus.canceled) { if (task == null || task.status.value == DownloadStatus.canceled) {
return; return;
} }
setStatus(task, DownloadStatus.downloading); setStatus(task, DownloadStatus.downloading);
debugPrint(url); debugPrint("[DownloadManager] $url");
var file = File(savePath.toString()); final file = File(savePath.toString());
partialFilePath = savePath + partialExtension; partialFilePath = savePath + partialExtension;
partialFile = File(partialFilePath); partialFile = File(partialFilePath);
var fileExist = await file.exists(); final fileExist = await file.exists();
var partialFileExist = await partialFile.exists(); final partialFileExist = await partialFile.exists();
if (fileExist) { if (fileExist) {
debugPrint("File Exists"); debugPrint("[DownloadManager] File Exists");
setStatus(task, DownloadStatus.completed); setStatus(task, DownloadStatus.completed);
} else if (partialFileExist) { } else if (partialFileExist) {
debugPrint("Partial File Exists"); debugPrint("[DownloadManager] Partial File Exists");
var partialFileLength = await partialFile.length(); final partialFileLength = await partialFile.length();
var response = await dio.download( final response = await dio.download(
url, url,
partialFilePath + tempExtension, partialFilePath + tempExtension,
onReceiveProgress: createCallback(url, partialFileLength), onReceiveProgress: createCallback(url, partialFileLength),
@ -97,23 +103,20 @@ class DownloadManager {
); );
if (response.statusCode == HttpStatus.partialContent) { if (response.statusCode == HttpStatus.partialContent) {
var ioSink = partialFile.openWrite(mode: FileMode.writeOnlyAppend); final ioSink = partialFile.openWrite(mode: FileMode.writeOnlyAppend);
var _f = File(partialFilePath + tempExtension); final partialChunkFile = File(partialFilePath + tempExtension);
await ioSink.addStream(_f.openRead()); await ioSink.addStream(partialChunkFile.openRead());
await _f.delete(); await partialChunkFile.delete();
await ioSink.close(); await ioSink.close();
await partialFile.rename(savePath); await partialFile.rename(savePath);
setStatus(task, DownloadStatus.completed); setStatus(task, DownloadStatus.completed);
} }
} else { } else {
var response = await dio.download( final response = await dio.chunkedDownload(
url, url,
partialFilePath, savePath: partialFilePath,
onReceiveProgress: createCallback(url, 0), onReceiveProgress: createCallback(url, 0),
options: Options(headers: {
HttpHeaders.connectionHeader: "close",
}),
cancelToken: cancelToken, cancelToken: cancelToken,
deleteOnError: false, deleteOnError: false,
); );
@ -123,7 +126,9 @@ class DownloadManager {
setStatus(task, DownloadStatus.completed); setStatus(task, DownloadStatus.completed);
} }
} }
} catch (e) { } catch (e, stackTrace) {
Catcher.reportCheckedError(e, stackTrace);
var task = getDownload(url)!; var task = getDownload(url)!;
if (task.status.value != DownloadStatus.canceled && if (task.status.value != DownloadStatus.canceled &&
task.status.value != DownloadStatus.paused) { task.status.value != DownloadStatus.paused) {
@ -169,21 +174,9 @@ class DownloadManager {
} }
} }
Future<DownloadTask?> addDownload(String url, String savedDir) async { Future<DownloadTask?> addDownload(String url, String savedPath) async {
if (url.isNotEmpty) { if (url.isEmpty) throw Exception("Invalid Url. Url is empty: $url");
if (savedDir.isEmpty) { return _addDownloadRequest(DownloadRequest(url, savedPath));
savedDir = ".";
}
var isDirectory = await Directory(savedDir).exists();
var downloadFilename = isDirectory
? savedDir + Platform.pathSeparator + getFileNameFromUrl(url)
: savedDir;
return _addDownloadRequest(DownloadRequest(url, downloadFilename));
}
return null;
} }
Future<DownloadTask> _addDownloadRequest( Future<DownloadTask> _addDownloadRequest(
@ -200,7 +193,8 @@ class DownloadManager {
} }
_queue.add(DownloadRequest(downloadRequest.url, downloadRequest.path)); _queue.add(DownloadRequest(downloadRequest.url, downloadRequest.path));
var task = DownloadTask(_queue.last);
final task = DownloadTask(_queue.last);
_cache[downloadRequest.url] = task; _cache[downloadRequest.url] = task;
@ -210,7 +204,7 @@ class DownloadManager {
} }
Future<void> pauseDownload(String url) async { Future<void> pauseDownload(String url) async {
debugPrint("Pause Download"); debugPrint("[DownloadManager] Pause Download");
var task = getDownload(url)!; var task = getDownload(url)!;
setStatus(task, DownloadStatus.paused); setStatus(task, DownloadStatus.paused);
task.request.cancelToken.cancel(); task.request.cancelToken.cancel();
@ -219,7 +213,7 @@ class DownloadManager {
} }
Future<void> cancelDownload(String url) async { Future<void> cancelDownload(String url) async {
debugPrint("Cancel Download"); debugPrint("[DownloadManager] Cancel Download");
var task = getDownload(url)!; var task = getDownload(url)!;
setStatus(task, DownloadStatus.canceled); setStatus(task, DownloadStatus.canceled);
_queue.remove(task.request); _queue.remove(task.request);
@ -227,7 +221,7 @@ class DownloadManager {
} }
Future<void> resumeDownload(String url) async { Future<void> resumeDownload(String url) async {
debugPrint("Resume Download"); debugPrint("[DownloadManager] Resume Download");
var task = getDownload(url)!; var task = getDownload(url)!;
setStatus(task, DownloadStatus.downloading); setStatus(task, DownloadStatus.downloading);
task.request.cancelToken = CancelToken(); task.request.cancelToken = CancelToken();
@ -262,10 +256,10 @@ class DownloadManager {
} }
// Batch Download Mechanism // Batch Download Mechanism
Future<void> addBatchDownloads(List<String> urls, String savedDir) async { Future<void> addBatchDownloads(List<String> urls, String savePath) async {
urls.forEach((url) { for (final url in urls) {
addDownload(url, savedDir); addDownload(url, savePath);
}); }
} }
List<DownloadTask?> getBatchDownloads(List<String> urls) { List<DownloadTask?> getBatchDownloads(List<String> urls) {
@ -349,7 +343,7 @@ class DownloadManager {
var completed = 0; var completed = 0;
var total = urls.length; var total = urls.length;
urls.forEach((url) { for (final url in urls) {
DownloadTask? task = getDownload(url); DownloadTask? task = getDownload(url);
if (task != null) { if (task != null) {
@ -381,7 +375,7 @@ class DownloadManager {
completer.complete(null); completer.complete(null);
} }
} }
}); }
return completer.future.timeout(timeout); return completer.future.timeout(timeout);
} }