More splicing stuff

This commit is contained in:
Steven Barth 2009-02-28 21:21:52 +00:00
parent 0a224c5ca8
commit 33fe5b57d7
3 changed files with 175 additions and 77 deletions

View file

@ -14,10 +14,10 @@ $Id$
require "nixio.util" require "nixio.util"
local nixio = require "nixio" local nixio = require "nixio"
local httpclient = require "luci.httpclient" local httpc = require "luci.httpclient"
local ltn12 = require "luci.ltn12" local ltn12 = require "luci.ltn12"
local print = print local print, tonumber, require = print, tonumber, require
module "luci.httpclient.receiver" module "luci.httpclient.receiver"
@ -45,6 +45,132 @@ local function prepare_fd(target)
return file return file
end end
local function splice_async(sock, pipeout, pipein, file, cb)
local ssize = 65536
local smode = nixio.splice_flags("move", "more", "nonblock")
-- Set pipe non-blocking otherwise we might end in a deadlock
local stat, code, msg = pipein:setblocking(false)
if stat then
stat, code, msg = pipeout:setblocking(false)
end
if not stat then
return stat, code, msg
end
local pollsock = {
{fd=sock, events=nixio.poll_flags("in")}
}
local pollfile = {
{fd=file, events=nixio.poll_flags("out")}
}
local done
local active -- Older splice implementations sometimes don't detect EOS
repeat
active = false
-- Socket -> Pipe
repeat
nixio.poll(pollsock, 15000)
stat, code, msg = nixio.splice(sock, pipeout, ssize, smode)
if stat == nil then
return stat, code, msg
elseif stat == 0 then
done = true
break
elseif stat then
active = true
end
until stat == false
-- Pipe -> File
repeat
nixio.poll(pollfile, 15000)
stat, code, msg = nixio.splice(pipein, file, ssize, smode)
if stat == nil then
return stat, code, msg
elseif stat then
active = true
end
until stat == false
if cb then
cb(file)
end
if not active then
-- We did not splice any data, maybe EOS, fallback to default
return false
end
until done
pipein:close()
pipeout:close()
sock:close()
file:close()
return true
end
local function splice_sync(sock, pipeout, pipein, file, cb)
local os = require "os"
local posix = require "posix"
local ssize = 65536
local smode = nixio.splice_flags("move", "more")
local stat
-- This is probably the only forking http-client ;-)
local pid, code, msg = posix.fork()
if not pid then
return pid, code, msg
elseif pid == 0 then
pipein:close()
file:close()
repeat
stat, code = nixio.splice(sock, pipeout, ssize, smode)
until not stat or stat == 0
pipeout:close()
sock:close()
os.exit(stat or code)
else
pipeout:close()
sock:close()
repeat
stat, code, msg = nixio.splice(pipein, file, ssize, smode)
if cb then
cb(file)
end
until not stat or stat == 0
pipein:close()
file:close()
if not stat then
posix.kill(pid)
posix.wait(pid)
return stat, code, msg
else
pid, msg, code = posix.wait(pid)
if msg == "exited" then
if code == 0 then
return true
else
return nil, code, nixio.strerror(code)
end
else
return nil, -0x11, "broken pump"
end
end
end
end
function request_to_file(uri, target, options, cbs) function request_to_file(uri, target, options, cbs)
options = options or {} options = options or {}
@ -64,7 +190,7 @@ function request_to_file(uri, target, options, cbs)
hdr.Range = hdr.Range or ("bytes=" .. off .. "-") hdr.Range = hdr.Range or ("bytes=" .. off .. "-")
end end
local code, resp, buffer, sock = httpclient.request_raw(uri, options) local code, resp, buffer, sock = httpc.request_raw(uri, options)
if not code then if not code then
-- No success -- No success
file:close() file:close()
@ -86,13 +212,13 @@ function request_to_file(uri, target, options, cbs)
end end
local chunked = resp.headers["Transfer-Encoding"] == "chunked" local chunked = resp.headers["Transfer-Encoding"] == "chunked"
local stat
-- Write the buffer to file -- Write the buffer to file
file:writeall(buffer) file:writeall(buffer)
print ("Buffered data: " .. #buffer .. " Byte")
repeat repeat
if not sock:is_socket() or chunked then if not options.splice or not sock:is_socket() or chunked then
break break
end end
@ -106,78 +232,34 @@ function request_to_file(uri, target, options, cbs)
end end
-- Disable blocking for the pipe otherwise we might end in a deadlock
local stat, code, msg = pipein:setblocking(false)
if stat then
stat, code, msg = pipeout:setblocking(false)
end
if not stat then
sock:close()
file:close()
return stat, code, msg
end
-- Adjust splice values -- Adjust splice values
local ssize = 65536 local ssize = 65536
local smode = nixio.splice_flags("move", "more", "nonblock") local smode = nixio.splice_flags("move", "more")
local stat, code, msg = nixio.splice(sock, pipeout, ssize, smode) -- Splicing 512 bytes should never block on a fresh pipe
local stat, code, msg = nixio.splice(sock, pipeout, 512, smode)
if stat == nil then if stat == nil then
break break
end end
local pollsock = { -- Now do the real splicing
{fd=sock, events=nixio.poll_flags("in")} local cb = cbs.on_write
} if options.splice == "asynchronous" then
stat, code, msg = splice_async(sock, pipeout, pipein, file, cb)
elseif options.splice == "synchronous" then
stat, code, msg = splice_sync(sock, pipeout, pipein, file, cb)
else
break
end
local pollfile = { if stat == false then
{fd=file, events=nixio.poll_flags("out")} break
} end
local done return stat, code, msg
repeat
-- Socket -> Pipe
repeat
nixio.poll(pollsock, 15000)
stat, code, msg = nixio.splice(sock, pipeout, ssize, smode)
if stat == nil then
sock:close()
file:close()
return stat, code, msg
elseif stat == 0 then
done = true
break
end
until stat == false
-- Pipe -> File
repeat
nixio.poll(pollfile, 15000)
stat, code, msg = nixio.splice(pipein, file, ssize, smode)
if stat == nil then
sock:close()
file:close()
return stat, code, msg
end
until stat == false
if cbs.on_write then
cbs.on_write(file)
end
until done
file:close()
sock:close()
return true
until true until true
print "Warning: splice() failed, falling back to read/write mode" local src = chunked and httpc.chunksource(sock) or sock:blocksource()
local src = chunked and httpclient.chunksource(sock) or sock:blocksource()
local snk = file:sink() local snk = file:sink()
if cbs.on_write then if cbs.on_write then
@ -188,10 +270,10 @@ function request_to_file(uri, target, options, cbs)
end end
-- Fallback to read/write -- Fallback to read/write
local stat, code, msg = ltn12.pump.all(src, snk) stat, code, msg = ltn12.pump.all(src, snk)
if stat then
file:close() file:close()
sock:close() sock:close()
end return stat and true, code, msg
return stat, code, msg end
end

View file

@ -88,8 +88,14 @@ int nixio__tofd(lua_State *L, int ud) {
return fd; return fd;
} }
static int nixio_strerror(lua_State *L) {
lua_pushstring(L, strerror(luaL_checkinteger(L, 1)));
return 1;
}
/* object table */ /* object table */
static const luaL_reg R[] = { static const luaL_reg R[] = {
{"strerror", nixio_strerror},
{NULL, NULL} {NULL, NULL}
}; };

