This is a terrible idea for a default behavior. It will generally
result in lost data. However, an easy readable class for your closing
use case might look like this:
util.inherits(AutoClose, stream.Readable);
function AutoClose(options) {
stream.Readable.call(this, options);
var pipes = 0;
this.on('pipe', function(dest) {
pipes++;
});
this.on('unpipe', function(dest) {
pipes--;
if (pipes === 0)
this.destroy();
});
}
// don't implement _read. This is still an abstract class
// extend it in actual implementations
On Wed, Jul 3, 2013 at 1:38 AM, Gil Pedersen <[email protected]> wrote:
> The difference, in this case, between stream pipes and unix pipes, is that
> unix pipes are fixed once they are set up. Stream pipes are dynamic and when
> all streams are unpiped, it simply means that there are currently no
> receivers to pipe data to, thus "pausing" the stream. It is quite possible
> to later attach another pipe, or consume it using the normal API.
>
> What you want would require the readable to detect that no one is listening,
> assume no one will use it the future, and close the resource. This would
> probably make a nice base feature for Readable, perhaps exposing it behind
> an autoClose option?
>
> On 03/07/2013, at 08.35, Michael Hart <[email protected]> wrote:
>
> In a convo with @isaacs on Twitter, the example of EPIPE in Unix pipes was
> brought up - if I understand correctly, this is exactly the mechanism I'm
> looking for in node.js streams.
>
> So when you do `cat myfile | grep 'hello' | head -5` - EPIPE will be thrown
> if there are more than 5 matches and `cat` will shut the file descriptor and
> close down gracefully (correct me if I'm wrong here - I'm not 100% sure on
> this).
>
> So `cat` is the readable stream in our case - and it needs to know when to
> stop reading and close gracefully - whether it be as a result of a
> downstream write error, or whatever.
>
> If `myReadableStream` is the equivalent of `cat`, and the truncating `take`
> stream is the equivalent of `head` (ignore process.stdout here as it'll
> probably confuse things given it *can* actually throw EPIPE) - then what can
> `myReadableStream` listen for to know to close gracefully? (keep in mind
> that `myReadableStream` won't have a direct reference to `take`, just as
> `cat` doesn't to `head` either)
>
> Does that make things a bit clearer?
>
> On Wednesday, 3 July 2013 16:04:24 UTC+10, Michael Hart wrote:
>>
>> Ah, so I might not have been clear enough with the way I phrased it - I
>> didn't actually want to know how to write the *truncating* stream (although
>> thank you!) - I wanted to know how to write the *read* stream.
>>
>> If I'm writing a module that exposes a read stream that is reading from a
>> database - and that is piped into a truncating stream of some kind - how can
>> my readable stream know to stop reading from the database? I don't want it
>> to keep reading the entire database (and I might not know this at the time I
>> created the read stream).
>>
>> Imagine a grep'ing transform of some sort - that will preclude you from
>> knowing how much you want to read up front - and then imagine you only want
>> the first 5 matches.
>>
>> So:
>>
>> myReadStream.pipe(grep('hello')).pipe(take(5)).pipe(process.stdout)
>>
>> If myReadStream is coming from a huge file, or from paging calls to a DB
>> (in my case), then it will just continue to read and read and read...
>>
>> Unless in my app code I specifically listen for the `take` stream to end
>> and then call `close` on myReadStream, or something like that. But that's a
>> lot of work for the module consumer to write. It stops the
>> pipe().pipe().pipe() construct from being so useful.
>>
>> So is there some way *for the read stream itself* to know if it can stop
>> reading? Or at least some idiomatic way to achieve it (keeping track of how
>> many pipes have been attached/unattached, etc)
>>
>> On Wednesday, 3 July 2013 15:13:08 UTC+10, Isaac Schlueter wrote:
>>>
>>> Here's one way to do it: https://github.com/isaacs/truncating-stream
>>>
>>> npm install truncating-stream
>>>
>>> var Trunc = require('truncating-stream')
>>> var t = new Trunc({ limit: 100 })
>>>
>>> Now no matter how much you t.write() only 100 bytes will come out.
>>>
>>> Of course, the thing you're piping your reader INTO won't be able to
>>> close the reader. That'd be way too intimate. It's up to you to
>>> decide when to do that.
>>>
>>> But, you can listen for the finish of the truncating stream and
>>> forcibly destroy the reader if you want to.
>>>
>>> reader.pipe(t)
>>> t.on('finish', function() {
>>> reader.destroy()
>>> })
>>>
>>>
>>>
>>>
>>> On Mon, Jul 1, 2013 at 2:54 AM, Michael Hart <[email protected]>
>>> wrote:
>>> > As an update to this (I realise nodeconf has been on... pity I missed
>>> > it),
>>> > I've tried a few things and I still can't figure out the idiomatic way
>>> > to
>>> > deal with this situation. Here's an example:
>>> >
>>> > var stream = require('stream'),
>>> > fs = require('fs')
>>> >
>>> > var readStream = fs.createReadStream(__filename, {highWaterMark: 5}),
>>> > limitStream = new stream.Transform(),
>>> > limit = 5
>>> >
>>> > // Just an example of a write stream that only wants a certain amount
>>> > of
>>> > data
>>> > limitStream._transform = function(chunk, encoding, cb) {
>>> > if (--limit >= 0)
>>> > return cb(null, chunk + '\n')
>>> >
>>> > // All we want is 5 chunks, then we want to close
>>> > this.end()
>>> > // Wait a while and check if the file stream has closed
>>> > setTimeout(checkFd, 1000)
>>> > cb()
>>> > }
>>> >
>>> > // fd is set to null when the file is closed
>>> > function checkFd() {
>>> > console.log(readStream.fd != null ? 'Bugger. File still open!' : 'All
>>> > good')
>>> > }
>>> >
>>> > readStream.on('unpipe', function() { console.log('unpipe emitted from
>>> > readStream') })
>>> > readStream.on('end', function() { console.log('end emitted from
>>> > readStream')
>>> > })
>>> > readStream.on('close', function() { console.log('close emitted from
>>> > readStream') })
>>> >
>>> > limitStream.on('unpipe', function() { console.log('unpipe emitted from
>>> > limitStream') })
>>> > limitStream.on('end', function() { console.log('end emitted from
>>> > limitStream') })
>>> > limitStream.on('close', function() { console.log('close emitted from
>>> > limitStream') })
>>> >
>>> > readStream.pipe(limitStream).pipe(process.stdout)
>>> >
>>> > When run (in node v0.10.12), this results in:
>>> >
>>> > $ node test.js
>>> > var s
>>> > tream
>>> > = re
>>> > quire
>>> > ('str
>>> > unpipe emitted from limitStream
>>> > end emitted from limitStream
>>> > Bugger. File still open!
>>> >
>>> > And the process exits. Although if the timeout was longer (for
>>> > example),
>>> > then the file would have stayed open the whole time with the stream
>>> > continuing to read even though it's going nowhere (imagine it was
>>> > /dev/random or similar).
>>> >
>>> > So the file descriptor is left open, even though the pipe is closed -
>>> > and no
>>> > events seem to be signalled on readStream at all, only on the writable
>>> > limitStream.
>>> >
>>> > Now in my case I'm actually developing the read stream, I'm not using a
>>> > file
>>> > stream - so I certainly have more control than this example - but I
>>> > still
>>> > have no idea what the read stream needs to look for or listen to to
>>> > know to
>>> > stop reading from the source (ie, to know that all destinations have
>>> > ended).
>>> >
>>> > There are a few possibilities:
>>> >
>>> > Check the _readableState property when _read is called - if pipes is
>>> > null
>>> > (or pipesCount is 0) then don't continue to fetch data. Ugly, and using
>>> > hidden variables.
>>> > Override the unpipe() function and call close() if no pipes left -
>>> > again,
>>> > still need some way to know how many pipes are left - could keep our
>>> > own
>>> > count?
>>> > Check to see if there are any listeners left for 'readable' (and/or
>>> > 'data'?)
>>> > - if none, then close(). Not sure where to do this. During _read()? Or
>>> > override unpipe()?
>>> > Override the pipe() function and add listeners to each destination to
>>> > see
>>> > when they end - and hope that they do the same to all streams they pipe
>>> > to.
>>> >
>>> > Is there a more idiomatic way to do achieve this? Is one of these
>>> > options
>>> > the "nicest"?
>>> >
>>> > Or are readable streams really just supposed to keep reading and
>>> > reading,
>>> > never closing (until they eventually reach the end of their source,
>>> > assuming
>>> > there is an end), regardless of whether they've been piped to writable
>>> > streams that subsequently finish?
>>> >
>>> > This would be a pity as it would impede the ability to compose streams
>>> > together in a `dbReadStream.pipe(take(20))` kind of fashion.
>>> >
>>> > M
>>> >
>>> > On Friday, 28 June 2013 17:01:46 UTC+10, Michael Hart wrote:
>>> >>
>>> >> Hi all,
>>> >>
>>> >> I'm struggling with a fairly basic problem to do with limiting (ie,
>>> >> truncating) streams and signalling to the source readable to stop
>>> >> reading
>>> >> and close.
>>> >>
>>> >> Here's a simple example that hopefully illustrates what I mean:
>>> >>
>>> >> fs.createReadStream('myHugeFile')
>>> >> .pipe(someTransform)
>>> >> .pipe(someTruncatingStream)
>>> >> .pipe(process.stdout)
>>> >>
>>> >> Where someTruncatingStream wants to only take the first n bytes of the
>>> >> transformed stream, and then signal "hey, I'm done (ie, no need to
>>> >> keep
>>> >> reading the massive file)".
>>> >>
>>> >> Is there a way to do this from someTruncatingStream without it having
>>> >> direct access to the source read stream? ie, some way to signal up the
>>> >> pipe
>>> >> to stop reading and close everything down (ie, not just pause)?
>>> >>
>>> >> In my case it's actually not a file stream that I want to do this for
>>> >> - I
>>> >> just figured that was the easiest illustration. Mine is an object
>>> >> stream I
>>> >> want to create in front of a DB which is being queried a page at a
>>> >> time - I
>>> >> want to expose a stream for this that can continue paging behind the
>>> >> scenes
>>> >> and spitting out data, but that knows to stop paging when the
>>> >> consumers have
>>> >> consumed all the data they want. ie:
>>> >>
>>> >> db.createReadStream({some:
>>> >>
>>> >> query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)
>>> >>
>>> >> I could use another paradigm for this, a lazy collection for example -
>>> >> but
>>> >> it seems to me that streams should support this and are more of a
>>> >> lingua
>>> >> franca to expose from a module API (and indeed can be transformed into
>>> >> lazy
>>> >> collections themselves, a la lazy.js/node-lazy)
>>> >>
>>> >> Cheers,
>>> >>
>>> >> Michael
>>> >
>>> > --
>>> > --
>>> > Job Board: http://jobs.nodejs.org/
>>> > Posting guidelines:
>>> > https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
>>> > You received this message because you are subscribed to the Google
>>> > Groups "nodejs" group.
>>> > To post to this group, send email to [email protected]
>>> > To unsubscribe from this group, send email to
>>> > [email protected]
>>> > For more options, visit this group at
>>> > http://groups.google.com/group/nodejs?hl=en?hl=en
>>> >
>>> > ---
>>> > You received this message because you are subscribed to the Google
>>> > Groups
>>> > "nodejs" group.
>>> > To unsubscribe from this group and stop receiving emails from it, send
>>> > an
>>> > email to [email protected].
>>> > For more options, visit https://groups.google.com/groups/opt_out.
>>> >
>>> >
>
>
> --
> --
> Job Board: http://jobs.nodejs.org/
> Posting guidelines:
> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> You received this message because you are subscribed to the Google
> Groups "nodejs" group.
> To post to this group, send email to [email protected]
>
> To unsubscribe from this group, send email to
> [email protected]
>
> For more options, visit this group at
> http://groups.google.com/group/nodejs?hl=en?hl=en
>
> ---
> You received this message because you are subscribed to the Google Groups
> "nodejs" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
>
> For more options, visit https://groups.google.com/groups/opt_out.
>
>
>
>
> --
> --
> Job Board: http://jobs.nodejs.org/
> Posting guidelines:
> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> You received this message because you are subscribed to the Google
> Groups "nodejs" group.
> To post to this group, send email to [email protected]
>
> To unsubscribe from this group, send email to
> [email protected]
>
> For more options, visit this group at
> http://groups.google.com/group/nodejs?hl=en?hl=en
>
> ---
> You received this message because you are subscribed to the Google Groups
> "nodejs" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
>
> For more options, visit https://groups.google.com/groups/opt_out.
>
>
--
--
Job Board: http://jobs.nodejs.org/
Posting guidelines:
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en
---
You received this message because you are subscribed to the Google Groups
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.