Ah, so I might not have been clear enough with the way I phrased it - I 
didn't actually want to know how to write the *truncating* stream (although 
thank you!) - I wanted to know how to write the *read* stream.

If I'm writing a module that exposes a read stream that is reading from a 
database - and that is piped into a truncating stream of some kind - how 
can my readable stream know to stop reading from the database? I don't want 
it to keep reading the entire database (and I might not know this at the 
time I created the read stream).

Imagine a grep'ing transform of some sort - that will preclude you from 
knowing how much you want to read up front - and then imagine you only want 
the first 5 matches.

So:

myReadStream.pipe(grep('hello')).pipe(take(5)).pipe(process.stdout)

If myReadStream is coming from a huge file, or from paging calls to a DB 
(in my case), then it will just continue to read and read and read...

Unless in my app code I specifically listen for the `take` stream to end 
and then call `close` on myReadStream, or something like that. But that's a 
lot of work for the module consumer to write. It stops the 
pipe().pipe().pipe() construct from being so useful.

So is there some way *for the read stream itself* to know if it can stop 
reading? Or at least some idiomatic way to achieve it (keeping track of how 
many pipes have been attached/unattached, etc)

On Wednesday, 3 July 2013 15:13:08 UTC+10, Isaac Schlueter wrote:
>
> Here's one way to do it: https://github.com/isaacs/truncating-stream 
>
> npm install truncating-stream 
>
> var Trunc = require('truncating-stream') 
> var t = new Trunc({ limit: 100 }) 
>
> Now no matter how much you t.write() only 100 bytes will come out. 
>
> Of course, the thing you're piping your reader INTO won't be able to 
> close the reader.  That'd be way too intimate.  It's up to you to 
> decide when to do that. 
>
> But, you can listen for the finish of the truncating stream and 
> forcibly destroy the reader if you want to. 
>
> reader.pipe(t) 
> t.on('finish', function() { 
>   reader.destroy() 
> }) 
>
>
>
>
> On Mon, Jul 1, 2013 at 2:54 AM, Michael Hart 
> <[email protected]<javascript:>> 
> wrote: 
> > As an update to this (I realise nodeconf has been on... pity I missed 
> it), 
> > I've tried a few things and I still can't figure out the idiomatic way 
> to 
> > deal with this situation. Here's an example: 
> > 
> > var stream = require('stream'), 
> >     fs = require('fs') 
> > 
> > var readStream = fs.createReadStream(__filename, {highWaterMark: 5}), 
> >     limitStream = new stream.Transform(), 
> >     limit = 5 
> > 
> > // Just an example of a write stream that only wants a certain amount of 
> > data 
> > limitStream._transform = function(chunk, encoding, cb) { 
> >   if (--limit >= 0) 
> >     return cb(null, chunk + '\n') 
> > 
> >   // All we want is 5 chunks, then we want to close 
> >   this.end() 
> >   // Wait a while and check if the file stream has closed 
> >   setTimeout(checkFd, 1000) 
> >   cb() 
> > } 
> > 
> > // fd is set to null when the file is closed 
> > function checkFd() { 
> >   console.log(readStream.fd != null ? 'Bugger. File still open!' : 'All 
> > good') 
> > } 
> > 
> > readStream.on('unpipe', function() { console.log('unpipe emitted from 
> > readStream') }) 
> > readStream.on('end', function() { console.log('end emitted from 
> readStream') 
> > }) 
> > readStream.on('close', function() { console.log('close emitted from 
> > readStream') }) 
> > 
> > limitStream.on('unpipe', function() { console.log('unpipe emitted from 
> > limitStream') }) 
> > limitStream.on('end', function() { console.log('end emitted from 
> > limitStream') }) 
> > limitStream.on('close', function() { console.log('close emitted from 
> > limitStream') }) 
> > 
> > readStream.pipe(limitStream).pipe(process.stdout) 
> > 
> > When run (in node v0.10.12), this results in: 
> > 
> > $ node test.js 
> > var s 
> > tream 
> >  = re 
> > quire 
> > ('str 
> > unpipe emitted from limitStream 
> > end emitted from limitStream 
> > Bugger. File still open! 
> > 
> > And the process exits. Although if the timeout was longer (for example), 
> > then the file would have stayed open the whole time with the stream 
> > continuing to read even though it's going nowhere (imagine it was 
> > /dev/random or similar). 
> > 
> > So the file descriptor is left open, even though the pipe is closed - 
> and no 
> > events seem to be signalled on readStream at all, only on the writable 
> > limitStream. 
> > 
> > Now in my case I'm actually developing the read stream, I'm not using a 
> file 
> > stream - so I certainly have more control than this example - but I 
> still 
> > have no idea what the read stream needs to look for or listen to to know 
> to 
> > stop reading from the source (ie, to know that all destinations have 
> ended). 
> > 
> > There are a few possibilities: 
> > 
> > Check the _readableState property when _read is called - if pipes is 
> null 
> > (or pipesCount is 0) then don't continue to fetch data. Ugly, and using 
> > hidden variables. 
> > Override the unpipe() function and call close() if no pipes left - 
> again, 
> > still need some way to know how many pipes are left - could keep our own 
> > count? 
> > Check to see if there are any listeners left for 'readable' (and/or 
> 'data'?) 
> > - if none, then close(). Not sure where to do this. During _read()? Or 
> > override unpipe()? 
> > Override the pipe() function and add listeners to each destination to 
> see 
> > when they end - and hope that they do the same to all streams they pipe 
> to. 
> > 
> > Is there a more idiomatic way to do achieve this? Is one of these 
> options 
> > the "nicest"? 
> > 
> > Or are readable streams really just supposed to keep reading and 
> reading, 
> > never closing (until they eventually reach the end of their source, 
> assuming 
> > there is an end), regardless of whether they've been piped to writable 
> > streams that subsequently finish? 
> > 
> > This would be a pity as it would impede the ability to compose streams 
> > together in a `dbReadStream.pipe(take(20))` kind of fashion. 
> > 
> > M 
> > 
> > On Friday, 28 June 2013 17:01:46 UTC+10, Michael Hart wrote: 
> >> 
> >> Hi all, 
> >> 
> >> I'm struggling with a fairly basic problem to do with limiting (ie, 
> >> truncating) streams and signalling to the source readable to stop 
> reading 
> >> and close. 
> >> 
> >> Here's a simple example that hopefully illustrates what I mean: 
> >> 
> >> fs.createReadStream('myHugeFile') 
> >>   .pipe(someTransform) 
> >>   .pipe(someTruncatingStream) 
> >>   .pipe(process.stdout) 
> >> 
> >> Where someTruncatingStream wants to only take the first n bytes of the 
> >> transformed stream, and then signal "hey, I'm done (ie, no need to keep 
> >> reading the massive file)". 
> >> 
> >> Is there a way to do this from someTruncatingStream without it having 
> >> direct access to the source read stream? ie, some way to signal up the 
> pipe 
> >> to stop reading and close everything down (ie, not just pause)? 
> >> 
> >> In my case it's actually not a file stream that I want to do this for - 
> I 
> >> just figured that was the easiest illustration. Mine is an object 
> stream I 
> >> want to create in front of a DB which is being queried a page at a time 
> - I 
> >> want to expose a stream for this that can continue paging behind the 
> scenes 
> >> and spitting out data, but that knows to stop paging when the consumers 
> have 
> >> consumed all the data they want. ie: 
> >> 
> >> db.createReadStream({some: 
> >> 
> query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)
>  
>
> >> 
> >> I could use another paradigm for this, a lazy collection for example - 
> but 
> >> it seems to me that streams should support this and are more of a 
> lingua 
> >> franca to expose from a module API (and indeed can be transformed into 
> lazy 
> >> collections themselves, a la lazy.js/node-lazy) 
> >> 
> >> Cheers, 
> >> 
> >> Michael 
> > 
> > -- 
> > -- 
> > Job Board: http://jobs.nodejs.org/ 
> > Posting guidelines: 
> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines 
> > You received this message because you are subscribed to the Google 
> > Groups "nodejs" group. 
> > To post to this group, send email to [email protected]<javascript:> 
> > To unsubscribe from this group, send email to 
> > [email protected] <javascript:> 
> > For more options, visit this group at 
> > http://groups.google.com/group/nodejs?hl=en?hl=en 
> > 
> > --- 
> > You received this message because you are subscribed to the Google 
> Groups 
> > "nodejs" group. 
> > To unsubscribe from this group and stop receiving emails from it, send 
> an 
> > email to [email protected] <javascript:>. 
> > For more options, visit https://groups.google.com/groups/opt_out. 
> > 
> > 
>

-- 
-- 
Job Board: http://jobs.nodejs.org/
Posting guidelines: 
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

--- 
You received this message because you are subscribed to the Google Groups 
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to