luci/applications/luci-app-upnp/root/usr/libexec/rpcd/luci.upnp

155 lines
3.6 KiB
Text
Raw Normal View History

#!/usr/bin/env lua
local json = require "luci.jsonc"
local UCI = require "luci.model.uci"
local fs = require "nixio.fs"
local sys = require "luci.sys"
local methods = {
get_status = {
call = function()
local uci = UCI.cursor()
local lease_file = uci:get("upnpd", "config", "upnp_lease_file")
local ipv4_hints = sys.net.ipv4_hints()
local rule = { }
local ipt = io.popen("iptables --line-numbers -t nat -xnvL MINIUPNPD 2>/dev/null")
if ipt then
local upnpf = lease_file and io.open(lease_file, "r")
while true do
local ln = ipt:read("*l")
if not ln then
break
elseif ln:match("^%d+") then
local num, proto, extport, intaddr, intport =
ln:match("^(%d+).-([a-z]+).-dpt:(%d+) to:(%S-):(%d+)")
local descr = ""
if num and proto and extport and intaddr and intport then
extport = tonumber(extport)
intport = tonumber(intport)
if upnpf then
local uln = upnpf:read("*l")
if uln then descr = uln:match(string.format("^%s:%d:%s:%d:%%d*:(.*)$", proto:upper(), extport, intaddr, intport)) end
if not descr then descr = "" end
end
local host_hint, _, e
for _,e in pairs(ipv4_hints) do
if e[1] == intaddr then
host_hint = e[2]
break
end
end
rule[#rule+1] = {
num = num,
proto = proto:upper(),
extport = extport,
intaddr = intaddr,
host_hint = host_hint,
intport = intport,
descr = descr
}
end
end
end
if upnpf then upnpf:close() end
ipt:close()
end
return { rules = rule }
end
},
delete_rule = {
args = { token = "token" },
call = function(args)
local util = require "luci.util"
local idx = args and tonumber(args.token)
local res = {}
if idx and idx > 0 then
local uci = UCI.cursor()
sys.call("iptables -t filter -D MINIUPNPD %d 2>/dev/null" % idx)
sys.call("iptables -t nat -D MINIUPNPD %d 2>/dev/null" % idx)
local lease_file = uci:get("upnpd", "config", "upnp_lease_file")
if lease_file and fs.access(lease_file) then
sys.call("sed -i -e '%dd' %s" %{ idx, util.shellquote(lease_file) })
end
uci.unload()
return { result = "OK" }
end
return { result = "Bad request" }
end
}
}
local function parseInput()
local parse = json.new()
local done, err
while true do
local chunk = io.read(4096)
if not chunk then
break
elseif not done and not err then
done, err = parse:parse(chunk)
end
end
if not done then
print(json.stringify({ error = err or "Incomplete input" }))
os.exit(1)
end
return parse:get()
end
local function validateArgs(func, uargs)
local method = methods[func]
if not method then
print(json.stringify({ error = "Method not found" }))
os.exit(1)
end
if type(uargs) ~= "table" then
print(json.stringify({ error = "Invalid arguments" }))
os.exit(1)
end
uargs.ubus_rpc_session = nil
local k, v
local margs = method.args or {}
for k, v in pairs(uargs) do
if margs[k] == nil or
(v ~= nil and type(v) ~= type(margs[k]))
then
print(json.stringify({ error = "Invalid arguments" }))
os.exit(1)
end
end
return method
end
if arg[1] == "list" then
local _, method, rv = nil, nil, {}
for _, method in pairs(methods) do rv[_] = method.args or {} end
print((json.stringify(rv):gsub(":%[%]", ":{}")))
elseif arg[1] == "call" then
local args = parseInput()
local method = validateArgs(arg[2], args)
local result, code = method.call(args)
print((json.stringify(result):gsub("^%[%]$", "{}")))
os.exit(code or 0)
end