Yep, cool, so that's essentially #2 in my suggestions in my second message. 
I'll run with that.

Indeed - I was never suggesting the default behaviour should be to destroy 
- I was merely looking for a mechanism to be notified.

So the only thing I'll need to do is to ensure that every transform in the 
pipe chain also exhibits this behaviour otherwise the pipes will stay open, 
just waiting in case someone wants to attach to them later on...

On Thursday, 4 July 2013 01:24:10 UTC+10, Isaac Schlueter wrote:
>
> This is a terrible idea for a default behavior.  It will generally 
> result in lost data.  However, an easy readable class for your closing 
> use case might look like this: 
>
>
> util.inherits(AutoClose, stream.Readable); 
>
> function AutoClose(options) { 
>   stream.Readable.call(this, options); 
>
>   var pipes = 0; 
>
>   this.on('pipe', function(dest) { 
>     pipes++; 
>   }); 
>
>   this.on('unpipe', function(dest) { 
>     pipes--; 
>     if (pipes === 0) 
>       this.destroy(); 
>   }); 
> } 
>
> // don't implement _read.  This is still an abstract class 
> // extend it in actual implementations 
>
>
> On Wed, Jul 3, 2013 at 1:38 AM, Gil Pedersen <[email protected] <javascript:>> 
> wrote: 
> > The difference, in this case, between stream pipes and unix pipes, is 
> that 
> > unix pipes are fixed once they are set up. Stream pipes are dynamic and 
> when 
> > all streams are unpiped, it simply means that there are currently no 
> > receivers to pipe data to, thus "pausing" the stream. It is quite 
> possible 
> > to later attach another pipe, or consume it using the normal API. 
> > 
> > What you want would require the readable to detect that no one is 
> listening, 
> > assume no one will use it the future, and close the resource. This would 
> > probably make a nice base feature for Readable, perhaps exposing it 
> behind 
> > an autoClose option? 
> > 
> > On 03/07/2013, at 08.35, Michael Hart <[email protected]<javascript:>> 
> wrote: 
> > 
> > In a convo with @isaacs on Twitter, the example of EPIPE in Unix pipes 
> was 
> > brought up - if I understand correctly, this is exactly the mechanism 
> I'm 
> > looking for in node.js streams. 
> > 
> > So when you do `cat myfile | grep 'hello' | head -5` - EPIPE will be 
> thrown 
> > if there are more than 5 matches and `cat` will shut the file descriptor 
> and 
> > close down gracefully (correct me if I'm wrong here - I'm not 100% sure 
> on 
> > this). 
> > 
> > So `cat` is the readable stream in our case - and it needs to know when 
> to 
> > stop reading and close gracefully - whether it be as a result of a 
> > downstream write error, or whatever. 
> > 
> > If `myReadableStream` is the equivalent of `cat`, and the truncating 
> `take` 
> > stream is the equivalent of `head` (ignore process.stdout here as it'll 
> > probably confuse things given it *can* actually throw EPIPE) - then what 
> can 
> > `myReadableStream` listen for to know to close gracefully? (keep in mind 
> > that `myReadableStream` won't have a direct reference to `take`, just as 
> > `cat` doesn't to `head` either) 
> > 
> > Does that make things a bit clearer? 
> > 
> > On Wednesday, 3 July 2013 16:04:24 UTC+10, Michael Hart wrote: 
> >> 
> >> Ah, so I might not have been clear enough with the way I phrased it - I 
> >> didn't actually want to know how to write the *truncating* stream 
> (although 
> >> thank you!) - I wanted to know how to write the *read* stream. 
> >> 
> >> If I'm writing a module that exposes a read stream that is reading from 
> a 
> >> database - and that is piped into a truncating stream of some kind - 
> how can 
> >> my readable stream know to stop reading from the database? I don't want 
> it 
> >> to keep reading the entire database (and I might not know this at the 
> time I 
> >> created the read stream). 
> >> 
> >> Imagine a grep'ing transform of some sort - that will preclude you from 
> >> knowing how much you want to read up front - and then imagine you only 
> want 
> >> the first 5 matches. 
> >> 
> >> So: 
> >> 
> >> myReadStream.pipe(grep('hello')).pipe(take(5)).pipe(process.stdout) 
> >> 
> >> If myReadStream is coming from a huge file, or from paging calls to a 
> DB 
> >> (in my case), then it will just continue to read and read and read... 
> >> 
> >> Unless in my app code I specifically listen for the `take` stream to 
> end 
> >> and then call `close` on myReadStream, or something like that. But 
> that's a 
> >> lot of work for the module consumer to write. It stops the 
> >> pipe().pipe().pipe() construct from being so useful. 
> >> 
> >> So is there some way *for the read stream itself* to know if it can 
> stop 
> >> reading? Or at least some idiomatic way to achieve it (keeping track of 
> how 
> >> many pipes have been attached/unattached, etc) 
> >> 
> >> On Wednesday, 3 July 2013 15:13:08 UTC+10, Isaac Schlueter wrote: 
> >>> 
> >>> Here's one way to do it: https://github.com/isaacs/truncating-stream 
> >>> 
> >>> npm install truncating-stream 
> >>> 
> >>> var Trunc = require('truncating-stream') 
> >>> var t = new Trunc({ limit: 100 }) 
> >>> 
> >>> Now no matter how much you t.write() only 100 bytes will come out. 
> >>> 
> >>> Of course, the thing you're piping your reader INTO won't be able to 
> >>> close the reader.  That'd be way too intimate.  It's up to you to 
> >>> decide when to do that. 
> >>> 
> >>> But, you can listen for the finish of the truncating stream and 
> >>> forcibly destroy the reader if you want to. 
> >>> 
> >>> reader.pipe(t) 
> >>> t.on('finish', function() { 
> >>>   reader.destroy() 
> >>> }) 
> >>> 
> >>> 
> >>> 
> >>> 
> >>> On Mon, Jul 1, 2013 at 2:54 AM, Michael Hart <[email protected]> 
> >>> wrote: 
> >>> > As an update to this (I realise nodeconf has been on... pity I 
> missed 
> >>> > it), 
> >>> > I've tried a few things and I still can't figure out the idiomatic 
> way 
> >>> > to 
> >>> > deal with this situation. Here's an example: 
> >>> > 
> >>> > var stream = require('stream'), 
> >>> >     fs = require('fs') 
> >>> > 
> >>> > var readStream = fs.createReadStream(__filename, {highWaterMark: 
> 5}), 
> >>> >     limitStream = new stream.Transform(), 
> >>> >     limit = 5 
> >>> > 
> >>> > // Just an example of a write stream that only wants a certain 
> amount 
> >>> > of 
> >>> > data 
> >>> > limitStream._transform = function(chunk, encoding, cb) { 
> >>> >   if (--limit >= 0) 
> >>> >     return cb(null, chunk + '\n') 
> >>> > 
> >>> >   // All we want is 5 chunks, then we want to close 
> >>> >   this.end() 
> >>> >   // Wait a while and check if the file stream has closed 
> >>> >   setTimeout(checkFd, 1000) 
> >>> >   cb() 
> >>> > } 
> >>> > 
> >>> > // fd is set to null when the file is closed 
> >>> > function checkFd() { 
> >>> >   console.log(readStream.fd != null ? 'Bugger. File still open!' : 
> 'All 
> >>> > good') 
> >>> > } 
> >>> > 
> >>> > readStream.on('unpipe', function() { console.log('unpipe emitted 
> from 
> >>> > readStream') }) 
> >>> > readStream.on('end', function() { console.log('end emitted from 
> >>> > readStream') 
> >>> > }) 
> >>> > readStream.on('close', function() { console.log('close emitted from 
> >>> > readStream') }) 
> >>> > 
> >>> > limitStream.on('unpipe', function() { console.log('unpipe emitted 
> from 
> >>> > limitStream') }) 
> >>> > limitStream.on('end', function() { console.log('end emitted from 
> >>> > limitStream') }) 
> >>> > limitStream.on('close', function() { console.log('close emitted from 
> >>> > limitStream') }) 
> >>> > 
> >>> > readStream.pipe(limitStream).pipe(process.stdout) 
> >>> > 
> >>> > When run (in node v0.10.12), this results in: 
> >>> > 
> >>> > $ node test.js 
> >>> > var s 
> >>> > tream 
> >>> >  = re 
> >>> > quire 
> >>> > ('str 
> >>> > unpipe emitted from limitStream 
> >>> > end emitted from limitStream 
> >>> > Bugger. File still open! 
> >>> > 
> >>> > And the process exits. Although if the timeout was longer (for 
> >>> > example), 
> >>> > then the file would have stayed open the whole time with the stream 
> >>> > continuing to read even though it's going nowhere (imagine it was 
> >>> > /dev/random or similar). 
> >>> > 
> >>> > So the file descriptor is left open, even though the pipe is closed 
> - 
> >>> > and no 
> >>> > events seem to be signalled on readStream at all, only on the 
> writable 
> >>> > limitStream. 
> >>> > 
> >>> > Now in my case I'm actually developing the read stream, I'm not 
> using a 
> >>> > file 
> >>> > stream - so I certainly have more control than this example - but I 
> >>> > still 
> >>> > have no idea what the read stream needs to look for or listen to to 
> >>> > know to 
> >>> > stop reading from the source (ie, to know that all destinations have 
> >>> > ended). 
> >>> > 
> >>> > There are a few possibilities: 
> >>> > 
> >>> > Check the _readableState property when _read is called - if pipes is 
> >>> > null 
> >>> > (or pipesCount is 0) then don't continue to fetch data. Ugly, and 
> using 
> >>> > hidden variables. 
> >>> > Override the unpipe() function and call close() if no pipes left - 
> >>> > again, 
> >>> > still need some way to know how many pipes are left - could keep our 
> >>> > own 
> >>> > count? 
> >>> > Check to see if there are any listeners left for 'readable' (and/or 
> >>> > 'data'?) 
> >>> > - if none, then close(). Not sure where to do this. During _read()? 
> Or 
> >>> > override unpipe()? 
> >>> > Override the pipe() function and add listeners to each destination 
> to 
> >>> > see 
> >>> > when they end - and hope that they do the same to all streams they 
> pipe 
> >>> > to. 
> >>> > 
> >>> > Is there a more idiomatic way to do achieve this? Is one of these 
> >>> > options 
> >>> > the "nicest"? 
> >>> > 
> >>> > Or are readable streams really just supposed to keep reading and 
> >>> > reading, 
> >>> > never closing (until they eventually reach the end of their source, 
> >>> > assuming 
> >>> > there is an end), regardless of whether they've been piped to 
> writable 
> >>> > streams that subsequently finish? 
> >>> > 
> >>> > This would be a pity as it would impede the ability to compose 
> streams 
> >>> > together in a `dbReadStream.pipe(take(20))` kind of fashion. 
> >>> > 
> >>> > M 
> >>> > 
> >>> > On Friday, 28 June 2013 17:01:46 UTC+10, Michael Hart wrote: 
> >>> >> 
> >>> >> Hi all, 
> >>> >> 
> >>> >> I'm struggling with a fairly basic problem to do with limiting (ie, 
> >>> >> truncating) streams and signalling to the source readable to stop 
> >>> >> reading 
> >>> >> and close. 
> >>> >> 
> >>> >> Here's a simple example that hopefully illustrates what I mean: 
> >>> >> 
> >>> >> fs.createReadStream('myHugeFile') 
> >>> >>   .pipe(someTransform) 
> >>> >>   .pipe(someTruncatingStream) 
> >>> >>   .pipe(process.stdout) 
> >>> >> 
> >>> >> Where someTruncatingStream wants to only take the first n bytes of 
> the 
> >>> >> transformed stream, and then signal "hey, I'm done (ie, no need to 
> >>> >> keep 
> >>> >> reading the massive file)". 
> >>> >> 
> >>> >> Is there a way to do this from someTruncatingStream without it 
> having 
> >>> >> direct access to the source read stream? ie, some way to signal up 
> the 
> >>> >> pipe 
> >>> >> to stop reading and close everything down (ie, not just pause)? 
> >>> >> 
> >>> >> In my case it's actually not a file stream that I want to do this 
> for 
> >>> >> - I 
> >>> >> just figured that was the easiest illustration. Mine is an object 
> >>> >> stream I 
> >>> >> want to create in front of a DB which is being queried a page at a 
> >>> >> time - I 
> >>> >> want to expose a stream for this that can continue paging behind 
> the 
> >>> >> scenes 
> >>> >> and spitting out data, but that knows to stop paging when the 
> >>> >> consumers have 
> >>> >> consumed all the data they want. ie: 
> >>> >> 
> >>> >> db.createReadStream({some: 
> >>> >> 
> >>> >> 
> query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)
>  
>
> >>> >> 
> >>> >> I could use another paradigm for this, a lazy collection for 
> example - 
> >>> >> but 
> >>> >> it seems to me that streams should support this and are more of a 
> >>> >> lingua 
> >>> >> franca to expose from a module API (and indeed can be transformed 
> into 
> >>> >> lazy 
> >>> >> collections themselves, a la lazy.js/node-lazy) 
> >>> >> 
> >>> >> Cheers, 
> >>> >> 
> >>> >> Michael 
> >>> > 
> >>> > -- 
> >>> > -- 
> >>> > Job Board: http://jobs.nodejs.org/ 
> >>> > Posting guidelines: 
> >>> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines 
> >>> > You received this message because you are subscribed to the Google 
> >>> > Groups "nodejs" group. 
> >>> > To post to this group, send email to [email protected] 
> >>> > To unsubscribe from this group, send email to 
> >>> > [email protected] 
> >>> > For more options, visit this group at 
> >>> > http://groups.google.com/group/nodejs?hl=en?hl=en 
> >>> > 
> >>> > --- 
> >>> > You received this message because you are subscribed to the Google 
> >>> > Groups 
> >>> > "nodejs" group. 
> >>> > To unsubscribe from this group and stop receiving emails from it, 
> send 
> >>> > an 
> >>> > email to [email protected]. 
> >>> > For more options, visit https://groups.google.com/groups/opt_out. 
> >>> > 
> >>> > 
> > 
> > 
> > -- 
> > -- 
> > Job Board: http://jobs.nodejs.org/ 
> > Posting guidelines: 
> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines 
> > You received this message because you are subscribed to the Google 
> > Groups "nodejs" group. 
> > To post to this group, send email to [email protected]<javascript:> 
> > 
> > To unsubscribe from this group, send email to 
> > [email protected] <javascript:> 
> > 
> > For more options, visit this group at 
> > http://groups.google.com/group/nodejs?hl=en?hl=en 
> > 
> > --- 
> > You received this message because you are subscribed to the Google 
> Groups 
> > "nodejs" group. 
> > To unsubscribe from this group and stop receiving emails from it, send 
> an 
> > email to [email protected] <javascript:>. 
> > 
> > For more options, visit https://groups.google.com/groups/opt_out. 
> > 
> > 
> > 
> > 
> > -- 
> > -- 
> > Job Board: http://jobs.nodejs.org/ 
> > Posting guidelines: 
> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines 
> > You received this message because you are subscribed to the Google 
> > Groups "nodejs" group. 
> > To post to this group, send email to [email protected]<javascript:> 
> > 
> > To unsubscribe from this group, send email to 
> > [email protected] <javascript:> 
> > 
> > For more options, visit this group at 
> > http://groups.google.com/group/nodejs?hl=en?hl=en 
> > 
> > --- 
> > You received this message because you are subscribed to the Google 
> Groups 
> > "nodejs" group. 
> > To unsubscribe from this group and stop receiving emails from it, send 
> an 
> > email to [email protected] <javascript:>. 
> > 
> > For more options, visit https://groups.google.com/groups/opt_out. 
> > 
> > 
>

-- 
-- 
Job Board: http://jobs.nodejs.org/
Posting guidelines: 
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

--- 
You received this message because you are subscribed to the Google Groups 
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to