I write my latest parser (for a new binary syncing protocol I'm designing)
in your decoder style.  I really enjoyed it.  I did have two questions
though:

 - How do you signify errors in the decoder?  What if the input data is
malformed?
 - Also, is there a way to signify end of stream?   I normally emit nil for
EOS, but here nil means more input wanted.

https://github.com/luvit/lit/blob/d8034de01b90cf6805e85324955045c9c1f4590a/server.lua#L19-L86

On Mon, Nov 24, 2014 at 8:12 PM, <[email protected]> wrote:

> I am learning about channels these days.
>
> This reader design doesn't care how we write, because blocking writing
> sometimes causes troubles. For example, if we are making an IRC chatroom,
> we had to start lots of threads to start sending operations at the same
> time. So writing cannot be limited blocking unlike reading. I can't find a
> situation when non-blocking reading is necessary.
>
> For performance problems, the main source is the string concat. So there's
> a Peek method lets the reader just redirect the pushed chunk to avoid
> concats. When we already know how many bytes to read, for example by http
> content-length, we can use it.
>
> 在 2014年11月25日,3:48,Tim Caswell <[email protected]> 写道:
>
> This is a pretty good design.  I especially like the part about the reader
> simply suspending when it needs more data to keep things simple and out of
> callback crazyness.
>
> For comparison, have you see the channel style I've been starting to use
> in the new luvi-up branch of luvit?
>
> Basically an app or codec, or whatever is implemented as a coroutine with
> a blocking read and write function.  Read will return the next chunk from
> it's source and write will send data to it's output.  If there is nothing
> to read, it will suspend.  It will also suspend on write if the thing
> you're writing to isn't ready for more data.
>
>
> function (read, write)
> end
>
> The thing I like about your decoder being a separate function is it
> doesn't involve any coroutines or callbacks.  It's clever how it handles
> too much data and not enough data.  Though I do worry about bad performance
> when input chunks are huge and contain a large number of output events.
> That fear is of course based on gut feeling and not actual use of
> benchmarks.  WIth tail calls in lua and not hitting coroutine APIs, this
> could be quite fast in luajit.
>
>
> On Sat, Nov 22, 2014 at 10:38 PM, Zyxwvu S <[email protected]> wrote:
>
>> I want to share this design to luvit developers because with this design
>> takes advantages of Lua when we are parsing a stream. This design is very
>> flexable and worked very well in one of my projects.
>>
>> ** Stream*
>>
>> Stream is a series of ordered data-chunks like what uv_read_cb provides.
>> We should just push those data-chunks to the reader and they will be
>> concatenated by the reader as the buffer and passed to the decoder.
>>
>> ** Reader*
>>
>> Reader is the main thing of this design. It's a Lua object that you push
>> data chunks from a stream to it and read Lua values from it. When there is
>> no enough data it will yield the thread and when there's been enough data
>> decoded, resumes the coroutine.
>>
>> Note: the Reader can be read from one coroutine at the one time. If an
>> queue is implemented this limit will disappear, but the queue seems never
>> required. Data chunks can be pushed from any thread.
>>
>> ** Decoder*
>>
>> Decoder is a Lua function that receives a string of the Reader's buffer
>> and returns an Lua value, plus the rest buffer if any. If the buffer
>> doesn't have enough data, return nil.  If it encounters a bad buffer, it
>> should throw a Lua error.
>>
>> *If we want to optimize the program, we can simply implement the decoder
>> in C.*
>>
>> These code will express more clear.
>>
>> ```
>> local MT_Reader = {}
>>
>> -- Tell the reader no more data will be pushed.
>> -- If the reader want to continue reading, it will call onrestore first.
>> -- Pause is used to avoid too much data buffered.
>> function MT_Reader:Pause(onrestore)
>>     assert(not self.stopped, "already paused")
>>     self.paused = onrestore
>>     if self.decoder then
>>         SafeResume(self.readco, nil, "paused")
>>     end
>> end
>>
>> -- Push a chunk.
>> -- If str is nil, no more data will be pushed to this reader. All read
>> requests will return nil plus "stopped".
>> -- When str is nil, if there is also a err, the err will be returned by
>> the pending Read.
>> -- This returns the length of the buffer. We can use it to check if we
>> need to pause reading.
>> function MT_Reader:Push(str, err)
>>     if not str then
>>         self.stopped = true
>>         if self.decoder then
>>             SafeResume(self.readco, nil, err or "stopped")
>>         end
>>     elseif self.buffer then
>>         self.buffer = self.buffer .. str
>>         if self.decoder then
>>             local s, result, rest = pcall(self.decoder, self.buffer)
>>             if not s then
>>                 SafeResume(self.readco, nil, result)
>>             elseif result then
>>                 if rest and #rest > 0 then
>>                     self.buffer = rest
>>                 else
>>                     self.buffer = nil
>>                 end
>>                 SafeResume(self.readco, result)
>>             end
>>         end
>>     else
>>         if self.decoder then
>>             local s, result, rest = pcall(self.decoder, str)
>>             if not s then
>>                 self.buffer = str
>>                 SafeResume(self.readco, nil, result)
>>             elseif result then
>>                 if rest and #rest > 0 then
>>                     self.buffer = rest
>>                 end
>>                 SafeResume(self.readco, result)
>>             else
>>                 self.buffer = str
>>             end
>>         else
>>             self.buffer = str
>>         end
>>     end
>>     if self.buffer then return #self.buffer else return 0 end
>> end
>>
>> -- Read data with the decoder.
>> function MT_Reader:Read(decoder)
>>     assert(not self.decoder, "already reading")
>>     if self.buffer then
>>         local s, result, rest = pcall(decoder, self.buffer)
>>         if not s then
>>             return nil, result
>>         elseif result then
>>             if rest and #rest > 0 then
>>                 self.buffer = rest
>>             else
>>                 self.buffer = nil
>>             end
>>             return result
>>         end
>>     end
>>     if self.stopped then return nil, "stopped" end
>>     if self.paused then self.paused(self) self.paused = false end
>>     self.readco, self.decoder = crunning(), decoder
>>     local result, err = cyield()
>>     self.readco, self.decoder = nil, nil
>>     return result, err
>> end
>>
>> -- Read data of the length.
>> function MT_Reader:Get(len)
>>     return self:Read(function(buffer)
>>         if #buffer == len then return buffer elseif #buffer > len then
>>             return buffer:sub(1, len), buffer:sub(len + 1, -1)
>>         end
>>     end)
>> end
>>
>> -- This equals with:
>> --    Reader:Read(function(buffer) return buffer end)
>> -- but works faster if there's been buffered data.
>> -- If len is provided, this can be used to limit the length readed data,
>> -- but it will be not optimized.
>> function MT_Reader:Peek(len)
>>     if len then
>>         return self:Read(function(buffer)
>>             if #buffer <= len then return buffer elseif #buffer > len then
>>                 return buffer:sub(1, len), buffer:sub(len + 1, -1)
>>             end
>>         end)
>>     end
>>     assert(not self.decoder, "already reading")
>>     if self.buffer then
>>         local buffer = self.buffer
>>         self.buffer = nil
>>         return self.buffer
>>     end
>>     if self.stopped then return nil, "stopped" end
>>     if self.paused then self.paused(self) self.paused = false end
>>     self.readco, self.decoder = crunning(), function(buffer)
>>         return buffer
>>     end
>>     local result, err = cyield()
>>     self.readco, self.decoder = nil, nil
>>     return result, err
>> end
>>
>> return function() return setmetatable({}, MT_Reader) end
>> ```
>>
>> An HTTP header decoder which reads a HTTP header as a table:
>> ```
>> local function decodeHead(buffer)
>>     local l, r = buffer:find("\r?\n\r?\n")
>>     if l and r then
>>         local head = buffer:sub(1, l - 1)
>>         local result, firstLine = {}, true
>>         for l in head:gmatch("([^\r\n]+)") do
>>             if firstLine then
>>                 local verb, resource = l:match("^([A-Z]+) ([^%s]+)
>> HTTP/1%.[01]$")
>>                 assert(verb and resource, "bad request")
>>                 result.method, result.resource_orig = verb, resource
>>                 firstLine = false
>>             else
>>                 local k, v = l:match("^([A-Za-z0-9%-]+):%s?(.+)$")
>>                 assert(k and v, "bad request")
>>                 result.headers[k:lower()] = v
>>             end
>>         end
>>         return result, buffer:sub(r + 1, -1)
>>     end
>> end
>> ```
>>
>> An FastCGI decoder which reads a FastCGI payload:
>> ```
>> local function decodeFCGI(buffer)
>>     if #buffer < 8 then return nil end
>>     local dl, pl = buffer:byte(5) * 0x100 + buffer:byte(6), buffer:byte(7)
>>     if #buffer >= dl + pl + 8 then
>>         local result = { buffer:byte(2), buffer:sub(9,  8 + dl) }
>>         return result, buffer:sub(9 + dl + pl, -1)
>>     end
>> end
>> ```
>>
>> An WebSocket frame decoder:
>> ```
>> function decodeWSF(block)
>>     if #block < 2 then return end
>>     local b1, b2 = block:byte(1, 2)
>>     local skip, len = 2, band(b2, 0x7F)
>>     if len == 126 then
>>         if #block < 4 then return end
>>         local l1, l2 = block:byte(3, 4)
>>         len, skip = l1 * 0x100 + l2, 4
>>     elseif len == 127 then
>>         if #block < 10 then return end
>>         local l1, l2, l3, l4, l5, l6, l7, l8 = block:byte(3, 10)
>>         len = l1 * 0x100000000000000 + l2 * 0x1000000000000 +
>>               l3 * 0x10000000000 + l4 * 0x100000000 +
>>               l5 * 0x1000000 + l6 * 0x10000 + l7 * 0x100 + l8
>>         skip = 10
>>     end
>>     local mask = band(b2, 0x80) == 0x80
>>     if mask then
>>         if #block < skip + 4 then return end
>>         skip = skip + 4
>>         mask = { block:byte(skip - 3, skip) }
>>     end
>>     if #block < skip + len then return end
>>     local data = block:sub(skip + 1, skip + len)
>>     if mask then
>>         local new = {}
>>         for i = 1, #data do
>>             new[i] = bxor(data:byte(i, i), mask[(i - 1) % 4 + 1])
>>         end
>>         data = schar(unpack(new))
>>     end
>>     local result = { band(b1, 0xF), data }
>>     result.FIN = band(b1, 0x80) == 0x80
>>     return result, block:sub(skip + len + 1, -1)
>> end
>> ```
>>
>>  --
>> You received this message because you are subscribed to the Google Groups
>> "luvit" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>  --
> You received this message because you are subscribed to a topic in the
> Google Groups "luvit" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/luvit/y6ZB7dyAwzo/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> [email protected].
> For more options, visit https://groups.google.com/d/optout.
>
>  --
> You received this message because you are subscribed to the Google Groups
> "luvit" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google Groups 
"luvit" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to