You want to create a Readable stream from a push source.
The general strategy is to create a stream, set _read to the function that
does the resume() logic.
Listen to the push source and call push() whenever you have data.
When push() returns false you should do the pause() logic.
In the UDP case you can either just not implement pause / resume (i.e. no
backpressure, probably a bad idea)
or you can start dropping messages / sending backpressure messages down the
socket.
```js
var Readable = require("stream").Readable
function UDPListener(socket) {
var stream = Readable({ objectMode: true })
var remainder = ""
Readable._read = function () { /* resume */ }
socket.on("message", function (chunk) {
var message = remainder + String(chunk)
var parts = message.split("\n")
remainder = parts.pop()
parts.forEach(function (str) {
var obj = JSON.parse(str)
var writeMore = stream.push(obj)
/* if (!writeMore) { pause() } */
})
})
}
```
Implementing this using prototypical inheritance & semicolons is an
exercise for the user
On Thu, Aug 15, 2013 at 7:25 PM, Pedro Teixeira <[email protected]>wrote:
> The way I do it is by implementing a no-op _read and pushing whenever I
> need to.
>
> --
> Pedro
>
> On Friday, August 16, 2013 at 3:03 AM, Brian Lalor wrote:
>
> I'm trying to implement a Readable stream that emits objects parsed from
> UDP datagrams. I've been looking for some good stream tutorials, but
> nothing's really clicking for me. I think this is compounded by the fact
> that the stream documentation on nodejs.org says that Readable.push() and
> Writable._write() both accept Buffers or strings, not referencing the
> objets that are implied by the presence of the objectMode option.
>
> I'm going to be listening on a UDP socket and I want to emit one or more
> objects per dgram message; the message is newline delimited and could have
> multiple events per message. I then want to use multiple Writable streams
> to distribute the received objects. I will not be polling for data,
> obviously; there's no read() method appropriate for a datagram. At a high
> level, I want to be able to do the following:
> var udpListener = new UDPListener(somePort);
> var frameHandler = new FrameHandler();
> var destination = fs.createWritableStream("targetFile.txt");
>
> udpListener.pipe(frameHandler).pipe(destination);
>
> udpListener should emit objects, frameHandler should be a transformer that
> modifies the object, and destination should accept a string or buffer to be
> written out to a file or over the network to another service. Under the
> hood, UDPListener will create a dgram socket, split received message by
> newline, turn the message into an object, and then send it downstream.
>
> The way the documentation for *_read* reads, it implies that I need to
> maintain my own buffer of data received on the UDP socket, which would then
> be push()'ed when _read is invoked. I would have thought that I'd just be
> able to arbitrarily push() new data, but push()'ing an object results in
> "[Object object]" being received at the next phase of the pipeline.
>
> Is an invocation _read() supposed to result in one or more calls to push()
> by the implementing class? How do I push() an object?
>
> Thanks,
> Brian
>
> --
> Brian Lalor
> [email protected]
> http://github.com/blalor
>
> --
> --
> Job Board: http://jobs.nodejs.org/
> Posting guidelines:
> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> You received this message because you are subscribed to the Google
> Groups "nodejs" group.
> To post to this group, send email to [email protected]
> To unsubscribe from this group, send email to
> [email protected]
> For more options, visit this group at
> http://groups.google.com/group/nodejs?hl=en?hl=en
>
> ---
> You received this message because you are subscribed to the Google Groups
> "nodejs" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/groups/opt_out.
>
>
> --
> --
> Job Board: http://jobs.nodejs.org/
> Posting guidelines:
> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> You received this message because you are subscribed to the Google
> Groups "nodejs" group.
> To post to this group, send email to [email protected]
> To unsubscribe from this group, send email to
> [email protected]
> For more options, visit this group at
> http://groups.google.com/group/nodejs?hl=en?hl=en
>
> ---
> You received this message because you are subscribed to the Google Groups
> "nodejs" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/groups/opt_out.
>
--
--
Job Board: http://jobs.nodejs.org/
Posting guidelines:
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en
---
You received this message because you are subscribed to the Google Groups
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.