I want to share this design to luvit developers because with this design
takes advantages of Lua when we are parsing a stream. This design is very
flexable and worked very well in one of my projects.
** Stream*
Stream is a series of ordered data-chunks like what uv_read_cb provides. We
should just push those data-chunks to the reader and they will be
concatenated by the reader as the buffer and passed to the decoder.
** Reader*
Reader is the main thing of this design. It's a Lua object that you push
data chunks from a stream to it and read Lua values from it. When there is
no enough data it will yield the thread and when there's been enough data
decoded, resumes the coroutine.
Note: the Reader can be read from one coroutine at the one time. If an
queue is implemented this limit will disappear, but the queue seems never
required. Data chunks can be pushed from any thread.
** Decoder*
Decoder is a Lua function that receives a string of the Reader's buffer and
returns an Lua value, plus the rest buffer if any. If the buffer doesn't
have enough data, return nil. If it encounters a bad buffer, it should
throw a Lua error.
*If we want to optimize the program, we can simply implement the decoder in
C.*
These code will express more clear.
```
local MT_Reader = {}
-- Tell the reader no more data will be pushed.
-- If the reader want to continue reading, it will call onrestore first.
-- Pause is used to avoid too much data buffered.
function MT_Reader:Pause(onrestore)
assert(not self.stopped, "already paused")
self.paused = onrestore
if self.decoder then
SafeResume(self.readco, nil, "paused")
end
end
-- Push a chunk.
-- If str is nil, no more data will be pushed to this reader. All read
requests will return nil plus "stopped".
-- When str is nil, if there is also a err, the err will be returned by the
pending Read.
-- This returns the length of the buffer. We can use it to check if we need
to pause reading.
function MT_Reader:Push(str, err)
if not str then
self.stopped = true
if self.decoder then
SafeResume(self.readco, nil, err or "stopped")
end
elseif self.buffer then
self.buffer = self.buffer .. str
if self.decoder then
local s, result, rest = pcall(self.decoder, self.buffer)
if not s then
SafeResume(self.readco, nil, result)
elseif result then
if rest and #rest > 0 then
self.buffer = rest
else
self.buffer = nil
end
SafeResume(self.readco, result)
end
end
else
if self.decoder then
local s, result, rest = pcall(self.decoder, str)
if not s then
self.buffer = str
SafeResume(self.readco, nil, result)
elseif result then
if rest and #rest > 0 then
self.buffer = rest
end
SafeResume(self.readco, result)
else
self.buffer = str
end
else
self.buffer = str
end
end
if self.buffer then return #self.buffer else return 0 end
end
-- Read data with the decoder.
function MT_Reader:Read(decoder)
assert(not self.decoder, "already reading")
if self.buffer then
local s, result, rest = pcall(decoder, self.buffer)
if not s then
return nil, result
elseif result then
if rest and #rest > 0 then
self.buffer = rest
else
self.buffer = nil
end
return result
end
end
if self.stopped then return nil, "stopped" end
if self.paused then self.paused(self) self.paused = false end
self.readco, self.decoder = crunning(), decoder
local result, err = cyield()
self.readco, self.decoder = nil, nil
return result, err
end
-- Read data of the length.
function MT_Reader:Get(len)
return self:Read(function(buffer)
if #buffer == len then return buffer elseif #buffer > len then
return buffer:sub(1, len), buffer:sub(len + 1, -1)
end
end)
end
-- This equals with:
-- Reader:Read(function(buffer) return buffer end)
-- but works faster if there's been buffered data.
-- If len is provided, this can be used to limit the length readed data,
-- but it will be not optimized.
function MT_Reader:Peek(len)
if len then
return self:Read(function(buffer)
if #buffer <= len then return buffer elseif #buffer > len then
return buffer:sub(1, len), buffer:sub(len + 1, -1)
end
end)
end
assert(not self.decoder, "already reading")
if self.buffer then
local buffer = self.buffer
self.buffer = nil
return self.buffer
end
if self.stopped then return nil, "stopped" end
if self.paused then self.paused(self) self.paused = false end
self.readco, self.decoder = crunning(), function(buffer)
return buffer
end
local result, err = cyield()
self.readco, self.decoder = nil, nil
return result, err
end
return function() return setmetatable({}, MT_Reader) end
```
An HTTP header decoder which reads a HTTP header as a table:
```
local function decodeHead(buffer)
local l, r = buffer:find("\r?\n\r?\n")
if l and r then
local head = buffer:sub(1, l - 1)
local result, firstLine = {}, true
for l in head:gmatch("([^\r\n]+)") do
if firstLine then
local verb, resource = l:match("^([A-Z]+) ([^%s]+)
HTTP/1%.[01]$")
assert(verb and resource, "bad request")
result.method, result.resource_orig = verb, resource
firstLine = false
else
local k, v = l:match("^([A-Za-z0-9%-]+):%s?(.+)$")
assert(k and v, "bad request")
result.headers[k:lower()] = v
end
end
return result, buffer:sub(r + 1, -1)
end
end
```
An FastCGI decoder which reads a FastCGI payload:
```
local function decodeFCGI(buffer)
if #buffer < 8 then return nil end
local dl, pl = buffer:byte(5) * 0x100 + buffer:byte(6), buffer:byte(7)
if #buffer >= dl + pl + 8 then
local result = { buffer:byte(2), buffer:sub(9, 8 + dl) }
return result, buffer:sub(9 + dl + pl, -1)
end
end
```
An WebSocket frame decoder:
```
function decodeWSF(block)
if #block < 2 then return end
local b1, b2 = block:byte(1, 2)
local skip, len = 2, band(b2, 0x7F)
if len == 126 then
if #block < 4 then return end
local l1, l2 = block:byte(3, 4)
len, skip = l1 * 0x100 + l2, 4
elseif len == 127 then
if #block < 10 then return end
local l1, l2, l3, l4, l5, l6, l7, l8 = block:byte(3, 10)
len = l1 * 0x100000000000000 + l2 * 0x1000000000000 +
l3 * 0x10000000000 + l4 * 0x100000000 +
l5 * 0x1000000 + l6 * 0x10000 + l7 * 0x100 + l8
skip = 10
end
local mask = band(b2, 0x80) == 0x80
if mask then
if #block < skip + 4 then return end
skip = skip + 4
mask = { block:byte(skip - 3, skip) }
end
if #block < skip + len then return end
local data = block:sub(skip + 1, skip + len)
if mask then
local new = {}
for i = 1, #data do
new[i] = bxor(data:byte(i, i), mask[(i - 1) % 4 + 1])
end
data = schar(unpack(new))
end
local result = { band(b1, 0xF), data }
result.FIN = band(b1, 0x80) == 0x80
return result, block:sub(skip + len + 1, -1)
end
```
--
You received this message because you are subscribed to the Google Groups
"luvit" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.