Runtime Windows/Aggregation Computations in Beam

2016-09-22 Thread Chawla,Sumit
Hi All

I am trying to code a solution for following scenarios.

1.  I have a stream of Tuples with multiple numeric fields (e.g. A, B, C,
D, E ... etc )
2.  I want the ability to do different Windowing and Aggregation on each
field or a group of fields in the Tuple.  e.g. Sum A over a Period of 2
minutes, Avg B over a period of 3 minutes,  Sum of C grouped by D over a
period of 15 minutes
3.  *These window requirements can be added by user at runtime*.  My
pipeline should be able to compute a new aggregation at runtime.
4.  Plan to support only simple aggregation windows like SUM, AVG, MAX,
MIN, COUNT etc.


As i understand in BEAM pipelines ( with Flink Runner), the DAG of
computations cannot be altered once the pipeline is deployed.  I am trying
to see how can i support above use case.  I would love to hear your
feedback on this, and suggestions on doing it in a completely different
way.

*My Design:*

1.   Create 1 minute buckets per Field or Group of Fields and compute basic
aggregations for bucket.  e.g.  Buckets are highlighted in Yellow here.
For each field i calculate [SUM, COUNT, MAX, MIN] in the bucket.  ( Bucket
size of 1 minute is defined for simplicity, and would limit the minimum
window size to 1 minute)

2.  Downstream process these buckets, and compute the user defined
aggregations.  Following diagram depicts Tumbling Window computations.  The
Aggregation functions in GREEN are just NATIVE functions consuming
different buckets, and doing aggregations on top of these buckets.




​
​

*P.S.*

* Some of the design choices that i have decided not to go for are:*

1.  Multiple Pipelines for doing computation.  One master pipeline does
grouping, and sends to a different topic based on user configured window
size. (e.g. topic_window_by_5_min, topic_window_by_15_min), and have a
different pipeline consume each topic.

2.  Single pipeline doing all the business with predefined Windows defined
for Downstream processing. e.g. 5, 15, 30, 60 minute windows will be
defined which will consume from different Side Inputs.  User is only
allowed only to select these Window sizes.  Upstream Group By operator will
route to the data to different Window Function based on user configuration.



Regards
Sumit Chawla


Re: Support for Flink 1.1.0 in release-0.2.0-incubating

2016-09-18 Thread Chawla,Sumit
Hi Max

Thanks for the information. I agree with you that 0.3.0 is the way ahead,
but i am hesitant to use 0.3.0-SNAPSHOT due to its changing nature.

Regards
Sumit Chawla


On Fri, Sep 16, 2016 at 5:51 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Sumit,
>
> Thanks for the PR. Your changes looks good. I think there are
> currently no plans for a minor release 0.2.1-incubating. A lot of
> issues were fixed on the latest master which should give you a better
> experience than the 0.2.0-incubating release.
>
> These are the current issues which will be fixed in 0.3.0-incubating:
> https://issues.apache.org/jira/browse/BEAM-102?jql=
> project%20%3D%20BEAM%20AND%20fixVersion%20%3D%200.3.0-
> incubating%20AND%20component%20%3D%20runner-flink
>
> 1. Most notably, it fixes an issue with comparing keys unencoded in
> streaming mode which was an issue if you had not implemented
> equals/hashCode for your objects.
>
> 2. Further, we support side inputs in streaming now. In the course, we
> have also unified execution paths in streaming mode.
>
> 3. There was an issue with checkpointed sources that has been resolved.
>
> If you could try out the latest version, that would be great. If not,
> we can probably merge your PR and think about a minor release.
>
> Best,
> Max
>
> On Fri, Sep 16, 2016 at 6:10 AM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
> > Hi Max
> >
> > I have opened a PR - https://github.com/apache/incubator-beam/pull/963
> for
> > adding support of Flink 1.1.2 in Beam 0.2.0 release.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Sep 14, 2016 at 1:32 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> >
> >> Hi Max
> >>
> >> I was able to compile 0.2.0 with Flink 1.1.0 with small modification,
> and
> >> run a simple pipeline.
> >>
> >>@Override
> >> -  public void restoreState(StreamTaskState taskState, long
> >> recoveryTimestamp) throws Exception {
> >> -super.restoreState(taskState, recoveryTimestamp);
> >> +  public void restoreState(StreamTaskState taskState) throws Exception
> {
> >> +super.restoreState(taskState);
> >>
> >>
> >> Can i get a sense of the changes that have happened in 0.3.0 for
> Flink?  I
> >> observed some classes completely reworked.  It will be crucial for me to
> >> understand the scope of change and impact before making a move to 0.3.0
> >>
> >>
> >>
> >> Regards
> >> Sumit Chawla
> >>
> >>
> >> On Wed, Sep 14, 2016 at 3:03 AM, Maximilian Michels <m...@apache.org>
> >> wrote:
> >>
> >>> We support Flink 1.1.2 on the latest snapshot version
> >>> 0.3.0-incubating-SNAPSHOT. Would it be possible for you to work with
> >>> this version?
> >>>
> >>> On Tue, Sep 13, 2016 at 11:55 PM, Chawla,Sumit <sumitkcha...@gmail.com
> >
> >>> wrote:
> >>> > When trying to use Beam 0.2.0 with Flink 1.1.0 jar, i am seeing
> >>> following
> >>> > error:
> >>> >
> >>> > java.lang.NoSuchMethodError:
> >>> > org.apache.flink.streaming.api.operators.StreamingRuntimeCon
> >>> text.registerTimer(JLorg/apache/flink/streaming/
> >>> runtime/operators/Triggerable;)V
> >>> > at org.apache.beam.runners.flink.
> translation.wrappers.streaming
> >>> .io.UnboundedSourceWrapper.setNextWatermarkTimer(UnboundedSo
> >>> urceWrapper.java:381)
> >>> > at org.apache.beam.runners.flink.
> translation.wrappers.streaming
> >>> .io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:233)
> >>> > at org.apache.flink.streaming.api.operators.StreamSource.
> run(
> >>> StreamSource.java:80)
> >>> > at org.apache.flink.streaming.api.operators.StreamSource.
> run(
> >>> StreamSource.java:53)
> >>> > at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask.
> >>> run(SourceStreamTask.java:56)
> >>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(
> >>> StreamTask.java:266)
> >>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:584)
> >>> > at java.lang.Thread.run(Thread.java:745)
> >>> >
> >>> >
> >>> > Regards
> >>> > Sumit Chawla
> >>> >
> >>> >
> >>> > On Tue, Sep 13, 2016 at 2:20 PM, Chawla,Sumit <
> sumitkcha...@gmail.com>
> >>> > wrote:
> >>> >
> >>> >> Hi All
> >>> >>
> >>> >> The release-0.2.0-incubating supports Flink 1.0.3. With Flink 1.1.0
> >>> out,
> >>> >> is there a plan to support it with any 0.2.0 patch? I tried
> compiling
> >>> 0.2.0
> >>> >> with Flink 1.1.0,
> >>> >> and got couple of compliation errors in
> FlinkGroupAlsoByWindowWrapper.
> >>> java.
> >>> >> Going back to master i see lots of change in Flink translation
> >>> wrappers,
> >>> >> and
> >>> >> FlinkGroupAlsoByWindowWrapper.java being removed.
> >>> >>
> >>> >> Just want to get a sense of things here, on what would it take to
> >>> support Flink
> >>> >> 1.1.0 with release-0.2.0. Would appreciate views of people who are
> >>> already
> >>> >> working on upgrading it to Flink 1.1.0
> >>> >>
> >>> >> Regards
> >>> >> Sumit Chawla
> >>> >>
> >>> >>
> >>>
> >>
> >>
>


