Hi,
right now, writing to a Beam "Sink" is only supported for bounded streams,
as you discovered. An unbounded stream cannot be transformed to a bounded
stream using a window, this will just "chunk" the stream differently but it
will still be unbounded.

The options you have right now for writing are to write to your external
datastore using a DoFn, using KafkaIO to write to a Kafka topic or to use
UnboundedFlinkSink to wrap a Flink Sink for use in a Beam pipeline. The
latter would allow you to use, for example, BucketingSink or RollingSink
from Flink. I'm only mentioning UnboundedFlinkSink for completeness, I
would not recommend using it since your program will only work on the Flink
runner. The way to go, IMHO, would be to write to Kafka and then take the
data from there and ship it to some final location such as HDFS.

Cheers,
Aljoscha

On Sun, 18 Sep 2016 at 23:17 Emanuele Cesena <emanu...@shopkick.com> wrote:

> Thanks I’ll look into it, even if it’s not really the feature I need
> (exactly because it will stop execution).
>
>
> > On Sep 18, 2016, at 2:11 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
> >
> > Hi Emanuele
> >
> > KafkaIO  supports withMaxNumRecords(X) support which will create a
> bounded source from Kafka.  However, the pipeline will finish once X number
> of records are read.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Sun, Sep 18, 2016 at 2:00 PM, Emanuele Cesena <emanu...@shopkick.com>
> wrote:
> > Hi,
> >
> > Thanks for the hint - I’ll debug better but I thought I did that:
> >
> https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140
> >
> > Best,
> >
> >
> > > On Sep 18, 2016, at 1:54 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> > >
> > > Hi Emanuele
> > >
> > > You have to use a window to create a bounded collection from an
> unbounded source.
> > >
> > > Regards
> > > JB
> > >
> > > On Sep 18, 2016, at 21:04, Emanuele Cesena <emanu...@shopkick.com>
> wrote:
> > > Hi,
> > >
> > > I wrote a while ago about a simple example I was building to test
> KafkaIO:
> > >
> https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java
> > >
> > > Issues with Flink should be fixed now, and I’m try to run the example
> on master and Flink 1.1.2.
> > > I’m currently getting:
> > > Caused by: java.lang.IllegalArgumentException: Write can only be
> applied to a Bounded PCollection
> > >
> > > What is the recommended way to go here?
> > > - is there a way to create a bounded collection from an unbounded one?
> > > - is there a plat to let TextIO write unbounded collections?
> > > - is there another recommended “simple sink” to use?
> > >
> > > Thank you much!
> > >
> > > Best,
> >
> > --
> > Emanuele Cesena, Data Eng.
> > http://www.shopkick.com
> >
> > Il corpo non ha ideali
> >
> >
> >
> >
> >
>
> --
> Emanuele Cesena, Data Eng.
> http://www.shopkick.com
>
> Il corpo non ha ideali
>
>
>
>
>

Reply via email to