From 06f44f69b6fe33b07d85c5b6ed0eb49a199a532e Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Mon, 5 May 2025 13:55:42 +0200 Subject: [PATCH] unetmsg: add subscriber update callback to notify about publish events When services start publishing on a topic, this can be used to allow subscribers to query them. Signed-off-by: Felix Fietkau --- .../files/usr/share/ucode/unetmsg/client.uc | 36 ++++++++++++++++--- .../share/ucode/unetmsg/unetmsgd-client.uc | 8 ++++- .../share/ucode/unetmsg/unetmsgd-remote.uc | 6 +++- .../files/usr/share/ucode/unetmsg/unetmsgd.uc | 30 +++++++++++++--- 4 files changed, 70 insertions(+), 10 deletions(-) diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc index 06c927297e0..293763572fa 100644 --- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc +++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc @@ -23,7 +23,7 @@ function publish(name, request_cb) this.channel.request("publish", { name }); } -function subscribe(name, message_cb) +function subscribe(name, message_cb, update_cb) { if (!this.channel) this.connect(); @@ -31,8 +31,12 @@ function subscribe(name, message_cb) if (type(name) == "string") name = [ name ]; + let cb = { + cb: message_cb, + update: update_cb + }; for (let cur in name) - this.cb_sub[cur] = message_cb; + this.cb_sub[cur] = cb; if (!this.channel) return; @@ -109,6 +113,12 @@ function connect() const client_proto = { connect, publish, subscribe, send, request, close: function() { + for (let sub in this.sub_cb) { + if (!sub.timer) + continue; + sub.timer.cancel(); + delete sub.timer; + } if (this.channel) this.channel.disconnect(); this.connect_timer.cancel(); @@ -119,11 +129,29 @@ const client_proto = { function handle_request(cl, req) { - let cb; + let data, cb; switch (req.type) { + case "publish": + data = cl.cb_sub[req.args.name]; + if (!data || data.timer) + break; + + cb = data.update; + if (!cb) + return; + + data.timer = uloop.timer(100, () => { + delete data.timer; + cb(); + }); + break; case "message": - cb = cl.cb_sub[req.args.name]; + data = cl.cb_sub[req.args.name]; + if (!data) + break; + + cb = data.cb; if (cb) return cb(req); break; diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc index 8b428821520..6da745a7709 100644 --- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc +++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc @@ -40,6 +40,9 @@ function client_pubsub(kind, cl, names) cl_list[name] = core.pubsub_add(kind, name, proto({ client: cl.id, }, pubsub_proto)); + + if (kind == "publish") + core.handle_publish(cl_list[name], name); } return 0; @@ -101,8 +104,11 @@ function client_disconnect(id) return; for (let kind in [ "publish", "subscribe" ]) - for (let name, data in cl[kind]) + for (let name, data in cl[kind]) { + if (kind == "publish") + core.handle_publish(data, name); core.pubsub_del(kind, name, data); + } delete clients[id]; } diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc index edc034343b0..18ee2a3684c 100644 --- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc +++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc @@ -96,8 +96,10 @@ function network_socket_handle_request(sock_data, req) if (!name) return; if (args.enabled) { - if (list[name]) + if (list[name]) { + core.handle_publish(null, name); return 0; + } let allowed = net.peers[host].allowed == null; for (let cur in net.peers[host].allowed) { @@ -114,10 +116,12 @@ function network_socket_handle_request(sock_data, req) network: sock_data.network, name: host, }, pubsub_proto); + core.handle_publish(null, name); list[name] = true; } else { if (!list[name]) return 0; + core.handle_publish(null, name); delete core["remote_" + msgtype][name][host]; delete list[name]; } diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc index 393a6ea47ad..b81acb908eb 100644 --- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc +++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc @@ -47,8 +47,8 @@ function new_handle(list, name, data) function pubsub_add(kind, name, data) { let list = this[kind]; - if (!length(list[name])) { - list[name] = {}; + if (!length(list[name]) || kind == "publish") { + list[name] ??= {}; remote.pubsub_set(kind, name, true); } return new_handle(this[kind], name, data); @@ -58,8 +58,8 @@ function pubsub_del(kind, name, data) { let list = this[kind][name]; delete list[data._id]; - if (!length(list)) - remote.pubsub_set(kind, name, false); + if (!length(list) || kind == "publish") + remote.pubsub_set(kind, name, length(list) > 0); } function get_handles(handle, local, remote) @@ -158,6 +158,27 @@ function handle_message(handle, data, remote) return 0; } +function handle_publish(handle, name) +{ + let local = this.subscribe[name]; + let handles = get_handles(handle, local); + + for (let cur in handles) { + if (!cur || !cur.get_channel) + continue; + + let chan = cur.get_channel(); + if (!chan) + continue; + + chan.request({ + method: "publish", + return: "ignore", + data: { name }, + }); + } +} + function add_acl(type, user, data) { if (!data || !user) @@ -199,6 +220,7 @@ const core_proto = { pubsub_del, handle_request, handle_message, + handle_publish, dbg: function(msg) { if (this.debug_enabled) warn(msg);