I have finally found what is wrong. Readable.push() may call _read again synchrnously so whole thing becomes recursive and buffers are swapped as I am adjusting position after call to push.
2014-02-12 18:50 GMT+02:00 Denys Khanzhyiev <[email protected]>: > It looks like reading from growing file is not reliable at all . I have > tested on real app again and reads from growing file return wrong chunks > when close to current end of file. I even can not rely on > fs.writableStream.bytesWritten value as it is always greater than reading > stream position, and reading still gets wrong data. > > How can I detect what part of file it is safe to read? > > > 2014-02-12 9:16 GMT+02:00 Denys Khanzhyiev <[email protected]>: > > Thank you Michael, >> >> Yes return is missing before setTimeout. I have updated gist. But that >> does not change result. >> https://gist.github.com/xdenser/8944752 >> I think this is small enough. >> What I am trying to acheeve is to create "CatchStream" - a stream reading >> from growing file, where growing file is downloaded by http request. >> I also need CatchStream to slow down when needed by stream reading from >> it. So I try to create implementation of nodejs Readable stream. >> >> My gen() function emulates that growing file and is working Ok - it >> generates file with chunks of growing bytes. >> >> The problem is with CatchStream it sometimes reads chunks with wrong >> order it is checked at line #85 of gist. I also have noted that swapped >> chunks apear after 'readable' event - >> i.e after 'readable' I call fs.ReadStream.read and get chunk N+1 then N >> then N+2 then N+3 then fs.ReadStream.read returns empty buffer and I wait >> for next 'readable'. >> >> The problem is not with fs.ReadStream. I have added read function that >> does not have the problem. But I need Readable stream implementation. >> >> >> 2014-02-12 5:26 GMT+02:00 Michael Hart <[email protected]>: >> >> Your `writeChunk` call seems a bit fragile with the `setTimeout` there - >>> I commented on the gist. >>> >>> I think a smaller test case might be easier to read/understand what >>> you're trying to achieve. >>> >>> >>> On Wednesday, 12 February 2014 08:48:48 UTC+11, Denys Khanzhiyev wrote: >>> >>>> Ok I have created simple gist to test this. >>>> >>>> https://gist.github.com/xdenser/8944752 >>>> >>>> Just run file and you should see >>>> >>>> WTF! 49152 >>>> WTF! 114688 >>>> WTF! 180224 >>>> WTF! 245760 >>>> ... >>>> WTF! 4112384 >>>> writeStream finish >>>> WTF! 4177920 >>>> read Stream Finished >>>> >>>> if you see just >>>> >>>> writeStream finish >>>> read Stream Finished >>>> >>>> it works as expected. >>>> >>>> what I made is two "simultanous" streams >>>> >>>> 1. writes growing numbers into file and slows down from time to time >>>> 2. my custom read stream implementation to catch on that written file >>>> >>>> WTF! mean that it read a number which is less than previous. And that >>>> is not possible as numbers are growing. >>>> >>>> Somebody can explain,please, what is wrong with my CatchStream? >>>> >>>> tested on node v0.10.24 and v0.10.16 under W7 >>>> >>>> >>>> >>>> 2014-02-11 13:37 GMT+02:00 greelgorke <[email protected]>: >>>> >>>>> In Addition to that, whenthe counter can emit something >>>>> >>>>> >>>>> counter._transform = function(chunk, encoding, callback){ >>>>> count += (chunk && chunk.length) || 0 >>>>> this.push(chunk) >>>>> if(count >= threshhold) this.emit('threshhold_reached') >>>>> callback() >>>>> } >>>>> >>>>> >>>>> Am Dienstag, 11. Februar 2014 12:36:20 UTC+1 schrieb greelgorke: >>>>> >>>>>> you can count bytes with by putting a passthrough stream in between >>>>>> >>>>>> var counter = new stream.Transform >>>>>> var count = 0 >>>>>> counter._transform = function(chunk, encoding, callback){ >>>>>> count += (chunk && chunk.length) || 0 >>>>>> this.push(chunk) >>>>>> callback() >>>>>> } >>>>>> >>>>>> >>>>>> slowSourceStream.pipe(counter).pipe(destinationStream) >>>>>> >>>>>> >>>>>> >>>>>> Am Samstag, 8. Februar 2014 01:12:37 UTC+1 schrieb Denys Khanzhiyev: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I have a task where one slow stream is piped to fs.writeStream and >>>>>>> after some event I need to read from that writen file, i.e. read from >>>>>>> growing file from some position. A have seen `node-growing-file`, and >>>>>>> `tailing-stream` nothing seems to solve my problem. >>>>>>> >>>>>>> It looks like I do not understand how streams work >>>>>>> Here is my helper object (though it is called PxyStream it is not >>>>>>> stream in fact), >>>>>>> >>>>>>> var >>>>>>> fs = require('fs'); >>>>>>> >>>>>>> function PxyStream(path,readStream,writeStream,start,end){ >>>>>>> this.path = path; >>>>>>> this.readStream = readStream; >>>>>>> this.writeStream = writeStream; >>>>>>> this._offset = start; >>>>>>> this.endPos = end; >>>>>>> this.writeStream.on('finish',function(){ >>>>>>> this._writeStreamFinished = true; >>>>>>> this.nextStream(); >>>>>>> }.bind(this)) >>>>>>> } >>>>>>> >>>>>>> >>>>>>> PxyStream.prototype.pipe = function(destination){ >>>>>>> this.destination = destination; >>>>>>> this.nextStream(); >>>>>>> } >>>>>>> >>>>>>> PxyStream.prototype.nextStream = function(){ >>>>>>> if(!this._stream){ >>>>>>> var options = { >>>>>>> start: this._offset >>>>>>> }; >>>>>>> var last = this._writeStreamFinished; >>>>>>> console.log('new read stream',this._offset, last); >>>>>>> this._stream = fs.createReadStream(this.path,options); >>>>>>> this._stream.pipe(this.destination,{end: false}); >>>>>>> this._stream.on('data',function(data){ >>>>>>> this._offset += data.length; >>>>>>> }.bind(this)); >>>>>>> >>>>>>> this._stream.on('end',function(){ >>>>>>> console.log('read stream end',this._offset, last); >>>>>>> this._stream.unpipe(); >>>>>>> this._stream = null; >>>>>>> if(last) { >>>>>>> this.destination.emit('end'); >>>>>>> } >>>>>>> this._watch(); >>>>>>> }.bind(this)); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> PxyStream.prototype._watch = function(){ >>>>>>> this.readStream.once('data',function(){ >>>>>>> this.nextStream(); >>>>>>> }.bind(this)) >>>>>>> } >>>>>>> >>>>>>> exports.PxyStream = PxyStream; >>>>>>> >>>>>>> >>>>>>> I am using it as >>>>>>> >>>>>>> pxyStream = new PxyStream(filePath,<slowReadStream>, >>>>>>> <fsWriteStream>,start,null); >>>>>>> // i need end position too but lets skip it for now >>>>>>> pxyStream.pipe(<otherSlowStream>); >>>>>>> >>>>>>> my problem is I see 'read stream end' message far before >>>>>>> otherSlowStream ends. >>>>>>> In fact it never ends, but i can see its progress. >>>>>>> Actually destination is http.response stream. >>>>>>> I thought stream.pipe should slow down reading in order to keep >>>>>>> buffers short. >>>>>>> Maybe attached 'data' event makes it read fast, but how can I count >>>>>>> bytes then? >>>>>>> The other problem is I can not end destination properly. >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>> -- >>>>> Job Board: http://jobs.nodejs.org/ >>>>> Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List- >>>>> Posting-Guidelines >>>>> You received this message because you are subscribed to the Google >>>>> Groups "nodejs" group. >>>>> To post to this group, send email to [email protected] >>>>> >>>>> To unsubscribe from this group, send email to >>>>> [email protected] >>>>> >>>>> For more options, visit this group at >>>>> http://groups.google.com/group/nodejs?hl=en?hl=en >>>>> >>>>> --- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "nodejs" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to [email protected]. >>>>> >>>>> For more options, visit https://groups.google.com/groups/opt_out. >>>>> >>>> >>>> -- >>> -- >>> Job Board: http://jobs.nodejs.org/ >>> Posting guidelines: >>> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines >>> You received this message because you are subscribed to the Google >>> Groups "nodejs" group. >>> To post to this group, send email to [email protected] >>> To unsubscribe from this group, send email to >>> [email protected] >>> For more options, visit this group at >>> http://groups.google.com/group/nodejs?hl=en?hl=en >>> >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "nodejs" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> For more options, visit https://groups.google.com/groups/opt_out. >>> >> >> > -- -- Job Board: http://jobs.nodejs.org/ Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines You received this message because you are subscribed to the Google Groups "nodejs" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/nodejs?hl=en?hl=en --- You received this message because you are subscribed to the Google Groups "nodejs" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/groups/opt_out.