Re: Support for Flink 1.1.0 in release-0.2.0-incubating

2016-09-14 Thread Chawla,Sumit
Hi Max

I was able to compile 0.2.0 with Flink 1.1.0 with small modification, and
run a simple pipeline.

   @Override
-  public void restoreState(StreamTaskState taskState, long
recoveryTimestamp) throws Exception {
-super.restoreState(taskState, recoveryTimestamp);
+  public void restoreState(StreamTaskState taskState) throws Exception {
+super.restoreState(taskState);


Can i get a sense of the changes that have happened in 0.3.0 for Flink?  I
observed some classes completely reworked.  It will be crucial for me to
understand the scope of change and impact before making a move to 0.3.0



Regards
Sumit Chawla


On Wed, Sep 14, 2016 at 3:03 AM, Maximilian Michels <m...@apache.org> wrote:

> We support Flink 1.1.2 on the latest snapshot version
> 0.3.0-incubating-SNAPSHOT. Would it be possible for you to work with
> this version?
>
> On Tue, Sep 13, 2016 at 11:55 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
> > When trying to use Beam 0.2.0 with Flink 1.1.0 jar, i am seeing following
> > error:
> >
> > java.lang.NoSuchMethodError:
> > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.
> registerTimer(JLorg/apache/flink/streaming/runtime/
> operators/Triggerable;)V
> > at org.apache.beam.runners.flink.translation.wrappers.
> streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(
> UnboundedSourceWrapper.java:381)
> > at org.apache.beam.runners.flink.translation.wrappers.
> streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:233)
> > at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:80)
> > at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:53)
> > at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask.run(SourceStreamTask.java:56)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Tue, Sep 13, 2016 at 2:20 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> >
> >> Hi All
> >>
> >> The release-0.2.0-incubating supports Flink 1.0.3. With Flink 1.1.0 out,
> >> is there a plan to support it with any 0.2.0 patch? I tried compiling
> 0.2.0
> >> with Flink 1.1.0,
> >> and got couple of compliation errors in FlinkGroupAlsoByWindowWrapper.
> java.
> >> Going back to master i see lots of change in Flink translation wrappers,
> >> and
> >> FlinkGroupAlsoByWindowWrapper.java being removed.
> >>
> >> Just want to get a sense of things here, on what would it take to
> support Flink
> >> 1.1.0 with release-0.2.0. Would appreciate views of people who are
> already
> >> working on upgrading it to Flink 1.1.0
> >>
> >> Regards
> >> Sumit Chawla
> >>
> >>
>


Support for Flink 1.1.0 in release-0.2.0-incubating

2016-09-13 Thread Chawla,Sumit
Hi All

The release-0.2.0-incubating supports Flink 1.0.3. With Flink 1.1.0 out, is
there a plan to support it with any 0.2.0 patch? I tried compiling 0.2.0
with Flink 1.1.0,
and got couple of compliation errors in
FlinkGroupAlsoByWindowWrapper.java. Going
back to master i see lots of change in Flink translation wrappers, and
FlinkGroupAlsoByWindowWrapper.java being removed.

Just want to get a sense of things here, on what would it take to support Flink
1.1.0 with release-0.2.0. Would appreciate views of people who are already
working on upgrading it to Flink 1.1.0

Regards
Sumit Chawla


Re: KafkaIO Windowing Fn

2016-09-01 Thread Chawla,Sumit
Thanks Ajioscha\Thomas

I will explore on the option to upgrade.  Meanwhile here is what observed
with the above code in my local Flink Cluster.

1.  To start there are 0 records in Kafka
2.  Deploy the pipeline.  Two records are received in Kafka at time
10:00:00 AM
3.  The Pane with 100 records would not fire because expected data is not
there.  I would expect the 30 sec based filter to fire and downstream to
receive the record around 10:00:30 AM.

4.  No new records are arriving.  The downstream received the above record
around 10 minutes later around 10:10:00 AM

I am not sure whats actually triggering the window firing here.  ( does not
look like to be 30 sec trigger)



Regards
Sumit Chawla


On Wed, Aug 31, 2016 at 11:14 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Ah I see, the Flink Runner had quite some updates in 0.2.0-incubating and
> even more for the upcoming 0.3.0-incubating.
>
> On Thu, 1 Sep 2016 at 04:09 Thomas Groh <tg...@google.com.invalid> wrote:
>
> > In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner
> with
> > the DirectRunner (formerly InProcessPipelineRunner), which is capable of
> > handling Unbounded Pipelines. Is it possible for you to upgrade?
> >
> > On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> >
> > > @Ajioscha,  My assumption is here that atleast one trigger should fire.
> > > Either the 100 elements or the 30 second since first element.
> (whichever
> > > happens first)
> > >
> > > @Thomas - here is the error i get: I am using 0.1.0-incubating
> > >
> > > *ava.lang.IllegalStateException: no evaluator registered for
> > > Read(UnboundedKafkaSource)*
> > >
> > > * at
> > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.
> > > visitPrimitiveTransform(DirectPipelineRunner.java:890)*
> > > * at
> > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > TransformTreeNode.java:225)*
> > > * at
> > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > TransformTreeNode.java:220)*
> > > * at
> > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > TransformTreeNode.java:220)*
> > > * a*
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <
> aljos...@apache.org>
> > > wrote:
> > >
> > > > Hi,
> > > > could the reason for the second part of the trigger never firing be
> > that
> > > > there are never at least 100 elements per key. The trigger would only
> > > fire
> > > > if it saw 100 elements and with only 540 elements that seems unlikely
> > if
> > > > you have more than 6 keys.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid>
> > > wrote:
> > > >
> > > > > KafkaIO is implemented using the UnboundedRead API, which is
> > supported
> > > by
> > > > > the DirectRunner. You should be able to run without the
> > > > withMaxNumRecords;
> > > > > if you can't, I'd be very interested to see the stack trace that
> you
> > > get
> > > > > when you try to run the Pipeline.
> > > > >
> > > > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <
> > sumitkcha...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Yes.  I added it only for DirectRunner as it cannot translate
> > > > > > Read(UnboundedSourceOfKafka)
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> > > > aljos...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Ah ok, this might be a stupid question but did you remove this
> > line
> > > > > when
> > > > > > > running it with Flink:
> > > > > > >     .withMaxNumRecords(500)
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <
> > sumitkcha...@gmail.com>
> > > &g

