I have finally found what is wrong. Readable.push() may call _read again
synchrnously so whole thing becomes recursive and buffers are swapped as I
am adjusting position after call to push.



2014-02-12 18:50 GMT+02:00 Denys Khanzhyiev <[email protected]>:

> It looks like reading from growing file is not reliable at all . I have
> tested on real app again and reads from growing file return wrong chunks
> when close to current end of file. I even can not rely on
> fs.writableStream.bytesWritten value as it is always greater than reading
> stream position, and reading still gets wrong data.
>
> How can I detect what part of file it is safe to read?
>
>
> 2014-02-12 9:16 GMT+02:00 Denys Khanzhyiev <[email protected]>:
>
> Thank you Michael,
>>
>> Yes return is missing before setTimeout. I have updated gist. But that
>> does not change result.
>> https://gist.github.com/xdenser/8944752
>> I think this is small enough.
>> What I am trying to acheeve is to create "CatchStream" - a stream reading
>> from growing file, where growing file is downloaded by http request.
>> I also need CatchStream to slow down when needed by stream reading from
>> it. So I try to create implementation of nodejs Readable stream.
>>
>> My gen() function emulates that growing file and is working Ok - it
>> generates file with chunks of growing bytes.
>>
>> The problem is with CatchStream it sometimes reads chunks with wrong
>> order it is checked  at line #85 of gist. I also have noted that swapped
>> chunks apear after 'readable' event -
>> i.e after 'readable' I call fs.ReadStream.read and get chunk N+1 then N
>> then N+2 then N+3 then fs.ReadStream.read returns empty buffer and I wait
>> for next 'readable'.
>>
>> The problem is not with fs.ReadStream. I have added read function that
>> does not have the problem. But I need Readable stream implementation.
>>
>>
>> 2014-02-12 5:26 GMT+02:00 Michael Hart <[email protected]>:
>>
>> Your `writeChunk` call seems a bit fragile with the `setTimeout` there -
>>> I commented on the gist.
>>>
>>> I think a smaller test case might be easier to read/understand what
>>> you're trying to achieve.
>>>
>>>
>>> On Wednesday, 12 February 2014 08:48:48 UTC+11, Denys Khanzhiyev wrote:
>>>
>>>> Ok I have created simple gist to test this.
>>>>
>>>> https://gist.github.com/xdenser/8944752
>>>>
>>>> Just run file and you should see
>>>>
>>>> WTF! 49152
>>>> WTF! 114688
>>>> WTF! 180224
>>>> WTF! 245760
>>>> ...
>>>> WTF! 4112384
>>>> writeStream finish
>>>> WTF! 4177920
>>>> read Stream Finished
>>>>
>>>> if you see just
>>>>
>>>> writeStream finish
>>>> read Stream Finished
>>>>
>>>> it works as expected.
>>>>
>>>> what I made is two "simultanous" streams
>>>>
>>>> 1. writes growing numbers into file and slows down from time to time
>>>> 2. my custom read stream implementation to catch on that written file
>>>>
>>>> WTF! mean that it read a number which is less than previous. And that
>>>> is not possible as numbers are growing.
>>>>
>>>> Somebody can explain,please, what is wrong with my CatchStream?
>>>>
>>>> tested on node v0.10.24 and v0.10.16 under W7
>>>>
>>>>
>>>>
>>>> 2014-02-11 13:37 GMT+02:00 greelgorke <[email protected]>:
>>>>
>>>>> In Addition to that, whenthe counter can emit something
>>>>>
>>>>>
>>>>> counter._transform = function(chunk, encoding, callback){
>>>>>   count += (chunk && chunk.length) || 0
>>>>>   this.push(chunk)
>>>>>   if(count >= threshhold) this.emit('threshhold_reached')
>>>>>   callback()
>>>>> }
>>>>>
>>>>>
>>>>> Am Dienstag, 11. Februar 2014 12:36:20 UTC+1 schrieb greelgorke:
>>>>>
>>>>>> you can count bytes with by putting a passthrough stream in between
>>>>>>
>>>>>> var counter = new stream.Transform
>>>>>> var count = 0
>>>>>> counter._transform = function(chunk, encoding, callback){
>>>>>>   count += (chunk && chunk.length) || 0
>>>>>>   this.push(chunk)
>>>>>>   callback()
>>>>>> }
>>>>>>
>>>>>>
>>>>>> slowSourceStream.pipe(counter).pipe(destinationStream)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am Samstag, 8. Februar 2014 01:12:37 UTC+1 schrieb Denys Khanzhiyev:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have a task where one slow stream is piped to fs.writeStream and
>>>>>>> after some event I need to read from that writen file, i.e. read from
>>>>>>> growing file from some position. A have seen `node-growing-file`, and
>>>>>>> `tailing-stream` nothing seems to solve my problem.
>>>>>>>
>>>>>>> It looks like I do not understand how streams work
>>>>>>> Here is my helper object (though it is called PxyStream it is not
>>>>>>> stream in fact),
>>>>>>>
>>>>>>> var
>>>>>>>    fs = require('fs');
>>>>>>>
>>>>>>> function PxyStream(path,readStream,writeStream,start,end){
>>>>>>>     this.path = path;
>>>>>>>     this.readStream = readStream;
>>>>>>>     this.writeStream = writeStream;
>>>>>>>     this._offset = start;
>>>>>>>     this.endPos = end;
>>>>>>>     this.writeStream.on('finish',function(){
>>>>>>>         this._writeStreamFinished = true;
>>>>>>>         this.nextStream();
>>>>>>>     }.bind(this))
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> PxyStream.prototype.pipe = function(destination){
>>>>>>>     this.destination = destination;
>>>>>>>     this.nextStream();
>>>>>>> }
>>>>>>>
>>>>>>> PxyStream.prototype.nextStream = function(){
>>>>>>>     if(!this._stream){
>>>>>>>         var options = {
>>>>>>>            start: this._offset
>>>>>>>         };
>>>>>>>         var last = this._writeStreamFinished;
>>>>>>>         console.log('new read stream',this._offset, last);
>>>>>>>         this._stream =  fs.createReadStream(this.path,options);
>>>>>>>         this._stream.pipe(this.destination,{end: false});
>>>>>>>         this._stream.on('data',function(data){
>>>>>>>             this._offset += data.length;
>>>>>>>         }.bind(this));
>>>>>>>
>>>>>>>         this._stream.on('end',function(){
>>>>>>>             console.log('read stream end',this._offset, last);
>>>>>>>            this._stream.unpipe();
>>>>>>>            this._stream = null;
>>>>>>>            if(last) {
>>>>>>>                this.destination.emit('end');
>>>>>>>            }
>>>>>>>            this._watch();
>>>>>>>         }.bind(this));
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> PxyStream.prototype._watch = function(){
>>>>>>>    this.readStream.once('data',function(){
>>>>>>>        this.nextStream();
>>>>>>>    }.bind(this))
>>>>>>>  }
>>>>>>>
>>>>>>> exports.PxyStream = PxyStream;
>>>>>>>
>>>>>>>
>>>>>>> I am using it as
>>>>>>>
>>>>>>> pxyStream = new PxyStream(filePath,<slowReadStream>,
>>>>>>> <fsWriteStream>,start,null);
>>>>>>> // i need end position too but lets skip it for now
>>>>>>> pxyStream.pipe(<otherSlowStream>);
>>>>>>>
>>>>>>> my problem is I see 'read stream end' message far before
>>>>>>> otherSlowStream ends.
>>>>>>> In fact it never ends, but i can see its progress.
>>>>>>> Actually destination is http.response stream.
>>>>>>> I thought stream.pipe should slow down reading in order to keep
>>>>>>> buffers short.
>>>>>>> Maybe attached 'data' event makes it read fast, but how can I count
>>>>>>> bytes then?
>>>>>>> The other problem is I can not end destination properly.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  --
>>>>> --
>>>>> Job Board: http://jobs.nodejs.org/
>>>>> Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List-
>>>>> Posting-Guidelines
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "nodejs" group.
>>>>> To post to this group, send email to [email protected]
>>>>>
>>>>> To unsubscribe from this group, send email to
>>>>> [email protected]
>>>>>
>>>>> For more options, visit this group at
>>>>> http://groups.google.com/group/nodejs?hl=en?hl=en
>>>>>
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "nodejs" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>>
>>>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>>>
>>>>
>>>>  --
>>> --
>>> Job Board: http://jobs.nodejs.org/
>>> Posting guidelines:
>>> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
>>> You received this message because you are subscribed to the Google
>>> Groups "nodejs" group.
>>> To post to this group, send email to [email protected]
>>> To unsubscribe from this group, send email to
>>> [email protected]
>>> For more options, visit this group at
>>> http://groups.google.com/group/nodejs?hl=en?hl=en
>>>
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "nodejs" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>
>>
>>
>

-- 
-- 
Job Board: http://jobs.nodejs.org/
Posting guidelines: 
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

--- 
You received this message because you are subscribed to the Google Groups 
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to