The difference, in this case, between stream pipes and unix pipes, is that unix 
pipes are fixed once they are set up. Stream pipes are dynamic and when all 
streams are unpiped, it simply means that there are currently no receivers to 
pipe data to, thus "pausing" the stream. It is quite possible to later attach 
another pipe, or consume it using the normal API.

What you want would require the readable to detect that no one is listening, 
assume no one will use it the future, and close the resource. This would 
probably make a nice base feature for Readable, perhaps exposing it behind an 
autoClose option?

On 03/07/2013, at 08.35, Michael Hart <[email protected]> wrote:

> In a convo with @isaacs on Twitter, the example of EPIPE in Unix pipes was 
> brought up - if I understand correctly, this is exactly the mechanism I'm 
> looking for in node.js streams.
> 
> So when you do `cat myfile | grep 'hello' | head -5` - EPIPE will be thrown 
> if there are more than 5 matches and `cat` will shut the file descriptor and 
> close down gracefully (correct me if I'm wrong here - I'm not 100% sure on 
> this).
> 
> So `cat` is the readable stream in our case - and it needs to know when to 
> stop reading and close gracefully - whether it be as a result of a downstream 
> write error, or whatever.
> 
> If `myReadableStream` is the equivalent of `cat`, and the truncating `take` 
> stream is the equivalent of `head` (ignore process.stdout here as it'll 
> probably confuse things given it *can* actually throw EPIPE) - then what can 
> `myReadableStream` listen for to know to close gracefully? (keep in mind that 
> `myReadableStream` won't have a direct reference to `take`, just as `cat` 
> doesn't to `head` either)
> 
> Does that make things a bit clearer?
> 
> On Wednesday, 3 July 2013 16:04:24 UTC+10, Michael Hart wrote:
> Ah, so I might not have been clear enough with the way I phrased it - I 
> didn't actually want to know how to write the *truncating* stream (although 
> thank you!) - I wanted to know how to write the *read* stream.
> 
> If I'm writing a module that exposes a read stream that is reading from a 
> database - and that is piped into a truncating stream of some kind - how can 
> my readable stream know to stop reading from the database? I don't want it to 
> keep reading the entire database (and I might not know this at the time I 
> created the read stream).
> 
> Imagine a grep'ing transform of some sort - that will preclude you from 
> knowing how much you want to read up front - and then imagine you only want 
> the first 5 matches.
> 
> So:
> 
> myReadStream.pipe(grep('hello')).pipe(take(5)).pipe(process.stdout)
> 
> If myReadStream is coming from a huge file, or from paging calls to a DB (in 
> my case), then it will just continue to read and read and read...
> 
> Unless in my app code I specifically listen for the `take` stream to end and 
> then call `close` on myReadStream, or something like that. But that's a lot 
> of work for the module consumer to write. It stops the pipe().pipe().pipe() 
> construct from being so useful.
> 
> So is there some way *for the read stream itself* to know if it can stop 
> reading? Or at least some idiomatic way to achieve it (keeping track of how 
> many pipes have been attached/unattached, etc)
> 
> On Wednesday, 3 July 2013 15:13:08 UTC+10, Isaac Schlueter wrote:
> Here's one way to do it: https://github.com/isaacs/truncating-stream 
> 
> npm install truncating-stream 
> 
> var Trunc = require('truncating-stream') 
> var t = new Trunc({ limit: 100 }) 
> 
> Now no matter how much you t.write() only 100 bytes will come out. 
> 
> Of course, the thing you're piping your reader INTO won't be able to 
> close the reader.  That'd be way too intimate.  It's up to you to 
> decide when to do that. 
> 
> But, you can listen for the finish of the truncating stream and 
> forcibly destroy the reader if you want to. 
> 
> reader.pipe(t) 
> t.on('finish', function() { 
>   reader.destroy() 
> }) 
> 
> 
> 
> 
> On Mon, Jul 1, 2013 at 2:54 AM, Michael Hart <[email protected]> wrote: 
> > As an update to this (I realise nodeconf has been on... pity I missed it), 
> > I've tried a few things and I still can't figure out the idiomatic way to 
> > deal with this situation. Here's an example: 
> > 
> > var stream = require('stream'), 
> >     fs = require('fs') 
> > 
> > var readStream = fs.createReadStream(__filename, {highWaterMark: 5}), 
> >     limitStream = new stream.Transform(), 
> >     limit = 5 
> > 
> > // Just an example of a write stream that only wants a certain amount of 
> > data 
> > limitStream._transform = function(chunk, encoding, cb) { 
> >   if (--limit >= 0) 
> >     return cb(null, chunk + '\n') 
> > 
> >   // All we want is 5 chunks, then we want to close 
> >   this.end() 
> >   // Wait a while and check if the file stream has closed 
> >   setTimeout(checkFd, 1000) 
> >   cb() 
> > } 
> > 
> > // fd is set to null when the file is closed 
> > function checkFd() { 
> >   console.log(readStream.fd != null ? 'Bugger. File still open!' : 'All 
> > good') 
> > } 
> > 
> > readStream.on('unpipe', function() { console.log('unpipe emitted from 
> > readStream') }) 
> > readStream.on('end', function() { console.log('end emitted from 
> > readStream') 
> > }) 
> > readStream.on('close', function() { console.log('close emitted from 
> > readStream') }) 
> > 
> > limitStream.on('unpipe', function() { console.log('unpipe emitted from 
> > limitStream') }) 
> > limitStream.on('end', function() { console.log('end emitted from 
> > limitStream') }) 
> > limitStream.on('close', function() { console.log('close emitted from 
> > limitStream') }) 
> > 
> > readStream.pipe(limitStream).pipe(process.stdout) 
> > 
> > When run (in node v0.10.12), this results in: 
> > 
> > $ node test.js 
> > var s 
> > tream 
> >  = re 
> > quire 
> > ('str 
> > unpipe emitted from limitStream 
> > end emitted from limitStream 
> > Bugger. File still open! 
> > 
> > And the process exits. Although if the timeout was longer (for example), 
> > then the file would have stayed open the whole time with the stream 
> > continuing to read even though it's going nowhere (imagine it was 
> > /dev/random or similar). 
> > 
> > So the file descriptor is left open, even though the pipe is closed - and 
> > no 
> > events seem to be signalled on readStream at all, only on the writable 
> > limitStream. 
> > 
> > Now in my case I'm actually developing the read stream, I'm not using a 
> > file 
> > stream - so I certainly have more control than this example - but I still 
> > have no idea what the read stream needs to look for or listen to to know to 
> > stop reading from the source (ie, to know that all destinations have 
> > ended). 
> > 
> > There are a few possibilities: 
> > 
> > Check the _readableState property when _read is called - if pipes is null 
> > (or pipesCount is 0) then don't continue to fetch data. Ugly, and using 
> > hidden variables. 
> > Override the unpipe() function and call close() if no pipes left - again, 
> > still need some way to know how many pipes are left - could keep our own 
> > count? 
> > Check to see if there are any listeners left for 'readable' (and/or 
> > 'data'?) 
> > - if none, then close(). Not sure where to do this. During _read()? Or 
> > override unpipe()? 
> > Override the pipe() function and add listeners to each destination to see 
> > when they end - and hope that they do the same to all streams they pipe to. 
> > 
> > Is there a more idiomatic way to do achieve this? Is one of these options 
> > the "nicest"? 
> > 
> > Or are readable streams really just supposed to keep reading and reading, 
> > never closing (until they eventually reach the end of their source, 
> > assuming 
> > there is an end), regardless of whether they've been piped to writable 
> > streams that subsequently finish? 
> > 
> > This would be a pity as it would impede the ability to compose streams 
> > together in a `dbReadStream.pipe(take(20))` kind of fashion. 
> > 
> > M 
> > 
> > On Friday, 28 June 2013 17:01:46 UTC+10, Michael Hart wrote: 
> >> 
> >> Hi all, 
> >> 
> >> I'm struggling with a fairly basic problem to do with limiting (ie, 
> >> truncating) streams and signalling to the source readable to stop reading 
> >> and close. 
> >> 
> >> Here's a simple example that hopefully illustrates what I mean: 
> >> 
> >> fs.createReadStream('myHugeFile') 
> >>   .pipe(someTransform) 
> >>   .pipe(someTruncatingStream) 
> >>   .pipe(process.stdout) 
> >> 
> >> Where someTruncatingStream wants to only take the first n bytes of the 
> >> transformed stream, and then signal "hey, I'm done (ie, no need to keep 
> >> reading the massive file)". 
> >> 
> >> Is there a way to do this from someTruncatingStream without it having 
> >> direct access to the source read stream? ie, some way to signal up the 
> >> pipe 
> >> to stop reading and close everything down (ie, not just pause)? 
> >> 
> >> In my case it's actually not a file stream that I want to do this for - I 
> >> just figured that was the easiest illustration. Mine is an object stream I 
> >> want to create in front of a DB which is being queried a page at a time - 
> >> I 
> >> want to expose a stream for this that can continue paging behind the 
> >> scenes 
> >> and spitting out data, but that knows to stop paging when the consumers 
> >> have 
> >> consumed all the data they want. ie: 
> >> 
> >> db.createReadStream({some: 
> >> query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)
> >>  
> >> 
> >> I could use another paradigm for this, a lazy collection for example - but 
> >> it seems to me that streams should support this and are more of a lingua 
> >> franca to expose from a module API (and indeed can be transformed into 
> >> lazy 
> >> collections themselves, a la lazy.js/node-lazy) 
> >> 
> >> Cheers, 
> >> 
> >> Michael 
> > 
> > -- 
> > -- 
> > Job Board: http://jobs.nodejs.org/ 
> > Posting guidelines: 
> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines 
> > You received this message because you are subscribed to the Google 
> > Groups "nodejs" group. 
> > To post to this group, send email to [email protected] 
> > To unsubscribe from this group, send email to 
> > [email protected] 
> > For more options, visit this group at 
> > http://groups.google.com/group/nodejs?hl=en?hl=en 
> > 
> > --- 
> > You received this message because you are subscribed to the Google Groups 
> > "nodejs" group. 
> > To unsubscribe from this group and stop receiving emails from it, send an 
> > email to [email protected]. 
> > For more options, visit https://groups.google.com/groups/opt_out. 
> > 
> > 
> 
> -- 
> -- 
> Job Board: http://jobs.nodejs.org/
> Posting guidelines: 
> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> You received this message because you are subscribed to the Google
> Groups "nodejs" group.
> To post to this group, send email to [email protected]
> To unsubscribe from this group, send email to
> [email protected]
> For more options, visit this group at
> http://groups.google.com/group/nodejs?hl=en?hl=en
>  
> --- 
> You received this message because you are subscribed to the Google Groups 
> "nodejs" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected].
> For more options, visit https://groups.google.com/groups/opt_out.
>  
>  

-- 
-- 
Job Board: http://jobs.nodejs.org/
Posting guidelines: 
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

--- 
You received this message because you are subscribed to the Google Groups 
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to