unetmsg: add subscriber update callback to notify about publish events

When services start publishing on a topic, this can be used to allow
subscribers to query them.

Signed-off-by: Felix Fietkau <nbd@nbd.name>
This commit is contained in:
Felix Fietkau 2025-05-05 13:55:42 +02:00
parent 6fcaf3d589
commit 06f44f69b6
4 changed files with 70 additions and 10 deletions

View file

@ -23,7 +23,7 @@ function publish(name, request_cb)
this.channel.request("publish", { name }); this.channel.request("publish", { name });
} }
function subscribe(name, message_cb) function subscribe(name, message_cb, update_cb)
{ {
if (!this.channel) if (!this.channel)
this.connect(); this.connect();
@ -31,8 +31,12 @@ function subscribe(name, message_cb)
if (type(name) == "string") if (type(name) == "string")
name = [ name ]; name = [ name ];
let cb = {
cb: message_cb,
update: update_cb
};
for (let cur in name) for (let cur in name)
this.cb_sub[cur] = message_cb; this.cb_sub[cur] = cb;
if (!this.channel) if (!this.channel)
return; return;
@ -109,6 +113,12 @@ function connect()
const client_proto = { const client_proto = {
connect, publish, subscribe, send, request, connect, publish, subscribe, send, request,
close: function() { close: function() {
for (let sub in this.sub_cb) {
if (!sub.timer)
continue;
sub.timer.cancel();
delete sub.timer;
}
if (this.channel) if (this.channel)
this.channel.disconnect(); this.channel.disconnect();
this.connect_timer.cancel(); this.connect_timer.cancel();
@ -119,11 +129,29 @@ const client_proto = {
function handle_request(cl, req) function handle_request(cl, req)
{ {
let cb; let data, cb;
switch (req.type) { switch (req.type) {
case "publish":
data = cl.cb_sub[req.args.name];
if (!data || data.timer)
break;
cb = data.update;
if (!cb)
return;
data.timer = uloop.timer(100, () => {
delete data.timer;
cb();
});
break;
case "message": case "message":
cb = cl.cb_sub[req.args.name]; data = cl.cb_sub[req.args.name];
if (!data)
break;
cb = data.cb;
if (cb) if (cb)
return cb(req); return cb(req);
break; break;

View file

@ -40,6 +40,9 @@ function client_pubsub(kind, cl, names)
cl_list[name] = core.pubsub_add(kind, name, proto({ cl_list[name] = core.pubsub_add(kind, name, proto({
client: cl.id, client: cl.id,
}, pubsub_proto)); }, pubsub_proto));
if (kind == "publish")
core.handle_publish(cl_list[name], name);
} }
return 0; return 0;
@ -101,8 +104,11 @@ function client_disconnect(id)
return; return;
for (let kind in [ "publish", "subscribe" ]) for (let kind in [ "publish", "subscribe" ])
for (let name, data in cl[kind]) for (let name, data in cl[kind]) {
if (kind == "publish")
core.handle_publish(data, name);
core.pubsub_del(kind, name, data); core.pubsub_del(kind, name, data);
}
delete clients[id]; delete clients[id];
} }

View file

@ -96,8 +96,10 @@ function network_socket_handle_request(sock_data, req)
if (!name) if (!name)
return; return;
if (args.enabled) { if (args.enabled) {
if (list[name]) if (list[name]) {
core.handle_publish(null, name);
return 0; return 0;
}
let allowed = net.peers[host].allowed == null; let allowed = net.peers[host].allowed == null;
for (let cur in net.peers[host].allowed) { for (let cur in net.peers[host].allowed) {
@ -114,10 +116,12 @@ function network_socket_handle_request(sock_data, req)
network: sock_data.network, network: sock_data.network,
name: host, name: host,
}, pubsub_proto); }, pubsub_proto);
core.handle_publish(null, name);
list[name] = true; list[name] = true;
} else { } else {
if (!list[name]) if (!list[name])
return 0; return 0;
core.handle_publish(null, name);
delete core["remote_" + msgtype][name][host]; delete core["remote_" + msgtype][name][host];
delete list[name]; delete list[name];
} }

View file

@ -47,8 +47,8 @@ function new_handle(list, name, data)
function pubsub_add(kind, name, data) function pubsub_add(kind, name, data)
{ {
let list = this[kind]; let list = this[kind];
if (!length(list[name])) { if (!length(list[name]) || kind == "publish") {
list[name] = {}; list[name] ??= {};
remote.pubsub_set(kind, name, true); remote.pubsub_set(kind, name, true);
} }
return new_handle(this[kind], name, data); return new_handle(this[kind], name, data);
@ -58,8 +58,8 @@ function pubsub_del(kind, name, data)
{ {
let list = this[kind][name]; let list = this[kind][name];
delete list[data._id]; delete list[data._id];
if (!length(list)) if (!length(list) || kind == "publish")
remote.pubsub_set(kind, name, false); remote.pubsub_set(kind, name, length(list) > 0);
} }
function get_handles(handle, local, remote) function get_handles(handle, local, remote)
@ -158,6 +158,27 @@ function handle_message(handle, data, remote)
return 0; return 0;
} }
function handle_publish(handle, name)
{
let local = this.subscribe[name];
let handles = get_handles(handle, local);
for (let cur in handles) {
if (!cur || !cur.get_channel)
continue;
let chan = cur.get_channel();
if (!chan)
continue;
chan.request({
method: "publish",
return: "ignore",
data: { name },
});
}
}
function add_acl(type, user, data) function add_acl(type, user, data)
{ {
if (!data || !user) if (!data || !user)
@ -199,6 +220,7 @@ const core_proto = {
pubsub_del, pubsub_del,
handle_request, handle_request,
handle_message, handle_message,
handle_publish,
dbg: function(msg) { dbg: function(msg) {
if (this.debug_enabled) if (this.debug_enabled)
warn(msg); warn(msg);