In a convo with @isaacs on Twitter, the example of EPIPE in Unix pipes was
brought up - if I understand correctly, this is exactly the mechanism I'm
looking for in node.js streams.
So when you do `cat myfile | grep 'hello' | head -5` - EPIPE will be thrown
if there are more than 5 matches and `cat` will shut the file descriptor
and close down gracefully (correct me if I'm wrong here - I'm not 100% sure
on this).
So `cat` is the readable stream in our case - and it needs to know when to
stop reading and close gracefully - whether it be as a result of a
downstream write error, or whatever.
If `myReadableStream` is the equivalent of `cat`, and the truncating `take`
stream is the equivalent of `head` (ignore process.stdout here as it'll
probably confuse things given it *can* actually throw EPIPE) - then what
can `myReadableStream` listen for to know to close gracefully? (keep in
mind that `myReadableStream` won't have a direct reference to `take`, just
as `cat` doesn't to `head` either)
Does that make things a bit clearer?
On Wednesday, 3 July 2013 16:04:24 UTC+10, Michael Hart wrote:
>
> Ah, so I might not have been clear enough with the way I phrased it - I
> didn't actually want to know how to write the *truncating* stream (although
> thank you!) - I wanted to know how to write the *read* stream.
>
> If I'm writing a module that exposes a read stream that is reading from a
> database - and that is piped into a truncating stream of some kind - how
> can my readable stream know to stop reading from the database? I don't want
> it to keep reading the entire database (and I might not know this at the
> time I created the read stream).
>
> Imagine a grep'ing transform of some sort - that will preclude you from
> knowing how much you want to read up front - and then imagine you only want
> the first 5 matches.
>
> So:
>
> myReadStream.pipe(grep('hello')).pipe(take(5)).pipe(process.stdout)
>
> If myReadStream is coming from a huge file, or from paging calls to a DB
> (in my case), then it will just continue to read and read and read...
>
> Unless in my app code I specifically listen for the `take` stream to end
> and then call `close` on myReadStream, or something like that. But that's a
> lot of work for the module consumer to write. It stops the
> pipe().pipe().pipe() construct from being so useful.
>
> So is there some way *for the read stream itself* to know if it can stop
> reading? Or at least some idiomatic way to achieve it (keeping track of how
> many pipes have been attached/unattached, etc)
>
> On Wednesday, 3 July 2013 15:13:08 UTC+10, Isaac Schlueter wrote:
>>
>> Here's one way to do it: https://github.com/isaacs/truncating-stream
>>
>> npm install truncating-stream
>>
>> var Trunc = require('truncating-stream')
>> var t = new Trunc({ limit: 100 })
>>
>> Now no matter how much you t.write() only 100 bytes will come out.
>>
>> Of course, the thing you're piping your reader INTO won't be able to
>> close the reader. That'd be way too intimate. It's up to you to
>> decide when to do that.
>>
>> But, you can listen for the finish of the truncating stream and
>> forcibly destroy the reader if you want to.
>>
>> reader.pipe(t)
>> t.on('finish', function() {
>> reader.destroy()
>> })
>>
>>
>>
>>
>> On Mon, Jul 1, 2013 at 2:54 AM, Michael Hart <[email protected]>
>> wrote:
>> > As an update to this (I realise nodeconf has been on... pity I missed
>> it),
>> > I've tried a few things and I still can't figure out the idiomatic way
>> to
>> > deal with this situation. Here's an example:
>> >
>> > var stream = require('stream'),
>> > fs = require('fs')
>> >
>> > var readStream = fs.createReadStream(__filename, {highWaterMark: 5}),
>> > limitStream = new stream.Transform(),
>> > limit = 5
>> >
>> > // Just an example of a write stream that only wants a certain amount
>> of
>> > data
>> > limitStream._transform = function(chunk, encoding, cb) {
>> > if (--limit >= 0)
>> > return cb(null, chunk + '\n')
>> >
>> > // All we want is 5 chunks, then we want to close
>> > this.end()
>> > // Wait a while and check if the file stream has closed
>> > setTimeout(checkFd, 1000)
>> > cb()
>> > }
>> >
>> > // fd is set to null when the file is closed
>> > function checkFd() {
>> > console.log(readStream.fd != null ? 'Bugger. File still open!' : 'All
>> > good')
>> > }
>> >
>> > readStream.on('unpipe', function() { console.log('unpipe emitted from
>> > readStream') })
>> > readStream.on('end', function() { console.log('end emitted from
>> readStream')
>> > })
>> > readStream.on('close', function() { console.log('close emitted from
>> > readStream') })
>> >
>> > limitStream.on('unpipe', function() { console.log('unpipe emitted from
>> > limitStream') })
>> > limitStream.on('end', function() { console.log('end emitted from
>> > limitStream') })
>> > limitStream.on('close', function() { console.log('close emitted from
>> > limitStream') })
>> >
>> > readStream.pipe(limitStream).pipe(process.stdout)
>> >
>> > When run (in node v0.10.12), this results in:
>> >
>> > $ node test.js
>> > var s
>> > tream
>> > = re
>> > quire
>> > ('str
>> > unpipe emitted from limitStream
>> > end emitted from limitStream
>> > Bugger. File still open!
>> >
>> > And the process exits. Although if the timeout was longer (for
>> example),
>> > then the file would have stayed open the whole time with the stream
>> > continuing to read even though it's going nowhere (imagine it was
>> > /dev/random or similar).
>> >
>> > So the file descriptor is left open, even though the pipe is closed -
>> and no
>> > events seem to be signalled on readStream at all, only on the writable
>> > limitStream.
>> >
>> > Now in my case I'm actually developing the read stream, I'm not using a
>> file
>> > stream - so I certainly have more control than this example - but I
>> still
>> > have no idea what the read stream needs to look for or listen to to
>> know to
>> > stop reading from the source (ie, to know that all destinations have
>> ended).
>> >
>> > There are a few possibilities:
>> >
>> > Check the _readableState property when _read is called - if pipes is
>> null
>> > (or pipesCount is 0) then don't continue to fetch data. Ugly, and using
>> > hidden variables.
>> > Override the unpipe() function and call close() if no pipes left -
>> again,
>> > still need some way to know how many pipes are left - could keep our
>> own
>> > count?
>> > Check to see if there are any listeners left for 'readable' (and/or
>> 'data'?)
>> > - if none, then close(). Not sure where to do this. During _read()? Or
>> > override unpipe()?
>> > Override the pipe() function and add listeners to each destination to
>> see
>> > when they end - and hope that they do the same to all streams they pipe
>> to.
>> >
>> > Is there a more idiomatic way to do achieve this? Is one of these
>> options
>> > the "nicest"?
>> >
>> > Or are readable streams really just supposed to keep reading and
>> reading,
>> > never closing (until they eventually reach the end of their source,
>> assuming
>> > there is an end), regardless of whether they've been piped to writable
>> > streams that subsequently finish?
>> >
>> > This would be a pity as it would impede the ability to compose streams
>> > together in a `dbReadStream.pipe(take(20))` kind of fashion.
>> >
>> > M
>> >
>> > On Friday, 28 June 2013 17:01:46 UTC+10, Michael Hart wrote:
>> >>
>> >> Hi all,
>> >>
>> >> I'm struggling with a fairly basic problem to do with limiting (ie,
>> >> truncating) streams and signalling to the source readable to stop
>> reading
>> >> and close.
>> >>
>> >> Here's a simple example that hopefully illustrates what I mean:
>> >>
>> >> fs.createReadStream('myHugeFile')
>> >> .pipe(someTransform)
>> >> .pipe(someTruncatingStream)
>> >> .pipe(process.stdout)
>> >>
>> >> Where someTruncatingStream wants to only take the first n bytes of the
>> >> transformed stream, and then signal "hey, I'm done (ie, no need to
>> keep
>> >> reading the massive file)".
>> >>
>> >> Is there a way to do this from someTruncatingStream without it having
>> >> direct access to the source read stream? ie, some way to signal up the
>> pipe
>> >> to stop reading and close everything down (ie, not just pause)?
>> >>
>> >> In my case it's actually not a file stream that I want to do this for
>> - I
>> >> just figured that was the easiest illustration. Mine is an object
>> stream I
>> >> want to create in front of a DB which is being queried a page at a
>> time - I
>> >> want to expose a stream for this that can continue paging behind the
>> scenes
>> >> and spitting out data, but that knows to stop paging when the
>> consumers have
>> >> consumed all the data they want. ie:
>> >>
>> >> db.createReadStream({some:
>> >>
>> query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)
>>
>>
>> >>
>> >> I could use another paradigm for this, a lazy collection for example -
>> but
>> >> it seems to me that streams should support this and are more of a
>> lingua
>> >> franca to expose from a module API (and indeed can be transformed into
>> lazy
>> >> collections themselves, a la lazy.js/node-lazy)
>> >>
>> >> Cheers,
>> >>
>> >> Michael
>> >
>> > --
>> > --
>> > Job Board: http://jobs.nodejs.org/
>> > Posting guidelines:
>> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
>> > You received this message because you are subscribed to the Google
>> > Groups "nodejs" group.
>> > To post to this group, send email to [email protected]
>> > To unsubscribe from this group, send email to
>> > [email protected]
>> > For more options, visit this group at
>> > http://groups.google.com/group/nodejs?hl=en?hl=en
>> >
>> > ---
>> > You received this message because you are subscribed to the Google
>> Groups
>> > "nodejs" group.
>> > To unsubscribe from this group and stop receiving emails from it, send
>> an
>> > email to [email protected].
>> > For more options, visit https://groups.google.com/groups/opt_out.
>> >
>> >
>>
>
--
--
Job Board: http://jobs.nodejs.org/
Posting guidelines:
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en
---
You received this message because you are subscribed to the Google Groups
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.