* libs/httpd: Introduced keep-alive and pipelining support

This commit is contained in:
Steven Barth 2008-06-25 16:38:48 +00:00
parent b85d292bcd
commit 7a4aa85dd6
8 changed files with 353 additions and 226 deletions

View file

@ -134,6 +134,23 @@ function syslog()
end end
-- Generates a random key of length BYTES
function uniqueid(bytes)
local fp = io.open("/dev/urandom")
local chunk = { fp:read(bytes):byte(1, bytes) }
fp:close()
local hex = ""
local pattern = "%02X"
for i, byte in ipairs(chunk) do
hex = hex .. pattern:format(byte)
end
return hex
end
group = {} group = {}
group.getgroup = posix.getgroup group.getgroup = posix.getgroup

View file

@ -114,9 +114,13 @@ local process_states = { }
-- Extract "magic", the first line of a http message. -- Extract "magic", the first line of a http message.
-- Extracts the message type ("get", "post" or "response"), the requested uri -- Extracts the message type ("get", "post" or "response"), the requested uri
-- or the status code if the line descripes a http response. -- or the status code if the line descripes a http response.
process_states['magic'] = function( msg, chunk ) process_states['magic'] = function( msg, chunk, err )
if chunk ~= nil then if chunk ~= nil then
-- ignore empty lines before request
if #chunk == 0 then
return true, nil
end
-- Is it a request? -- Is it a request?
local method, uri, http_ver = chunk:match("^([A-Z]+) ([^ ]+) HTTP/([01]%.[019])$") local method, uri, http_ver = chunk:match("^([A-Z]+) ([^ ]+) HTTP/([01]%.[019])$")
@ -522,6 +526,34 @@ process_states['urldecode-value'] = function( msg, chunk, filecb )
end end
-- Creates a header source from a given socket
function header_source( sock )
return ltn12.source.simplify( function()
local chunk, err, part = sock:receive("*l")
-- Line too long
if chunk == nil then
if err ~= "timeout" then
return nil, part
and "Line exceeds maximum allowed length["..part.."]"
or "Unexpected EOF"
else
return nil, err
end
-- Line ok
elseif chunk ~= nil then
-- Strip trailing CR
chunk = chunk:gsub("\r$","")
return chunk, nil
end
end )
end
-- Decode MIME encoded data. -- Decode MIME encoded data.
function mimedecode_message_body( source, msg, filecb ) function mimedecode_message_body( source, msg, filecb )
@ -617,20 +649,6 @@ function urldecode_message_body( source, msg )
end end
-- Parse a http message
function parse_message( data, filecb )
local reader = _linereader( data, HTTP_MAX_READBUF )
local message = parse_message_header( reader )
if message then
parse_message_body( reader, message, filecb )
end
return message
end
-- Parse a http message header -- Parse a http message header
function parse_message_header( source ) function parse_message_header( source )
@ -673,7 +691,7 @@ function parse_message_header( source )
REQUEST_URI = msg.request_uri; REQUEST_URI = msg.request_uri;
SCRIPT_NAME = msg.request_uri:gsub("?.+$",""); SCRIPT_NAME = msg.request_uri:gsub("?.+$","");
SCRIPT_FILENAME = ""; -- XXX implement me SCRIPT_FILENAME = ""; -- XXX implement me
SERVER_PROTOCOL = "HTTP/" .. msg.http_version SERVER_PROTOCOL = "HTTP/" .. string.format("%.1f", msg.http_version)
} }
-- Populate HTTP_* environment variables -- Populate HTTP_* environment variables
@ -702,19 +720,6 @@ end
-- Parse a http message body -- Parse a http message body
function parse_message_body( source, msg, filecb ) function parse_message_body( source, msg, filecb )
-- Install an additional filter if we're operating on chunked transfer
-- coding and client is HTTP/1.1 capable
if msg.http_version == 1.1 and
msg.headers['Transfer-Encoding'] and
msg.headers['Transfer-Encoding']:find("chunked")
then
source = ltn12.source.chain(
source, luci.http.protocol.filter.decode_chunked
)
end
-- Is it multipart/mime ? -- Is it multipart/mime ?
if msg.env.REQUEST_METHOD == "POST" and msg.env.CONTENT_TYPE and if msg.env.REQUEST_METHOD == "POST" and msg.env.CONTENT_TYPE and
msg.env.CONTENT_TYPE:match("^multipart/form%-data") msg.env.CONTENT_TYPE:match("^multipart/form%-data")
@ -771,33 +776,14 @@ function parse_message_body( source, msg, filecb )
end end
end end
-- Push a response to a socket
function push_response(request, response, sourceout, sinkout, sinkerr)
local code = response.status
sinkout(request.env.SERVER_PROTOCOL .. " " .. code .. " " .. statusmsg[code] .. "\r\n")
-- FIXME: Add support for keep-alive
response.headers["Connection"] = "close"
for k,v in pairs(response.headers) do
sinkout(k .. ": " .. v .. "\r\n")
end
sinkout("\r\n")
if sourceout then
ltn12.pump.all(sourceout, sinkout)
end
end
-- Status codes -- Status codes
statusmsg = { statusmsg = {
[200] = "OK", [200] = "OK",
[400] = "Bad Request", [400] = "Bad Request",
[403] = "Forbidden", [403] = "Forbidden",
[404] = "Not Found", [404] = "Not Found",
[405] = "Method Not Allowed",
[411] = "Length Required",
[500] = "Internal Server Error", [500] = "Internal Server Error",
[503] = "Server Unavailable", [503] = "Server Unavailable",
} }

