Support for Flink 1.1.0 in release-0.2.0-incubating

2016-09-13 Thread Chawla,Sumit
Hi All

The release-0.2.0-incubating supports Flink 1.0.3. With Flink 1.1.0 out, is
there a plan to support it with any 0.2.0 patch? I tried compiling 0.2.0
with Flink 1.1.0,
and got couple of compliation errors in
FlinkGroupAlsoByWindowWrapper.java. Going
back to master i see lots of change in Flink translation wrappers, and
FlinkGroupAlsoByWindowWrapper.java being removed.

Just want to get a sense of things here, on what would it take to support Flink
1.1.0 with release-0.2.0. Would appreciate views of people who are already
working on upgrading it to Flink 1.1.0

Regards
Sumit Chawla


Re: Should UnboundedSource provide a split identifier ?

2016-09-13 Thread Thomas Groh
Yes. We don't currently permit adding (or modifying) partitions in Kafka
while a Pipeline is running (without updates). Our understanding was that
this was a rare occurrence, but it's not impossible to support.

For generateInitialSplits, the UnboundedSource API doesn't require
deterministic splitting (although it's recommended), and a PipelineRunner
should keep track of the initially generated splits.

On Tue, Sep 13, 2016 at 1:49 AM, Amit Sela  wrote:

> If I understand correctly this will break
> https://github.com/apache/incubator-beam/blob/master/sdks/
> java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857
> in
> KafkaIO.
>
> So it's a KafkaIO limitation (for now ?) ?
>
> On Tue, Sep 13, 2016 at 11:31 AM Amit Sela  wrote:
>
> > Thanks Thomas, you did understand correct.
> > Doing this, or assigning a running id, is basically the same, as long as
> > generateInitialSplits implementation is deterministic (KafkaIO actually
> > notes this).
> >
> > So what if partitions were added at runtime to one (or more) of the
> topics
> > I'm consuming from ?
> >
> > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh 
> > wrote:
> >
> >> I'm not sure if I've understood what the problem is - from what I can
> tell
> >> it's about associating UnboundedSource splits with Checkpoints in order
> to
> >> get consistent behavior from the sources. If I'm wrong, the following
> >> isn't
> >> really relevant to your problem - it's about the expected behavior of a
> >> runner interacting with any split of a Source.
> >>
> >> In the absence of updates, the evaluation of a split of UnboundedSource
> >> must to obey the general contract for UnboundedSource, which is that
> >> createReader(PipelineOptions, CheckpointMarkT) will only ever be called
> >> with a Checkpoint Mark that was generated by an UnboundedReader that was
> >> created from the source - i.e., a Source creates Readers and is provided
> >> only checkpoints from those readers it creates. Each Source instance
> >> (split
> >> and top-level) should be independent of all other instances. A split of
> a
> >> Source should generally be indistinguishable from a top-level source (it
> >> will just have slightly different configuration).
> >>
> >> Generally this means that Source splits have to have an associated
> >> identifier, but these identifiers are arbitrary and not relevant to the
> >> actual evaluation of the Source - so the runner gets to tag splits
> however
> >> it pleases, so long as those tags don't allow splits to bleed into each
> >> other.
> >>
> >> Could you instead store the Source paired with some (arbitrary and
> unique)
> >> key and pull out the checkpoint using the key (or even just store the
> keys
> >> and store the source with the checkpoint)? That way you always will keep
> >> the same association between Source and Checkpoint. Flink does something
> >> like this where they store the serialized source alongside the
> >> CheckpointMark so they're never separated (
> >>
> >> https://github.com/apache/incubator-beam/blob/master/runners
> /flink/runner/src/main/java/org/apache/beam/runners/flink/
> translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164
> >> and
> >>
> >> https://github.com/apache/incubator-beam/blob/master/runners
> /flink/runner/src/main/java/org/apache/beam/runners/flink/
> translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334
> >> )
> >>
> >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela 
> wrote:
> >>
> >> > If this issue doesn't make sense for "native" streaming systems, and
> >> it's
> >> > only a Spark issue (and my implementation of Read.Unbounded) - I could
> >> keep
> >> > doing what I do, use a running id.
> >> > I was just wondering... ( hence the question mark in the title ;-) )
> >> >
> >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela 
> wrote:
> >> >
> >> > > Not sure how it works in Dataflow or Flink, but I'm working on an
> >> > > implementation for Spark using the (almost) only stateful operator
> it
> >> > has -
> >> > > "mapWithState" - and the State needs to correspond to a key.
> >> > > Each micro-batch, the Sources recreate the readers and "look-up" the
> >> > > latest checkpoint.
> >> > >
> >> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi
> >>  >> > >
> >> > > wrote:
> >> > >
> >> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela 
> >> wrote:
> >> > >>
> >> > >> > @Raghu, hashing  is exactly what I mean, but
> I'm
> >> > >> asking
> >> > >> > if it should be abstracted in the Source.. Otherwise, runners
> will
> >> > have
> >> > >> to
> >> > >> > *is instance of* on every Source, and write their own hash
> >> > >> implementation.
> >> > >> > Since splits contain the "splitted" Source, and it contains it's
> >> own
> >> > >> > CheckpointMark, and hashing would probably be tied to that
> >> > >> 

