From 1e1f2ca82c6dc2c76fa6a5924d35bccd23258c72 Mon Sep 17 00:00:00 2001 From: Kingkor Roy Tirtho Date: Wed, 10 Dec 2025 16:45:44 +0600 Subject: [PATCH] fix: stuck because of authState running in main thread and sse no url-request event captured --- .vscode/launch.json | 2 +- lib/collections/routes.gr.dart | 4 +- lib/models/playback/track_sources.g.dart | 2 +- .../settings/metadata/metadata_form.dart | 2 +- lib/provider/metadata_plugin/core/auth.dart | 9 +- .../metadata_plugin_provider.dart | 14 +- .../updater/update_checker.dart | 58 +++-- .../server/libs/eventsource_publisher.dart | 201 ++++++++++++++++++ .../server/libs/shelf_eventsource.dart | 106 +++++++++ lib/provider/server/router.dart | 13 +- .../routes/plugin_apis/path_provider.dart | 29 --- .../server/routes/plugin_apis/webview.dart | 59 +++-- lib/provider/server/sse_publisher.dart | 14 ++ lib/services/metadata/metadata.dart | 15 +- .../plugin_api/localstorage/localstorage.dart | 0 lib/src/rust/api/plugin/plugin.dart | 5 +- lib/src/rust/frb_generated.dart | 32 +-- rust/src/api/plugin/models/core.rs | 5 +- rust/src/api/plugin/plugin.rs | 107 ++++++---- rust/src/api/plugin/senders.rs | 98 ++++----- rust/src/frb_generated.rs | 70 +++--- rust/src/internal/apis/mod.rs | 28 +-- rust/src/internal/apis/timezone.rs | 2 +- rust/src/internal/apis/webview.rs | 28 ++- rust/src/internal/auth.rs | 14 +- rust/src/internal/utils.rs | 78 +++++-- rust/src/main.rs | 2 +- 27 files changed, 680 insertions(+), 317 deletions(-) create mode 100644 lib/provider/server/libs/eventsource_publisher.dart create mode 100644 lib/provider/server/libs/shelf_eventsource.dart delete mode 100644 lib/provider/server/routes/plugin_apis/path_provider.dart create mode 100644 lib/provider/server/sse_publisher.dart delete mode 100644 lib/src/plugin_api/localstorage/localstorage.dart diff --git a/.vscode/launch.json b/.vscode/launch.json index b81e2eee..a62385f3 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -52,7 +52,7 @@ "--flavor", "dev" ] - } + }, ], "compounds": [] } \ No newline at end of file diff --git a/lib/collections/routes.gr.dart b/lib/collections/routes.gr.dart index 51144ceb..a5209d22 100644 --- a/lib/collections/routes.gr.dart +++ b/lib/collections/routes.gr.dart @@ -632,7 +632,7 @@ class SettingsMetadataProviderFormRoute SettingsMetadataProviderFormRoute({ _i44.Key? key, required String title, - required List fields, + required List fields, List<_i41.PageRouteInfo>? children, }) : super( SettingsMetadataProviderFormRoute.name, @@ -670,7 +670,7 @@ class SettingsMetadataProviderFormRouteArgs { final String title; - final List fields; + final List fields; @override String toString() { diff --git a/lib/models/playback/track_sources.g.dart b/lib/models/playback/track_sources.g.dart index d3c1796e..3088493a 100644 --- a/lib/models/playback/track_sources.g.dart +++ b/lib/models/playback/track_sources.g.dart @@ -7,7 +7,7 @@ part of 'track_sources.dart'; // ************************************************************************** BasicSourcedTrack _$BasicSourcedTrackFromJson(Map json) => BasicSourcedTrack( - query: SpotubeTrackObject.fromJson( + query: SpotubeFullTrackObject.fromJson( Map.from(json['query'] as Map)), source: json['source'] as String, info: SpotubeAudioSourceMatchObject.fromJson( diff --git a/lib/pages/settings/metadata/metadata_form.dart b/lib/pages/settings/metadata/metadata_form.dart index b0aeb8bb..45acdde4 100644 --- a/lib/pages/settings/metadata/metadata_form.dart +++ b/lib/pages/settings/metadata/metadata_form.dart @@ -13,7 +13,7 @@ import 'package:spotube/models/metadata/metadata.dart'; @RoutePage() class SettingsMetadataProviderFormPage extends HookConsumerWidget { final String title; - final List fields; + final List fields; const SettingsMetadataProviderFormPage({ super.key, required this.title, diff --git a/lib/provider/metadata_plugin/core/auth.dart b/lib/provider/metadata_plugin/core/auth.dart index e5b16555..7649e781 100644 --- a/lib/provider/metadata_plugin/core/auth.dart +++ b/lib/provider/metadata_plugin/core/auth.dart @@ -19,16 +19,19 @@ class MetadataPluginAuthenticatedNotifier extends AsyncNotifier { return false; } + /// `authState` can be called once in the SpotubePlugin's lifetime. final sub = defaultPlugin.authState().listen((event) async { - state = AsyncData(await defaultPlugin.auth - .isAuthenticated(mpscTx: defaultPlugin.sender)); + state = AsyncData( + await defaultPlugin.auth.isAuthenticated(mpscTx: defaultPlugin.sender), + ); }); ref.onDispose(() { sub.cancel(); }); - return defaultPlugin.auth.isAuthenticated(mpscTx: defaultPlugin.sender); + return await defaultPlugin.auth + .isAuthenticated(mpscTx: defaultPlugin.sender); } } diff --git a/lib/provider/metadata_plugin/metadata_plugin_provider.dart b/lib/provider/metadata_plugin/metadata_plugin_provider.dart index 16e1bce5..a8bd0536 100644 --- a/lib/provider/metadata_plugin/metadata_plugin_provider.dart +++ b/lib/provider/metadata_plugin/metadata_plugin_provider.dart @@ -16,6 +16,7 @@ import 'package:spotube/services/dio/dio.dart'; import 'package:spotube/services/logger/logger.dart'; import 'package:spotube/services/metadata/errors/exceptions.dart'; import 'package:spotube/services/metadata/metadata.dart'; +import 'package:spotube/src/rust/api/plugin/plugin.dart'; import 'package:spotube/utils/service_utils.dart'; import 'package:archive/archive.dart'; import 'package:pub_semver/pub_semver.dart'; @@ -598,11 +599,16 @@ final _pluginProvider = final pluginsNotifier = ref.read(metadataPluginsProvider.notifier); final pluginSourceCode = await pluginsNotifier.getPluginSourceCode(config); + final spotubePlugin = SpotubePlugin(); final plugin = MetadataPlugin( - pluginScript: pluginSourceCode, - pluginConfig: config, - serverEndpointUrl: "http://${server.address.host}:$port", - serverSecret: serverSecret, + plugin: spotubePlugin, + sender: await spotubePlugin.createContext( + pluginScript: pluginSourceCode, + pluginConfig: config, + serverEndpointUrl: "http://${server.address.host}:$port", + serverSecret: serverSecret, + localStorageDir: (await getApplicationSupportDirectory()).path, + ), ); ref.onDispose(() { diff --git a/lib/provider/metadata_plugin/updater/update_checker.dart b/lib/provider/metadata_plugin/updater/update_checker.dart index 91f98dcd..a70ac786 100644 --- a/lib/provider/metadata_plugin/updater/update_checker.dart +++ b/lib/provider/metadata_plugin/updater/update_checker.dart @@ -1,36 +1,52 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:shadcn_flutter/shadcn_flutter.dart'; import 'package:spotube/models/metadata/metadata.dart'; import 'package:spotube/provider/metadata_plugin/metadata_plugin_provider.dart'; final metadataPluginUpdateCheckerProvider = FutureProvider((ref) async { - final metadataPluginConfigs = await ref.watch(metadataPluginsProvider.future); - final metadataPlugin = await ref.watch(metadataPluginProvider.future); + try { + final metadataPluginConfigs = + await ref.watch(metadataPluginsProvider.future); + final metadataPlugin = await ref.watch(metadataPluginProvider.future); - if (metadataPlugin == null || - metadataPluginConfigs.defaultMetadataPluginConfig == null) { - return null; + if (metadataPlugin == null || + metadataPluginConfigs.defaultMetadataPluginConfig == null) { + return null; + } + + final res = await metadataPlugin.core.checkUpdate( + pluginConfig: metadataPluginConfigs.defaultMetadataPluginConfig!, + mpscTx: metadataPlugin.sender, + ); + + return res; + } catch (e) { + debugPrint('Error checking metadata plugin update: $e'); + rethrow; } - - return metadataPlugin.core.checkUpdate( - pluginConfig: metadataPluginConfigs.defaultMetadataPluginConfig!, - mpscTx: metadataPlugin.sender, - ); }); final audioSourcePluginUpdateCheckerProvider = FutureProvider((ref) async { - final audioSourcePluginConfigs = - await ref.watch(metadataPluginsProvider.future); - final audioSourcePlugin = await ref.watch(audioSourcePluginProvider.future); + try { + final audioSourcePluginConfigs = + await ref.watch(metadataPluginsProvider.future); + final audioSourcePlugin = await ref.watch(audioSourcePluginProvider.future); - if (audioSourcePlugin == null || - audioSourcePluginConfigs.defaultAudioSourcePluginConfig == null) { - return null; + if (audioSourcePlugin == null || + audioSourcePluginConfigs.defaultAudioSourcePluginConfig == null) { + return null; + } + + final res = await audioSourcePlugin.core.checkUpdate( + pluginConfig: audioSourcePluginConfigs.defaultAudioSourcePluginConfig!, + mpscTx: audioSourcePlugin.sender, + ); + + return res; + } catch (e) { + debugPrint('Error checking audio source plugin update: $e'); + rethrow; } - - return audioSourcePlugin.core.checkUpdate( - pluginConfig: audioSourcePluginConfigs.defaultAudioSourcePluginConfig!, - mpscTx: audioSourcePlugin.sender, - ); }); diff --git a/lib/provider/server/libs/eventsource_publisher.dart b/lib/provider/server/libs/eventsource_publisher.dart new file mode 100644 index 00000000..3c01a282 --- /dev/null +++ b/lib/provider/server/libs/eventsource_publisher.dart @@ -0,0 +1,201 @@ +import "dart:async"; + +import "package:collection/collection.dart"; +import "package:logging/logging.dart" as log; + +/// Just a simple [Sink] implementation that proxies the [add] and [close] +/// methods. +class ProxySink implements Sink { + void Function(T) onAdd; + void Function() onClose; + ProxySink({required this.onAdd, required this.onClose}); + @override + void add(t) => onAdd(t); + @override + void close() => onClose(); +} + +class EventCache { + final int? cacheCapacity; + final bool comparableIds; + final Map> _caches = {}; + + EventCache({this.cacheCapacity, this.comparableIds = true}); + + void replay(Sink sink, String lastEventId, [String channel = ""]) { + List? cache = _caches[channel]; + if (cache == null || cache.isEmpty) { + // nothing to replay + return; + } + // find the location of lastEventId in the queue + int index; + if (comparableIds) { + // if comparableIds, we can use binary search + index = binarySearch(cache, lastEventId); + } else { + // otherwise, we starts from the last one and look one by one + index = cache.length - 1; + while (index > 0 && cache[index].id != lastEventId) { + index--; + } + } + if (index >= 0) { + // add them all to the sink + cache.sublist(index).forEach(sink.add); + } + } + + /// Add a new [Event] to the cache(s) of the specified channel(s). + /// Please note that we assume events are added with increasing values of + /// [Event.id]. + void add(Event event, [Iterable channels = const [""]]) { + for (String channel in channels) { + List cache = _caches.putIfAbsent(channel, () => []); + if (cacheCapacity != null && cache.length >= cacheCapacity!) { + cache.removeAt(0); + } + cache.add(event); + } + } + + void clear([Iterable channels = const [""]]) { + channels.forEach(_caches.remove); + } + + void clearAll() { + _caches.clear(); + } +} + +class Event implements Comparable { + /// An identifier that can be used to allow a client to replay + /// missed Events by returning the Last-Event-Id header. + /// Return empty string if not required. + String? id; + + /// The name of the event. Return empty string if not required. + String? event; + + /// The payload of the event. + String? data; + + Event({this.id, this.event, this.data}); + + Event.message({this.id, this.data}) : event = "message"; + + @override + int compareTo(Event other) => id!.compareTo(other.id!); +} + +/// An EventSource publisher. It can manage different channels of events. +/// This class forms the backbone of an EventSource server. To actually serve +/// a web server, use this together with [shelf_eventsource] or another server +/// implementation. +class EventSourcePublisher implements Sink { + log.Logger? logger; + EventCache? _cache; + + /// Create a new EventSource server. + /// + /// When using a cache, for efficient replaying, it is advisable to use a + /// custom Event implementation that overrides the `Event.compareTo` method. + /// F.e. if integer events are used, sorting should be done on integers and + /// not on the string representations of them. + /// If your Event's id properties are not incremental using + /// [Comparable.compare], set [comparableIds] to false. + EventSourcePublisher({ + int cacheCapacity = 0, + bool comparableIds = false, + bool enableLogging = true, + }) { + if (cacheCapacity > 0) { + _cache = EventCache(cacheCapacity: cacheCapacity); + } + if (enableLogging) { + logger = log.Logger("EventSourceServer"); + } + } + + final Map> _subsByChannel = {}; + + /// Creates a Sink for the specified channel. + /// The `add` and `remove` methods of this channel are equivalent to the + /// respective methods of this class with the specific channel passed along. + Sink channel(String channel) => ProxySink( + onAdd: (e) => add(e, channels: [channel]), + onClose: () => close(channels: [channel])); + + /// Add a publication to the specified channels. + /// By default, only adds to the default channel. + @override + void add(Event event, {Iterable channels = const [""]}) { + for (String channel in channels) { + List? subs = _subsByChannel[channel]; + if (subs == null) { + continue; + } + _logFiner( + "Sending event on channel $channel to ${subs.length} subscribers."); + for (var sub in subs) { + sub.add(event); + } + } + _cache?.add(event, channels); + } + + /// Close the specified channels. + /// All the connections with the subscribers to this channels will be closed. + /// By default only closes the default channel. + @override + void close({Iterable channels = const [""]}) { + for (String channel in channels) { + List? subs = _subsByChannel[channel]; + if (subs == null) { + continue; + } + _logInfo("Closing channel $channel with ${subs.length} subscribers."); + for (var sub in subs) { + sub.close(); + } + } + _cache?.clear(channels); + } + + /// Close all the open channels. + void closeAllChannels() => close(channels: _subsByChannel.keys); + + /// Initialize a new subscription and replay when possible. + /// Should not be used by the user directly. + void newSubscription({ + required void Function(Event) onEvent, + required void Function() onClose, + required String channel, + String? lastEventId, + }) { + _logFine("New subscriber on channel $channel."); + // create a sink for the subscription + ProxySink sub = ProxySink(onAdd: onEvent, onClose: onClose); + // save the subscription + _subsByChannel.putIfAbsent(channel, () => []).add(sub); + // replay past events + if (_cache != null && lastEventId != null) { + scheduleMicrotask(() { + _logFine("Replaying events on channel $channel from id $lastEventId."); + _cache!.replay(sub, lastEventId, channel); + }); + } + } + + void _logInfo(message) { + logger?.log(log.Level.INFO, message); + } + + void _logFine(message) { + logger?.log(log.Level.FINE, message); + } + + void _logFiner(message) { + logger?.log(log.Level.FINER, message); + } +} diff --git a/lib/provider/server/libs/shelf_eventsource.dart b/lib/provider/server/libs/shelf_eventsource.dart new file mode 100644 index 00000000..d7dd519f --- /dev/null +++ b/lib/provider/server/libs/shelf_eventsource.dart @@ -0,0 +1,106 @@ +import "dart:convert"; +import "dart:io"; +import "package:shelf/shelf.dart"; +import "package:spotube/provider/server/libs/eventsource_publisher.dart"; + +class EventSourceEncoder extends Converter> { + final bool compressed; + + const EventSourceEncoder({this.compressed = false}); + + static final Map _fields = { + "id: ": (e) => e.id, + "event: ": (e) => e.event, + "data: ": (e) => e.data, + }; + + @override + List convert(Event event) { + String payload = convertToString(event); + List bytes = utf8.encode(payload); + if (compressed) { + bytes = gzip.encode(bytes); + } + return bytes; + } + + String convertToString(Event event) { + String payload = ""; + for (String prefix in _fields.keys) { + String? value = _fields[prefix]?.call(event); + if (value == null || value.isEmpty) { + continue; + } + // multi-line values need the field prefix on every line + value = value.replaceAll("\n", "\n$prefix"); + payload += "$prefix$value\n"; + } + payload += "\n"; + return payload; + } + + @override + Sink startChunkedConversion(Sink> sink) { + Sink inputSink = sink; + if (compressed) { + inputSink = + gzip.encoder.startChunkedConversion(inputSink as Sink>); + } + inputSink = + utf8.encoder.startChunkedConversion(inputSink as Sink>); + return new ProxySink( + onAdd: (Event event) => inputSink.add(convertToString(event)), + onClose: () => inputSink.close()); + } +} + +/// Create a shelf handler for the specified channel. +/// This handler can be passed to the [shelf.serve] method. +Handler eventSourceHandler( + EventSourcePublisher publisher, { + String channel = "", + bool gzip = false, +}) { + // define the handler + Response shelfHandler(Request request) { + if (request.method != "GET") { + return Response.notFound(null); + } + + if (!request.canHijack) { + throw ArgumentError("eventSourceHandler may only be used with a " + "server that supports request hijacking."); + } + + // set content encoding to gzip if we allow it and the request supports it + bool useGzip = + gzip && (request.headers["Accept-Encoding"] ?? "").contains("gzip"); + + // hijack the raw underlying channel + request.hijack((untypedChannel) { + var socketChannel = (untypedChannel).cast>(); + // create a regular UTF8 sink to write headers + var sink = utf8.encoder.startChunkedConversion(socketChannel.sink); + // write headers + sink.add("HTTP/1.1 200 OK\r\n" + "Content-Type: text/event-stream; charset=utf-8\r\n" + "Cache-Control: no-cache, no-store, must-revalidate\r\n" + "Connection: keep-alive\r\n"); + if (useGzip) sink.add("Content-Encoding: gzip\r\n"); + sink.add("\r\n"); + + // create encoder for this connection + var encodedSink = EventSourceEncoder(compressed: useGzip) + .startChunkedConversion(socketChannel.sink); + + // initialize the new subscription + publisher.newSubscription( + onEvent: encodedSink.add, + onClose: encodedSink.close, + channel: channel, + lastEventId: request.headers["Last-Event-ID"]); + }); + } + + return shelfHandler; +} diff --git a/lib/provider/server/router.dart b/lib/provider/server/router.dart index 9db32145..16a226ed 100644 --- a/lib/provider/server/router.dart +++ b/lib/provider/server/router.dart @@ -1,12 +1,13 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf_router/shelf_router.dart'; +import 'package:spotube/provider/server/libs/shelf_eventsource.dart'; import 'package:spotube/provider/server/routes/connect.dart'; import 'package:spotube/provider/server/routes/playback.dart'; import 'package:spotube/provider/server/routes/plugin_apis/form.dart'; -import 'package:spotube/provider/server/routes/plugin_apis/path_provider.dart'; import 'package:spotube/provider/server/routes/plugin_apis/webview.dart'; import 'package:spotube/provider/server/routes/plugin_apis/yt_engine.dart'; +import 'package:spotube/provider/server/sse_publisher.dart'; Handler pluginApiAuthMiddleware(Handler handler) { return (Request request) { @@ -25,6 +26,8 @@ final serverRouterProvider = Provider((ref) { final webviewRoutes = ref.watch(serverWebviewRoutesProvider); final formRoutes = ref.watch(serverFormRoutesProvider); final ytEngineRoutes = ref.watch(serverYTEngineRoutesProvider); + final publisher = ref.watch(ssePublisherProvider); + final sseHandler = eventSourceHandler(publisher); final router = Router(); @@ -42,8 +45,8 @@ final serverRouterProvider = Provider((ref) { pluginApiAuthMiddleware(webviewRoutes.postCreateWebview), ); router.get( - "/plugin-api/webview//on-url-request", - pluginApiAuthMiddleware(webviewRoutes.getOnUrlRequestStream), + "/plugin-api/webview/events", + pluginApiAuthMiddleware(sseHandler), ); router.post( "/plugin-api/webview/open", @@ -61,10 +64,6 @@ final serverRouterProvider = Provider((ref) { "/plugin-api/form/show", pluginApiAuthMiddleware(formRoutes.showForm), ); - router.get( - "/plugin/localstorage/directories", - pluginApiAuthMiddleware(ServerPathProviderRoutes.getDirectories), - ); router.get( "/plugin-api/yt-engine/search", diff --git a/lib/provider/server/routes/plugin_apis/path_provider.dart b/lib/provider/server/routes/plugin_apis/path_provider.dart deleted file mode 100644 index 55f1ad8b..00000000 --- a/lib/provider/server/routes/plugin_apis/path_provider.dart +++ /dev/null @@ -1,29 +0,0 @@ -import 'dart:convert'; -import 'dart:io'; - -import 'package:path_provider/path_provider.dart' as pp; -import 'package:shelf/shelf.dart'; - -class ServerPathProviderRoutes { - static Future getDirectories(Request request) async { - final directories = { - 'temporary': await Future.value(pp.getTemporaryDirectory()) - .catchError((e) => null), - 'applicationDocuments': - await Future.value(pp.getApplicationDocumentsDirectory()) - .catchError((e) => null), - 'applicationSupport': - await Future.value(pp.getApplicationSupportDirectory()) - .catchError((e) => null), - 'library': await Future.value(pp.getLibraryDirectory()) - .catchError((e) => null), - 'externalStorage': - await pp.getExternalStorageDirectory().catchError((e) => null), - 'downloads': await pp.getDownloadsDirectory().catchError((e) => null), - }.map((key, value) => MapEntry(key, value?.path)); - return Response.ok( - jsonEncode(directories), - headers: {'Content-Type': 'application/json'}, - ); - } -} diff --git a/lib/provider/server/routes/plugin_apis/webview.dart b/lib/provider/server/routes/plugin_apis/webview.dart index 40340a64..bd1421bf 100644 --- a/lib/provider/server/routes/plugin_apis/webview.dart +++ b/lib/provider/server/routes/plugin_apis/webview.dart @@ -7,9 +7,10 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:shadcn_flutter/shadcn_flutter.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf_router/shelf_router.dart'; +import 'package:spotube/provider/server/libs/eventsource_publisher.dart'; import 'package:spotube/provider/server/server.dart'; +import 'package:spotube/provider/server/sse_publisher.dart'; import 'package:spotube/src/plugin_api/webview/webview.dart'; -import 'package:async/async.dart'; import 'package:encrypt/encrypt.dart' as encrypt; class ServerWebviewRoutes { @@ -17,6 +18,7 @@ class ServerWebviewRoutes { ServerWebviewRoutes({required this.ref}); final Map _webviews = {}; + final Map _eventSubscriptions = {}; String _encryptCookies(dynamic cookies, String secret) { final keyBytes = base64.decode(secret); @@ -39,6 +41,16 @@ class ServerWebviewRoutes { final webview = Webview(uri: uri.toString()); _webviews[webview.uid] = webview; + + _eventSubscriptions[webview.uid] = webview.onUrlRequestStream.listen((url) { + ref.read(ssePublisherProvider).add( + Event( + event: "url-request", + data: jsonEncode({'uid': webview.uid, 'url': url}), + ), + ); + }); + return Response.ok( jsonEncode({'uid': webview.uid}), encoding: utf8, @@ -56,40 +68,16 @@ class ServerWebviewRoutes { return Response.notFound('Webview with uid $uid not found'); } - // Create a stream that merges URL events with keepalive pings - final controller = StreamController>(); - - // Send keepalive comment every 15 seconds to prevent connection timeout - final keepaliveTimer = Stream.periodic( - const Duration(seconds: 15), - (_) => utf8.encode(": keepalive\n\n"), - ); - final urlStream = webview.onUrlRequestStream.map((url) { - return utf8.encode("event: url-request\n" - "data: ${jsonEncode({'url': url})}\n\n"); + final payload = "event: url-request\n" + "data: ${jsonEncode({'url': url})}\n\n"; + + debugPrint('[server][webview] sending:\n$payload'); + return utf8.encode(payload); }); - // Merge both streams - final subscription = StreamGroup.merge([keepaliveTimer, urlStream]).listen( - (data) { - if (!controller.isClosed) { - controller.add(data); - } - }, - onDone: () { - controller.close(); - }, - ); - - // Clean up when client disconnects - controller.onCancel = () { - debugPrint('Webview $uid client disconnected'); - subscription.cancel(); - }; - return Response.ok( - controller.stream, + urlStream, headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', @@ -117,11 +105,14 @@ class ServerWebviewRoutes { final uid = body['uid'] as String; final webview = _webviews[uid]; - if (webview == null) { + final subscription = _eventSubscriptions[uid]; + if (webview == null || subscription == null) { return Response.notFound('Webview with uid $uid not found'); } + subscription.cancel(); await webview.close(); + _eventSubscriptions.remove(uid); _webviews.remove(uid); return Response.ok(null); } @@ -149,6 +140,10 @@ class ServerWebviewRoutes { } Future dispose() async { + for (final subscription in _eventSubscriptions.values) { + await subscription.cancel(); + } + _eventSubscriptions.clear(); for (final webview in _webviews.values) { await webview.close(); } diff --git a/lib/provider/server/sse_publisher.dart b/lib/provider/server/sse_publisher.dart new file mode 100644 index 00000000..b954673c --- /dev/null +++ b/lib/provider/server/sse_publisher.dart @@ -0,0 +1,14 @@ +import 'package:riverpod/riverpod.dart'; +import 'package:spotube/provider/server/libs/eventsource_publisher.dart'; + +final ssePublisherProvider = Provider( + (ref) { + final publisher = EventSourcePublisher(cacheCapacity: 100); + + ref.onDispose(() { + publisher.close(); + }); + + return publisher; + }, +); diff --git a/lib/services/metadata/metadata.dart b/lib/services/metadata/metadata.dart index ffc3a746..de367d14 100644 --- a/lib/services/metadata/metadata.dart +++ b/lib/services/metadata/metadata.dart @@ -1,5 +1,4 @@ import 'package:spotube/src/rust/api/plugin/models/auth.dart'; -import 'package:spotube/src/rust/api/plugin/models/core.dart'; import 'package:spotube/src/rust/api/plugin/plugin.dart'; import 'package:spotube/src/rust/api/plugin/senders.dart'; @@ -7,19 +6,7 @@ class MetadataPlugin { final SpotubePlugin plugin; late final OpaqueSender sender; - MetadataPlugin({ - required String pluginScript, - required PluginConfiguration pluginConfig, - required String serverEndpointUrl, - required String serverSecret, - }) : plugin = SpotubePlugin() { - sender = plugin.createContext( - pluginScript: pluginScript, - pluginConfig: pluginConfig, - serverEndpointUrl: serverEndpointUrl, - serverSecret: serverSecret, - ); - } + MetadataPlugin({required this.sender, required this.plugin}); Stream authState() => plugin.authState(); diff --git a/lib/src/plugin_api/localstorage/localstorage.dart b/lib/src/plugin_api/localstorage/localstorage.dart deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/src/rust/api/plugin/plugin.dart b/lib/src/rust/api/plugin/plugin.dart index 71452169..bb65a571 100644 --- a/lib/src/rust/api/plugin/plugin.dart +++ b/lib/src/rust/api/plugin/plugin.dart @@ -66,11 +66,12 @@ abstract class SpotubePlugin implements RustOpaqueInterface { Future close({required OpaqueSender tx}); - OpaqueSender createContext( + Future createContext( {required String pluginScript, required PluginConfiguration pluginConfig, required String serverEndpointUrl, - required String serverSecret}); + required String serverSecret, + required String localStorageDir}); factory SpotubePlugin() => RustLib.instance.api.crateApiPluginPluginSpotubePluginNew(); diff --git a/lib/src/rust/frb_generated.dart b/lib/src/rust/frb_generated.dart index 19dc01b8..92f21400 100644 --- a/lib/src/rust/frb_generated.dart +++ b/lib/src/rust/frb_generated.dart @@ -170,12 +170,13 @@ abstract class RustLibApi extends BaseApi { Future crateApiPluginPluginSpotubePluginClose( {required SpotubePlugin that, required OpaqueSender tx}); - OpaqueSender crateApiPluginPluginSpotubePluginCreateContext( + Future crateApiPluginPluginSpotubePluginCreateContext( {required SpotubePlugin that, required String pluginScript, required PluginConfiguration pluginConfig, required String serverEndpointUrl, - required String serverSecret}); + required String serverSecret, + required String localStorageDir}); SpotubePlugin crateApiPluginPluginSpotubePluginNew(); @@ -1204,14 +1205,15 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { ); @override - OpaqueSender crateApiPluginPluginSpotubePluginCreateContext( + Future crateApiPluginPluginSpotubePluginCreateContext( {required SpotubePlugin that, required String pluginScript, required PluginConfiguration pluginConfig, required String serverEndpointUrl, - required String serverSecret}) { - return handler.executeSync(SyncTask( - callFfi: () { + required String serverSecret, + required String localStorageDir}) { + return handler.executeNormal(NormalTask( + callFfi: (port_) { final serializer = SseSerializer(generalizedFrbRustBinding); sse_encode_Auto_Ref_RustOpaque_flutter_rust_bridgefor_generatedRustAutoOpaqueInnerSpotubePlugin( that, serializer); @@ -1219,7 +1221,9 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { sse_encode_box_autoadd_plugin_configuration(pluginConfig, serializer); sse_encode_String(serverEndpointUrl, serializer); sse_encode_String(serverSecret, serializer); - return pdeCallFfi(generalizedFrbRustBinding, serializer, funcId: 25)!; + sse_encode_String(localStorageDir, serializer); + pdeCallFfi(generalizedFrbRustBinding, serializer, + funcId: 25, port: port_); }, codec: SseCodec( decodeSuccessData: @@ -1232,7 +1236,8 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { pluginScript, pluginConfig, serverEndpointUrl, - serverSecret + serverSecret, + localStorageDir ], apiImpl: this, )); @@ -1246,7 +1251,8 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { "pluginScript", "pluginConfig", "serverEndpointUrl", - "serverSecret" + "serverSecret", + "localStorageDir" ], ); @@ -6951,15 +6957,17 @@ class SpotubePluginImpl extends RustOpaque implements SpotubePlugin { Future close({required OpaqueSender tx}) => RustLib.instance.api .crateApiPluginPluginSpotubePluginClose(that: this, tx: tx); - OpaqueSender createContext( + Future createContext( {required String pluginScript, required PluginConfiguration pluginConfig, required String serverEndpointUrl, - required String serverSecret}) => + required String serverSecret, + required String localStorageDir}) => RustLib.instance.api.crateApiPluginPluginSpotubePluginCreateContext( that: this, pluginScript: pluginScript, pluginConfig: pluginConfig, serverEndpointUrl: serverEndpointUrl, - serverSecret: serverSecret); + serverSecret: serverSecret, + localStorageDir: localStorageDir); } diff --git a/rust/src/api/plugin/models/core.rs b/rust/src/api/plugin/models/core.rs index c34a8925..560c385f 100644 --- a/rust/src/api/plugin/models/core.rs +++ b/rust/src/api/plugin/models/core.rs @@ -11,14 +11,11 @@ pub enum PluginApi { } #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub enum PluginAbility { - #[serde(rename = "authentication")] Authentication, - #[serde(rename = "scrobbling")] Scrobbling, - #[serde(rename = "metadata")] Metadata, - #[serde(rename = "audio-source")] AudioSource, } diff --git a/rust/src/api/plugin/plugin.rs b/rust/src/api/plugin/plugin.rs index e2a7ae6e..e29bddb3 100644 --- a/rust/src/api/plugin/plugin.rs +++ b/rust/src/api/plugin/plugin.rs @@ -4,7 +4,7 @@ use crate::api::plugin::executors::{ execute_core, execute_playlist, execute_search, execute_track, execute_user, }; use crate::api::plugin::models::auth::{AuthEventObject, AuthEventType}; -use crate::api::plugin::models::core::PluginConfiguration; +use crate::api::plugin::models::core::{PluginAbility, PluginConfiguration}; use crate::api::plugin::senders::{ PluginAlbumSender, PluginArtistSender, PluginAudioSourceSender, PluginAuthSender, PluginBrowseSender, PluginCoreSender, PluginPlaylistSender, PluginSearchSender, @@ -12,9 +12,9 @@ use crate::api::plugin::senders::{ }; use crate::frb_generated::StreamSink; use crate::internal::apis; -use crate::internal::apis::{form, get_platform_directories, timezone, webview, yt_engine}; +use crate::internal::apis::{form, timezone, webview, yt_engine}; use anyhow::anyhow; -use flutter_rust_bridge::{frb, Rust2DartSendError}; +use flutter_rust_bridge::frb; use llrt_modules::module_builder::ModuleBuilder; use llrt_modules::{ abort, buffer, console, crypto, events, exceptions, fetch, navigator, timers, url, util, @@ -37,8 +37,9 @@ async fn create_context( server_endpoint_url: String, server_secret: String, plugin_slug: String, + local_storage_dir: String, ) -> anyhow::Result<(AsyncContext, AsyncRuntime)> { - let runtime = AsyncRuntime::new().expect("Unable to create async runtime"); + let runtime = AsyncRuntime::new()?; let mut module_builder = ModuleBuilder::new(); @@ -64,15 +65,7 @@ async fn create_context( .set_loader((module_resolver,), (module_loader,)) .await; - let context = AsyncContext::full(&runtime) - .await - .expect("Unable to create async context"); - - let directories = - get_platform_directories(server_endpoint_url.clone(), server_secret.clone()).await?; - let local_storage_dir = directories - .application_support - .ok_or_else(|| anyhow!("Application support directory not found"))?; + let context = AsyncContext::full(&runtime).await?; async_with!(context => |ctx| { apis::init(&ctx, server_endpoint_url, server_secret).catch(&ctx).map_err(|e| anyhow!("Failed to initialize APIs: {}", e))?; @@ -80,7 +73,7 @@ async fn create_context( global_attachment.attach(&ctx).catch(&ctx).map_err(|e| anyhow!("Failed to attach global modules: {}", e))?; anyhow::Ok(()) }) - .await?; + .await?; Ok((context, runtime)) } @@ -90,9 +83,7 @@ async fn js_executor_thread( context: &AsyncContext, ) -> anyhow::Result<()> { while let Some(command) = rx.recv().await { - println!("JS Executor thread received command: {:?}", command); if let PluginCommand::Shutdown = command { - println!("JS Executor thread shutting down."); return anyhow::Ok(()); } @@ -134,7 +125,7 @@ pub struct SpotubePlugin { pub track: PluginTrackSender, pub user: PluginUserSender, event_tx: Sender, - event_rx: Receiver, + event_rx: Option>, } impl SpotubePlugin { @@ -154,29 +145,40 @@ impl SpotubePlugin { track: PluginTrackSender::new(), user: PluginUserSender::new(), event_tx, - event_rx, + event_rx: Some(event_rx), } } pub async fn auth_state(&mut self, sink: StreamSink) -> anyhow::Result<()> { - while let Some(event) = self.event_rx.recv().await { - sink.add(event) - .map_err(|e: Rust2DartSendError| anyhow::anyhow!(e))?; - } + let mut receiver = self + .event_rx + .take() + .ok_or_else(|| anyhow::anyhow!("Receiver already consumed"))?; + + tokio::spawn(async move { + while let Some(event) = receiver.recv().await { + if let Err(e) = sink.add(event) { + eprintln!("Failed to send auth event to stream sink: {:?}", e); + break; + } + } + }); Ok(()) } - #[frb(sync)] - pub fn create_context( + pub async fn create_context( &self, plugin_script: String, plugin_config: PluginConfiguration, server_endpoint_url: String, server_secret: String, + local_storage_dir: String, ) -> anyhow::Result { let (command_tx, mut command_rx) = mpsc::channel(32); + let (init_tx, init_rx) = tokio::sync::oneshot::channel::>(); let sender = self.event_tx.clone(); + let _thread_handle = thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -184,24 +186,36 @@ impl SpotubePlugin { .unwrap(); let local = LocalSet::new(); if let Err(e) = local.block_on(&rt, async { - let (ctx, _) = create_context( + let ctx_res = create_context( server_endpoint_url, server_secret, plugin_config.slug(), - ).await?; + local_storage_dir, + ).await; - let injection = format!( - "globalThis.pluginInstance = new {}();", - plugin_config.entry_point - ); - let script = format!("{}\n{}", plugin_script, injection); + if let Err(e) = ctx_res { + let _ = init_tx.send(Err(e)); + return anyhow::Ok(()); + } - async_with!(ctx => |cx| { + let (ctx, _runtime) = ctx_res.unwrap(); + + let begin_injection = "globalThis.module = {exports: {}};"; + + let end_injection = "globalThis.pluginInstance = new module.exports.default();"; + let script = format!("{}\n{}\n{}", begin_injection, plugin_script, end_injection); + + let script_eval_res = async_with!(ctx => |cx| { cx.eval::<(), _>(script.as_str()) .catch(&cx).map_err(|e| anyhow!("Failed to evaluate supplied plugin script: {}", e)) - }).await?; + }).await; - async_with!(ctx => |ctx|{ + if let Err(e) = script_eval_res { + let _ = init_tx.send(Err(e)); + return anyhow::Ok(()); + } + + let on_auth_event_res = async_with!(ctx => |ctx|{ let globals = ctx.globals(); let callback = Func::new(move |event: Object| -> rquickjs::Result<()>{ let sender_clone = sender.clone(); @@ -223,26 +237,39 @@ impl SpotubePlugin { } }); - if let Err(e) = globals.get::<_, Object>("pluginInstance")?.get::<_, Object>("auth")?.set( - "onAuthEvent", callback - ) { - eprintln!("Error setting auth event handler: {:?}", e); + if plugin_config.abilities.contains(&PluginAbility::Authentication) { + if let Err(e) = globals.get::<_, Object>("pluginInstance")?.get::<_, Object>("auth")?.set( + "onAuthEvent", callback + ) { + eprintln!("Error setting auth event handler: {:?}", e); + } } Ok::<(), Error>(()) }) .await - .map_err(|e| anyhow!("[onAuthEvent] {e}"))?; + .map_err(|e| anyhow!("[onAuthEvent] {e}")); + + if let Err(e) = on_auth_event_res { + let _ = init_tx.send(Err(e)); + return anyhow::Ok(()); + } + + let _ = init_tx.send(Ok(())); if let Err(e) = js_executor_thread(&mut command_rx, &ctx).await { eprintln!("JS executor error: {}", e); } anyhow::Ok(()) }) { - eprintln!("JS Executor thread error: {}", e); + eprintln!("[PluginInitializationError]: {}", e); } }); + init_rx + .await + .map_err(|e| anyhow!("Failed to receive initialization result: {}", e))??; + Ok(OpaqueSender { sender: command_tx }) } diff --git a/rust/src/api/plugin/senders.rs b/rust/src/api/plugin/senders.rs index 8cb6e645..ff75034f 100644 --- a/rust/src/api/plugin/senders.rs +++ b/rust/src/api/plugin/senders.rs @@ -45,7 +45,7 @@ impl PluginArtistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn top_tracks( @@ -66,7 +66,7 @@ impl PluginArtistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn albums( @@ -87,7 +87,7 @@ impl PluginArtistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn related( @@ -108,7 +108,7 @@ impl PluginArtistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn save(&self, mpsc_tx: &OpaqueSender, ids: Vec) -> anyhow::Result<()> { @@ -121,7 +121,7 @@ impl PluginArtistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn unsave(&self, mpsc_tx: &OpaqueSender, ids: Vec) -> anyhow::Result<()> { @@ -134,7 +134,7 @@ impl PluginArtistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -161,7 +161,7 @@ impl PluginAlbumSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn tracks( @@ -182,7 +182,7 @@ impl PluginAlbumSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn releases( @@ -201,7 +201,7 @@ impl PluginAlbumSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn save(&self, mpsc_tx: &OpaqueSender, ids: Vec) -> anyhow::Result<()> { @@ -214,7 +214,7 @@ impl PluginAlbumSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn unsave(&self, mpsc_tx: &OpaqueSender, ids: Vec) -> anyhow::Result<()> { @@ -227,7 +227,7 @@ impl PluginAlbumSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -252,7 +252,7 @@ impl PluginAudioSourceSender { )) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn matches( @@ -269,7 +269,7 @@ impl PluginAudioSourceSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn streams( @@ -286,7 +286,7 @@ impl PluginAudioSourceSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -308,7 +308,7 @@ impl PluginAuthSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn logout(&self, mpsc_tx: &OpaqueSender) -> anyhow::Result<()> { @@ -320,7 +320,7 @@ impl PluginAuthSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn is_authenticated(&self, mpsc_tx: &OpaqueSender) -> anyhow::Result { @@ -332,7 +332,7 @@ impl PluginAuthSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -361,7 +361,7 @@ impl PluginBrowseSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn section_items( @@ -382,7 +382,7 @@ impl PluginBrowseSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -409,13 +409,7 @@ impl PluginCoreSender { })) .await?; - rx.await - .map_err(|e| { - eprintln!("RecvError: {}", e); - eprintln!("Stack trace:\n{:?}", Backtrace::capture()); - anyhow!("{e}") - }) - .and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn support(&self, mpsc_tx: &OpaqueSender) -> anyhow::Result { @@ -427,7 +421,7 @@ impl PluginCoreSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn scrobble( @@ -444,7 +438,7 @@ impl PluginCoreSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -471,7 +465,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn tracks( @@ -492,7 +486,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn create_playlist( @@ -517,7 +511,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn update_playlist( @@ -542,7 +536,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn delete_playlist( @@ -559,7 +553,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn add_tracks( @@ -580,7 +574,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn remove_tracks( @@ -599,7 +593,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn save(&self, mpsc_tx: &OpaqueSender, playlist_id: String) -> anyhow::Result<()> { @@ -612,7 +606,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn unsave(&self, mpsc_tx: &OpaqueSender, playlist_id: String) -> anyhow::Result<()> { @@ -625,7 +619,7 @@ impl PluginPlaylistSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -647,7 +641,7 @@ impl PluginSearchSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn all( @@ -664,7 +658,7 @@ impl PluginSearchSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn tracks( @@ -685,7 +679,7 @@ impl PluginSearchSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn albums( @@ -706,7 +700,7 @@ impl PluginSearchSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn artists( @@ -727,7 +721,7 @@ impl PluginSearchSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn playlists( @@ -748,7 +742,7 @@ impl PluginSearchSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -775,7 +769,7 @@ impl PluginTrackSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn save(&self, mpsc_tx: &OpaqueSender, ids: Vec) -> anyhow::Result<()> { @@ -788,7 +782,7 @@ impl PluginTrackSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn unsave(&self, mpsc_tx: &OpaqueSender, ids: Vec) -> anyhow::Result<()> { @@ -801,7 +795,7 @@ impl PluginTrackSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn radio( @@ -818,7 +812,7 @@ impl PluginTrackSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } @@ -838,7 +832,7 @@ impl PluginUserSender { .send(PluginCommand::User(UserCommands::Me { response_tx: tx })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn saved_tracks( @@ -857,7 +851,7 @@ impl PluginUserSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn saved_albums( @@ -876,7 +870,7 @@ impl PluginUserSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn saved_artists( @@ -895,7 +889,7 @@ impl PluginUserSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } pub async fn saved_playlists( @@ -914,6 +908,6 @@ impl PluginUserSender { })) .await?; - rx.await.map_err(|e| anyhow!("{e}")).and_then(|o| o) + rx.await.map_err(|e| anyhow!("{e}"))? } } diff --git a/rust/src/frb_generated.rs b/rust/src/frb_generated.rs index a99de597..69314cc7 100644 --- a/rust/src/frb_generated.rs +++ b/rust/src/frb_generated.rs @@ -1253,15 +1253,16 @@ fn wire__crate__api__plugin__plugin__SpotubePlugin_close_impl( ) } fn wire__crate__api__plugin__plugin__SpotubePlugin_create_context_impl( + port_: flutter_rust_bridge::for_generated::MessagePort, ptr_: flutter_rust_bridge::for_generated::PlatformGeneralizedUint8ListPtr, rust_vec_len_: i32, data_len_: i32, -) -> flutter_rust_bridge::for_generated::WireSyncRust2DartSse { - FLUTTER_RUST_BRIDGE_HANDLER.wrap_sync::( +) { + FLUTTER_RUST_BRIDGE_HANDLER.wrap_async::( flutter_rust_bridge::for_generated::TaskInfo { debug_name: "SpotubePlugin_create_context", - port: None, - mode: flutter_rust_bridge::for_generated::FfiCallMode::Sync, + port: Some(port_), + mode: flutter_rust_bridge::for_generated::FfiCallMode::Normal, }, move || { let message = unsafe { @@ -1283,33 +1284,42 @@ fn wire__crate__api__plugin__plugin__SpotubePlugin_create_context_impl( ); let api_server_endpoint_url = ::sse_decode(&mut deserializer); let api_server_secret = ::sse_decode(&mut deserializer); + let api_local_storage_dir = ::sse_decode(&mut deserializer); deserializer.end(); - transform_result_sse::<_, flutter_rust_bridge::for_generated::anyhow::Error>( - (move || { - let mut api_that_guard = None; - let decode_indices_ = - flutter_rust_bridge::for_generated::lockable_compute_decode_order(vec![ - flutter_rust_bridge::for_generated::LockableOrderInfo::new( - &api_that, 0, false, - ), - ]); - for i in decode_indices_ { - match i { - 0 => api_that_guard = Some(api_that.lockable_decode_sync_ref()), - _ => unreachable!(), + move |context| async move { + transform_result_sse::<_, flutter_rust_bridge::for_generated::anyhow::Error>( + (move || async move { + let mut api_that_guard = None; + let decode_indices_ = + flutter_rust_bridge::for_generated::lockable_compute_decode_order( + vec![flutter_rust_bridge::for_generated::LockableOrderInfo::new( + &api_that, 0, false, + )], + ); + for i in decode_indices_ { + match i { + 0 => { + api_that_guard = + Some(api_that.lockable_decode_async_ref().await) + } + _ => unreachable!(), + } } - } - let api_that_guard = api_that_guard.unwrap(); - let output_ok = crate::api::plugin::plugin::SpotubePlugin::create_context( - &*api_that_guard, - api_plugin_script, - api_plugin_config, - api_server_endpoint_url, - api_server_secret, - )?; - Ok(output_ok) - })(), - ) + let api_that_guard = api_that_guard.unwrap(); + let output_ok = crate::api::plugin::plugin::SpotubePlugin::create_context( + &*api_that_guard, + api_plugin_script, + api_plugin_config, + api_server_endpoint_url, + api_server_secret, + api_local_storage_dir, + ) + .await?; + Ok(output_ok) + })() + .await, + ) + } }, ) } @@ -5824,6 +5834,7 @@ fn pde_ffi_dispatcher_primary_impl( match func_id { 3 => wire__crate__api__plugin__plugin__SpotubePlugin_auth_state_impl(port, ptr, rust_vec_len, data_len), 24 => wire__crate__api__plugin__plugin__SpotubePlugin_close_impl(port, ptr, rust_vec_len, data_len), +25 => wire__crate__api__plugin__plugin__SpotubePlugin_create_context_impl(port, ptr, rust_vec_len, data_len), 27 => wire__crate__api__init_app_impl(port, ptr, rust_vec_len, data_len), 28 => wire__crate__api__plugin__senders__plugin_album_sender_get_album_impl(port, ptr, rust_vec_len, data_len), 29 => wire__crate__api__plugin__senders__plugin_album_sender_releases_impl(port, ptr, rust_vec_len, data_len), @@ -5906,7 +5917,6 @@ fn pde_ffi_dispatcher_sync_impl( 21 => wire__crate__api__plugin__plugin__SpotubePlugin_auto_accessor_set_search_impl(ptr, rust_vec_len, data_len), 22 => wire__crate__api__plugin__plugin__SpotubePlugin_auto_accessor_set_track_impl(ptr, rust_vec_len, data_len), 23 => wire__crate__api__plugin__plugin__SpotubePlugin_auto_accessor_set_user_impl(ptr, rust_vec_len, data_len), -25 => wire__crate__api__plugin__plugin__SpotubePlugin_create_context_impl(ptr, rust_vec_len, data_len), 26 => wire__crate__api__plugin__plugin__SpotubePlugin_new_impl(ptr, rust_vec_len, data_len), 47 => wire__crate__api__plugin__models__core__plugin_configuration_slug_impl(ptr, rust_vec_len, data_len), 75 => wire__crate__api__plugin__models__audio_source__spotube_audio_lossless_container_quality_to_string_fmt_impl(ptr, rust_vec_len, data_len), diff --git a/rust/src/internal/apis/mod.rs b/rust/src/internal/apis/mod.rs index 31c99e9c..1c3902a9 100644 --- a/rust/src/internal/apis/mod.rs +++ b/rust/src/internal/apis/mod.rs @@ -1,10 +1,9 @@ use rquickjs::Ctx; -use serde::{Deserialize, Serialize}; pub mod form; pub mod local_storage; -pub mod webview; pub mod timezone; +pub mod webview; pub mod yt_engine; pub fn init(ctx: &Ctx, endpoint_url: String, secret: String) -> rquickjs::Result<()> { @@ -13,28 +12,3 @@ pub fn init(ctx: &Ctx, endpoint_url: String, secret: String) -> rquickjs::Result Ok(()) } - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct DirectoriesResponse { - pub temporary: Option, - pub application_documents: Option, - pub application_support: Option, - pub library: Option, - pub external_storage: Option, - pub downloads: Option, -} - -pub async fn get_platform_directories( - server_url: String, - server_secret: String, -) -> anyhow::Result { - let client = reqwest::Client::new(); - Ok(client - .get(format!("{}/plugin/localstorage/directories", server_url).as_str()) - .header("X-Plugin-Secret", server_secret.as_str()) - .send() - .await? - .json::() - .await?) -} diff --git a/rust/src/internal/apis/timezone.rs b/rust/src/internal/apis/timezone.rs index e6f195d0..3a2ee2d1 100644 --- a/rust/src/internal/apis/timezone.rs +++ b/rust/src/internal/apis/timezone.rs @@ -1,5 +1,5 @@ use rquickjs::prelude::Func; -use rquickjs::{Class, Ctx, Object}; +use rquickjs::{Ctx, Object}; pub fn get_local_timezone() -> rquickjs::Result { let timezone = iana_time_zone::get_timezone() diff --git a/rust/src/internal/apis/webview.rs b/rust/src/internal/apis/webview.rs index 5725b7cf..07f73fa3 100644 --- a/rust/src/internal/apis/webview.rs +++ b/rust/src/internal/apis/webview.rs @@ -185,10 +185,7 @@ impl<'js> WebView<'js> { } async fn url_change_task(&self, ctx: Ctx<'js>) { - let endpoint = format!( - "{}/plugin-api/webview/{}/on-url-request", - self.endpoint_url, self.uid - ); + let endpoint = format!("{}/plugin-api/webview/events", self.endpoint_url); let secret = self.secret.clone(); @@ -201,22 +198,39 @@ impl<'js> WebView<'js> { .header("X-Plugin-Secret", &secret) .expect("Failed to set header for EventSourceClient") .build(); + let mut stream = client.stream(); while let Some(event) = stream.next().await { match event { Ok(eventsource_client::SSE::Event(msg)) => { if msg.event_type != "url-request" { + eprintln!( + "[rust][webview] Not expected event-type: {}", + msg.event_type + ); continue; } backoff = 1; if let Ok(data) = serde_json::from_str::>(&msg.data) { let url = data.get("url").cloned().unwrap_or_default(); + let uid = data.get("uid").cloned().unwrap_or_default(); + + if uid != self.uid { + println!( + "[rust][webview] Ignored event for different uid: {}", + uid + ); + continue; + } + for callback in self.callbacks.iter() { match callback.call::<_, Value>((url.clone(),)) { Ok(res) => { if let Some(promise) = res.into_promise() { - if let Err(e) = promise.into_future::<()>().await.catch(&ctx) { + if let Err(e) = + promise.into_future::<()>().await.catch(&ctx) + { eprintln!("Error in onUrlChange promise: {}", e); } } @@ -230,7 +244,9 @@ impl<'js> WebView<'js> { eprintln!("Failed to parse event data: {}", msg.data); } } - Ok(_) => {} + Ok(e) => { + eprintln!("[rust][webview] Ignored non-event message: {:?}", e); + } Err(err) => { eprintln!("Error in EventSource stream: {}", err); } diff --git a/rust/src/internal/auth.rs b/rust/src/internal/auth.rs index f46cb92a..f3d8e94b 100644 --- a/rust/src/internal/auth.rs +++ b/rust/src/internal/auth.rs @@ -1,4 +1,4 @@ -use crate::internal::utils::js_invoke_async_method_to_json; +use crate::internal::utils::{js_invoke_async_method_to_json, js_invoke_method_to_json}; use flutter_rust_bridge::frb; use rquickjs::{async_with, AsyncContext}; @@ -28,16 +28,14 @@ impl<'a> PluginAuthEndpoint<'a> { pub async fn is_authenticated(&self) -> anyhow::Result { async_with!(self.0 => |ctx| { - Ok( - js_invoke_async_method_to_json::<(), bool>( + let s = js_invoke_method_to_json::<(), bool>( ctx.clone(), "auth", - "is_authenticated", + "isAuthenticated", &[] - ) - .await? - .expect("[hey][smartypants] auth.is_authenticated should return a boolean") - ) + )?.expect("[hey][smartypants] auth.isAuthenticated should return a boolean"); + + Ok(s) }) .await } diff --git a/rust/src/internal/utils.rs b/rust/src/internal/utils.rs index 192eabeb..e0e0b24f 100644 --- a/rust/src/internal/utils.rs +++ b/rust/src/internal/utils.rs @@ -5,9 +5,10 @@ use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::{Map, Value}; use std::collections::HashMap; +use std::fmt::Debug; /// Convert a `serde_json::Value` into a Boa `JsValue` -pub fn json_value_to_js<'a>(value: &Value, ctx: Ctx<'a>) -> anyhow::Result> { +pub fn json_value_to_js<'a>(value: &Value, ctx: Ctx<'a>) -> rquickjs::Result> { match value { Value::Null => Ok(rquickjs::Value::new_null(ctx)), Value::Bool(b) => Ok(rquickjs::Value::new_bool(ctx, *b)), @@ -30,7 +31,7 @@ pub fn json_value_to_js<'a>(value: &Value, ctx: Ctx<'a>) -> anyhow::Result { let mut js_obj = HashMap::::with_capacity(obj.len()); @@ -40,12 +41,12 @@ pub fn json_value_to_js<'a>(value: &Value, ctx: Ctx<'a>) -> anyhow::Result(value: rquickjs::Value<'a>, ctx: Ctx<'a>) -> anyhow::Result { if value.is_null() || value.is_undefined() { return Ok(Value::Null); @@ -70,7 +71,9 @@ pub fn js_value_to_json<'a>(value: rquickjs::Value<'a>, ctx: Ctx<'a>) -> anyhow: // Array? if obj.is_array() { - let obj: Array = Array::from_value(obj.into_value()).map_err(|e| anyhow!("{}", e))?; + let obj: Array = Array::from_value(obj.into_value()) + .catch(&ctx) + .map_err(|e| anyhow!("{}", e))?; let length = obj.len(); let mut json_arr = Vec::::with_capacity(length); @@ -111,34 +114,53 @@ pub async fn js_invoke_async_method_to_json<'b, T, R>( ) -> anyhow::Result> where T: Serialize, - R: DeserializeOwned, + R: DeserializeOwned + Debug, { let global = ctx.globals(); - let plugin_instance: Object<'b> = global.get("pluginInstance").map_err(|e| anyhow!("{e}"))?; + let plugin_instance: Object<'b> = global + .get("pluginInstance") + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; let core_val: Object<'b> = plugin_instance .get(endpoint_name) + .catch(&ctx) .map_err(|e| anyhow!("{e}"))?; - let js_fn: Function<'b> = core_val.get(name).map_err(|e| anyhow!("{e}"))?; + let js_fn: Function<'b> = core_val.get(name).catch(&ctx).map_err(|e| anyhow!("{e}"))?; let mut args_js = Args::new(ctx.clone(), args.len() as usize); + + args_js + .this(core_val) + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; + for arg in args.iter() { let arg_value = serde_json::to_value(arg).map_err(|e| anyhow!("{e}"))?; - let arg_js = json_value_to_js(&arg_value, ctx.clone()).map_err(|e| anyhow!("{e}"))?; - args_js.push_arg(arg_js).map_err(|e| anyhow!("{e}"))?; + let arg_js = json_value_to_js(&arg_value, ctx.clone()) + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; + args_js + .push_arg(arg_js) + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; } - let result_promise: Promise = js_fn.call_arg(args_js).map_err(|e| anyhow!("{e}"))?; + let result_promise: Promise = js_fn + .call_arg(args_js) + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; + + println!("Sync Result: {:?}", result_promise); let result_future: rquickjs::Value = result_promise .into_future() .await .catch(&ctx) .map_err(|e| anyhow!("{e}"))?; - let value = js_value_to_json(result_future, ctx.clone()).map_err(|e| anyhow!("{e}"))?; + let value = js_value_to_json(result_future, ctx.clone())?; if value.is_null() { return Ok(None); } - Ok(Some( serde_json::from_value::(value).map_err(|e| anyhow!("{e}"))?, )) @@ -155,20 +177,38 @@ where R: DeserializeOwned, { let global = ctx.globals(); - let plugin_instance: Object<'b> = global.get("pluginInstance").map_err(|e| anyhow!("{e}"))?; + let plugin_instance: Object<'b> = global + .get("pluginInstance") + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; let core_val: Object<'b> = plugin_instance .get(endpoint_name) + .catch(&ctx) .map_err(|e| anyhow!("{e}"))?; - let js_fn: Function<'b> = core_val.get(name).map_err(|e| anyhow!("{e}"))?; + let js_fn: Function<'b> = core_val.get(name).catch(&ctx).map_err(|e| anyhow!("{e}"))?; let mut args_js = Args::new(ctx.clone(), args.len() as usize); + + args_js + .this(core_val) + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; + for arg in args.iter().enumerate() { let arg_value = serde_json::to_value(arg).map_err(|e| anyhow!("{e}"))?; - let arg_js = json_value_to_js(&arg_value, ctx.clone()).map_err(|e| anyhow!("{e}"))?; - args_js.push_arg(arg_js).map_err(|e| anyhow!("{e}"))?; + let arg_js = json_value_to_js(&arg_value, ctx.clone()) + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; + args_js + .push_arg(arg_js) + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; } - let result: rquickjs::Value = js_fn.call_arg(args_js).map_err(|e| anyhow!("{e}"))?; - let value = js_value_to_json(result, ctx.clone()).map_err(|e| anyhow!("{e}"))?; + let result: rquickjs::Value = js_fn + .call_arg(args_js) + .catch(&ctx) + .map_err(|e| anyhow!("{e}"))?; + let value = js_value_to_json(result, ctx.clone())?; if value.is_null() { return Ok(None); diff --git a/rust/src/main.rs b/rust/src/main.rs index c6b92dfa..058f9fe7 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -100,7 +100,7 @@ async fn plugin() -> anyhow::Result<()> { repository: None, version: "0.1.0".to_string(), }; - let sender = plugin.create_context(PLUGIN_JS.to_string(), config.clone(), "".to_string(), "".to_string())?; + let sender = plugin.create_context(PLUGIN_JS.to_string(), config.clone(), "".to_string(), "".to_string(), "".into()).await?; let (r1, r2) = tokio::join!( plugin.core.check_update(&sender, config.clone()), plugin.core.check_update(&sender, config.clone())