View file

@ -27,13 +27,59 @@ function Socket(ip, port)
return sock, err return sock, err
end end
Thread = luci.util.class()
function Thread.__init__(self, socket, func)
self.socket = socket
self.routine = coroutine.create(func)
self.stamp = os.time()
self.waiting = false
end
function Thread.getidletime(self)
return os.difftime(os.time(), self.stamp)
end
function Thread.iswaiting(self)
return self.waiting
end
function Thread.receive(self, ...)
local chunk, err, part
self.waiting = true
repeat
coroutine.yield()
chunk, err, part = self.socket:receive(...)
until err ~= "timeout"
self.waiting = false
return chunk, err, part
end
function Thread.resume(self, ...)
return coroutine.resume(self.routine, self, ...)
end
function Thread.status(self)
return coroutine.status(self.routine)
end
function Thread.touch(self)
self.stamp = os.time()
end
Daemon = luci.util.class() Daemon = luci.util.class()
function Daemon.__init__(self, threadlimit, timeout) function Daemon.__init__(self, threadlimit, timeout)
self.reading = {} self.reading = {}
self.running = {} self.threads = {}
self.handler = {} self.handler = {}
self.waiting = {}
self.threadc = 0
setmetatable(self.waiting, {__mode = "v"})
self.debug = false self.debug = false
self.threadlimit = threadlimit self.threadlimit = threadlimit
self.timeout = timeout or 0.1 self.timeout = timeout or 0.1
@ -58,10 +104,7 @@ end
function Daemon.step(self) function Daemon.step(self)
local input, output, err = socket.select( self.reading, nil, 0 ) local input, output, err = socket.select( self.reading, nil, 0 )
local working = false
if err == "timeout" and #self.running == 0 then
socket.sleep(self.timeout)
end
-- accept new connections -- accept new connections
for i, connection in ipairs(input) do for i, connection in ipairs(input) do
@ -70,19 +113,18 @@ function Daemon.step(self)
if sock then if sock then
-- check capacity -- check capacity
if not self.threadlimit or #self.running < self.threadlimit then if not self.threadlimit or self.threadc < self.threadlimit then
if self.debug then if self.debug then
self:dprint("Accepted incoming connection from " .. sock:getpeername()) self:dprint("Accepted incoming connection from " .. sock:getpeername())
end end
table.insert( self.running, { local t = Thread(sock, self.handler[connection].clhandler)
coroutine.create( self.handler[connection].clhandler ), self.threads[sock] = t
sock self.threadc = self.threadc + 1
} )
if self.debug then if self.debug then
self:dprint("Created " .. tostring(self.running[#self.running][1])) self:dprint("Created " .. tostring(t))
end end
-- reject client -- reject client
@ -101,27 +143,62 @@ function Daemon.step(self)
end end
-- create client handler -- create client handler
for i, client in ipairs( self.running ) do for sock, thread in pairs( self.threads ) do
-- reap dead clients -- reap dead clients
if coroutine.status( client[1] ) == "dead" then if thread:status() == "dead" then
if self.debug then if self.debug then
self:dprint("Completed " .. tostring(client[1])) self:dprint("Completed " .. tostring(thread))
end end
table.remove( self.running, i ) sock:close()
self.threadc = self.threadc - 1
self.threads[sock] = nil
-- resume working threads
elseif not thread:iswaiting() then
if self.debug then
self:dprint("Resuming " .. tostring(thread))
end
local stat, err = thread:resume()
if stat then
thread:touch()
if not thread:iswaiting() then
working = true
else else
if self.debug then table.insert(self.waiting, sock)
self:dprint("Resuming " .. tostring(client[1])) end
end end
local stat, err = coroutine.resume( client[1], client[2] )
if self.debug then if self.debug then
self:dprint(tostring(client[1]) .. " returned") self:dprint(tostring(thread) .. " returned")
if not stat then if not stat then
self:dprint("Error in " .. tostring(client[1]) .. " " .. err) self:dprint("Error in " .. tostring(thread) .. " " .. err)
end end
end end
end end
end end
-- check for data on waiting threads
input, output, err = socket.select( self.waiting, nil, 0 )
for i, sock in ipairs(input) do
self.threads[sock]:resume()
self.threads[sock]:touch()
if not self.threads[sock]:iswaiting() then
for i, s in ipairs(self.waiting) do
if s == sock then
table.remove(self.waiting, i)
break
end
end
if not working then
working = true
end
end
end
if err == "timeout" and not working then
socket.sleep(self.timeout)
end
end end

View file

@ -11,18 +11,28 @@ function Simple.__init__(self, docroot)
self.docroot = docroot self.docroot = docroot
end end
function Simple.handle(self, request, sourcein, sinkerr) function Simple.getfile(self, uri)
local uri = request.env.PATH_INFO
local file = self.docroot .. uri:gsub("%.%./", "") local file = self.docroot .. uri:gsub("%.%./", "")
local stat = luci.fs.stat(file) local stat = luci.fs.stat(file)
return file, stat
end
function Simple.handle_get(self, request, sourcein, sinkerr)
local file, stat = self:getfile(request.env.PATH_INFO)
if stat then if stat then
if stat.type == "regular" then if stat.type == "regular" then
return Response(200, {["Content-Length"] = stat.size}), ltn12.source.file(io.open(file)) return Response(200, {["Content-Length"] = stat.size}), ltn12.source.file(io.open(file))
else else
return self:failure(403, "Unable to transmit " .. stat.type .. " " .. uri) return self:failure(403, "Unable to transmit " .. stat.type .. " " .. request.env.PATH_INFO)
end end
else else
return self:failure(404, "No such file: " .. uri) return self:failure(404, "No such file: " .. uri)
end end
end end
function Simple.handle_head(self, ...)
local response, sourceout = self:handle_get(...)
return response
end

View file

@ -10,7 +10,16 @@ function Luci.__init__(self)
luci.httpd.module.Handler.__init__(self) luci.httpd.module.Handler.__init__(self)
end end
function Luci.handle(self, request, sourcein, sinkerr) function Luci.handle_head(self, ...)
local response, sourceout = self:handle_get(...)
return response
end
function Luci.handle_post(self, ...)
return self:handle_get(...)
end
function Luci.handle_get(self, request, sourcein, sinkerr)
local r = luci.http.Request( local r = luci.http.Request(
request.env, request.env,
sourcein, sourcein,
@ -22,7 +31,7 @@ function Luci.handle(self, request, sourcein, sinkerr)
local status = 200 local status = 200
local x = coroutine.create(luci.dispatcher.httpdispatch) local x = coroutine.create(luci.dispatcher.httpdispatch)
while id < 3 do while not id or id < 3 do
coroutine.yield() coroutine.yield()
res, id, data1, data2 = coroutine.resume(x, r) res, id, data1, data2 = coroutine.resume(x, r)
@ -45,6 +54,8 @@ function Luci.handle(self, request, sourcein, sinkerr)
local res, id, data = coroutine.resume(x) local res, id, data = coroutine.resume(x)
if not res then if not res then
return nil, id return nil, id
elseif not id then
return true
elseif id == 5 then elseif id == 5 then
return nil return nil
else else

View file

@ -24,6 +24,7 @@ Handler = luci.util.class()
-- Constructor -- Constructor
function Handler.__init__(self) function Handler.__init__(self)
self.filters = {} self.filters = {}
self.handler = {}
end end
@ -41,9 +42,10 @@ function Handler.failure(self, code, message)
return response, sourceout return response, sourceout
end end
-- Processes a request -- Processes a request
function Handler.process(self, request, sourcein, sinkout, sinkerr) function Handler.process(self, request, sourcein, sinkerr, ...)
local stat, response, sourceout
-- Process incoming filters -- Process incoming filters
for i, f in ipairs(self.filters) do for i, f in ipairs(self.filters) do
local i = f:get("input") local i = f:get("input")
@ -57,15 +59,21 @@ function Handler.process(self, request, sourcein, sinkout, sinkerr)
end end
end end
-- Detect request Method
local hname = "handle_" .. request.request_method
if self[hname] then
-- Run the handler -- Run the handler
local stat, response, sourceout = luci.util.copcall( stat, response, sourceout = luci.util.copcall(
self.handle, self, request, sourcein, sinkerr self[hname], self, request, sourcein, sinkerr, ...
) )
-- Check for any errors -- Check for any errors
if not stat then if not stat then
response, sourceout = self:failure(500, response) response, sourceout = self:failure(500, response)
end end
else
response, sourceout = self:failure(405, luci.http.protocol.statusmsg[405])
end
-- Check data -- Check data
if not luci.util.instanceof(response, Response) then if not luci.util.instanceof(response, Response) then
@ -85,7 +93,7 @@ function Handler.process(self, request, sourcein, sinkout, sinkerr)
end end
end end
luci.http.protocol.push_response(request, response, sourceout, sinkout, sinkerr) return response, sourceout
end end

View file

@ -14,6 +14,8 @@ $Id$
]]-- ]]--
module("luci.httpd.server", package.seeall) module("luci.httpd.server", package.seeall)
require("socket")
require("socket.http")
require("luci.util") require("luci.util")
READ_BUFSIZE = 1024 READ_BUFSIZE = 1024
@ -26,7 +28,7 @@ function VHost.__init__(self, handler)
self.dhandler = {} self.dhandler = {}
end end
function VHost.process(self, request, sourcein, sinkout, sinkerr) function VHost.process(self, request, sourcein, sinkerr, ...)
local handler = self.handler local handler = self.handler
local uri = request.env.REQUEST_URI:match("^([^?]*)") local uri = request.env.REQUEST_URI:match("^([^?]*)")
@ -47,10 +49,7 @@ function VHost.process(self, request, sourcein, sinkout, sinkerr)
end end
if handler then if handler then
handler:process(request, sourcein, sinkout, sinkerr) return handler:process(request, sourcein, sinkerr, ...)
return true
else
return false
end end
end end
@ -69,8 +68,6 @@ end
Server = luci.util.class() Server = luci.util.class()
function Server.__init__(self, host) function Server.__init__(self, host)
self.clhandler = client_handler
self.errhandler = error503
self.host = host self.host = host
self.vhosts = {} self.vhosts = {}
end end
@ -86,109 +83,67 @@ end
function Server.create_daemon_handlers(self) function Server.create_daemon_handlers(self)
return function(...) return self:process(...) end, return function(...) return self:process(...) end,
function(...) return self:error503(...) end function(...) return self:error_overload(...) end
end
function Server.create_client_sources(self, client)
-- Create LTN12 block source
local block_source = function()
-- Yielding here may cause chaos in coroutine based modules, be careful
-- coroutine.yield()
local chunk, err, part = client:receive( READ_BUFSIZE )
if chunk == nil and err == "timeout" then
return part
elseif chunk ~= nil then
return chunk
else
return nil, err
end
end end
-- Create LTN12 line source function Server.error(self, socket, code, msg)
local line_source = ltn12.source.simplify( function() hcode = tostring(code)
coroutine.yield() socket:send( "HTTP/1.1 " .. hcode .. " " ..
luci.http.protocol.statusmsg[code] .. "\r\n" )
local chunk, err, part = client:receive("*l") socket:send( "Connection: close\r\n" )
-- Line too long
if chunk == nil and err ~= "timeout" then
return nil, part
and "Line exceeds maximum allowed length["..part.."]"
or "Unexpected EOF"
-- Line ok
elseif chunk ~= nil then
-- Strip trailing CR
chunk = chunk:gsub("\r$","")
-- We got end of headers, switch to dummy source
if #chunk == 0 then
return "", function()
return nil
end
else
return chunk, nil
end
end
end )
return block_source, line_source
end
function Server.error400(self, socket, msg)
socket:send( "HTTP/1.0 400 Bad request\r\n" )
socket:send( "Content-Type: text/plain\r\n\r\n" ) socket:send( "Content-Type: text/plain\r\n\r\n" )
if msg then if msg then
socket:send( msg .. "\r\n" ) socket:send( "HTTP-Error " .. code .. ": " .. msg .. "\r\n" )
end
end end
socket:close() function Server.error_overload(self, socket)
end self:error(socket, 503, "Too many simultaneous connections")
function Server.error500(self, socket, msg)
socket:send( "HTTP/1.0 500 Internal Server Error\r\n" )
socket:send( "Content-Type: text/plain\r\n\r\n" )
if msg then
socket:send( msg .. "\r\n" )
end
socket:close()
end
function Server.error503(self, socket)
socket:send( "HTTP/1.0 503 Server unavailable\r\n" )
socket:send( "Content-Type: text/plain\r\n\r\n" )
socket:send( "There are too many clients connected, try again later\r\n" )
socket:close()
end end
function Server.process(self, client) function Server.process( self, thread )
-- Setup sockets and sources
local client = thread.socket
client:settimeout( 0 ) client:settimeout( 0 )
local sourcein, sourcehdr = self:create_client_sources(client) local sourcein = ltn12.source.empty()
local sourcehdr = luci.http.protocol.header_source( thread )
local sinkerr = ltn12.sink.file( io.stderr ) local sinkerr = ltn12.sink.file( io.stderr )
-- FIXME: Add keep-alive support local close = false
local sinkout = socket.sink("close-when-done", client)
local reading = { client }
local message, err
socket.sleep(5)
repeat
-- parse headers
message, err = luci.http.protocol.parse_message_header( sourcehdr )
if not message then
self:error( client, 400, err )
break
end
coroutine.yield() coroutine.yield()
-- parse headers -- keep-alive
local message, err = luci.http.protocol.parse_message_header( sourcehdr ) if message.http_version == 1.1 then
close = (message.env.HTTP_CONNECTION == "close")
else
close = not message.env.HTTP_CONNECTION or message.env.HTTP_CONNECTION == "close"
end
if message then if message.request_method == "get" or message.request_method == "head" then
-- Be happy
elseif message.request_method == "post" then
-- If we have a HTTP/1.1 client and an Expect: 100-continue header then -- If we have a HTTP/1.1 client and an Expect: 100-continue header then
-- respond with HTTP 100 Continue message -- respond with HTTP 100 Continue message
if message.http_version == 1.1 and message.headers['Expect'] and if message.http_version == 1.1 and message.headers['Expect'] and
@ -197,18 +152,84 @@ function Server.process(self, client)
client:send("HTTP/1.1 100 Continue\r\n\r\n") client:send("HTTP/1.1 100 Continue\r\n\r\n")
end end
if message.headers['Transfer-Encoding'] and
message.headers['Transfer-Encoding'] ~= "identity" then
sourcein = socket.source("http-chunked", thread)
elseif message.env.CONTENT_LENGTH then
sourcein = socket.source("by-length", thread,
tonumber(message.env.CONTENT_LENGTH))
else
self:error( client, 411, luci.http.protocol.statusmsg[411] )
break;
end
else
self:error( client, 405, luci.http.protocol.statusmsg[405] )
break;
end
local host = self.vhosts[message.env.HTTP_HOST] or self.host local host = self.vhosts[message.env.HTTP_HOST] or self.host
if host then if not host then
if host:process(message, sourcein, sinkout, sinkerr) then self:error( client, 500, "Unable to find matching host" )
sinkout() break;
end
coroutine.yield()
local response, sourceout = host:process(
message, sourcein, sinkerr,
client, io.stderr
)
if not response then
self:error( client, 500, "Error processing handler" )
end
coroutine.yield()
-- Post process response
local sinkmode = close and "close-when-done" or "keep-open"
if sourceout then
if not response.headers["Content-Length"] then
if message.http_version == 1.1 then
response.headers["Transfer-Encoding"] = "chunked"
sinkmode = "http-chunked"
else else
self:error500( client, "No suitable path handler found" ) close = true
end sinkmode = "close-when-done"
else
self:error500( client, "No suitable host handler found" )
end
else
self:error400( client, err )
return nil
end end
end end
end
if close then
response.headers["Connection"] = "close"
end
local sinkout = socket.sink(sinkmode, client)
local header =
message.env.SERVER_PROTOCOL .. " " ..
tostring(response.status) .. " " ..
luci.http.protocol.statusmsg[response.status] .. "\r\n"
for k,v in pairs(response.headers) do
header = header .. k .. ": " .. v .. "\r\n"
end
client:send(header .. "\r\n")
if sourceout then
local eof = false
repeat
coroutine.yield()
eof = not ltn12.pump.step(sourceout, sinkout)
until eof
end
until close
client:close()
end

View file

@ -7,9 +7,6 @@ HTTP-Header manipulator and form variable preprocessor
FileId: FileId:
$Id$ $Id$
ToDo:
- Cookie handling
License: License:
Copyright 2008 Steven Barth <steven@midlink.org> Copyright 2008 Steven Barth <steven@midlink.org>
@ -51,22 +48,14 @@ function Request.__init__(self, env, sourcein, sinkerr)
params = luci.http.protocol.urldecode_params(env.QUERY_STRING or ""), params = luci.http.protocol.urldecode_params(env.QUERY_STRING or ""),
} }
setmetatable(self.message.params, {__index = self.parsed_input = false
function(tbl, key)
setmetatable(tbl, nil)
luci.http.protocol.parse_message_body(
self.input,
self.message,
self.filehandler
)
return rawget(tbl, key)
end
})
end end
function Request.formvalue(self, name, default) function Request.formvalue(self, name, default)
if not self.parsed_input then
self:_parse_input()
end
if name then if name then
return self.message.params[name] and tostring(self.message.params[name]) or default return self.message.params[name] and tostring(self.message.params[name]) or default
else else
@ -78,6 +67,10 @@ function Request.formvaluetable(self, prefix)
local vals = {} local vals = {}
prefix = prefix and prefix .. "." or "." prefix = prefix and prefix .. "." or "."
if not self.parsed_input then
self:_parse_input()
end
local void = self.message.params[nil] local void = self.message.params[nil]
for k, v in pairs(self.message.params) do for k, v in pairs(self.message.params) do
if k:find(prefix, 1, true) == 1 then if k:find(prefix, 1, true) == 1 then
@ -88,6 +81,13 @@ function Request.formvaluetable(self, prefix)
return vals return vals
end end
function Request.getcookie(self, name)
local c = string.gsub(";" .. (self:getenv("HTTP_COOKIE") or "") .. ";", "%s*;%s*", ";")
local p = ";" .. name .. "=(.-);"
local i, j, value = cookies:find(p)
return value and urldecode(value)
end
function Request.getenv(self, name) function Request.getenv(self, name)
if name then if name then
return self.message.env[name] return self.message.env[name]
@ -100,6 +100,15 @@ function Request.setfilehandler(self, callback)
self.filehandler = callback self.filehandler = callback
end end
function Request._parse_input(self)
luci.http.protocol.parse_message_body(
self.input,
self.message,
self.filehandler
)
self.parsed_input = true
end
function close() function close()
if not context.eoh then if not context.eoh then
@ -177,18 +186,6 @@ function write(content)
coroutine.yield(4, content) coroutine.yield(4, content)
end end
function basic_auth(realm, errorpage)
header("Status", "401 Unauthorized")
header("WWW-Authenticate", string.format('Basic realm="%s"', realm or ""))
if errorpage then
errorpage()
end
close()
end
function redirect(url) function redirect(url)
header("Status", "302 Found") header("Status", "302 Found")
header("Location", url) header("Location", url)