Ah, so I might not have been clear enough with the way I phrased it - I
didn't actually want to know how to write the *truncating* stream (although
thank you!) - I wanted to know how to write the *read* stream.
If I'm writing a module that exposes a read stream that is reading from a
database - and that is piped into a truncating stream of some kind - how
can my readable stream know to stop reading from the database? I don't want
it to keep reading the entire database (and I might not know this at the
time I created the read stream).
Imagine a grep'ing transform of some sort - that will preclude you from
knowing how much you want to read up front - and then imagine you only want
the first 5 matches.
So:
myReadStream.pipe(grep('hello')).pipe(take(5)).pipe(process.stdout)
If myReadStream is coming from a huge file, or from paging calls to a DB
(in my case), then it will just continue to read and read and read...
Unless in my app code I specifically listen for the `take` stream to end
and then call `close` on myReadStream, or something like that. But that's a
lot of work for the module consumer to write. It stops the
pipe().pipe().pipe() construct from being so useful.
So is there some way *for the read stream itself* to know if it can stop
reading? Or at least some idiomatic way to achieve it (keeping track of how
many pipes have been attached/unattached, etc)
On Wednesday, 3 July 2013 15:13:08 UTC+10, Isaac Schlueter wrote:
>
> Here's one way to do it: https://github.com/isaacs/truncating-stream
>
> npm install truncating-stream
>
> var Trunc = require('truncating-stream')
> var t = new Trunc({ limit: 100 })
>
> Now no matter how much you t.write() only 100 bytes will come out.
>
> Of course, the thing you're piping your reader INTO won't be able to
> close the reader. That'd be way too intimate. It's up to you to
> decide when to do that.
>
> But, you can listen for the finish of the truncating stream and
> forcibly destroy the reader if you want to.
>
> reader.pipe(t)
> t.on('finish', function() {
> reader.destroy()
> })
>
>
>
>
> On Mon, Jul 1, 2013 at 2:54 AM, Michael Hart
> <[email protected]<javascript:>>
> wrote:
> > As an update to this (I realise nodeconf has been on... pity I missed
> it),
> > I've tried a few things and I still can't figure out the idiomatic way
> to
> > deal with this situation. Here's an example:
> >
> > var stream = require('stream'),
> > fs = require('fs')
> >
> > var readStream = fs.createReadStream(__filename, {highWaterMark: 5}),
> > limitStream = new stream.Transform(),
> > limit = 5
> >
> > // Just an example of a write stream that only wants a certain amount of
> > data
> > limitStream._transform = function(chunk, encoding, cb) {
> > if (--limit >= 0)
> > return cb(null, chunk + '\n')
> >
> > // All we want is 5 chunks, then we want to close
> > this.end()
> > // Wait a while and check if the file stream has closed
> > setTimeout(checkFd, 1000)
> > cb()
> > }
> >
> > // fd is set to null when the file is closed
> > function checkFd() {
> > console.log(readStream.fd != null ? 'Bugger. File still open!' : 'All
> > good')
> > }
> >
> > readStream.on('unpipe', function() { console.log('unpipe emitted from
> > readStream') })
> > readStream.on('end', function() { console.log('end emitted from
> readStream')
> > })
> > readStream.on('close', function() { console.log('close emitted from
> > readStream') })
> >
> > limitStream.on('unpipe', function() { console.log('unpipe emitted from
> > limitStream') })
> > limitStream.on('end', function() { console.log('end emitted from
> > limitStream') })
> > limitStream.on('close', function() { console.log('close emitted from
> > limitStream') })
> >
> > readStream.pipe(limitStream).pipe(process.stdout)
> >
> > When run (in node v0.10.12), this results in:
> >
> > $ node test.js
> > var s
> > tream
> > = re
> > quire
> > ('str
> > unpipe emitted from limitStream
> > end emitted from limitStream
> > Bugger. File still open!
> >
> > And the process exits. Although if the timeout was longer (for example),
> > then the file would have stayed open the whole time with the stream
> > continuing to read even though it's going nowhere (imagine it was
> > /dev/random or similar).
> >
> > So the file descriptor is left open, even though the pipe is closed -
> and no
> > events seem to be signalled on readStream at all, only on the writable
> > limitStream.
> >
> > Now in my case I'm actually developing the read stream, I'm not using a
> file
> > stream - so I certainly have more control than this example - but I
> still
> > have no idea what the read stream needs to look for or listen to to know
> to
> > stop reading from the source (ie, to know that all destinations have
> ended).
> >
> > There are a few possibilities:
> >
> > Check the _readableState property when _read is called - if pipes is
> null
> > (or pipesCount is 0) then don't continue to fetch data. Ugly, and using
> > hidden variables.
> > Override the unpipe() function and call close() if no pipes left -
> again,
> > still need some way to know how many pipes are left - could keep our own
> > count?
> > Check to see if there are any listeners left for 'readable' (and/or
> 'data'?)
> > - if none, then close(). Not sure where to do this. During _read()? Or
> > override unpipe()?
> > Override the pipe() function and add listeners to each destination to
> see
> > when they end - and hope that they do the same to all streams they pipe
> to.
> >
> > Is there a more idiomatic way to do achieve this? Is one of these
> options
> > the "nicest"?
> >
> > Or are readable streams really just supposed to keep reading and
> reading,
> > never closing (until they eventually reach the end of their source,
> assuming
> > there is an end), regardless of whether they've been piped to writable
> > streams that subsequently finish?
> >
> > This would be a pity as it would impede the ability to compose streams
> > together in a `dbReadStream.pipe(take(20))` kind of fashion.
> >
> > M
> >
> > On Friday, 28 June 2013 17:01:46 UTC+10, Michael Hart wrote:
> >>
> >> Hi all,
> >>
> >> I'm struggling with a fairly basic problem to do with limiting (ie,
> >> truncating) streams and signalling to the source readable to stop
> reading
> >> and close.
> >>
> >> Here's a simple example that hopefully illustrates what I mean:
> >>
> >> fs.createReadStream('myHugeFile')
> >> .pipe(someTransform)
> >> .pipe(someTruncatingStream)
> >> .pipe(process.stdout)
> >>
> >> Where someTruncatingStream wants to only take the first n bytes of the
> >> transformed stream, and then signal "hey, I'm done (ie, no need to keep
> >> reading the massive file)".
> >>
> >> Is there a way to do this from someTruncatingStream without it having
> >> direct access to the source read stream? ie, some way to signal up the
> pipe
> >> to stop reading and close everything down (ie, not just pause)?
> >>
> >> In my case it's actually not a file stream that I want to do this for -
> I
> >> just figured that was the easiest illustration. Mine is an object
> stream I
> >> want to create in front of a DB which is being queried a page at a time
> - I
> >> want to expose a stream for this that can continue paging behind the
> scenes
> >> and spitting out data, but that knows to stop paging when the consumers
> have
> >> consumed all the data they want. ie:
> >>
> >> db.createReadStream({some:
> >>
> query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)
>
>
> >>
> >> I could use another paradigm for this, a lazy collection for example -
> but
> >> it seems to me that streams should support this and are more of a
> lingua
> >> franca to expose from a module API (and indeed can be transformed into
> lazy
> >> collections themselves, a la lazy.js/node-lazy)
> >>
> >> Cheers,
> >>
> >> Michael
> >
> > --
> > --
> > Job Board: http://jobs.nodejs.org/
> > Posting guidelines:
> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> > You received this message because you are subscribed to the Google
> > Groups "nodejs" group.
> > To post to this group, send email to [email protected]<javascript:>
> > To unsubscribe from this group, send email to
> > [email protected] <javascript:>
> > For more options, visit this group at
> > http://groups.google.com/group/nodejs?hl=en?hl=en
> >
> > ---
> > You received this message because you are subscribed to the Google
> Groups
> > "nodejs" group.
> > To unsubscribe from this group and stop receiving emails from it, send
> an
> > email to [email protected] <javascript:>.
> > For more options, visit https://groups.google.com/groups/opt_out.
> >
> >
>
--
--
Job Board: http://jobs.nodejs.org/
Posting guidelines:
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en
---
You received this message because you are subscribed to the Google Groups
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.