I write my latest parser (for a new binary syncing protocol I'm designing) in your decoder style. I really enjoyed it. I did have two questions though:
- How do you signify errors in the decoder? What if the input data is malformed? - Also, is there a way to signify end of stream? I normally emit nil for EOS, but here nil means more input wanted. https://github.com/luvit/lit/blob/d8034de01b90cf6805e85324955045c9c1f4590a/server.lua#L19-L86 On Mon, Nov 24, 2014 at 8:12 PM, <[email protected]> wrote: > I am learning about channels these days. > > This reader design doesn't care how we write, because blocking writing > sometimes causes troubles. For example, if we are making an IRC chatroom, > we had to start lots of threads to start sending operations at the same > time. So writing cannot be limited blocking unlike reading. I can't find a > situation when non-blocking reading is necessary. > > For performance problems, the main source is the string concat. So there's > a Peek method lets the reader just redirect the pushed chunk to avoid > concats. When we already know how many bytes to read, for example by http > content-length, we can use it. > > 在 2014年11月25日,3:48,Tim Caswell <[email protected]> 写道: > > This is a pretty good design. I especially like the part about the reader > simply suspending when it needs more data to keep things simple and out of > callback crazyness. > > For comparison, have you see the channel style I've been starting to use > in the new luvi-up branch of luvit? > > Basically an app or codec, or whatever is implemented as a coroutine with > a blocking read and write function. Read will return the next chunk from > it's source and write will send data to it's output. If there is nothing > to read, it will suspend. It will also suspend on write if the thing > you're writing to isn't ready for more data. > > > function (read, write) > end > > The thing I like about your decoder being a separate function is it > doesn't involve any coroutines or callbacks. It's clever how it handles > too much data and not enough data. Though I do worry about bad performance > when input chunks are huge and contain a large number of output events. > That fear is of course based on gut feeling and not actual use of > benchmarks. WIth tail calls in lua and not hitting coroutine APIs, this > could be quite fast in luajit. > > > On Sat, Nov 22, 2014 at 10:38 PM, Zyxwvu S <[email protected]> wrote: > >> I want to share this design to luvit developers because with this design >> takes advantages of Lua when we are parsing a stream. This design is very >> flexable and worked very well in one of my projects. >> >> ** Stream* >> >> Stream is a series of ordered data-chunks like what uv_read_cb provides. >> We should just push those data-chunks to the reader and they will be >> concatenated by the reader as the buffer and passed to the decoder. >> >> ** Reader* >> >> Reader is the main thing of this design. It's a Lua object that you push >> data chunks from a stream to it and read Lua values from it. When there is >> no enough data it will yield the thread and when there's been enough data >> decoded, resumes the coroutine. >> >> Note: the Reader can be read from one coroutine at the one time. If an >> queue is implemented this limit will disappear, but the queue seems never >> required. Data chunks can be pushed from any thread. >> >> ** Decoder* >> >> Decoder is a Lua function that receives a string of the Reader's buffer >> and returns an Lua value, plus the rest buffer if any. If the buffer >> doesn't have enough data, return nil. If it encounters a bad buffer, it >> should throw a Lua error. >> >> *If we want to optimize the program, we can simply implement the decoder >> in C.* >> >> These code will express more clear. >> >> ``` >> local MT_Reader = {} >> >> -- Tell the reader no more data will be pushed. >> -- If the reader want to continue reading, it will call onrestore first. >> -- Pause is used to avoid too much data buffered. >> function MT_Reader:Pause(onrestore) >> assert(not self.stopped, "already paused") >> self.paused = onrestore >> if self.decoder then >> SafeResume(self.readco, nil, "paused") >> end >> end >> >> -- Push a chunk. >> -- If str is nil, no more data will be pushed to this reader. All read >> requests will return nil plus "stopped". >> -- When str is nil, if there is also a err, the err will be returned by >> the pending Read. >> -- This returns the length of the buffer. We can use it to check if we >> need to pause reading. >> function MT_Reader:Push(str, err) >> if not str then >> self.stopped = true >> if self.decoder then >> SafeResume(self.readco, nil, err or "stopped") >> end >> elseif self.buffer then >> self.buffer = self.buffer .. str >> if self.decoder then >> local s, result, rest = pcall(self.decoder, self.buffer) >> if not s then >> SafeResume(self.readco, nil, result) >> elseif result then >> if rest and #rest > 0 then >> self.buffer = rest >> else >> self.buffer = nil >> end >> SafeResume(self.readco, result) >> end >> end >> else >> if self.decoder then >> local s, result, rest = pcall(self.decoder, str) >> if not s then >> self.buffer = str >> SafeResume(self.readco, nil, result) >> elseif result then >> if rest and #rest > 0 then >> self.buffer = rest >> end >> SafeResume(self.readco, result) >> else >> self.buffer = str >> end >> else >> self.buffer = str >> end >> end >> if self.buffer then return #self.buffer else return 0 end >> end >> >> -- Read data with the decoder. >> function MT_Reader:Read(decoder) >> assert(not self.decoder, "already reading") >> if self.buffer then >> local s, result, rest = pcall(decoder, self.buffer) >> if not s then >> return nil, result >> elseif result then >> if rest and #rest > 0 then >> self.buffer = rest >> else >> self.buffer = nil >> end >> return result >> end >> end >> if self.stopped then return nil, "stopped" end >> if self.paused then self.paused(self) self.paused = false end >> self.readco, self.decoder = crunning(), decoder >> local result, err = cyield() >> self.readco, self.decoder = nil, nil >> return result, err >> end >> >> -- Read data of the length. >> function MT_Reader:Get(len) >> return self:Read(function(buffer) >> if #buffer == len then return buffer elseif #buffer > len then >> return buffer:sub(1, len), buffer:sub(len + 1, -1) >> end >> end) >> end >> >> -- This equals with: >> -- Reader:Read(function(buffer) return buffer end) >> -- but works faster if there's been buffered data. >> -- If len is provided, this can be used to limit the length readed data, >> -- but it will be not optimized. >> function MT_Reader:Peek(len) >> if len then >> return self:Read(function(buffer) >> if #buffer <= len then return buffer elseif #buffer > len then >> return buffer:sub(1, len), buffer:sub(len + 1, -1) >> end >> end) >> end >> assert(not self.decoder, "already reading") >> if self.buffer then >> local buffer = self.buffer >> self.buffer = nil >> return self.buffer >> end >> if self.stopped then return nil, "stopped" end >> if self.paused then self.paused(self) self.paused = false end >> self.readco, self.decoder = crunning(), function(buffer) >> return buffer >> end >> local result, err = cyield() >> self.readco, self.decoder = nil, nil >> return result, err >> end >> >> return function() return setmetatable({}, MT_Reader) end >> ``` >> >> An HTTP header decoder which reads a HTTP header as a table: >> ``` >> local function decodeHead(buffer) >> local l, r = buffer:find("\r?\n\r?\n") >> if l and r then >> local head = buffer:sub(1, l - 1) >> local result, firstLine = {}, true >> for l in head:gmatch("([^\r\n]+)") do >> if firstLine then >> local verb, resource = l:match("^([A-Z]+) ([^%s]+) >> HTTP/1%.[01]$") >> assert(verb and resource, "bad request") >> result.method, result.resource_orig = verb, resource >> firstLine = false >> else >> local k, v = l:match("^([A-Za-z0-9%-]+):%s?(.+)$") >> assert(k and v, "bad request") >> result.headers[k:lower()] = v >> end >> end >> return result, buffer:sub(r + 1, -1) >> end >> end >> ``` >> >> An FastCGI decoder which reads a FastCGI payload: >> ``` >> local function decodeFCGI(buffer) >> if #buffer < 8 then return nil end >> local dl, pl = buffer:byte(5) * 0x100 + buffer:byte(6), buffer:byte(7) >> if #buffer >= dl + pl + 8 then >> local result = { buffer:byte(2), buffer:sub(9, 8 + dl) } >> return result, buffer:sub(9 + dl + pl, -1) >> end >> end >> ``` >> >> An WebSocket frame decoder: >> ``` >> function decodeWSF(block) >> if #block < 2 then return end >> local b1, b2 = block:byte(1, 2) >> local skip, len = 2, band(b2, 0x7F) >> if len == 126 then >> if #block < 4 then return end >> local l1, l2 = block:byte(3, 4) >> len, skip = l1 * 0x100 + l2, 4 >> elseif len == 127 then >> if #block < 10 then return end >> local l1, l2, l3, l4, l5, l6, l7, l8 = block:byte(3, 10) >> len = l1 * 0x100000000000000 + l2 * 0x1000000000000 + >> l3 * 0x10000000000 + l4 * 0x100000000 + >> l5 * 0x1000000 + l6 * 0x10000 + l7 * 0x100 + l8 >> skip = 10 >> end >> local mask = band(b2, 0x80) == 0x80 >> if mask then >> if #block < skip + 4 then return end >> skip = skip + 4 >> mask = { block:byte(skip - 3, skip) } >> end >> if #block < skip + len then return end >> local data = block:sub(skip + 1, skip + len) >> if mask then >> local new = {} >> for i = 1, #data do >> new[i] = bxor(data:byte(i, i), mask[(i - 1) % 4 + 1]) >> end >> data = schar(unpack(new)) >> end >> local result = { band(b1, 0xF), data } >> result.FIN = band(b1, 0x80) == 0x80 >> return result, block:sub(skip + len + 1, -1) >> end >> ``` >> >> -- >> You received this message because you are subscribed to the Google Groups >> "luvit" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> For more options, visit https://groups.google.com/d/optout. >> > > -- > You received this message because you are subscribed to a topic in the > Google Groups "luvit" group. > To unsubscribe from this topic, visit > https://groups.google.com/d/topic/luvit/y6ZB7dyAwzo/unsubscribe. > To unsubscribe from this group and all its topics, send an email to > [email protected]. > For more options, visit https://groups.google.com/d/optout. > > -- > You received this message because you are subscribed to the Google Groups > "luvit" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > For more options, visit https://groups.google.com/d/optout. > -- You received this message because you are subscribed to the Google Groups "luvit" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
