unetmsg: add support for sending requests/messages to specific hosts
Makes it possible to implement unetmsg APIs for host control Signed-off-by: Felix Fietkau <nbd@nbd.name>
This commit is contained in:
parent
d95e8a59bb
commit
12f4814c41
4 changed files with 56 additions and 19 deletions
|
@ -51,11 +51,14 @@ let obj = ubus.publish("unetmsg", {
|
|||
args: {
|
||||
name: "",
|
||||
type: "",
|
||||
host: "",
|
||||
data: {},
|
||||
},
|
||||
call: function(req) {
|
||||
try {
|
||||
core.handle_request(null, req, req.args, true);
|
||||
let host = req.args.host;
|
||||
delete req.args.host;
|
||||
core.handle_request(null, req, req.args, true, host);
|
||||
} catch (e) {
|
||||
core.exception(e);
|
||||
}
|
||||
|
|
|
@ -44,14 +44,26 @@ function subscribe(name, message_cb, update_cb)
|
|||
this.channel.request("subscribe", { name });
|
||||
}
|
||||
|
||||
function send(name, type, data)
|
||||
function send_ext(data)
|
||||
{
|
||||
this.channel.request({
|
||||
method: "message",
|
||||
return: "ignore",
|
||||
data: {
|
||||
name, type, data
|
||||
},
|
||||
data
|
||||
});
|
||||
}
|
||||
|
||||
function send_host(host, name, type, data)
|
||||
{
|
||||
this.send_ext({
|
||||
host, name, type, data
|
||||
});
|
||||
}
|
||||
|
||||
function send(name, type, data)
|
||||
{
|
||||
this.send_ext({
|
||||
name, type, data
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -59,7 +71,7 @@ function default_complete_cb()
|
|||
{
|
||||
}
|
||||
|
||||
function request(name, type, data, data_cb, complete_cb)
|
||||
function request_ext(data, data_cb, complete_cb)
|
||||
{
|
||||
if (!this.channel)
|
||||
this.connect();
|
||||
|
@ -69,9 +81,7 @@ function request(name, type, data, data_cb, complete_cb)
|
|||
|
||||
let req = this.channel.defer({
|
||||
method: "request",
|
||||
data: {
|
||||
name, type, data
|
||||
},
|
||||
data,
|
||||
data_cb,
|
||||
cb: complete_cb
|
||||
});
|
||||
|
@ -82,6 +92,20 @@ function request(name, type, data, data_cb, complete_cb)
|
|||
req.await();
|
||||
}
|
||||
|
||||
function request_host(host, name, type, data, data_cb, complete_cb)
|
||||
{
|
||||
return this.request_ext({
|
||||
host, name, type, data
|
||||
}, data_cb, complete_cb);
|
||||
}
|
||||
|
||||
function request(name, type, data, data_cb, complete_cb)
|
||||
{
|
||||
return this.request_ext({
|
||||
name, type, data
|
||||
}, data_cb, complete_cb);
|
||||
}
|
||||
|
||||
function connect()
|
||||
{
|
||||
if (this.channel)
|
||||
|
@ -113,7 +137,9 @@ function connect()
|
|||
}
|
||||
|
||||
const client_proto = {
|
||||
connect, publish, subscribe, send, request,
|
||||
connect, publish, subscribe,
|
||||
send, send_ext, send_host,
|
||||
request, request_ext, request_host,
|
||||
close: function() {
|
||||
for (let sub in this.sub_cb) {
|
||||
if (!sub.timer)
|
||||
|
|
|
@ -65,14 +65,14 @@ function client_request(cl, req)
|
|||
if (type(name) != "string" || type(args.type) != "string" || type(args.data) != "object")
|
||||
return libubus.STATUS_INVALID_ARGUMENT;
|
||||
|
||||
let data = prepare_data(req.args);
|
||||
let data = prepare_data(args);
|
||||
let handle;
|
||||
switch (req.type) {
|
||||
case "message":
|
||||
handle = cl.publish[name];
|
||||
if (!handle)
|
||||
return libubus.STATUS_INVALID_ARGUMENT;
|
||||
return core.handle_message(handle, data, true);
|
||||
return core.handle_message(handle, data, true, args.host);
|
||||
case "request":
|
||||
handle = cl.subscribe[name];
|
||||
if (!handle &&
|
||||
|
@ -80,7 +80,7 @@ function client_request(cl, req)
|
|||
return libubus.STATUS_PERMISSION_DENIED;
|
||||
|
||||
handle ??= { client: cl.id };
|
||||
return core.handle_request(handle, req, data, true);
|
||||
return core.handle_request(handle, req, data, true, args.host);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -62,10 +62,15 @@ function pubsub_del(kind, name, data)
|
|||
remote.pubsub_set(kind, name, length(list) > 0);
|
||||
}
|
||||
|
||||
function get_handles(handle, local, remote)
|
||||
function get_handles(handle, local, remote, host)
|
||||
{
|
||||
let handles = [];
|
||||
|
||||
if (host == "")
|
||||
remote = {};
|
||||
else if (host != null)
|
||||
local = {};
|
||||
|
||||
for (let cur_id, cur in local) {
|
||||
if (handle) {
|
||||
if (handle.id == cur_id)
|
||||
|
@ -80,19 +85,22 @@ function get_handles(handle, local, remote)
|
|||
if (!remote)
|
||||
return handles;
|
||||
|
||||
for (let cur_id, cur in remote)
|
||||
for (let cur_id, cur in remote) {
|
||||
if (host != null && cur.name != host)
|
||||
continue;
|
||||
push(handles, cur);
|
||||
}
|
||||
|
||||
return handles;
|
||||
}
|
||||
|
||||
function handle_request(handle, req, data, remote)
|
||||
function handle_request(handle, req, data, remote, host)
|
||||
{
|
||||
let name = data.name;
|
||||
let local = this.publish[name];
|
||||
if (remote)
|
||||
remote = this.remote_publish[name];
|
||||
let handles = get_handles(handle, local, remote);
|
||||
let handles = get_handles(handle, local, remote, host);
|
||||
|
||||
let context = {
|
||||
pending: length(handles),
|
||||
|
@ -134,13 +142,13 @@ function handle_request(handle, req, data, remote)
|
|||
}
|
||||
}
|
||||
|
||||
function handle_message(handle, data, remote)
|
||||
function handle_message(handle, data, remote, host)
|
||||
{
|
||||
let name = data.name;
|
||||
let local = this.subscribe[name];
|
||||
if (remote)
|
||||
remote = this.remote_subscribe[name];
|
||||
let handles = get_handles(handle, local, remote);
|
||||
let handles = get_handles(handle, local, remote, host);
|
||||
for (let cur in handles) {
|
||||
if (!cur || !cur.get_channel)
|
||||
continue;
|
||||
|
|
Loading…
Reference in a new issue