I am learning about channels these days. 

This reader design doesn't care how we write, because blocking writing 
sometimes causes troubles. For example, if we are making an IRC chatroom, we 
had to start lots of threads to start sending operations at the same time. So 
writing cannot be limited blocking unlike reading. I can't find a situation 
when non-blocking reading is necessary.

For performance problems, the main source is the string concat. So there's a 
Peek method lets the reader just redirect the pushed chunk to avoid concats. 
When we already know how many bytes to read, for example by http 
content-length, we can use it. 

> 在 2014年11月25日,3:48,Tim Caswell <[email protected]> 写道:
> 
> This is a pretty good design.  I especially like the part about the reader 
> simply suspending when it needs more data to keep things simple and out of 
> callback crazyness.
> 
> For comparison, have you see the channel style I've been starting to use in 
> the new luvi-up branch of luvit?
> 
> Basically an app or codec, or whatever is implemented as a coroutine with a 
> blocking read and write function.  Read will return the next chunk from it's 
> source and write will send data to it's output.  If there is nothing to read, 
> it will suspend.  It will also suspend on write if the thing you're writing 
> to isn't ready for more data.
> 
> 
> function (read, write)
> end
> 
> The thing I like about your decoder being a separate function is it doesn't 
> involve any coroutines or callbacks.  It's clever how it handles too much 
> data and not enough data.  Though I do worry about bad performance when input 
> chunks are huge and contain a large number of output events.  That fear is of 
> course based on gut feeling and not actual use of benchmarks.  WIth tail 
> calls in lua and not hitting coroutine APIs, this could be quite fast in 
> luajit.
> 
> 
>> On Sat, Nov 22, 2014 at 10:38 PM, Zyxwvu S <[email protected]> wrote:
>> I want to share this design to luvit developers because with this design 
>> takes advantages of Lua when we are parsing a stream. This design is very 
>> flexable and worked very well in one of my projects.
>> 
>> * Stream
>> 
>> Stream is a series of ordered data-chunks like what uv_read_cb provides. We 
>> should just push those data-chunks to the reader and they will be 
>> concatenated by the reader as the buffer and passed to the decoder.
>> 
>> * Reader
>> 
>> Reader is the main thing of this design. It's a Lua object that you push 
>> data chunks from a stream to it and read Lua values from it. When there is 
>> no enough data it will yield the thread and when there's been enough data 
>> decoded, resumes the coroutine.
>> 
>> Note: the Reader can be read from one coroutine at the one time. If an queue 
>> is implemented this limit will disappear, but the queue seems never 
>> required. Data chunks can be pushed from any thread.
>> 
>> * Decoder
>> 
>> Decoder is a Lua function that receives a string of the Reader's buffer and 
>> returns an Lua value, plus the rest buffer if any. If the buffer doesn't 
>> have enough data, return nil.  If it encounters a bad buffer, it should 
>> throw a Lua error.
>> 
>> If we want to optimize the program, we can simply implement the decoder in C.
>> 
>> These code will express more clear.
>> 
>> ```
>> local MT_Reader = {}
>> 
>> -- Tell the reader no more data will be pushed.
>> -- If the reader want to continue reading, it will call onrestore first.
>> -- Pause is used to avoid too much data buffered.
>> function MT_Reader:Pause(onrestore)
>>     assert(not self.stopped, "already paused")
>>     self.paused = onrestore
>>     if self.decoder then
>>         SafeResume(self.readco, nil, "paused")
>>     end
>> end
>> 
>> -- Push a chunk.
>> -- If str is nil, no more data will be pushed to this reader. All read 
>> requests will return nil plus "stopped".
>> -- When str is nil, if there is also a err, the err will be returned by the 
>> pending Read.
>> -- This returns the length of the buffer. We can use it to check if we need 
>> to pause reading.
>> function MT_Reader:Push(str, err)
>>     if not str then
>>         self.stopped = true
>>         if self.decoder then
>>             SafeResume(self.readco, nil, err or "stopped")
>>         end
>>     elseif self.buffer then
>>         self.buffer = self.buffer .. str
>>         if self.decoder then
>>             local s, result, rest = pcall(self.decoder, self.buffer)
>>             if not s then
>>                 SafeResume(self.readco, nil, result)
>>             elseif result then
>>                 if rest and #rest > 0 then
>>                     self.buffer = rest
>>                 else
>>                     self.buffer = nil
>>                 end
>>                 SafeResume(self.readco, result)
>>             end
>>         end
>>     else
>>         if self.decoder then
>>             local s, result, rest = pcall(self.decoder, str)
>>             if not s then
>>                 self.buffer = str
>>                 SafeResume(self.readco, nil, result)
>>             elseif result then
>>                 if rest and #rest > 0 then
>>                     self.buffer = rest
>>                 end
>>                 SafeResume(self.readco, result)
>>             else
>>                 self.buffer = str
>>             end
>>         else
>>             self.buffer = str
>>         end
>>     end
>>     if self.buffer then return #self.buffer else return 0 end
>> end
>> 
>> -- Read data with the decoder.
>> function MT_Reader:Read(decoder)
>>     assert(not self.decoder, "already reading")
>>     if self.buffer then
>>         local s, result, rest = pcall(decoder, self.buffer)
>>         if not s then
>>             return nil, result
>>         elseif result then
>>             if rest and #rest > 0 then
>>                 self.buffer = rest
>>             else
>>                 self.buffer = nil
>>             end
>>             return result
>>         end
>>     end
>>     if self.stopped then return nil, "stopped" end
>>     if self.paused then self.paused(self) self.paused = false end
>>     self.readco, self.decoder = crunning(), decoder
>>     local result, err = cyield()
>>     self.readco, self.decoder = nil, nil
>>     return result, err
>> end
>> 
>> -- Read data of the length.
>> function MT_Reader:Get(len)
>>     return self:Read(function(buffer)
>>         if #buffer == len then return buffer elseif #buffer > len then
>>             return buffer:sub(1, len), buffer:sub(len + 1, -1)
>>         end
>>     end)
>> end
>> 
>> -- This equals with:
>> --    Reader:Read(function(buffer) return buffer end)
>> -- but works faster if there's been buffered data.
>> -- If len is provided, this can be used to limit the length readed data,
>> -- but it will be not optimized.
>> function MT_Reader:Peek(len)
>>     if len then
>>         return self:Read(function(buffer)
>>             if #buffer <= len then return buffer elseif #buffer > len then
>>                 return buffer:sub(1, len), buffer:sub(len + 1, -1)
>>             end
>>         end)
>>     end
>>     assert(not self.decoder, "already reading")
>>     if self.buffer then
>>         local buffer = self.buffer
>>         self.buffer = nil
>>         return self.buffer
>>     end
>>     if self.stopped then return nil, "stopped" end
>>     if self.paused then self.paused(self) self.paused = false end
>>     self.readco, self.decoder = crunning(), function(buffer)
>>         return buffer
>>     end
>>     local result, err = cyield()
>>     self.readco, self.decoder = nil, nil
>>     return result, err
>> end
>> 
>> return function() return setmetatable({}, MT_Reader) end
>> ```
>> 
>> An HTTP header decoder which reads a HTTP header as a table:
>> ```
>> local function decodeHead(buffer)
>>     local l, r = buffer:find("\r?\n\r?\n")
>>     if l and r then
>>         local head = buffer:sub(1, l - 1)
>>         local result, firstLine = {}, true
>>         for l in head:gmatch("([^\r\n]+)") do
>>             if firstLine then
>>                 local verb, resource = l:match("^([A-Z]+) ([^%s]+) 
>> HTTP/1%.[01]$")
>>                 assert(verb and resource, "bad request")
>>                 result.method, result.resource_orig = verb, resource
>>                 firstLine = false
>>             else
>>                 local k, v = l:match("^([A-Za-z0-9%-]+):%s?(.+)$")
>>                 assert(k and v, "bad request")
>>                 result.headers[k:lower()] = v
>>             end
>>         end
>>         return result, buffer:sub(r + 1, -1)
>>     end
>> end
>> ```
>> 
>> An FastCGI decoder which reads a FastCGI payload:
>> ```
>> local function decodeFCGI(buffer)
>>     if #buffer < 8 then return nil end
>>     local dl, pl = buffer:byte(5) * 0x100 + buffer:byte(6), buffer:byte(7)
>>     if #buffer >= dl + pl + 8 then
>>         local result = { buffer:byte(2), buffer:sub(9,  8 + dl) }
>>         return result, buffer:sub(9 + dl + pl, -1)
>>     end
>> end
>> ```
>> 
>> An WebSocket frame decoder:
>> ```
>> function decodeWSF(block)
>>     if #block < 2 then return end
>>     local b1, b2 = block:byte(1, 2)
>>     local skip, len = 2, band(b2, 0x7F)
>>     if len == 126 then
>>         if #block < 4 then return end
>>         local l1, l2 = block:byte(3, 4)
>>         len, skip = l1 * 0x100 + l2, 4
>>     elseif len == 127 then
>>         if #block < 10 then return end
>>         local l1, l2, l3, l4, l5, l6, l7, l8 = block:byte(3, 10)
>>         len = l1 * 0x100000000000000 + l2 * 0x1000000000000 +
>>               l3 * 0x10000000000 + l4 * 0x100000000 +
>>               l5 * 0x1000000 + l6 * 0x10000 + l7 * 0x100 + l8
>>         skip = 10
>>     end
>>     local mask = band(b2, 0x80) == 0x80
>>     if mask then
>>         if #block < skip + 4 then return end
>>         skip = skip + 4
>>         mask = { block:byte(skip - 3, skip) }
>>     end
>>     if #block < skip + len then return end
>>     local data = block:sub(skip + 1, skip + len)
>>     if mask then
>>         local new = {}
>>         for i = 1, #data do
>>             new[i] = bxor(data:byte(i, i), mask[(i - 1) % 4 + 1])
>>         end
>>         data = schar(unpack(new))
>>     end
>>     local result = { band(b1, 0xF), data }
>>     result.FIN = band(b1, 0x80) == 0x80
>>     return result, block:sub(skip + len + 1, -1)
>> end
>> ```
>> 
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "luvit" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected].
>> For more options, visit https://groups.google.com/d/optout.
> 
> -- 
> You received this message because you are subscribed to a topic in the Google 
> Groups "luvit" group.
> To unsubscribe from this topic, visit 
> https://groups.google.com/d/topic/luvit/y6ZB7dyAwzo/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to 
> [email protected].
> For more options, visit https://groups.google.com/d/optout.

-- 
You received this message because you are subscribed to the Google Groups 
"luvit" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to