View file

@ -21,12 +21,13 @@
#include "nixio.h" #include "nixio.h"
#include <fcntl.h> #include <fcntl.h>
#include <string.h> #include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/sendfile.h> #include <sys/sendfile.h>
/* guess what sucks... */ /* guess what sucks... */
#ifdef __UCLIBC__ #ifdef __UCLIBC__
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include <sys/syscall.h> #include <sys/syscall.h>
ssize_t splice(int __fdin, __off64_t *__offin, int __fdout, ssize_t splice(int __fdin, __off64_t *__offin, int __fdout,
__off64_t *__offout, size_t __len, unsigned int __flags) { __off64_t *__offout, size_t __len, unsigned int __flags) {
@ -77,9 +78,11 @@ static int nixio_splice(lua_State *L) {
int fd_out = nixio__checkfd(L, 2); int fd_out = nixio__checkfd(L, 2);
size_t len = luaL_checkinteger(L, 3); size_t len = luaL_checkinteger(L, 3);
int flags = luaL_optinteger(L, 4, 0); int flags = luaL_optinteger(L, 4, 0);
long spliced;
do {
long spliced = splice(fd_in, NULL, fd_out, NULL, len, flags); spliced = splice(fd_in, NULL, fd_out, NULL, len, flags);
} while (spliced == -1 && errno == EINTR);
if (spliced < 0) { if (spliced < 0) {
return nixio__perror(L); return nixio__perror(L);
@ -89,6 +92,12 @@ static int nixio_splice(lua_State *L) {
return 1; return 1;
} }
static int nixio_splice_avail(lua_State *L) {
splice(-1, 0, -1, 0, 0, 0);
lua_pushboolean(L, errno != ENOSYS);
return 1;
}
/** /**
* sendfile(outfd, infd, length) * sendfile(outfd, infd, length)
*/ */
@ -111,6 +120,7 @@ static int nixio_sendfile(lua_State *L) {
static const luaL_reg R[] = { static const luaL_reg R[] = {
{"splice", nixio_splice}, {"splice", nixio_splice},
{"splice_flags", nixio_splice_flags}, {"splice_flags", nixio_splice_flags},
{"splice_avail", nixio_splice_avail},
{"sendfile", nixio_sendfile}, {"sendfile", nixio_sendfile},
{NULL, NULL} {NULL, NULL}
}; };