Thank you Michael, Yes return is missing before setTimeout. I have updated gist. But that does not change result. https://gist.github.com/xdenser/8944752 I think this is small enough. What I am trying to acheeve is to create "CatchStream" - a stream reading from growing file, where growing file is downloaded by http request. I also need CatchStream to slow down when needed by stream reading from it. So I try to create implementation of nodejs Readable stream.
My gen() function emulates that growing file and is working Ok - it generates file with chunks of growing bytes. The problem is with CatchStream it sometimes reads chunks with wrong order it is checked at line #85 of gist. I also have noted that swapped chunks apear after 'readable' event - i.e after 'readable' I call fs.ReadStream.read and get chunk N+1 then N then N+2 then N+3 then fs.ReadStream.read returns empty buffer and I wait for next 'readable'. The problem is not with fs.ReadStream. I have added read function that does not have the problem. But I need Readable stream implementation. 2014-02-12 5:26 GMT+02:00 Michael Hart <[email protected]>: > Your `writeChunk` call seems a bit fragile with the `setTimeout` there - I > commented on the gist. > > I think a smaller test case might be easier to read/understand what you're > trying to achieve. > > > On Wednesday, 12 February 2014 08:48:48 UTC+11, Denys Khanzhiyev wrote: > >> Ok I have created simple gist to test this. >> >> https://gist.github.com/xdenser/8944752 >> >> Just run file and you should see >> >> WTF! 49152 >> WTF! 114688 >> WTF! 180224 >> WTF! 245760 >> ... >> WTF! 4112384 >> writeStream finish >> WTF! 4177920 >> read Stream Finished >> >> if you see just >> >> writeStream finish >> read Stream Finished >> >> it works as expected. >> >> what I made is two "simultanous" streams >> >> 1. writes growing numbers into file and slows down from time to time >> 2. my custom read stream implementation to catch on that written file >> >> WTF! mean that it read a number which is less than previous. And that is >> not possible as numbers are growing. >> >> Somebody can explain,please, what is wrong with my CatchStream? >> >> tested on node v0.10.24 and v0.10.16 under W7 >> >> >> >> 2014-02-11 13:37 GMT+02:00 greelgorke <[email protected]>: >> >>> In Addition to that, whenthe counter can emit something >>> >>> >>> counter._transform = function(chunk, encoding, callback){ >>> count += (chunk && chunk.length) || 0 >>> this.push(chunk) >>> if(count >= threshhold) this.emit('threshhold_reached') >>> callback() >>> } >>> >>> >>> Am Dienstag, 11. Februar 2014 12:36:20 UTC+1 schrieb greelgorke: >>> >>>> you can count bytes with by putting a passthrough stream in between >>>> >>>> var counter = new stream.Transform >>>> var count = 0 >>>> counter._transform = function(chunk, encoding, callback){ >>>> count += (chunk && chunk.length) || 0 >>>> this.push(chunk) >>>> callback() >>>> } >>>> >>>> >>>> slowSourceStream.pipe(counter).pipe(destinationStream) >>>> >>>> >>>> >>>> Am Samstag, 8. Februar 2014 01:12:37 UTC+1 schrieb Denys Khanzhiyev: >>>>> >>>>> Hello, >>>>> >>>>> I have a task where one slow stream is piped to fs.writeStream and >>>>> after some event I need to read from that writen file, i.e. read from >>>>> growing file from some position. A have seen `node-growing-file`, and >>>>> `tailing-stream` nothing seems to solve my problem. >>>>> >>>>> It looks like I do not understand how streams work >>>>> Here is my helper object (though it is called PxyStream it is not >>>>> stream in fact), >>>>> >>>>> var >>>>> fs = require('fs'); >>>>> >>>>> function PxyStream(path,readStream,writeStream,start,end){ >>>>> this.path = path; >>>>> this.readStream = readStream; >>>>> this.writeStream = writeStream; >>>>> this._offset = start; >>>>> this.endPos = end; >>>>> this.writeStream.on('finish',function(){ >>>>> this._writeStreamFinished = true; >>>>> this.nextStream(); >>>>> }.bind(this)) >>>>> } >>>>> >>>>> >>>>> PxyStream.prototype.pipe = function(destination){ >>>>> this.destination = destination; >>>>> this.nextStream(); >>>>> } >>>>> >>>>> PxyStream.prototype.nextStream = function(){ >>>>> if(!this._stream){ >>>>> var options = { >>>>> start: this._offset >>>>> }; >>>>> var last = this._writeStreamFinished; >>>>> console.log('new read stream',this._offset, last); >>>>> this._stream = fs.createReadStream(this.path,options); >>>>> this._stream.pipe(this.destination,{end: false}); >>>>> this._stream.on('data',function(data){ >>>>> this._offset += data.length; >>>>> }.bind(this)); >>>>> >>>>> this._stream.on('end',function(){ >>>>> console.log('read stream end',this._offset, last); >>>>> this._stream.unpipe(); >>>>> this._stream = null; >>>>> if(last) { >>>>> this.destination.emit('end'); >>>>> } >>>>> this._watch(); >>>>> }.bind(this)); >>>>> } >>>>> } >>>>> >>>>> PxyStream.prototype._watch = function(){ >>>>> this.readStream.once('data',function(){ >>>>> this.nextStream(); >>>>> }.bind(this)) >>>>> } >>>>> >>>>> exports.PxyStream = PxyStream; >>>>> >>>>> >>>>> I am using it as >>>>> >>>>> pxyStream = new PxyStream(filePath,<slowReadStream>, >>>>> <fsWriteStream>,start,null); >>>>> // i need end position too but lets skip it for now >>>>> pxyStream.pipe(<otherSlowStream>); >>>>> >>>>> my problem is I see 'read stream end' message far before >>>>> otherSlowStream ends. >>>>> In fact it never ends, but i can see its progress. >>>>> Actually destination is http.response stream. >>>>> I thought stream.pipe should slow down reading in order to keep >>>>> buffers short. >>>>> Maybe attached 'data' event makes it read fast, but how can I count >>>>> bytes then? >>>>> The other problem is I can not end destination properly. >>>>> >>>>> >>>>> >>>>> -- >>> -- >>> Job Board: http://jobs.nodejs.org/ >>> Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List- >>> Posting-Guidelines >>> You received this message because you are subscribed to the Google >>> Groups "nodejs" group. >>> To post to this group, send email to [email protected] >>> >>> To unsubscribe from this group, send email to >>> [email protected] >>> >>> For more options, visit this group at >>> http://groups.google.com/group/nodejs?hl=en?hl=en >>> >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "nodejs" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> >>> For more options, visit https://groups.google.com/groups/opt_out. >>> >> >> -- > -- > Job Board: http://jobs.nodejs.org/ > Posting guidelines: > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines > You received this message because you are subscribed to the Google > Groups "nodejs" group. > To post to this group, send email to [email protected] > To unsubscribe from this group, send email to > [email protected] > For more options, visit this group at > http://groups.google.com/group/nodejs?hl=en?hl=en > > --- > You received this message because you are subscribed to the Google Groups > "nodejs" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > For more options, visit https://groups.google.com/groups/opt_out. > -- -- Job Board: http://jobs.nodejs.org/ Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines You received this message because you are subscribed to the Google Groups "nodejs" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/nodejs?hl=en?hl=en --- You received this message because you are subscribed to the Google Groups "nodejs" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/groups/opt_out.
