I want to share this design to luvit developers because with this design 
takes advantages of Lua when we are parsing a stream. This design is very 
flexable and worked very well in one of my projects.

** Stream*

Stream is a series of ordered data-chunks like what uv_read_cb provides. We 
should just push those data-chunks to the reader and they will be 
concatenated by the reader as the buffer and passed to the decoder.

** Reader*

Reader is the main thing of this design. It's a Lua object that you push 
data chunks from a stream to it and read Lua values from it. When there is 
no enough data it will yield the thread and when there's been enough data 
decoded, resumes the coroutine.

Note: the Reader can be read from one coroutine at the one time. If an 
queue is implemented this limit will disappear, but the queue seems never 
required. Data chunks can be pushed from any thread.

** Decoder*

Decoder is a Lua function that receives a string of the Reader's buffer and 
returns an Lua value, plus the rest buffer if any. If the buffer doesn't 
have enough data, return nil.  If it encounters a bad buffer, it should 
throw a Lua error.

*If we want to optimize the program, we can simply implement the decoder in 
C.*

These code will express more clear.

```
local MT_Reader = {}

-- Tell the reader no more data will be pushed.
-- If the reader want to continue reading, it will call onrestore first.
-- Pause is used to avoid too much data buffered.
function MT_Reader:Pause(onrestore)
    assert(not self.stopped, "already paused")
    self.paused = onrestore
    if self.decoder then
        SafeResume(self.readco, nil, "paused")
    end
end

-- Push a chunk.
-- If str is nil, no more data will be pushed to this reader. All read 
requests will return nil plus "stopped".
-- When str is nil, if there is also a err, the err will be returned by the 
pending Read.
-- This returns the length of the buffer. We can use it to check if we need 
to pause reading.
function MT_Reader:Push(str, err)
    if not str then
        self.stopped = true
        if self.decoder then
            SafeResume(self.readco, nil, err or "stopped")
        end
    elseif self.buffer then
        self.buffer = self.buffer .. str
        if self.decoder then
            local s, result, rest = pcall(self.decoder, self.buffer)
            if not s then
                SafeResume(self.readco, nil, result)
            elseif result then
                if rest and #rest > 0 then
                    self.buffer = rest
                else
                    self.buffer = nil
                end
                SafeResume(self.readco, result)
            end
        end
    else
        if self.decoder then
            local s, result, rest = pcall(self.decoder, str)
            if not s then
                self.buffer = str
                SafeResume(self.readco, nil, result)
            elseif result then
                if rest and #rest > 0 then
                    self.buffer = rest
                end
                SafeResume(self.readco, result)
            else
                self.buffer = str
            end
        else
            self.buffer = str
        end
    end
    if self.buffer then return #self.buffer else return 0 end
end

-- Read data with the decoder.
function MT_Reader:Read(decoder)
    assert(not self.decoder, "already reading")
    if self.buffer then
        local s, result, rest = pcall(decoder, self.buffer)
        if not s then
            return nil, result
        elseif result then
            if rest and #rest > 0 then
                self.buffer = rest
            else
                self.buffer = nil
            end
            return result
        end
    end
    if self.stopped then return nil, "stopped" end
    if self.paused then self.paused(self) self.paused = false end
    self.readco, self.decoder = crunning(), decoder
    local result, err = cyield()
    self.readco, self.decoder = nil, nil
    return result, err
end

-- Read data of the length.
function MT_Reader:Get(len)
    return self:Read(function(buffer)
        if #buffer == len then return buffer elseif #buffer > len then
            return buffer:sub(1, len), buffer:sub(len + 1, -1)
        end
    end)
end

-- This equals with:
--    Reader:Read(function(buffer) return buffer end)
-- but works faster if there's been buffered data.
-- If len is provided, this can be used to limit the length readed data,
-- but it will be not optimized.
function MT_Reader:Peek(len)
    if len then
        return self:Read(function(buffer)
            if #buffer <= len then return buffer elseif #buffer > len then
                return buffer:sub(1, len), buffer:sub(len + 1, -1)
            end
        end)
    end
    assert(not self.decoder, "already reading")
    if self.buffer then
        local buffer = self.buffer
        self.buffer = nil
        return self.buffer
    end
    if self.stopped then return nil, "stopped" end
    if self.paused then self.paused(self) self.paused = false end
    self.readco, self.decoder = crunning(), function(buffer)
        return buffer
    end
    local result, err = cyield()
    self.readco, self.decoder = nil, nil
    return result, err
end

return function() return setmetatable({}, MT_Reader) end
```

An HTTP header decoder which reads a HTTP header as a table:
```
local function decodeHead(buffer)
    local l, r = buffer:find("\r?\n\r?\n")
    if l and r then
        local head = buffer:sub(1, l - 1)
        local result, firstLine = {}, true
        for l in head:gmatch("([^\r\n]+)") do
            if firstLine then
                local verb, resource = l:match("^([A-Z]+) ([^%s]+) 
HTTP/1%.[01]$")
                assert(verb and resource, "bad request")
                result.method, result.resource_orig = verb, resource
                firstLine = false
            else
                local k, v = l:match("^([A-Za-z0-9%-]+):%s?(.+)$")
                assert(k and v, "bad request")
                result.headers[k:lower()] = v
            end
        end
        return result, buffer:sub(r + 1, -1)
    end
end
```

An FastCGI decoder which reads a FastCGI payload:
```
local function decodeFCGI(buffer)
    if #buffer < 8 then return nil end
    local dl, pl = buffer:byte(5) * 0x100 + buffer:byte(6), buffer:byte(7)
    if #buffer >= dl + pl + 8 then
        local result = { buffer:byte(2), buffer:sub(9,  8 + dl) }
        return result, buffer:sub(9 + dl + pl, -1)
    end
end
```

An WebSocket frame decoder:
```
function decodeWSF(block)
    if #block < 2 then return end
    local b1, b2 = block:byte(1, 2)
    local skip, len = 2, band(b2, 0x7F)
    if len == 126 then
        if #block < 4 then return end
        local l1, l2 = block:byte(3, 4)
        len, skip = l1 * 0x100 + l2, 4
    elseif len == 127 then
        if #block < 10 then return end
        local l1, l2, l3, l4, l5, l6, l7, l8 = block:byte(3, 10)
        len = l1 * 0x100000000000000 + l2 * 0x1000000000000 +
              l3 * 0x10000000000 + l4 * 0x100000000 +
              l5 * 0x1000000 + l6 * 0x10000 + l7 * 0x100 + l8
        skip = 10
    end
    local mask = band(b2, 0x80) == 0x80
    if mask then
        if #block < skip + 4 then return end
        skip = skip + 4
        mask = { block:byte(skip - 3, skip) }
    end
    if #block < skip + len then return end
    local data = block:sub(skip + 1, skip + len)
    if mask then
        local new = {}
        for i = 1, #data do
            new[i] = bxor(data:byte(i, i), mask[(i - 1) % 4 + 1])
        end
        data = schar(unpack(new))
    end
    local result = { band(b1, 0xF), data }
    result.FIN = band(b1, 0x80) == 0x80
    return result, block:sub(skip + len + 1, -1)
end
```

-- 
You received this message because you are subscribed to the Google Groups 
"luvit" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to