Thank you Michael,

Yes return is missing before setTimeout. I have updated gist. But that does
not change result.
https://gist.github.com/xdenser/8944752
I think this is small enough.
What I am trying to acheeve is to create "CatchStream" - a stream reading
from growing file, where growing file is downloaded by http request.
I also need CatchStream to slow down when needed by stream reading from it.
So I try to create implementation of nodejs Readable stream.

My gen() function emulates that growing file and is working Ok - it
generates file with chunks of growing bytes.

The problem is with CatchStream it sometimes reads chunks with wrong order
it is checked  at line #85 of gist. I also have noted that swapped chunks
apear after 'readable' event -
i.e after 'readable' I call fs.ReadStream.read and get chunk N+1 then N
then N+2 then N+3 then fs.ReadStream.read returns empty buffer and I wait
for next 'readable'.

The problem is not with fs.ReadStream. I have added read function that does
not have the problem. But I need Readable stream implementation.


2014-02-12 5:26 GMT+02:00 Michael Hart <[email protected]>:

> Your `writeChunk` call seems a bit fragile with the `setTimeout` there - I
> commented on the gist.
>
> I think a smaller test case might be easier to read/understand what you're
> trying to achieve.
>
>
> On Wednesday, 12 February 2014 08:48:48 UTC+11, Denys Khanzhiyev wrote:
>
>> Ok I have created simple gist to test this.
>>
>> https://gist.github.com/xdenser/8944752
>>
>> Just run file and you should see
>>
>> WTF! 49152
>> WTF! 114688
>> WTF! 180224
>> WTF! 245760
>> ...
>> WTF! 4112384
>> writeStream finish
>> WTF! 4177920
>> read Stream Finished
>>
>> if you see just
>>
>> writeStream finish
>> read Stream Finished
>>
>> it works as expected.
>>
>> what I made is two "simultanous" streams
>>
>> 1. writes growing numbers into file and slows down from time to time
>> 2. my custom read stream implementation to catch on that written file
>>
>> WTF! mean that it read a number which is less than previous. And that is
>> not possible as numbers are growing.
>>
>> Somebody can explain,please, what is wrong with my CatchStream?
>>
>> tested on node v0.10.24 and v0.10.16 under W7
>>
>>
>>
>> 2014-02-11 13:37 GMT+02:00 greelgorke <[email protected]>:
>>
>>> In Addition to that, whenthe counter can emit something
>>>
>>>
>>> counter._transform = function(chunk, encoding, callback){
>>>   count += (chunk && chunk.length) || 0
>>>   this.push(chunk)
>>>   if(count >= threshhold) this.emit('threshhold_reached')
>>>   callback()
>>> }
>>>
>>>
>>> Am Dienstag, 11. Februar 2014 12:36:20 UTC+1 schrieb greelgorke:
>>>
>>>> you can count bytes with by putting a passthrough stream in between
>>>>
>>>> var counter = new stream.Transform
>>>> var count = 0
>>>> counter._transform = function(chunk, encoding, callback){
>>>>   count += (chunk && chunk.length) || 0
>>>>   this.push(chunk)
>>>>   callback()
>>>> }
>>>>
>>>>
>>>> slowSourceStream.pipe(counter).pipe(destinationStream)
>>>>
>>>>
>>>>
>>>> Am Samstag, 8. Februar 2014 01:12:37 UTC+1 schrieb Denys Khanzhiyev:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I have a task where one slow stream is piped to fs.writeStream and
>>>>> after some event I need to read from that writen file, i.e. read from
>>>>> growing file from some position. A have seen `node-growing-file`, and
>>>>> `tailing-stream` nothing seems to solve my problem.
>>>>>
>>>>> It looks like I do not understand how streams work
>>>>> Here is my helper object (though it is called PxyStream it is not
>>>>> stream in fact),
>>>>>
>>>>> var
>>>>>    fs = require('fs');
>>>>>
>>>>> function PxyStream(path,readStream,writeStream,start,end){
>>>>>     this.path = path;
>>>>>     this.readStream = readStream;
>>>>>     this.writeStream = writeStream;
>>>>>     this._offset = start;
>>>>>     this.endPos = end;
>>>>>     this.writeStream.on('finish',function(){
>>>>>         this._writeStreamFinished = true;
>>>>>         this.nextStream();
>>>>>     }.bind(this))
>>>>> }
>>>>>
>>>>>
>>>>> PxyStream.prototype.pipe = function(destination){
>>>>>     this.destination = destination;
>>>>>     this.nextStream();
>>>>> }
>>>>>
>>>>> PxyStream.prototype.nextStream = function(){
>>>>>     if(!this._stream){
>>>>>         var options = {
>>>>>            start: this._offset
>>>>>         };
>>>>>         var last = this._writeStreamFinished;
>>>>>         console.log('new read stream',this._offset, last);
>>>>>         this._stream =  fs.createReadStream(this.path,options);
>>>>>         this._stream.pipe(this.destination,{end: false});
>>>>>         this._stream.on('data',function(data){
>>>>>             this._offset += data.length;
>>>>>         }.bind(this));
>>>>>
>>>>>         this._stream.on('end',function(){
>>>>>             console.log('read stream end',this._offset, last);
>>>>>            this._stream.unpipe();
>>>>>            this._stream = null;
>>>>>            if(last) {
>>>>>                this.destination.emit('end');
>>>>>            }
>>>>>            this._watch();
>>>>>         }.bind(this));
>>>>>     }
>>>>> }
>>>>>
>>>>> PxyStream.prototype._watch = function(){
>>>>>    this.readStream.once('data',function(){
>>>>>        this.nextStream();
>>>>>    }.bind(this))
>>>>>  }
>>>>>
>>>>> exports.PxyStream = PxyStream;
>>>>>
>>>>>
>>>>> I am using it as
>>>>>
>>>>> pxyStream = new PxyStream(filePath,<slowReadStream>,
>>>>> <fsWriteStream>,start,null);
>>>>> // i need end position too but lets skip it for now
>>>>> pxyStream.pipe(<otherSlowStream>);
>>>>>
>>>>> my problem is I see 'read stream end' message far before
>>>>> otherSlowStream ends.
>>>>> In fact it never ends, but i can see its progress.
>>>>> Actually destination is http.response stream.
>>>>> I thought stream.pipe should slow down reading in order to keep
>>>>> buffers short.
>>>>> Maybe attached 'data' event makes it read fast, but how can I count
>>>>> bytes then?
>>>>> The other problem is I can not end destination properly.
>>>>>
>>>>>
>>>>>
>>>>>  --
>>> --
>>> Job Board: http://jobs.nodejs.org/
>>> Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List-
>>> Posting-Guidelines
>>> You received this message because you are subscribed to the Google
>>> Groups "nodejs" group.
>>> To post to this group, send email to [email protected]
>>>
>>> To unsubscribe from this group, send email to
>>> [email protected]
>>>
>>> For more options, visit this group at
>>> http://groups.google.com/group/nodejs?hl=en?hl=en
>>>
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "nodejs" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>>
>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>
>>
>>  --
> --
> Job Board: http://jobs.nodejs.org/
> Posting guidelines:
> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> You received this message because you are subscribed to the Google
> Groups "nodejs" group.
> To post to this group, send email to [email protected]
> To unsubscribe from this group, send email to
> [email protected]
> For more options, visit this group at
> http://groups.google.com/group/nodejs?hl=en?hl=en
>
> ---
> You received this message because you are subscribed to the Google Groups
> "nodejs" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/groups/opt_out.
>

-- 
-- 
Job Board: http://jobs.nodejs.org/
Posting guidelines: 
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

--- 
You received this message because you are subscribed to the Google Groups 
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to