Re: About Finishing Triggers

2016-09-14 Thread Kenneth Knowles
Caveat: I want to emphasize that I don't have a specific proposal. I
haven't thought through enough details to consider a proposal, or you would
have seen it already :-)

On Sep 14, 2016 5:14 AM, "Aljoscha Krettek"  wrote:
>
> Hi,
> I had a chat with Kenn at Flink Forward and he did an off-hand remark
about
> how it might be better if triggers where not allowed to mark a window as
> finished

Yes. I've seen many cases of accidental closing and intentional uses of
closing that were wrong, but very rarely intentional uses of closing that
were good.

When we chatted, I think I mentioned this pair of situations, which I
restate for the benefit of the thread and the list:

1. If a trigger's predicate is never satisfied, then we will emit buffered
results at window expiration against the trigger's wishes.
2. If a trigger "finishes" then we throw away the rest of the data.

I don't really like fact #2. It causes silly bugs in user pipelines. I only
can think of one good use case, which is to approximate some custom notion
of completeness other than the watermark.

If you compare, we think of allowed lateness as expressing when to drop
data. We don't think of "AfterEndOfWindow" as expressing when to drop data.
But a trigger that finishes to express a custom idea of completeness is
like the latter. And moving triggers to a DSL instead of a UDF further
reduces the applicability.

I think a good solution for custom notions of completeness should probably
have all pieces: (a) the measure of completeness (b) triggering based on it
in whatever way makes sense (c) a way of noting special levels of
completeness like the ON_TIME pane does for the watermark, and (d) a policy
for eventually considering the output complete enough that we can drop
further input. So that is a lot of different things to design carefully.

I also want to point out that this is not against phase transitions, such
as moving from early firings to late firing when the watermark reaches the
end of the window. That is like a "OnceTrigger" but it is helpful IMO to
separate in my mind the event that we are interested in from a trigger for
controlling output based on that event.

> and instead always be "Repeatedly" (if I understood correctly).

I don't mean necessarily to have this automatically wrapped, but also to
design the trigger DSL so triggers are all/mostly well-behaved. For
example, EOW + withEarlyFirings + withLateFirings is well crafted to make
only sensible things easy.

> Maybe you (Kenn) could go a bit more in depth about what you meant by this
> and if we should actually change this in Beam. Would this mean that we
then
> have the opposite of Repeatedly, i.e. Once, or Only.once(T)?

This is exactly a thought I have had sometimes - make it only possible to
use a OnceTrigger as the top level expression if it is explicitly
requested. This would at least quickly prevent the common pitfall.

But the OnceTrigger / Trigger split is not quite right to enforce this
restriction. Instead of distinguishing "at most once" from "any number of
times", we need to distinguish "finishes" and "never finishes". Or a
vocabulary I have started to favor in my head is "Predicate" and "Trigger"
where you have OnceTrigger via something like Once.at(Predicate) and
otherwise every other trigger you can construct will never lose data. I
actually have a branch sitting around with an experiment along these lines,
I think...

> I also noticed some inconsistencies in when triggers behave as repeated
> triggers and once triggers. For example, AfterPane.elementCountAtLeast(5)
> only fires once if used alone but it it fires repeatedly if used as the
> speculative trigger in
> AfterWatermark.pastEndOfWindow().withEarlyFirings(...). (This is true for
> all "once" triggers.)

This is actually by design. The early & late firings are automatically
repeated. But it is a good example to think about: if a user writes
.trigger(AfterCount(n)) they probably don't mean only once, but are
expecting it to fire whenever the predicate is satisfied. So, using the
vocabulary I mentioned, this example seems to encourage making an overload
.triggering(Predicate) the same as
.triggering(Repeatedly.forever(Predicate)). We can separate the Java
overloads/API question from the model, of course.

Kenn


Re: Should UnboundedSource provide a split identifier ?

2016-09-14 Thread Amit Sela
On Tue, Sep 13, 2016 at 7:30 PM Thomas Groh 
wrote:

> Yes. We don't currently permit adding (or modifying) partitions in Kafka
> while a Pipeline is running (without updates). Our understanding was that
> this was a rare occurrence, but it's not impossible to support.
>
Don't know how rare it is, but I know that Kafka supports adding
partitions, and Spark can handle this while reading from Kafka.

