Support for Flink 1.1.0 in release-0.2.0-incubating
Hi All The release-0.2.0-incubating supports Flink 1.0.3. With Flink 1.1.0 out, is there a plan to support it with any 0.2.0 patch? I tried compiling 0.2.0 with Flink 1.1.0, and got couple of compliation errors in FlinkGroupAlsoByWindowWrapper.java. Going back to master i see lots of change in Flink translation wrappers, and FlinkGroupAlsoByWindowWrapper.java being removed. Just want to get a sense of things here, on what would it take to support Flink 1.1.0 with release-0.2.0. Would appreciate views of people who are already working on upgrading it to Flink 1.1.0 Regards Sumit Chawla
Re: Should UnboundedSource provide a split identifier ?
Yes. We don't currently permit adding (or modifying) partitions in Kafka while a Pipeline is running (without updates). Our understanding was that this was a rare occurrence, but it's not impossible to support. For generateInitialSplits, the UnboundedSource API doesn't require deterministic splitting (although it's recommended), and a PipelineRunner should keep track of the initially generated splits. On Tue, Sep 13, 2016 at 1:49 AM, Amit Selawrote: > If I understand correctly this will break > https://github.com/apache/incubator-beam/blob/master/sdks/ > java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857 > in > KafkaIO. > > So it's a KafkaIO limitation (for now ?) ? > > On Tue, Sep 13, 2016 at 11:31 AM Amit Sela wrote: > > > Thanks Thomas, you did understand correct. > > Doing this, or assigning a running id, is basically the same, as long as > > generateInitialSplits implementation is deterministic (KafkaIO actually > > notes this). > > > > So what if partitions were added at runtime to one (or more) of the > topics > > I'm consuming from ? > > > > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh > > wrote: > > > >> I'm not sure if I've understood what the problem is - from what I can > tell > >> it's about associating UnboundedSource splits with Checkpoints in order > to > >> get consistent behavior from the sources. If I'm wrong, the following > >> isn't > >> really relevant to your problem - it's about the expected behavior of a > >> runner interacting with any split of a Source. > >> > >> In the absence of updates, the evaluation of a split of UnboundedSource > >> must to obey the general contract for UnboundedSource, which is that > >> createReader(PipelineOptions, CheckpointMarkT) will only ever be called > >> with a Checkpoint Mark that was generated by an UnboundedReader that was > >> created from the source - i.e., a Source creates Readers and is provided > >> only checkpoints from those readers it creates. Each Source instance > >> (split > >> and top-level) should be independent of all other instances. A split of > a > >> Source should generally be indistinguishable from a top-level source (it > >> will just have slightly different configuration). > >> > >> Generally this means that Source splits have to have an associated > >> identifier, but these identifiers are arbitrary and not relevant to the > >> actual evaluation of the Source - so the runner gets to tag splits > however > >> it pleases, so long as those tags don't allow splits to bleed into each > >> other. > >> > >> Could you instead store the Source paired with some (arbitrary and > unique) > >> key and pull out the checkpoint using the key (or even just store the > keys > >> and store the source with the checkpoint)? That way you always will keep > >> the same association between Source and Checkpoint. Flink does something > >> like this where they store the serialized source alongside the > >> CheckpointMark so they're never separated ( > >> > >> https://github.com/apache/incubator-beam/blob/master/runners > /flink/runner/src/main/java/org/apache/beam/runners/flink/ > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164 > >> and > >> > >> https://github.com/apache/incubator-beam/blob/master/runners > /flink/runner/src/main/java/org/apache/beam/runners/flink/ > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334 > >> ) > >> > >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela > wrote: > >> > >> > If this issue doesn't make sense for "native" streaming systems, and > >> it's > >> > only a Spark issue (and my implementation of Read.Unbounded) - I could > >> keep > >> > doing what I do, use a running id. > >> > I was just wondering... ( hence the question mark in the title ;-) ) > >> > > >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela > wrote: > >> > > >> > > Not sure how it works in Dataflow or Flink, but I'm working on an > >> > > implementation for Spark using the (almost) only stateful operator > it > >> > has - > >> > > "mapWithState" - and the State needs to correspond to a key. > >> > > Each micro-batch, the Sources recreate the readers and "look-up" the > >> > > latest checkpoint. > >> > > > >> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi > >> >> > > > >> > > wrote: > >> > > > >> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela > >> wrote: > >> > >> > >> > >> > @Raghu, hashing is exactly what I mean, but > I'm > >> > >> asking > >> > >> > if it should be abstracted in the Source.. Otherwise, runners > will > >> > have > >> > >> to > >> > >> > *is instance of* on every Source, and write their own hash > >> > >> implementation. > >> > >> > Since splits contain the "splitted" Source, and it contains it's > >> own > >> > >> > CheckpointMark, and hashing would probably be tied to that > >> > >>
Re: Should UnboundedSource provide a split identifier ?
If I understand correctly this will break https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857 in KafkaIO. So it's a KafkaIO limitation (for now ?) ? On Tue, Sep 13, 2016 at 11:31 AM Amit Selawrote: > Thanks Thomas, you did understand correct. > Doing this, or assigning a running id, is basically the same, as long as > generateInitialSplits implementation is deterministic (KafkaIO actually > notes this). > > So what if partitions were added at runtime to one (or more) of the topics > I'm consuming from ? > > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh > wrote: > >> I'm not sure if I've understood what the problem is - from what I can tell >> it's about associating UnboundedSource splits with Checkpoints in order to >> get consistent behavior from the sources. If I'm wrong, the following >> isn't >> really relevant to your problem - it's about the expected behavior of a >> runner interacting with any split of a Source. >> >> In the absence of updates, the evaluation of a split of UnboundedSource >> must to obey the general contract for UnboundedSource, which is that >> createReader(PipelineOptions, CheckpointMarkT) will only ever be called >> with a Checkpoint Mark that was generated by an UnboundedReader that was >> created from the source - i.e., a Source creates Readers and is provided >> only checkpoints from those readers it creates. Each Source instance >> (split >> and top-level) should be independent of all other instances. A split of a >> Source should generally be indistinguishable from a top-level source (it >> will just have slightly different configuration). >> >> Generally this means that Source splits have to have an associated >> identifier, but these identifiers are arbitrary and not relevant to the >> actual evaluation of the Source - so the runner gets to tag splits however >> it pleases, so long as those tags don't allow splits to bleed into each >> other. >> >> Could you instead store the Source paired with some (arbitrary and unique) >> key and pull out the checkpoint using the key (or even just store the keys >> and store the source with the checkpoint)? That way you always will keep >> the same association between Source and Checkpoint. Flink does something >> like this where they store the serialized source alongside the >> CheckpointMark so they're never separated ( >> >> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164 >> and >> >> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334 >> ) >> >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela wrote: >> >> > If this issue doesn't make sense for "native" streaming systems, and >> it's >> > only a Spark issue (and my implementation of Read.Unbounded) - I could >> keep >> > doing what I do, use a running id. >> > I was just wondering... ( hence the question mark in the title ;-) ) >> > >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela wrote: >> > >> > > Not sure how it works in Dataflow or Flink, but I'm working on an >> > > implementation for Spark using the (almost) only stateful operator it >> > has - >> > > "mapWithState" - and the State needs to correspond to a key. >> > > Each micro-batch, the Sources recreate the readers and "look-up" the >> > > latest checkpoint. >> > > >> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi >> > > > >> > > wrote: >> > > >> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela >> wrote: >> > >> >> > >> > @Raghu, hashing is exactly what I mean, but I'm >> > >> asking >> > >> > if it should be abstracted in the Source.. Otherwise, runners will >> > have >> > >> to >> > >> > *is instance of* on every Source, and write their own hash >> > >> implementation. >> > >> > Since splits contain the "splitted" Source, and it contains it's >> own >> > >> > CheckpointMark, and hashing would probably be tied to that >> > >> CheckpointMark, >> > >> > why not abstract it in the UnboundedSource ? >> > >> > >> > >> >> > >> I don't quite follow how a runner should be concerned about hashing >> used >> > >> by >> > >> a Source for its own splits. Can you give a concrete example? To me >> it >> > >> looks like source and checkpoint objects are completely opaque to the >> > >> runners. >> > >> >> > >> >> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi >> > > > >> > >> > >> > wrote: >> > >> > >> > >> > > > If splits (UnboundedSources) had an identifier, this could be >> > >> avoided, >> > >> > > and checkpoints could be persisted accordingly. >> > >> > > >> > >> > > The order of the splits that