Re: Should UnboundedSource provide a split identifier ?

2016-09-13 Thread Amit Sela
If I understand correctly this will break
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857
in
KafkaIO.

So it's a KafkaIO limitation (for now ?) ?

On Tue, Sep 13, 2016 at 11:31 AM Amit Sela  wrote:

> Thanks Thomas, you did understand correct.
> Doing this, or assigning a running id, is basically the same, as long as
> generateInitialSplits implementation is deterministic (KafkaIO actually
> notes this).
>
> So what if partitions were added at runtime to one (or more) of the topics
> I'm consuming from ?
>
> On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh 
> wrote:
>
>> I'm not sure if I've understood what the problem is - from what I can tell
>> it's about associating UnboundedSource splits with Checkpoints in order to
>> get consistent behavior from the sources. If I'm wrong, the following
>> isn't
>> really relevant to your problem - it's about the expected behavior of a
>> runner interacting with any split of a Source.
>>
>> In the absence of updates, the evaluation of a split of UnboundedSource
>> must to obey the general contract for UnboundedSource, which is that
>> createReader(PipelineOptions, CheckpointMarkT) will only ever be called
>> with a Checkpoint Mark that was generated by an UnboundedReader that was
>> created from the source - i.e., a Source creates Readers and is provided
>> only checkpoints from those readers it creates. Each Source instance
>> (split
>> and top-level) should be independent of all other instances. A split of a
>> Source should generally be indistinguishable from a top-level source (it
>> will just have slightly different configuration).
>>
>> Generally this means that Source splits have to have an associated
>> identifier, but these identifiers are arbitrary and not relevant to the
>> actual evaluation of the Source - so the runner gets to tag splits however
>> it pleases, so long as those tags don't allow splits to bleed into each
>> other.
>>
>> Could you instead store the Source paired with some (arbitrary and unique)
>> key and pull out the checkpoint using the key (or even just store the keys
>> and store the source with the checkpoint)? That way you always will keep
>> the same association between Source and Checkpoint. Flink does something
>> like this where they store the serialized source alongside the
>> CheckpointMark so they're never separated (
>>
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164
>> and
>>
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334
>> )
>>
>> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela  wrote:
>>
>> > If this issue doesn't make sense for "native" streaming systems, and
>> it's
>> > only a Spark issue (and my implementation of Read.Unbounded) - I could
>> keep
>> > doing what I do, use a running id.
>> > I was just wondering... ( hence the question mark in the title ;-) )
>> >
>> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela  wrote:
>> >
>> > > Not sure how it works in Dataflow or Flink, but I'm working on an
>> > > implementation for Spark using the (almost) only stateful operator it
>> > has -
>> > > "mapWithState" - and the State needs to correspond to a key.
>> > > Each micro-batch, the Sources recreate the readers and "look-up" the
>> > > latest checkpoint.
>> > >
>> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi
>> > > >
>> > > wrote:
>> > >
>> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela 
>> wrote:
>> > >>
>> > >> > @Raghu, hashing  is exactly what I mean, but I'm
>> > >> asking
>> > >> > if it should be abstracted in the Source.. Otherwise, runners will
>> > have
>> > >> to
>> > >> > *is instance of* on every Source, and write their own hash
>> > >> implementation.
>> > >> > Since splits contain the "splitted" Source, and it contains it's
>> own
>> > >> > CheckpointMark, and hashing would probably be tied to that
>> > >> CheckpointMark,
>> > >> > why not abstract it in the UnboundedSource ?
>> > >> >
>> > >>
>> > >> I don't quite follow how a runner should be concerned about hashing
>> used
>> > >> by
>> > >> a Source for its own splits. Can you give a concrete example? To me
>> it
>> > >> looks like source and checkpoint objects are completely opaque to the
>> > >> runners.
>> > >>
>> > >>
>> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi
>> > > > >> >
>> > >> > wrote:
>> > >> >
>> > >> > > > If splits (UnboundedSources) had an identifier, this could be
>> > >> avoided,
>> > >> > > and checkpoints could be persisted accordingly.
>> > >> > >
>> > >> > > The order of the splits that