In a convo with @isaacs on Twitter, the example of EPIPE in Unix pipes was 
brought up - if I understand correctly, this is exactly the mechanism I'm 
looking for in node.js streams.

So when you do `cat myfile | grep 'hello' | head -5` - EPIPE will be thrown 
if there are more than 5 matches and `cat` will shut the file descriptor 
and close down gracefully (correct me if I'm wrong here - I'm not 100% sure 
on this).

So `cat` is the readable stream in our case - and it needs to know when to 
stop reading and close gracefully - whether it be as a result of a 
downstream write error, or whatever.

If `myReadableStream` is the equivalent of `cat`, and the truncating `take` 
stream is the equivalent of `head` (ignore process.stdout here as it'll 
probably confuse things given it *can* actually throw EPIPE) - then what 
can `myReadableStream` listen for to know to close gracefully? (keep in 
mind that `myReadableStream` won't have a direct reference to `take`, just 
as `cat` doesn't to `head` either)

Does that make things a bit clearer?

On Wednesday, 3 July 2013 16:04:24 UTC+10, Michael Hart wrote:
>
> Ah, so I might not have been clear enough with the way I phrased it - I 
> didn't actually want to know how to write the *truncating* stream (although 
> thank you!) - I wanted to know how to write the *read* stream.
>
> If I'm writing a module that exposes a read stream that is reading from a 
> database - and that is piped into a truncating stream of some kind - how 
> can my readable stream know to stop reading from the database? I don't want 
> it to keep reading the entire database (and I might not know this at the 
> time I created the read stream).
>
> Imagine a grep'ing transform of some sort - that will preclude you from 
> knowing how much you want to read up front - and then imagine you only want 
> the first 5 matches.
>
> So:
>
> myReadStream.pipe(grep('hello')).pipe(take(5)).pipe(process.stdout)
>
> If myReadStream is coming from a huge file, or from paging calls to a DB 
> (in my case), then it will just continue to read and read and read...
>
> Unless in my app code I specifically listen for the `take` stream to end 
> and then call `close` on myReadStream, or something like that. But that's a 
> lot of work for the module consumer to write. It stops the 
> pipe().pipe().pipe() construct from being so useful.
>
> So is there some way *for the read stream itself* to know if it can stop 
> reading? Or at least some idiomatic way to achieve it (keeping track of how 
> many pipes have been attached/unattached, etc)
>
> On Wednesday, 3 July 2013 15:13:08 UTC+10, Isaac Schlueter wrote:
>>
>> Here's one way to do it: https://github.com/isaacs/truncating-stream 
>>
>> npm install truncating-stream 
>>
>> var Trunc = require('truncating-stream') 
>> var t = new Trunc({ limit: 100 }) 
>>
>> Now no matter how much you t.write() only 100 bytes will come out. 
>>
>> Of course, the thing you're piping your reader INTO won't be able to 
>> close the reader.  That'd be way too intimate.  It's up to you to 
>> decide when to do that. 
>>
>> But, you can listen for the finish of the truncating stream and 
>> forcibly destroy the reader if you want to. 
>>
>> reader.pipe(t) 
>> t.on('finish', function() { 
>>   reader.destroy() 
>> }) 
>>
>>
>>
>>
>> On Mon, Jul 1, 2013 at 2:54 AM, Michael Hart <[email protected]> 
>> wrote: 
>> > As an update to this (I realise nodeconf has been on... pity I missed 
>> it), 
>> > I've tried a few things and I still can't figure out the idiomatic way 
>> to 
>> > deal with this situation. Here's an example: 
>> > 
>> > var stream = require('stream'), 
>> >     fs = require('fs') 
>> > 
>> > var readStream = fs.createReadStream(__filename, {highWaterMark: 5}), 
>> >     limitStream = new stream.Transform(), 
>> >     limit = 5 
>> > 
>> > // Just an example of a write stream that only wants a certain amount 
>> of 
>> > data 
>> > limitStream._transform = function(chunk, encoding, cb) { 
>> >   if (--limit >= 0) 
>> >     return cb(null, chunk + '\n') 
>> > 
>> >   // All we want is 5 chunks, then we want to close 
>> >   this.end() 
>> >   // Wait a while and check if the file stream has closed 
>> >   setTimeout(checkFd, 1000) 
>> >   cb() 
>> > } 
>> > 
>> > // fd is set to null when the file is closed 
>> > function checkFd() { 
>> >   console.log(readStream.fd != null ? 'Bugger. File still open!' : 'All 
>> > good') 
>> > } 
>> > 
>> > readStream.on('unpipe', function() { console.log('unpipe emitted from 
>> > readStream') }) 
>> > readStream.on('end', function() { console.log('end emitted from 
>> readStream') 
>> > }) 
>> > readStream.on('close', function() { console.log('close emitted from 
>> > readStream') }) 
>> > 
>> > limitStream.on('unpipe', function() { console.log('unpipe emitted from 
>> > limitStream') }) 
>> > limitStream.on('end', function() { console.log('end emitted from 
>> > limitStream') }) 
>> > limitStream.on('close', function() { console.log('close emitted from 
>> > limitStream') }) 
>> > 
>> > readStream.pipe(limitStream).pipe(process.stdout) 
>> > 
>> > When run (in node v0.10.12), this results in: 
>> > 
>> > $ node test.js 
>> > var s 
>> > tream 
>> >  = re 
>> > quire 
>> > ('str 
>> > unpipe emitted from limitStream 
>> > end emitted from limitStream 
>> > Bugger. File still open! 
>> > 
>> > And the process exits. Although if the timeout was longer (for 
>> example), 
>> > then the file would have stayed open the whole time with the stream 
>> > continuing to read even though it's going nowhere (imagine it was 
>> > /dev/random or similar). 
>> > 
>> > So the file descriptor is left open, even though the pipe is closed - 
>> and no 
>> > events seem to be signalled on readStream at all, only on the writable 
>> > limitStream. 
>> > 
>> > Now in my case I'm actually developing the read stream, I'm not using a 
>> file 
>> > stream - so I certainly have more control than this example - but I 
>> still 
>> > have no idea what the read stream needs to look for or listen to to 
>> know to 
>> > stop reading from the source (ie, to know that all destinations have 
>> ended). 
>> > 
>> > There are a few possibilities: 
>> > 
>> > Check the _readableState property when _read is called - if pipes is 
>> null 
>> > (or pipesCount is 0) then don't continue to fetch data. Ugly, and using 
>> > hidden variables. 
>> > Override the unpipe() function and call close() if no pipes left - 
>> again, 
>> > still need some way to know how many pipes are left - could keep our 
>> own 
>> > count? 
>> > Check to see if there are any listeners left for 'readable' (and/or 
>> 'data'?) 
>> > - if none, then close(). Not sure where to do this. During _read()? Or 
>> > override unpipe()? 
>> > Override the pipe() function and add listeners to each destination to 
>> see 
>> > when they end - and hope that they do the same to all streams they pipe 
>> to. 
>> > 
>> > Is there a more idiomatic way to do achieve this? Is one of these 
>> options 
>> > the "nicest"? 
>> > 
>> > Or are readable streams really just supposed to keep reading and 
>> reading, 
>> > never closing (until they eventually reach the end of their source, 
>> assuming 
>> > there is an end), regardless of whether they've been piped to writable 
>> > streams that subsequently finish? 
>> > 
>> > This would be a pity as it would impede the ability to compose streams 
>> > together in a `dbReadStream.pipe(take(20))` kind of fashion. 
>> > 
>> > M 
>> > 
>> > On Friday, 28 June 2013 17:01:46 UTC+10, Michael Hart wrote: 
>> >> 
>> >> Hi all, 
>> >> 
>> >> I'm struggling with a fairly basic problem to do with limiting (ie, 
>> >> truncating) streams and signalling to the source readable to stop 
>> reading 
>> >> and close. 
>> >> 
>> >> Here's a simple example that hopefully illustrates what I mean: 
>> >> 
>> >> fs.createReadStream('myHugeFile') 
>> >>   .pipe(someTransform) 
>> >>   .pipe(someTruncatingStream) 
>> >>   .pipe(process.stdout) 
>> >> 
>> >> Where someTruncatingStream wants to only take the first n bytes of the 
>> >> transformed stream, and then signal "hey, I'm done (ie, no need to 
>> keep 
>> >> reading the massive file)". 
>> >> 
>> >> Is there a way to do this from someTruncatingStream without it having 
>> >> direct access to the source read stream? ie, some way to signal up the 
>> pipe 
>> >> to stop reading and close everything down (ie, not just pause)? 
>> >> 
>> >> In my case it's actually not a file stream that I want to do this for 
>> - I 
>> >> just figured that was the easiest illustration. Mine is an object 
>> stream I 
>> >> want to create in front of a DB which is being queried a page at a 
>> time - I 
>> >> want to expose a stream for this that can continue paging behind the 
>> scenes 
>> >> and spitting out data, but that knows to stop paging when the 
>> consumers have 
>> >> consumed all the data they want. ie: 
>> >> 
>> >> db.createReadStream({some: 
>> >> 
>> query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)
>>  
>>
>> >> 
>> >> I could use another paradigm for this, a lazy collection for example - 
>> but 
>> >> it seems to me that streams should support this and are more of a 
>> lingua 
>> >> franca to expose from a module API (and indeed can be transformed into 
>> lazy 
>> >> collections themselves, a la lazy.js/node-lazy) 
>> >> 
>> >> Cheers, 
>> >> 
>> >> Michael 
>> > 
>> > -- 
>> > -- 
>> > Job Board: http://jobs.nodejs.org/ 
>> > Posting guidelines: 
>> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines 
>> > You received this message because you are subscribed to the Google 
>> > Groups "nodejs" group. 
>> > To post to this group, send email to [email protected] 
>> > To unsubscribe from this group, send email to 
>> > [email protected] 
>> > For more options, visit this group at 
>> > http://groups.google.com/group/nodejs?hl=en?hl=en 
>> > 
>> > --- 
>> > You received this message because you are subscribed to the Google 
>> Groups 
>> > "nodejs" group. 
>> > To unsubscribe from this group and stop receiving emails from it, send 
>> an 
>> > email to [email protected]. 
>> > For more options, visit https://groups.google.com/groups/opt_out. 
>> > 
>> > 
>>
>

-- 
-- 
Job Board: http://jobs.nodejs.org/
Posting guidelines: 
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

--- 
You received this message because you are subscribed to the Google Groups 
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to