>
> For generateInitialSplits, the UnboundedSource API doesn't require
> deterministic splitting (although it's recommended), and a PipelineRunner
> should keep track of the initially generated splits.
>
If the splitting were to be consistent, in such way that newly added
partitions would be assigned with a new "splitId" while existing ones would
still be assigned with the same (consistent) splitId, it could support
newly added partitions, no ?

>
> On Tue, Sep 13, 2016 at 1:49 AM, Amit Sela  wrote:
>
> > If I understand correctly this will break
> > https://github.com/apache/incubator-beam/blob/master/sdks/
> >
> java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857
> > in
> > KafkaIO.
> >
> > So it's a KafkaIO limitation (for now ?) ?
> >
> > On Tue, Sep 13, 2016 at 11:31 AM Amit Sela  wrote:
> >
> > > Thanks Thomas, you did understand correct.
> > > Doing this, or assigning a running id, is basically the same, as long
> as
> > > generateInitialSplits implementation is deterministic (KafkaIO actually
> > > notes this).
> > >
> > > So what if partitions were added at runtime to one (or more) of the
> > topics
> > > I'm consuming from ?
> > >
> > > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh 
> > > wrote:
> > >
> > >> I'm not sure if I've understood what the problem is - from what I can
> > tell
> > >> it's about associating UnboundedSource splits with Checkpoints in
> order
> > to
> > >> get consistent behavior from the sources. If I'm wrong, the following
> > >> isn't
> > >> really relevant to your problem - it's about the expected behavior of
> a
> > >> runner interacting with any split of a Source.
> > >>
> > >> In the absence of updates, the evaluation of a split of
> UnboundedSource
> > >> must to obey the general contract for UnboundedSource, which is that
> > >> createReader(PipelineOptions, CheckpointMarkT) will only ever be
> called
> > >> with a Checkpoint Mark that was generated by an UnboundedReader that
> was
> > >> created from the source - i.e., a Source creates Readers and is
> provided
> > >> only checkpoints from those readers it creates. Each Source instance
> > >> (split
> > >> and top-level) should be independent of all other instances. A split
> of
> > a
> > >> Source should generally be indistinguishable from a top-level source
> (it
> > >> will just have slightly different configuration).
> > >>
> > >> Generally this means that Source splits have to have an associated
> > >> identifier, but these identifiers are arbitrary and not relevant to
> the
> > >> actual evaluation of the Source - so the runner gets to tag splits
> > however
> > >> it pleases, so long as those tags don't allow splits to bleed into
> each
> > >> other.
> > >>
> > >> Could you instead store the Source paired with some (arbitrary and
> > unique)
> > >> key and pull out the checkpoint using the key (or even just store the
> > keys
> > >> and store the source with the checkpoint)? That way you always will
> keep
> > >> the same association between Source and Checkpoint. Flink does
> something
> > >> like this where they store the serialized source alongside the
> > >> CheckpointMark so they're never separated (
> > >>
> > >> https://github.com/apache/incubator-beam/blob/master/runners
> > /flink/runner/src/main/java/org/apache/beam/runners/flink/
> > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164
> > >> and
> > >>
> > >> https://github.com/apache/incubator-beam/blob/master/runners
> > /flink/runner/src/main/java/org/apache/beam/runners/flink/
> > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334
> > >> )
> > >>
> > >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela 
> > wrote:
> > >>
> > >> > If this issue doesn't make sense for "native" streaming systems, and
> > >> it's
> > >> > only a Spark issue (and my implementation of Read.Unbounded) - I
> could
> > >> keep
> > >> > doing what I do, use a running id.
> > >> > I was just wondering... ( hence the question mark in the title ;-) )
> > >> >
> > >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela 
> > wrote:
> > >> >
> > >> > > Not sure how it works in Dataflow or Flink, but I'm working on an
> > >> > > implementation for Spark using the (almost) only stateful operator
> > it
> > >> > has -
> > >> > > "mapWithState" - and the State needs to correspond to a key.
> > >> > > Each micro-batch, the Sources recreate the readers and "look-up"
> the
> > >> > > latest checkpoint.
> > >> > >

Re: Support for Flink 1.1.0 in release-0.2.0-incubating

2016-09-14 Thread Chawla,Sumit
Hi Max

I was able to compile 0.2.0 with Flink 1.1.0 with small modification, and
run a simple pipeline.

   @Override
