As an update to this (I realise nodeconf has been on... pity I missed it), 
I've tried a few things and I still can't figure out the idiomatic way to 
deal with this situation. Here's an example:

var stream = require('stream'),
    fs = require('fs')

var readStream = fs.createReadStream(__filename, {highWaterMark: 5}),
    limitStream = new stream.Transform(),
    limit = 5

// Just an example of a write stream that only wants a certain amount of 
data
limitStream._transform = function(chunk, encoding, cb) {
  if (--limit >= 0)
    return cb(null, chunk + '\n')

  // All we want is 5 chunks, then we want to close
  this.end()
  // Wait a while and check if the file stream has closed
  setTimeout(checkFd, 1000)
  cb()
}

// fd is set to null when the file is closed
function checkFd() {
  console.log(readStream.fd != null ? 'Bugger. File still open!' : 'All 
good')
}

readStream.on('unpipe', function() { console.log('unpipe emitted from 
readStream') })
readStream.on('end', function() { console.log('end emitted from readStream') 
})
readStream.on('close', function() { console.log('close emitted from 
readStream') 
})

limitStream.on('unpipe', function() { console.log('unpipe emitted from 
limitStream') 
})
limitStream.on('end', function() { console.log('end emitted from limitStream') 
})
limitStream.on('close', function() { console.log('close emitted from 
limitStream') 
})

readStream.pipe(limitStream).pipe(process.stdout)

When run (in node v0.10.12), this results in:

$ node test.js
var s
tream
 = re
quire
('str
unpipe emitted from limitStream
end emitted from limitStream
Bugger. File still open!

And the process exits. Although if the timeout was longer (for example), 
then the file would have stayed open the whole time with the stream 
continuing to read even though it's going nowhere (imagine it was 
/dev/random or similar).

So the file descriptor is left open, even though the pipe is closed - and 
no events seem to be signalled on readStream at all, only on the writable 
limitStream.

Now in my case I'm actually developing the read stream, I'm not using a 
file stream - so I certainly have more control than this example - but I 
still have no idea what the read stream needs to look for or listen to to 
know to stop reading from the source (ie, to know that all destinations 
have ended).

There are a few possibilities:


   1. Check the _readableState property when _read is called - if pipes is 
   null (or pipesCount is 0) then don't continue to fetch data. Ugly, and 
   using hidden variables.
   2. Override the unpipe() function and call close() if no pipes left - 
   again, still need some way to know how many pipes are left - could keep our 
   own count?
   3. Check to see if there are any listeners left for 'readable' (and/or 
   'data'?) - if none, then close(). Not sure where to do this. During 
   _read()? Or override unpipe()?
   4. Override the pipe() function and add listeners to each destination to 
   see when they end - and hope that they do the same to all streams they pipe 
   to.

Is there a more idiomatic way to do achieve this? Is one of these options 
the "nicest"?

Or are readable streams really just supposed to keep reading and reading, 
never closing (until they eventually reach the end of their source, 
assuming there is an end), regardless of whether they've been piped to 
writable streams that subsequently finish?

This would be a pity as it would impede the ability to compose streams 
together in a `dbReadStream.pipe(take(20))` kind of fashion.

M

On Friday, 28 June 2013 17:01:46 UTC+10, Michael Hart wrote:
>
> Hi all,
>
> I'm struggling with a fairly basic problem to do with limiting (ie, 
> truncating) streams and signalling to the source readable to stop reading 
> and close.
>
> Here's a simple example that hopefully illustrates what I mean:
>
> fs.createReadStream('myHugeFile')
>   .pipe(someTransform)
>   .pipe(someTruncatingStream)
>   .pipe(process.stdout)
>
> Where someTruncatingStream wants to only take the first n bytes of the 
> transformed stream, and then signal "hey, I'm done (ie, no need to keep 
> reading the massive file)".
>
> Is there a way to do this from someTruncatingStream without it having 
> direct access to the source read stream? ie, some way to signal up the pipe 
> to stop reading and close everything down (ie, not just pause)?
>
> In my case it's actually not a file stream that I want to do this for - I 
> just figured that was the easiest illustration. Mine is an object stream I 
> want to create in front of a DB which is being queried a page at a time - I 
> want to expose a stream for this that can continue paging behind the scenes 
> and spitting out data, but that knows to stop paging when the consumers 
> have consumed all the data they want. ie:
>
> db.createReadStream({some: 
> query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)
>
> I could use another paradigm for this, a lazy collection for example - but 
> it seems to me that streams should support this and are more of a lingua 
> franca to expose from a module API (and indeed can be transformed into lazy 
> collections themselves, a la lazy.js/node-lazy)
>
> Cheers,
>
> Michael
>

-- 
-- 
Job Board: http://jobs.nodejs.org/
Posting guidelines: 
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

--- 
You received this message because you are subscribed to the Google Groups 
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to