Re: KafkaIO Windowing Fn

2016-08-31 Thread Chawla,Sumit
@Ajioscha,  My assumption is here that atleast one trigger should fire.
Either the 100 elements or the 30 second since first element. (whichever
happens first)

@Thomas - here is the error i get: I am using 0.1.0-incubating

*ava.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource)*

* at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitPrimitiveTransform(DirectPipelineRunner.java:890)*
* at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)*
* at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)*
* at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)*
* a*

Regards
Sumit Chawla


On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> could the reason for the second part of the trigger never firing be that
> there are never at least 100 elements per key. The trigger would only fire
> if it saw 100 elements and with only 540 elements that seems unlikely if
> you have more than 6 keys.
>
> Cheers,
> Aljoscha
>
> On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid> wrote:
>
> > KafkaIO is implemented using the UnboundedRead API, which is supported by
> > the DirectRunner. You should be able to run without the
> withMaxNumRecords;
> > if you can't, I'd be very interested to see the stack trace that you get
> > when you try to run the Pipeline.
> >
> > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> >
> > > Yes.  I added it only for DirectRunner as it cannot translate
> > > Read(UnboundedSourceOfKafka)
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> aljos...@apache.org>
> > > wrote:
> > >
> > > > Ah ok, this might be a stupid question but did you remove this line
> > when
> > > > running it with Flink:
> > > > .withMaxNumRecords(500)
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <sumitkcha...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Aljoscha
> > > > >
> > > > > The code is not different while running on Flink.  It have removed
> > > > business
> > > > > specific transformations only.
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > aljos...@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > could you maybe also post the complete that you're using with the
> > > > > > FlinkRunner? I could have a look into it.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> sumitkcha...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Thomas
> > > > > > >
> > > > > > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > > > > > Following
> > > > > > > is the snippet i am working on, and will post more details
> once i
> > > get
> > > > > it
> > > > > > > working ( as of now i am unable to read messages from Kafka
> using
> > > > > > > DirectRunner)
> > > > > > >
> > > > > > >
> > > > > > > PipelineOptions pipelineOptions =
> > PipelineOptionsFactory.create();
> > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > .withMaxNumRecords(500)
> > > > > > > .withTopics(ImmutableList.of("mytopic"))
> > > > > > > .withBootstrapServers("localhost:9092")
> > > > > > > .updateConsumerProperties(ImmutableMap.of(
> > > > > > > ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > > > Con

Re: KafkaIO Windowing Fn

2016-08-31 Thread Chawla,Sumit
Yes.  I added it only for DirectRunner as it cannot translate
Read(UnboundedSourceOfKafka)

Regards
Sumit Chawla


On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Ah ok, this might be a stupid question but did you remove this line when
> running it with Flink:
> .withMaxNumRecords(500)
>
> Cheers,
> Aljoscha
>
> On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <sumitkcha...@gmail.com> wrote:
>
> > Hi Aljoscha
> >
> > The code is not different while running on Flink.  It have removed
> business
> > specific transformations only.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Hi,
> > > could you maybe also post the complete that you're using with the
> > > FlinkRunner? I could have a look into it.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> > >
> > > > Hi Thomas
> > > >
> > > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > > Following
> > > > is the snippet i am working on, and will post more details once i get
> > it
> > > > working ( as of now i am unable to read messages from Kafka using
> > > > DirectRunner)
> > > >
> > > >
> > > > PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
> > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > pipeline.apply(KafkaIO.read()
> > > > .withMaxNumRecords(500)
> > > > .withTopics(ImmutableList.of("mytopic"))
> > > > .withBootstrapServers("localhost:9092")
> > > > .updateConsumerProperties(ImmutableMap.of(
> > > > ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
> > > > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
> > > > ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
> > > > KV<String, String>>() {
> > > > @Override
> > > > public void processElement(ProcessContext c) throws Exception {
> > > > KV<byte[], byte[]> record = c.element().getKV();
> > > > c.output(KV.of(new String(record.getKey()), new
> > > > String(record.getValue(;
> > > > }
> > > > }))
> > > > .apply("WindowByMinute", Window.<KV<String,
> > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > .withAllowedLateness(Duration.standardSeconds(1))
> > > > .triggering(
> > > > Repeatedly.forever(
> > > > AfterFirst.of(
> > > >
> > > > AfterProcessingTime.pastFirstElementInPane()
> > > >
> > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > >
>  AfterPane.elementCountAtLeast(
> > > 100)
> > > > )))
> > > > .discardingFiredPanes())
> > > > .apply("GroupByTenant", GroupByKey.create())
> > > > .apply(ParDo.of(new DoFn<KV<String, Iterable>,
> Void>()
> > {
> > > > @Override
> > > > public void processElement(ProcessContext c) throws
> > > Exception {
> > > > KV<String, Iterable> element = c.element();
> > > > Iterator iterator =
> > > element.getValue().iterator();
> > > > int count = 0;
> > > > while (iterator.hasNext()) {
> > > > iterator.next();
> > > > count++;
> > > > }
> > > > System.out.println(String.format("Key %s Value Count
> > > > %d", element.getKey(), count));
> > > > }
> > > > }));
> > > > pipeline.run();
> > > >
> > > >
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> &

Re: KafkaIO Windowing Fn

2016-08-30 Thread Chawla,Sumit
Hi Thomas

Sorry i tried with DirectRunner but ran into some kafka issues.  Following
is the snippet i am working on, and will post more details once i get it
working ( as of now i am unable to read messages from Kafka using
DirectRunner)


PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
pipelineOptions.setRunner(DirectPipelineRunner.class);
Pipeline pipeline = Pipeline.create(pipelineOptions);
pipeline.apply(KafkaIO.read()
.withMaxNumRecords(500)
.withTopics(ImmutableList.of("mytopic"))
.withBootstrapServers("localhost:9092")
.updateConsumerProperties(ImmutableMap.of(
ConsumerConfig.GROUP_ID_CONFIG, "test1",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
KV<String, String>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
KV<byte[], byte[]> record = c.element().getKV();
c.output(KV.of(new String(record.getKey()), new
String(record.getValue(;
}
}))
.apply("WindowByMinute", Window.<KV<String,
String>>into(FixedWindows.of(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.standardSeconds(1))
.triggering(
Repeatedly.forever(
AfterFirst.of(

AfterProcessingTime.pastFirstElementInPane()

.plusDelayOf(Duration.standardSeconds(30)),
AfterPane.elementCountAtLeast(100)
)))
.discardingFiredPanes())
.apply("GroupByTenant", GroupByKey.create())
.apply(ParDo.of(new DoFn<KV<String, Iterable>, Void>() {
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String, Iterable> element = c.element();
Iterator iterator = element.getValue().iterator();
int count = 0;
while (iterator.hasNext()) {
iterator.next();
count++;
}
System.out.println(String.format("Key %s Value Count
%d", element.getKey(), count));
}
}));
pipeline.run();



Regards
Sumit Chawla


On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh <tg...@google.com.invalid>
wrote:

> If you use the DirectRunner, do you observe the same behavior?
>
> On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
> > Hi Thomas
> >
> > I am using FlinkRunner.  Yes the second part of trigger never fires for
> me,
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh <tg...@google.com.invalid>
> > wrote:
> >
> > > Hey Sumit;
> > >
> > > What runner are you using? I can set up a test with the same trigger
> > > reading from an unbounded input using the DirectRunner and I get the
> > > expected output panes.
> > >
> > > Just to clarify, the second half of the trigger ('when the first
> element
> > > has been there for at least 30+ seconds') simply never fires?
> > >
> > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > > wrote:
> > >
> > > > Hi Thomas
> > > >
> > > > That did not work.
> > > >
> > > > I tried following instead:
> > > >
> > > > .triggering(
> > > > Repeatedly.forever(
> > > > AfterFirst.of(
> > > >   AfterProcessingTime.
> > > pastFirstElementInPane()
> > > > .plusDelayOf(Duration.standard
> > > > Seconds(30)),
> > > >   AfterPane.elementCountAtLeast(100)
> > > > )))
> > > > .discardingFiredPanes()
> > > >
> > > > What i am trying to do here.  This is to make sure that followup
> > > > operations receive batches of records.
> > > >
> > > > 1.  Fire when at Pane has 100+ elements
> > > >
> > > > 2.  Or Fire when the first element has been there for atleast 30
> sec+.
> > > >
> > > > However,  2 point does not seem to work.  e.g. I have 540 records in
> > > > Kafka.  The first 500 records are available immediately,
> > > >
> > > > but the remaining 40 don't pass through. I was expect

Re: KafkaIO Windowing Fn

2016-08-25 Thread Chawla,Sumit
Hi Thomas

I am using FlinkRunner.  Yes the second part of trigger never fires for me,

Regards
Sumit Chawla


On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh <tg...@google.com.invalid>
wrote:

> Hey Sumit;
>
> What runner are you using? I can set up a test with the same trigger
> reading from an unbounded input using the DirectRunner and I get the
> expected output panes.
>
> Just to clarify, the second half of the trigger ('when the first element
> has been there for at least 30+ seconds') simply never fires?
>
> On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
> > Hi Thomas
> >
> > That did not work.
> >
> > I tried following instead:
> >
> > .triggering(
> > Repeatedly.forever(
> > AfterFirst.of(
> >   AfterProcessingTime.
> pastFirstElementInPane()
> > .plusDelayOf(Duration.standard
> > Seconds(30)),
> >   AfterPane.elementCountAtLeast(100)
> > )))
> > .discardingFiredPanes()
> >
> > What i am trying to do here.  This is to make sure that followup
> > operations receive batches of records.
> >
> > 1.  Fire when at Pane has 100+ elements
> >
> > 2.  Or Fire when the first element has been there for atleast 30 sec+.
> >
> > However,  2 point does not seem to work.  e.g. I have 540 records in
> > Kafka.  The first 500 records are available immediately,
> >
> > but the remaining 40 don't pass through. I was expecting 2nd to
> > trigger to help here.
> >
> >
> >
> >
> >
> >
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh <tg...@google.com.invalid>
> > wrote:
> >
> > > You can adjust the trigger in the windowing transform if your sink can
> > > handle being written to multiple times for the same window. For
> example,
> > if
> > > the sink appends to the output when it receives new data in a window,
> you
> > > could add something like
> > >
> > > Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.
> > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).
> > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> > gFiredPanes();
> > >
> > > This will cause elements to be output some amount of time after they
> are
> > > first received from Kafka, even if Kafka does not have any new
> elements.
> > > Elements will only be output by the GroupByKey once.
> > >
> > > We should still have a JIRA to improve the KafkaIO watermark tracking
> in
> > > the absence of new records .
> > >
> > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <sumitkcha...@gmail.com
> >
> > > wrote:
> > >
> > > > Thanks Raghu.
> > > >
> > > > I don't have much control over changing KafkaIO properties.  I added
> > > > KafkaIO code for completing the example.  Are there any changes that
> > can
> > > be
> > > > done to Windowing to achieve the same behavior?
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > <rang...@google.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > The default implementation returns processing timestamp of the last
> > > > record
> > > > > (in effect. more accurately it returns same as getTimestamp(),
> which
> > > > might
> > > > > overridden by user).
> > > > >
> > > > > As a work around, yes, you can provide your own watermarkFn that
> > > > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > > kafka/KafkaIO.java#L138>
> > > > > )
> > > > >
> > > > > I think default watermark should be smarter. it should advance to
> > > current
> > > > > time if there aren't any records to read from Kafka. Could you
> file a
> > > > jira?
> > > > >
> > > > > thanks,
> > > > > Raghu.
> > > >

Re: KafkaIO Windowing Fn

2016-08-25 Thread Chawla,Sumit
Hi Thomas

That did not work.

I tried following instead:

.triggering(
Repeatedly.forever(
AfterFirst.of(
  AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(30)),
  AfterPane.elementCountAtLeast(100)
)))
.discardingFiredPanes()

What i am trying to do here.  This is to make sure that followup
operations receive batches of records.

1.  Fire when at Pane has 100+ elements

2.  Or Fire when the first element has been there for atleast 30 sec+.

However,  2 point does not seem to work.  e.g. I have 540 records in
Kafka.  The first 500 records are available immediately,

but the remaining 40 don't pass through. I was expecting 2nd to
trigger to help here.







Regards
Sumit Chawla


On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh <tg...@google.com.invalid>
wrote:

> You can adjust the trigger in the windowing transform if your sink can
> handle being written to multiple times for the same window. For example, if
> the sink appends to the output when it receives new data in a window, you
> could add something like
>
> Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.
> pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).
> withLateFirings(AfterPane.elementCountAtLeast(1))).discardingFiredPanes();
>
> This will cause elements to be output some amount of time after they are
> first received from Kafka, even if Kafka does not have any new elements.
> Elements will only be output by the GroupByKey once.
>
> We should still have a JIRA to improve the KafkaIO watermark tracking in
> the absence of new records .
>
> On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
> > Thanks Raghu.
> >
> > I don't have much control over changing KafkaIO properties.  I added
> > KafkaIO code for completing the example.  Are there any changes that can
> be
> > done to Windowing to achieve the same behavior?
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi <rang...@google.com.invalid
> >
> > wrote:
> >
> > > The default implementation returns processing timestamp of the last
> > record
> > > (in effect. more accurately it returns same as getTimestamp(), which
> > might
> > > overridden by user).
> > >
> > > As a work around, yes, you can provide your own watermarkFn that
> > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > <https://github.com/apache/incubator-beam/blob/master/
> > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > kafka/KafkaIO.java#L138>
> > > )
> > >
> > > I think default watermark should be smarter. it should advance to
> current
> > > time if there aren't any records to read from Kafka. Could you file a
> > jira?
> > >
> > > thanks,
> > > Raghu.
> > >
> > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > > wrote:
> > >
> > > > Hi All
> > > >
> > > >
> > > > I am trying to do some simple batch processing on KafkaIO records.
> My
> > > beam
> > > > pipeline looks like following:
> > > >
> > > > pipeline.apply(KafkaIO.read()
> > > > .withTopics(ImmutableList.of(s"mytopic"))
> > > > .withBootstrapServers("localhost:9200")
> > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> > > > KV<String,String>
> > > >
> > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > >
> > > > .apply("GroupByKey", GroupByKey.create())
> > > >
> > > > .apply("Sink", ParDo.of(new MySink())
> > > >
> > > >
> > > > My Kafka Source already has some messages 1000+, and new messages
> > arrive
> > > > every few minutes.
> > > >
> > > > When i start my pipeline,  i can see that it reads all the 1000+
> > messages
> > > > from Kafka.  However, Window does not fire untill a new message
> arrives
> > > in
> > > > Kafka.  And Sink does not receive any message until that point.  Do i
> > > need
> > > > to override the WaterMarkFn here? Since i am not providing any
> > > timeStampFn
> > > > , i am assuming that timestamps will be assigned as in when message
> > > arrives
> > > > i.e. ingestion time.  What is the default WaterMarkFn implementation?
> > Is
> > > > the Window not supposed to be fired based on Ingestion time?
> > > >
> > > >
> > > >
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > >
> >
>


Re: KafkaIO Windowing Fn

2016-08-25 Thread Chawla,Sumit
Thanks Raghu.

I don't have much control over changing KafkaIO properties.  I added
KafkaIO code for completing the example.  Are there any changes that can be
done to Windowing to achieve the same behavior?

Regards
Sumit Chawla


On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi <rang...@google.com.invalid>
wrote:

> The default implementation returns processing timestamp of the last record
> (in effect. more accurately it returns same as getTimestamp(), which might
> overridden by user).
>
> As a work around, yes, you can provide your own watermarkFn that
> essentially returns Now() or Now()-1sec. (usage in javadoc
> <https://github.com/apache/incubator-beam/blob/master/
> sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> kafka/KafkaIO.java#L138>
> )
>
> I think default watermark should be smarter. it should advance to current
> time if there aren't any records to read from Kafka. Could you file a jira?
>
> thanks,
> Raghu.
>
> On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
> > Hi All
> >
> >
> > I am trying to do some simple batch processing on KafkaIO records.  My
> beam
> > pipeline looks like following:
> >
> > pipeline.apply(KafkaIO.read()
> > .withTopics(ImmutableList.of(s"mytopic"))
> > .withBootstrapServers("localhost:9200")
> > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> > KV<String,String>
> >
> > .apply("WindowBy10Sec", Window.<KV<String,
> > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> >
> > .apply("GroupByKey", GroupByKey.create())
> >
> > .apply("Sink", ParDo.of(new MySink())
> >
> >
> > My Kafka Source already has some messages 1000+, and new messages arrive
> > every few minutes.
> >
> > When i start my pipeline,  i can see that it reads all the 1000+ messages
> > from Kafka.  However, Window does not fire untill a new message arrives
> in
> > Kafka.  And Sink does not receive any message until that point.  Do i
> need
> > to override the WaterMarkFn here? Since i am not providing any
> timeStampFn
> > , i am assuming that timestamps will be assigned as in when message
> arrives
> > i.e. ingestion time.  What is the default WaterMarkFn implementation? Is
> > the Window not supposed to be fired based on Ingestion time?
> >
> >
> >
> >
> > Regards
> > Sumit Chawla
> >
>


KafkaIO Windowing Fn

2016-08-24 Thread Chawla,Sumit
Hi All


I am trying to do some simple batch processing on KafkaIO records.  My beam
pipeline looks like following:

pipeline.apply(KafkaIO.read()
.withTopics(ImmutableList.of(s"mytopic"))
.withBootstrapServers("localhost:9200")
.apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
KV

.apply("WindowBy10Sec", Window.>into(FixedWindows.of(Duration.standardSeconds(10))).withAllowedLateness(Duration.standardSeconds(1)))

.apply("GroupByKey", GroupByKey.create())

.apply("Sink", ParDo.of(new MySink())


My Kafka Source already has some messages 1000+, and new messages arrive
every few minutes.

When i start my pipeline,  i can see that it reads all the 1000+ messages
from Kafka.  However, Window does not fire untill a new message arrives in
Kafka.  And Sink does not receive any message until that point.  Do i need
to override the WaterMarkFn here? Since i am not providing any timeStampFn
, i am assuming that timestamps will be assigned as in when message arrives
i.e. ingestion time.  What is the default WaterMarkFn implementation? Is
the Window not supposed to be fired based on Ingestion time?




Regards
Sumit Chawla


Re: java.io.NotSerializableException: org.apache.kafka.common.TopicPartition

2016-08-21 Thread Chawla,Sumit
Thanks Dan.. today i was able to identify what the issue was.  Kafka
TopicPartition is marked Serializable in kafka-clients-0.9.0.1.jar.
Somehow i was pulling down kafka-clients-0.9.0.0.jar.

Regards
Sumit Chawla


On Sun, Aug 21, 2016 at 10:20 AM, Dan Halperin <dhalp...@google.com.invalid>
wrote:

> Explicit +Raghu
>
> On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
> > Hi All
> >
> > I am trying to use KafkaIO as unbounded source, but the translation is
> > failing.  I am using FlinkRunner for the pipe.  It complains about
> > the org.apache.kafka.common.TopicPartition being not-serializable.
> >
> > pipeline.apply(
> > KafkaIO.read()
> > .withTopics(ImmutableList.of("test-topic))
> > .withBootstrapServers("localhost:9200")
> >
> > Is it a known issue?  Here is the full exception details.
> >
> > Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.
> > UnboundedSourceWrapper@2a855331
> > not serializable
> > at
> > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(
> > ClosureCleaner.java:99)
> > ~[flink-java-1.0.3.jar:1.0.3]
> > at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:61)
> > ~[flink-java-1.0.3.jar:1.0.3]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > clean(StreamExecutionEnvironment.java:1219)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1131)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1075)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1057)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> > tors$UnboundedReadSourceTranslator.translateNode(
> > FlinkStreamingTransformTranslators.java:281)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> > tors$UnboundedReadSourceTranslator.translateNode(
> > FlinkStreamingTransformTranslators.java:244)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> > or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> > or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:225)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:220)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:220)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformHierarchy.visit(
> > TransformHierarchy.java:104)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.
> > translate(FlinkPipelineTranslator.java:38)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm
> ent.translate(
> > FlinkPipelineExecutionEnvironment.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> > FlinkPipelineRunner.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.b

java.io.NotSerializableException: org.apache.kafka.common.TopicPartition

2016-08-19 Thread Chawla,Sumit
Hi All

I am trying to use KafkaIO as unbounded source, but the translation is
failing.  I am using FlinkRunner for the pipe.  It complains about
the org.apache.kafka.common.TopicPartition being not-serializable.

pipeline.apply(
KafkaIO.read()
.withTopics(ImmutableList.of("test-topic))
.withBootstrapServers("localhost:9200")

Is it a known issue?  Here is the full exception details.

Caused by: org.apache.flink.api.common.InvalidProgramException: Object
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@2a855331
not serializable
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
~[flink-java-1.0.3.jar:1.0.3]
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
~[flink-java-1.0.3.jar:1.0.3]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:281)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:244)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:106)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:106)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
com.cisco.ndp.pipeline.common.runner.flink.PipelineMain.main(PipelineMain.java:70)
~[pipelinecommon-1.0.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_92]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_92]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_92]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
~[flink-clients_2.10-1.0.3.jar:1.0.3]
... 9 more
Caused by: java.io.NotSerializableException:
org.apache.kafka.common.TopicPartition
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_92]
at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_92]
at 

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-12 Thread Chawla,Sumit
+1

Regards
Sumit Chawla


On Fri, Aug 12, 2016 at 9:29 AM, Aparup Banerjee (apbanerj) <
apban...@cisco.com> wrote:

> + 1, me2
>
>
>
>
> On 8/12/16, 9:27 AM, "Amit Sela"  wrote:
>
> >+1 as in I'll join ;-)
> >
> >On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov  >
> >wrote:
> >
> >> Sounds good, thanks!
> >> Then Friday Aug 19th it is, 8am-9am PST,
> >> https://staging.talkgadget.google.com/hangouts/_/google.
> com/splittabledofn
> >>
> >> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré 
> >> wrote:
> >>
> >> > Hi
> >> >
> >> > Unfortunately I will be in Ireland on August 15th. What about Friday
> >> 19th ?
> >> >
> >> > Regards
> >> > JB
> >> >
> >> >
> >> >
> >> > On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> >> >  wrote:
> >> > >Hi JB,
> >> > >
> >> > >Sounds great, does the suggested time over videoconference work for
> >> > >you?
> >> > >
> >> > >On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
> j...@nanthrax.net>
> >> > >wrote:
> >> > >
> >> > >> Hi Eugene
> >> > >>
> >> > >> May we talk together next week ? I like the proposal. I would just
> >> > >need
> >> > >> some details for my understanding.
> >> > >>
> >> > >> Thanks
> >> > >> Regards
> >> > >> JB
> >> > >>
> >> > >>
> >> > >>
> >> > >> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> >> > >>  wrote:
> >> > >> >Hi JB,
> >> > >> >
> >> > >> >What are your thoughts on this?
> >> > >> >
> >> > >> >I'm also thinking of having a virtual meeting to explain more
> about
> >> > >> >this
> >> > >> >proposal if necessary, since I understand it is a lot to digest.
> >> > >> >
> >> > >> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
> >> > >> >(link:
> >> > >> >
> >> > >>
> >> > >
> >> >
> >> https://staging.talkgadget.google.com/hangouts/_/google.
> com/splittabledofn
> >> > >> >-
> >> > >> >I confirmed that it can be joined without being logged into a
> Google
> >> > >> >account)
> >> > >> >
> >> > >> >Who'd be interested in attending, and does this time/date work for
> >> > >> >people?
> >> > >> >
> >> > >> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
> >> > >
> >> > >> >wrote:
> >> > >> >
> >> > >> >> Hi JB, thanks for reading and for your comments!
> >> > >> >>
> >> > >> >> It sounds like you are concerned about continued support for
> >> > >existing
> >> > >> >IO's
> >> > >> >> people have developed, and about backward compatibility?
> >> > >> >>
> >> > >> >> We do not need to remove the Source API, and all existing
> >> > >> >Source-based
> >> > >> >> connectors will continue to work [though the document proposes
> at
> >> > >> >some
> >> > >> >> point to make Read.from(Source) to translate to a wrapper SDF
> >> > >under
> >> > >> >the
> >> > >> >> hood, to exercise the feature more and to make sure that it is
> >> > >> >strictly
> >> > >> >> more powerful - but this is an optional implementation detail].
> >> > >> >>
> >> > >> >> Perhaps the document phrases this too strongly - "replacing the
> >> > >> >Source
> >> > >> >> API": a better phrasing would be "introducing a new API so
> >> > >powerful
> >> > >> >and
> >> > >> >> easy-to-use that hopefully people will choose it over the Source
> >> > >API
> >> > >> >all
> >> > >> >> the time, even though they don't have to" :) And we can discuss
> >> > >> >whether or
> >> > >> >> not to actually deprecate/remove the Source API at some point
> down
> >> > >> >the
> >> > >> >> road, once it becomes clear whether this is the case or not.
> >> > >> >>
> >> > >> >> To give more context: this proposal came out of discussions
> within
> >> > >> >the SDK
> >> > >> >> team over the past ~1.5 years, before the Beam project existed,
> on
> >> > >> >how to
> >> > >> >> make major improvements to the Source API; perhaps it will
> clarify
> >> > >> >things
> >> > >> >> if I give a history of the ideas discussed:
> >> > >> >> - The first idea was to introduce a
> Read.from(PCollection)
> >> > >> >> transform while keeping the Source API intact - this, given
> >> > >> >appropriate
> >> > >> >> implementation, would solve most of the scalability and
> >> > >composability
> >> > >> >> issues of IO's. Then most connectors would look like : ParDo >> > >> >Source>
> >> > >> >> + Read.from().
> >> > >> >> - Then we figured that the Source class is an unnecessary
> >> > >> >abstraction, as
> >> > >> >> it simply holds data. What if we only had a Reader class
> >> > >where
> >> > >> >S is
> >> > >> >> the source type and B the output type? Then connectors would be
> >> > >> >something
> >> > >> >> like: ParDo + hypothetical Read.using(Reader).
> >> > >> >> - Then somebody remarked that some of the features of Source are
> >> > >> >useful to
> >> > >> >> ParDo's as well: e.g. ability to report progress when
> processing a
> >> > >> >very
> >> > >> >> heavy element, or ability to produce very large output in

Re: Suggestion for Writing Sink Implementation

2016-07-29 Thread Chawla,Sumit
Any more comments on this pattern suggested by Jean?

Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> What I said earlier is not quite accurate, though my advice is the same.
> Here are the corrections:
>
>  - The Write transform actually has a too-general name, and Write.of(Sink)
> only really works for finite data. It re-windows into the global window and
> replaces any triggers.
>  - So the special case in the Flink runner actually just _enables_ a (fake)
> Sink to work.
>
> We should probably rename Write to some more specific name that indicates
> the particular strategy, and make it easier for a user to decide whether
> that pattern is what they want. And the transform as-is should probably
> reject unbounded inputs.
>
> So you should still proceed with implementation via ParDo and your own
> logic. If you want some logic similar to Write (but with different
> windowing and triggering) then it is a pretty simple composite to derive
> something from.
>
> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
> > Thanks Jean
> >
> > This is an interesting pattern here.  I see that its implemented as
> > PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
> > Sink interface.  Would love to hear more pros/cons of this pattern :)
> .
> > Definitely it gives more control over connection initialization and
> > cleanup.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> >
> > > Hi Sumit,
> > >
> > > I created a PR containing Cassandra IO with a sink:
> > >
> > > https://github.com/apache/incubator-beam/pull/592
> > >
> > > Maybe it can help you.
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> > >
> > >> Hi Kenneth
> > >>
> > >> Thanks for looking into it. I am currently trying to implement Sinks
> for
> > >> writing data into Cassandra/Titan DB.  My immediate goal is to run it
> on
> > >> Flink Runner.
> > >>
> > >>
> > >>
> > >> Regards
> > >> Sumit Chawla
> > >>
> > >>
> > >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
> > <k...@google.com.invalid
> > >> >
> > >> wrote:
> > >>
> > >> Hi Sumit,
> > >>>
> > >>> I see what has happened here, from that snippet you pasted from the
> > Flink
> > >>> runner's code [1]. Thanks for looking into it!
> > >>>
> > >>> The Flink runner today appears to reject Write.Bounded transforms in
> > >>> streaming mode if the sink is not an instance of UnboundedFlinkSink.
> > The
> > >>> intent of that code, I believe, was to special case
> UnboundedFlinkSink
> > to
> > >>> make it easy to use an existing Flink sink, not to disable all other
> > >>> Write
> > >>> transforms. What do you think, Max?
> > >>>
> > >>> Until we fix this issue, you should use ParDo transforms to do the
> > >>> writing.
> > >>> If you can share a little about your sink, we may be able to suggest
> > >>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> > >>> transform is just a specialized pattern of ParDo's, not a Beam
> > primitive.
> > >>>
> > >>> Kenn
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>>
> >
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
> > >>>
> > >>>
> > >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> > >>> kirpic...@google.com.invalid> wrote:
> > >>>
> > >>> Thanks Sumit. Looks like your question is, indeed, specific to the
> > Flink
> > >>>> runner, and I'll then defer to somebody familiar with it.
> > >>>>
> > >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
> sumitkcha...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>> Thanks a lot Eugene.
> > >>>>>
> > >&g

Re: Suggestion for Writing Sink Implementation

2016-07-28 Thread Chawla,Sumit
Thanks Jean

This is an interesting pattern here.  I see that its implemented as
PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
Sink interface.  Would love to hear more pros/cons of this pattern :) .
Definitely it gives more control over connection initialization and cleanup.

Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Sumit,
>
> I created a PR containing Cassandra IO with a sink:
>
> https://github.com/apache/incubator-beam/pull/592
>
> Maybe it can help you.
>
> Regards
> JB
>
>
> On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
>
>> Hi Kenneth
>>
>> Thanks for looking into it. I am currently trying to implement Sinks for
>> writing data into Cassandra/Titan DB.  My immediate goal is to run it on
>> Flink Runner.
>>
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <k...@google.com.invalid
>> >
>> wrote:
>>
>> Hi Sumit,
>>>
>>> I see what has happened here, from that snippet you pasted from the Flink
>>> runner's code [1]. Thanks for looking into it!
>>>
>>> The Flink runner today appears to reject Write.Bounded transforms in
>>> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
>>> intent of that code, I believe, was to special case UnboundedFlinkSink to
>>> make it easy to use an existing Flink sink, not to disable all other
>>> Write
>>> transforms. What do you think, Max?
>>>
>>> Until we fix this issue, you should use ParDo transforms to do the
>>> writing.
>>> If you can share a little about your sink, we may be able to suggest
>>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
>>> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>>>
>>> Kenn
>>>
>>> [1]
>>>
>>>
>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>>
>>>
>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>>> kirpic...@google.com.invalid> wrote:
>>>
>>> Thanks Sumit. Looks like your question is, indeed, specific to the Flink
>>>> runner, and I'll then defer to somebody familiar with it.
>>>>
>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <sumitkcha...@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks a lot Eugene.
>>>>>
>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
>>>>>>>>
>>>>>>> mandates that my implementation must also implement SinkFunction<>.
>>>>> In
>>>>> that >>>case, none of the Sink<> methods get called anyway.
>>>>>
>>>>> I am using FlinkRunner. The Sink implementation that i was writing by
>>>>> extending Sink<> class had to implement Flink Specific SinkFunction for
>>>>>
>>>> the
>>>>
>>>>> correct translation.
>>>>>
>>>>> private static class WriteSinkStreamingTranslator implements
>>>>>
>>>>>
>>>>
>>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound>
>>>
>>>> {
>>>>>
>>>>>@Override
>>>>>public void translateNode(Write.Bound transform,
>>>>> FlinkStreamingTranslationContext context) {
>>>>>  String name = transform.getName();
>>>>>  PValue input = context.getInput(transform);
>>>>>
>>>>>  Sink sink = transform.getSink();
>>>>>  if (!(sink instanceof UnboundedFlinkSink)) {
>>>>>throw new UnsupportedOperationException("At the time, only
>>>>> unbounded Flink sinks are supported.");
>>>>>  }
>>>>>
>>>>>  DataStream<WindowedValue> inputDataSet =
>>>>> context.getInputDataStream(input);
>>>>>
>>>>>  inputDataSet.flatMap(new FlatMapFunction<WindowedValue,
>>>>>
>>>> Object>()
>>>
>>>> {
>>>>
>>>>>@Override
>>>>>public void flatMap(WindowedValue value, Collector
>>>>> out) throws Exception {
>

Re: Suggestion for Writing Sink Implementation

2016-07-28 Thread Chawla,Sumit
Hi Kenneth

Thanks for looking into it. I am currently trying to implement Sinks for
writing data into Cassandra/Titan DB.  My immediate goal is to run it on
Flink Runner.



Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> Hi Sumit,
>
> I see what has happened here, from that snippet you pasted from the Flink
> runner's code [1]. Thanks for looking into it!
>
> The Flink runner today appears to reject Write.Bounded transforms in
> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
> intent of that code, I believe, was to special case UnboundedFlinkSink to
> make it easy to use an existing Flink sink, not to disable all other Write
> transforms. What do you think, Max?
>
> Until we fix this issue, you should use ParDo transforms to do the writing.
> If you can share a little about your sink, we may be able to suggest
> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>
> Kenn
>
> [1]
>
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>
>
> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Thanks Sumit. Looks like your question is, indeed, specific to the Flink
> > runner, and I'll then defer to somebody familiar with it.
> >
> > On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> >
> > > Thanks a lot Eugene.
> > >
> > > >>>My immediate requirement is to run this Sink on FlinkRunner. Which
> > > mandates that my implementation must also implement SinkFunction<>.  In
> > > that >>>case, none of the Sink<> methods get called anyway.
> > >
> > > I am using FlinkRunner. The Sink implementation that i was writing by
> > > extending Sink<> class had to implement Flink Specific SinkFunction for
> > the
> > > correct translation.
> > >
> > > private static class WriteSinkStreamingTranslator implements
> > >
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound>
> > > {
> > >
> > >   @Override
> > >   public void translateNode(Write.Bound transform,
> > > FlinkStreamingTranslationContext context) {
> > > String name = transform.getName();
> > > PValue input = context.getInput(transform);
> > >
> > > Sink sink = transform.getSink();
> > > if (!(sink instanceof UnboundedFlinkSink)) {
> > >   throw new UnsupportedOperationException("At the time, only
> > > unbounded Flink sinks are supported.");
> > > }
> > >
> > > DataStream<WindowedValue> inputDataSet =
> > > context.getInputDataStream(input);
> > >
> > > inputDataSet.flatMap(new FlatMapFunction<WindowedValue,
> Object>()
> > {
> > >   @Override
> > >   public void flatMap(WindowedValue value, Collector
> > > out) throws Exception {
> > > out.collect(value.getValue());
> > >   }
> > > }).addSink(((UnboundedFlinkSink)
> > > sink).getFlinkSource()).name(name);
> > >   }
> > > }
> > >
> > >
> > >
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > > kirpic...@google.com.invalid> wrote:
> > >
> > > > Hi Sumit,
> > > >
> > > > All reusable parts of a pipeline, including connectors to storage
> > > systems,
> > > > should be packaged as PTransform's.
> > > >
> > > > Sink is an advanced API that you can use under the hood to implement
> > the
> > > > transform, if this particular connector benefits from this API - but
> > you
> > > > don't have to, and many connectors indeed don't need it, and are
> > simpler
> > > to
> > > > implement just as wrappers around a couple of ParDo's writing the
> data.
> > > >
> > > > Even if the connector is implemented using a Sink, packaging the
> > > connector
> > > > as a PTransform is important because it's easier to apply in a
> pipeline
> > > and
> > > > because it's more future-proof (the author of the connector may later
> > > > change

Re: Suggestion for Writing Sink Implementation

2016-07-27 Thread Chawla,Sumit
Thanks a lot Eugene.

>>>My immediate requirement is to run this Sink on FlinkRunner. Which
mandates that my implementation must also implement SinkFunction<>.  In
that >>>case, none of the Sink<> methods get called anyway.

I am using FlinkRunner. The Sink implementation that i was writing by
extending Sink<> class had to implement Flink Specific SinkFunction for the
correct translation.

private static class WriteSinkStreamingTranslator implements
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound>
{

  @Override
  public void translateNode(Write.Bound transform,
FlinkStreamingTranslationContext context) {
String name = transform.getName();
PValue input = context.getInput(transform);

Sink sink = transform.getSink();
if (!(sink instanceof UnboundedFlinkSink)) {
  throw new UnsupportedOperationException("At the time, only
unbounded Flink sinks are supported.");
}

DataStream<WindowedValue> inputDataSet =
context.getInputDataStream(input);

inputDataSet.flatMap(new FlatMapFunction<WindowedValue, Object>() {
  @Override
  public void flatMap(WindowedValue value, Collector
out) throws Exception {
out.collect(value.getValue());
  }
}).addSink(((UnboundedFlinkSink) sink).getFlinkSource()).name(name);
  }
}




Regards
Sumit Chawla


On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> Hi Sumit,
>
> All reusable parts of a pipeline, including connectors to storage systems,
> should be packaged as PTransform's.
>
> Sink is an advanced API that you can use under the hood to implement the
> transform, if this particular connector benefits from this API - but you
> don't have to, and many connectors indeed don't need it, and are simpler to
> implement just as wrappers around a couple of ParDo's writing the data.
>
> Even if the connector is implemented using a Sink, packaging the connector
> as a PTransform is important because it's easier to apply in a pipeline and
> because it's more future-proof (the author of the connector may later
> change it to use something else rather than Sink under the hood without
> breaking existing users).
>
> Sink is, currently, useful in the following case:
> - You're writing a bounded amount of data (we do not yet have an unbounded
> Sink analogue)
> - The location you're writing to is known at pipeline construction time,
> and does not depend on the data itself (support for "data-dependent" sinks
> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> - The storage system you're writing to has a distinct "initialization" and
> "finalization" step, allowing the write operation to appear atomic (either
> all data is written or none). This mostly applies to files (where writing
> is done by first writing to a temporary directory, and then renaming all
> files to their final location), but there can be other cases too.
>
> Here's an example GCP connector using the Sink API under the hood:
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> Most other non-file-based connectors, indeed, don't (KafkaIO, DatastoreIO,
> BigtableIO etc.)
>
> I'm not familiar with the Flink API, however I'm a bit confused by your
> last paragraph: the Beam programming model is intentionally
> runner-agnostic, so that you can run exactly the same code on different
> runners.
>
> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
> > Hi
> >
> > Please suggest me on what is the best way to write a Sink in Beam.  I see
> > that there is a Sink abstract class which is in experimental state.
> > What is the expected outcome of this one? Do we have the api frozen, or
> > this could still change?  Most of the existing Sink implementations like
> > KafkaIO.Write are not using this interface, and instead extends
> > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> extend
> > Sink<>.
> >
> >
> > My immediate requirement is to run this Sink on FlinkRunner. Which
> mandates
> > that my implementation must also implement SinkFunction<>.  In that case,
> > none of the Sink<> methods get called anyway.
> >
> > Regards
> > Sumit Chawla
> >
>


Fwd: Suggestion for Writing Sink Implementation

2016-07-27 Thread Chawla,Sumit
Hi

Please suggest me on what is the best way to write a Sink in Beam.  I see
that there is a Sink abstract class which is in experimental state.
What is the expected outcome of this one? Do we have the api frozen, or
this could still change?  Most of the existing Sink implementations like
KafkaIO.Write are not using this interface, and instead extends
PTransform>, PDone>. Would these be changed to extend
Sink<>.


My immediate requirement is to run this Sink on FlinkRunner. Which mandates
that my implementation must also implement SinkFunction<>.  In that case,
none of the Sink<> methods get called anyway.

Regards
Sumit Chawla