-  public void restoreState(StreamTaskState taskState, long
recoveryTimestamp) throws Exception {
-super.restoreState(taskState, recoveryTimestamp);
+  public void restoreState(StreamTaskState taskState) throws Exception {
+super.restoreState(taskState);


Can i get a sense of the changes that have happened in 0.3.0 for Flink?  I
observed some classes completely reworked.  It will be crucial for me to
understand the scope of change and impact before making a move to 0.3.0



Regards
Sumit Chawla


On Wed, Sep 14, 2016 at 3:03 AM, Maximilian Michels  wrote:

> We support Flink 1.1.2 on the latest snapshot version
> 0.3.0-incubating-SNAPSHOT. Would it be possible for you to work with
> this version?
>
> On Tue, Sep 13, 2016 at 11:55 PM, Chawla,Sumit 
> wrote:
> > When trying to use Beam 0.2.0 with Flink 1.1.0 jar, i am seeing following
> > error:
> >
> > java.lang.NoSuchMethodError:
> > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.
> registerTimer(JLorg/apache/flink/streaming/runtime/
> operators/Triggerable;)V
> > at org.apache.beam.runners.flink.translation.wrappers.
> streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(
> UnboundedSourceWrapper.java:381)
> > at org.apache.beam.runners.flink.translation.wrappers.
> streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:233)
> > at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:80)
> > at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:53)
> > at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask.run(SourceStreamTask.java:56)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Tue, Sep 13, 2016 at 2:20 PM, Chawla,Sumit 
> > wrote:
> >
> >> Hi All
> >>
> >> The release-0.2.0-incubating supports Flink 1.0.3. With Flink 1.1.0 out,
> >> is there a plan to support it with any 0.2.0 patch? I tried compiling
> 0.2.0
> >> with Flink 1.1.0,
> >> and got couple of compliation errors in FlinkGroupAlsoByWindowWrapper.
> java.
> >> Going back to master i see lots of change in Flink translation wrappers,
> >> and
> >> FlinkGroupAlsoByWindowWrapper.java being removed.
> >>
> >> Just want to get a sense of things here, on what would it take to
> support Flink
> >> 1.1.0 with release-0.2.0. Would appreciate views of people who are
> already
> >> working on upgrading it to Flink 1.1.0
> >>
> >> Regards
> >> Sumit Chawla
> >>
> >>
>


About Finishing Triggers

2016-09-14 Thread Aljoscha Krettek
Hi,
I had a chat with Kenn at Flink Forward and he did an off-hand remark about
how it might be better if triggers where not allowed to mark a window as
finished and instead always be "Repeatedly" (if I understood correctly).

Maybe you (Kenn) could go a bit more in depth about what you meant by this
and if we should actually change this in Beam. Would this mean that we then
have the opposite of Repeatedly, i.e. Once, or Only.once(T)?

I also noticed some inconsistencies in when triggers behave as repeated
triggers and once triggers. For example, AfterPane.elementCountAtLeast(5)
only fires once if used alone but it it fires repeatedly if used as the
speculative trigger in
AfterWatermark.pastEndOfWindow().withEarlyFirings(...). (This is true for
all "once" triggers.)

Cheers,
Aljoscha


Re: Support for Flink 1.1.0 in release-0.2.0-incubating

2016-09-14 Thread Maximilian Michels
We support Flink 1.1.2 on the latest snapshot version
0.3.0-incubating-SNAPSHOT. Would it be possible for you to work with
this version?

On Tue, Sep 13, 2016 at 11:55 PM, Chawla,Sumit  wrote:
> When trying to use Beam 0.2.0 with Flink 1.1.0 jar, i am seeing following
> error:
>
> java.lang.NoSuchMethodError:
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.registerTimer(JLorg/apache/flink/streaming/runtime/operators/Triggerable;)V
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(UnboundedSourceWrapper.java:381)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:233)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Regards
> Sumit Chawla
>
>
> On Tue, Sep 13, 2016 at 2:20 PM, Chawla,Sumit 
> wrote:
>
>> Hi All
>>
>> The release-0.2.0-incubating supports Flink 1.0.3. With Flink 1.1.0 out,
>> is there a plan to support it with any 0.2.0 patch? I tried compiling 0.2.0
>> with Flink 1.1.0,
>> and got couple of compliation errors in FlinkGroupAlsoByWindowWrapper.java.
>> Going back to master i see lots of change in Flink translation wrappers,
>> and
>> FlinkGroupAlsoByWindowWrapper.java being removed.
>>
>> Just want to get a sense of things here, on what would it take to support 
>> Flink
>> 1.1.0 with release-0.2.0. Would appreciate views of people who are already
>> working on upgrading it to Flink 1.1.0
>>
>> Regards
>> Sumit Chawla
>>
>>