[jira] [Commented] (BEAM-1640) data file missing when submit a job on Flink

2017-03-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903038#comment-15903038
 ] 

Aljoscha Krettek commented on BEAM-1640:


Which Flink version are you using?

> data file missing when submit a job on Flink
> 
>
> Key: BEAM-1640
> URL: https://issues.apache.org/jira/browse/BEAM-1640
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.6.0
>Reporter: Xu Mingmin
>Assignee: Aljoscha Krettek
>
> I've one file with path 'META-INF/jaas/kafka_jaas.conf' in my jar package. it 
> works with Beam 0.5.0, when I re-package it with 0.6.0-SNAPSHOT, it fails to 
> submit with bin/flink command. --Both run on YARN.
> The error is show as below, I guess this file maybe lost in Flink-Runner. 
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> consumer
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:702)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:557)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:540)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$2.apply(KafkaIO.java:503)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$2.apply(KafkaIO.java:501)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:620)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.(UnboundedSourceWrapper.java:159)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:267)
>   ... 33 more
> Caused by: org.apache.kafka.common.KafkaException: 
> java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in 
> jaas config.
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
>   at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623)
>   ... 40 more
> Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' 
> entry in jaas config.
>   at io.ebay.rheos.kafka.security.iaf.IAFLogin.login(IAFLogin.java:54)
>   at 
> org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53)
>   at 
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:75)
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
>   ... 43 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1641) Support synchronized processing time in Flink runner

2017-03-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902802#comment-15902802
 ] 

Aljoscha Krettek commented on BEAM-1641:


Wow, that's a tough one. I think we might first have to introduce such a 
concept in Flink if we want to use it from Beam.

> Support synchronized processing time in Flink runner
> 
>
> Key: BEAM-1641
> URL: https://issues.apache.org/jira/browse/BEAM-1641
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Aljoscha Krettek
>
> The "continuation trigger" for a processing time trigger is a synchronized 
> processing time trigger. Today, this throws an exception in the FlinkRunner.
> The supports the following:
>  - GBK1
>  - GBK2
> When GBK1 fires due to processing time past the first element in the pane and 
> that element arrives at GBK2, it will wait until all the other upstream keys 
> have also processed and emitted corresponding data.
> Sorry for the terseness of explanation - writing quickly so I don't forget.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1631) Flink runner: submit job to a Flink-on-YARN cluster

2017-03-07 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899797#comment-15899797
 ] 

Aljoscha Krettek commented on BEAM-1631:


[~davor] Yes, this analysis is correct! I'll look into how hard it would be to 
let the Flink Runner submit directly to YARN without bin/flink.

[~mingmxu] You can submit a Beam job using the Flink Runner on a YARN cluster 
using the bin/flink command. This page in the Flink doc describes how to do 
that: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html.
 To Flink, a jar with a Beam on Flink Runner program looks just like a normal 
Flink program, so everything that you can do with Flink programs you can also 
do with Beam programs.

> Flink runner: submit job to a Flink-on-YARN cluster
> ---
>
> Key: BEAM-1631
> URL: https://issues.apache.org/jira/browse/BEAM-1631
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Davor Bonaci
>Assignee: Aljoscha Krettek
>
> As far as I understand, running Beam pipelines on a Flink cluster can be done 
> in two ways:
> * Run directly with a Flink runner, and specifying {{--flinkMaster}} pipeline 
> option via, say, {{mvn exec}}.
> * Produce a bundled JAR, and use {{bin/flink}} to submit the same pipeline.
> These two ways are equivalent, and work well on a standalone Flink cluster.
> Submitting to a Flink-on-YARN is more complicated. You can still produce a 
> bundled JAR, and use {{bin/flink -yid }} to submit such a job. 
> However, that seems impossible with a Flink runner directly.
> If so, we should add the ability to the Flink runner to submit a job to a 
> Flink-on-YARN cluster directly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner

2017-03-03 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894345#comment-15894345
 ] 

Aljoscha Krettek commented on BEAM-1612:


Yes, I think finding a solution for bundling is very important.

Right now, I'm afraid we have to manually induce bundling using either count or 
time (or both). Making bundling dependent on snapshotting doesn't work because 
Flink operators cannot (right now) emit data while snapshotting while a Beam 
{{DoFn}} can emit data in the {{@FinishBundle}} method.

> Support real Bundle in Flink runner
> ---
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1589) Add OnWindowExpiration method to Stateful DoFn

2017-03-03 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894162#comment-15894162
 ] 

Aljoscha Krettek commented on BEAM-1589:


This will probably take the form of a {{@OnWindowExpiration}} annotation. I'm 
guessing from some of his earlier comments elsewhere that [~kenn] already has 
an opinion about how this could be done, right?

> Add OnWindowExpiration method to Stateful DoFn
> --
>
> Key: BEAM-1589
> URL: https://issues.apache.org/jira/browse/BEAM-1589
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core, sdk-java-core
>Reporter: Jingsong Lee
>
> See BEAM-1517
> This allows the user to do some work before the state's garbage collection.
> It seems kind of annoying, but on the other hand forgetting to set a final 
> timer to flush state is probably data loss most of the time.
> FlinkRunner does this work very simply, but other runners, such as 
> DirectRunner, need to traverse all the states to do this, and maybe it's a 
> little hard.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-760) Validation needs to exist that @NeedsRunner / @RunnableOnService tests execute

2017-03-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-760:
--
Fix Version/s: First stable release

> Validation needs to exist that @NeedsRunner / @RunnableOnService tests execute
> --
>
> Key: BEAM-760
> URL: https://issues.apache.org/jira/browse/BEAM-760
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core, runner-dataflow, runner-direct, 
> runner-flink, runner-gearpump, runner-spark, sdk-java-core
>Reporter: Luke Cwik
>Assignee: Jason Kuster
> Fix For: First stable release
>
>
> We lack the validation that tests that were supposed to execute actually 
> executed part of pre/post commit.
> This is worrisome in an automated test environment since its difficult to 
> know if all the tests that were supposed to run did run.
> Repro steps:
> checkout apache/master @ b8e6eea691b48e14c4e2c3e84609d750769e09ee
> mvn clean integration-test -T 1C -pl runners/direct-java -am
> Note that the SplittableParDoTest part of beam-runners-core-java doesn't 
> execute.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-622) Add checkpointing tests for DoFnOperator and WindowDoFnOperator

2017-03-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-622:
--
Fix Version/s: First stable release

> Add checkpointing tests for DoFnOperator and WindowDoFnOperator 
> 
>
> Key: BEAM-622
> URL: https://issues.apache.org/jira/browse/BEAM-622
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Affects Versions: 0.3.0-incubating
>Reporter: Maximilian Michels
> Fix For: First stable release
>
>
> Tests which test the correct snapshotting of these two operators are missing. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-720) Run WindowedWordCount Integration Test in Flink

2017-03-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-720:
--
Fix Version/s: First stable release

> Run WindowedWordCount Integration Test in Flink
> ---
>
> Key: BEAM-720
> URL: https://issues.apache.org/jira/browse/BEAM-720
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Mark Liu
>Assignee: Aljoscha Krettek
> Fix For: First stable release
>
>
> In order to have coverage of streaming pipeline test in pre-commit, it's 
> important to have TestFlinkRunner to be able to run WindowedWordCountIT 
> successfully. 
> Relevant works in TestDataflowRunner:
> https://github.com/apache/incubator-beam/pull/1045



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1389) Remove or update Flink Runner README.md

2017-03-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1389.
--
   Resolution: Fixed
Fix Version/s: 0.6.0

> Remove or update Flink Runner README.md
> ---
>
> Key: BEAM-1389
> URL: https://issues.apache.org/jira/browse/BEAM-1389
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.6.0
>
>
> The readme is outdated by now. We could either remove it or just put a link 
> to the Runner documentation on the website.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1517) Garbage collect user state in Flink Runner

2017-03-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1517.
--
Resolution: Fixed
  Assignee: Jingsong Lee  (was: Aljoscha Krettek)

> Garbage collect user state in Flink Runner
> --
>
> Key: BEAM-1517
> URL: https://issues.apache.org/jira/browse/BEAM-1517
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.6.0
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>Priority: Blocker
> Fix For: 0.6.0
>
>
> User facing state/timers in Beam are bound to the key/window of the data. 
> Right now, the Flink Runner does not clean up user state when the watermark 
> passes the GC horizon for the state associated with a given window.
> Neither {{StateInternals}} nor the Flink state API support discarding state 
> for a whole namespace (which is the window in this case) so we might have to 
> manually set a GC timer for each window/key combination, as is done in the 
> {{ReduceFnRunner}}. For this we have to know all states a user can possibly 
> use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1517) Garbage collect user state in Flink Runner

2017-02-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1517:
---
Fix Version/s: 0.6.0

> Garbage collect user state in Flink Runner
> --
>
> Key: BEAM-1517
> URL: https://issues.apache.org/jira/browse/BEAM-1517
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.6.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.6.0
>
>
> User facing state/timers in Beam are bound to the key/window of the data. 
> Right now, the Flink Runner does not clean up user state when the watermark 
> passes the GC horizon for the state associated with a given window.
> Neither {{StateInternals}} nor the Flink state API support discarding state 
> for a whole namespace (which is the window in this case) so we might have to 
> manually set a GC timer for each window/key combination, as is done in the 
> {{ReduceFnRunner}}. For this we have to know all states a user can possibly 
> use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1517) Garbage collect user state in Flink Runner

2017-02-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1517:
---
Priority: Blocker  (was: Major)

> Garbage collect user state in Flink Runner
> --
>
> Key: BEAM-1517
> URL: https://issues.apache.org/jira/browse/BEAM-1517
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.6.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 0.6.0
>
>
> User facing state/timers in Beam are bound to the key/window of the data. 
> Right now, the Flink Runner does not clean up user state when the watermark 
> passes the GC horizon for the state associated with a given window.
> Neither {{StateInternals}} nor the Flink state API support discarding state 
> for a whole namespace (which is the window in this case) so we might have to 
> manually set a GC timer for each window/key combination, as is done in the 
> {{ReduceFnRunner}}. For this we have to know all states a user can possibly 
> use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1517) Garbage collect user state in Flink Runner

2017-02-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1517:
---
Affects Version/s: 0.6.0

> Garbage collect user state in Flink Runner
> --
>
> Key: BEAM-1517
> URL: https://issues.apache.org/jira/browse/BEAM-1517
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.6.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.6.0
>
>
> User facing state/timers in Beam are bound to the key/window of the data. 
> Right now, the Flink Runner does not clean up user state when the watermark 
> passes the GC horizon for the state associated with a given window.
> Neither {{StateInternals}} nor the Flink state API support discarding state 
> for a whole namespace (which is the window in this case) so we might have to 
> manually set a GC timer for each window/key combination, as is done in the 
> {{ReduceFnRunner}}. For this we have to know all states a user can possibly 
> use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1116) Support for new Timer API in Flink runner

2017-02-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1116.
--
   Resolution: Fixed
Fix Version/s: 0.6.0

> Support for new Timer API in Flink runner
> -
>
> Key: BEAM-1116
> URL: https://issues.apache.org/jira/browse/BEAM-1116
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-315) Flink Runner compares keys unencoded which may produce incorrect results

2017-02-28 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887826#comment-15887826
 ] 

Aljoscha Krettek commented on BEAM-315:
---

Strange... are you running in Streaming or Batch mode?

> Flink Runner compares keys unencoded which may produce incorrect results
> 
>
> Key: BEAM-315
> URL: https://issues.apache.org/jira/browse/BEAM-315
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.1.0-incubating, 0.2.0-incubating
>Reporter: Pawel Szczur
>Assignee: Aljoscha Krettek
> Fix For: 0.3.0-incubating
>
> Attachments: CiteGroupPatentNumberUpdateDataFlowOptions.java, 
> CiteGroupPatentNumberUpdateFnRunner.java, CoGroupPipelineStringKey.java, 
> execution.log, execution_split.log, execution_split_sorted.log, 
> NcUniPatToCiteGroupFn.java, PacUniPatToCiteGroupFn.java, 
> UniPatNumToLineFn.java, UniPatNumToOrigNumFn.java
>
>
> Same keys are processed multiple times.
> A repo to reproduce the bug:
> https://github.com/orian/cogroup-wrong-grouping
> Discussion:
> http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201605.mbox/%3CCAB2uKkG2xHsWpLFUkYnt8eEzdxU%3DB_nu6crTwVi-ZuUpugxkPQ%40mail.gmail.com%3E
> Notice: I haven't tested other runners (didn't manage to configure Spark).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1036) Support for new State API in FlinkRunner

2017-02-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1036.
--
   Resolution: Fixed
Fix Version/s: 0.6.0

> Support for new State API in FlinkRunner
> 
>
> Key: BEAM-1036
> URL: https://issues.apache.org/jira/browse/BEAM-1036
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1558) jetty-util 6.1.26.cloudera.4 absent from repository list

2017-02-27 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886382#comment-15886382
 ] 

Aljoscha Krettek commented on BEAM-1558:


On my machine, when I run your command I get this

{code}
$ mvn -f runners/flink/runner/pom.xml dependency:tree -Dincludes="*:jetty-util"
[INFO] Scanning for projects...
[INFO] 
[INFO] Detecting the operating system and CPU architecture
[INFO] 
[INFO] os.detected.name: osx
[INFO] os.detected.arch: x86_64
[INFO] os.detected.classifier: osx-x86_64
[INFO]
[INFO] 
[INFO] Building Apache Beam :: Runners :: Flink :: Core 0.6.0-SNAPSHOT
[INFO] 
[INFO]
[INFO] --- maven-dependency-plugin:3.0.0:tree (default-cli) @ 
beam-runners-flink_2.10 ---
[INFO] org.apache.beam:beam-runners-flink_2.10:jar:0.6.0-SNAPSHOT
[INFO] \- org.apache.flink:flink-java:jar:1.2.0:compile
[INFO]\- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
[INFO]   \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
{code}

having that shaded dependency there is still messy but it's not cloudera 
specific? Do you maybe have a custom-built Flink somewhere in your deps or 
something else Cloudera related in some of your settings?

> jetty-util 6.1.26.cloudera.4 absent from repository list
> 
>
> Key: BEAM-1558
> URL: https://issues.apache.org/jira/browse/BEAM-1558
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.6.0
>Reporter: Tom Haines
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> When building beam from source with a fresh local repository, the 
> org.mortbay.jetty:jetty-util:jar:6.1.26.cloudera.4 dependency is not 
> available in the repository list.
> The dependency is transitive via org.apache.flink:flink-java:jar:1.2.0
> {code}
> $ mvn -f runners/flink/runner/pom.xml dependency:tree 
> -Dincludes="*:jetty-util"
> [INFO] 
> 
> [INFO] Building Apache Beam :: Runners :: Flink :: Core 0.6.0-SNAPSHOT
> [INFO] 
> 
> [INFO] --- maven-dependency-plugin:3.0.0:tree (default-cli) @ 
> beam-runners-flink_2.10 ---
> [INFO] org.apache.beam:beam-runners-flink_2.10:jar:0.6.0-SNAPSHOT
> [INFO] \- org.apache.flink:flink-java:jar:1.2.0:compile
> [INFO]\- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
> [INFO]   \- org.mortbay.jetty:jetty-util:jar:6.1.26.cloudera.4:compile
> [INFO] 
> 
> {code}
> Error log:
> {code}
> [INFO] 
> 
> [INFO] Building Apache Beam :: Runners :: Flink :: Core 0.6.0-SNAPSHOT
> [INFO] 
> 
> [WARNING] The POM for org.mortbay.jetty:jetty-util:jar:6.1.26.cloudera.4 is 
> missing, no dependency information available
> [INFO] 
> 
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Apache Beam :: Runners :: Flink  SUCCESS [  0.873 
> s]
> [INFO] Apache Beam :: Runners :: Flink :: Core  FAILURE [  0.884 
> s]
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 2.334 s
> [INFO] 
> 
> [ERROR] Failed to execute goal on project beam-runners-flink_2.10: Could not 
> resolve dependencies for project 
> org.apache.beam:beam-runners-flink_2.10:jar:0.6.0-SNAPSHOT: Failure to find 
> org.mortbay.jetty:jetty-util:jar:6.1.26.cloudera.4 in 
> https://repo.maven.apache.org/maven2 was cached in the local repository, 
> resolution will not be reattempted until the update interval of central has 
> elapsed or updates are forced -> [Help 1]
> {code}
> By adding cloudera repo matching the repo listed in flink 1.2.0 pom.xml, the 
> dependency can be located and build passes.  
> See 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1560) Use provided Function Runners in Flink Batch Runner

2017-02-27 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1560.
--
Resolution: Fixed

> Use provided Function Runners in Flink Batch Runner
> ---
>
> Key: BEAM-1560
> URL: https://issues.apache.org/jira/browse/BEAM-1560
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> This is a prerequisite for supporting the user-facing State API and Timer API 
> in Flink Batch Runner.
> This requires executing {{DoFn}} using a {{DoFnRunner}} and executing 
> {{CombineFn}} using the {{PerKeyCombineFnRunner}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1560) Use provided Function Runners in Flink Batch Runner

2017-02-27 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1560:
---
Summary: Use provided Function Runners in Flink Batch Runner  (was: Use 
DoFnRunner in Flink Batch Runner)

> Use provided Function Runners in Flink Batch Runner
> ---
>
> Key: BEAM-1560
> URL: https://issues.apache.org/jira/browse/BEAM-1560
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> This is a prerequisite for supporting the user-facing State API and Timer API 
> in Flink Batch Runner.
> This requires executing {{DoFn}} using a {{DoFnRunner}} and executing 
> {{CombineFn}} using the {{PerKeyCombineFnRunner}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1560) Use DoFnRunner in Flink Batch Runner

2017-02-27 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1560:
---
Description: 
This is a prerequisite for supporting the user-facing State API and Timer API 
in Flink Batch Runner.

This requires executing {{DoFn}} using a {{DoFnRunner}} and executing 
{{CombineFn}} using the {{PerKeyCombineFnRunner}}.

  was:To support the user-facing State API and Timer API in Flink runner (batch)


> Use DoFnRunner in Flink Batch Runner
> 
>
> Key: BEAM-1560
> URL: https://issues.apache.org/jira/browse/BEAM-1560
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> This is a prerequisite for supporting the user-facing State API and Timer API 
> in Flink Batch Runner.
> This requires executing {{DoFn}} using a {{DoFnRunner}} and executing 
> {{CombineFn}} using the {{PerKeyCombineFnRunner}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1560) Use DoFnRunner in Flink Batch Runner

2017-02-27 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1560:
---
Summary: Use DoFnRunner in Flink Batch Runner  (was: Use SimpleDoFnRunner 
to invoke and use new CombineFnRunner in batch mode of Flink runner)

> Use DoFnRunner in Flink Batch Runner
> 
>
> Key: BEAM-1560
> URL: https://issues.apache.org/jira/browse/BEAM-1560
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> To support the user-facing State API and Timer API in Flink runner (batch)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1560) Use SimpleDoFnRunner to invoke and use new CombineFnRunner in batch mode of Flink runner

2017-02-27 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1560:
---
Description: To support the user-facing State API and Timer API in Flink 
runner (batch)  (was: To support new StateApi and TimerApi in Flink runner 
(batch))

> Use SimpleDoFnRunner to invoke and use new CombineFnRunner in batch mode of 
> Flink runner
> 
>
> Key: BEAM-1560
> URL: https://issues.apache.org/jira/browse/BEAM-1560
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> To support the user-facing State API and Timer API in Flink runner (batch)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1456) Make UnboundedSourceWrapper snapshot to rescalable operator state in Flink Runner

2017-02-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1456.
--
   Resolution: Fixed
Fix Version/s: 0.6.0

> Make UnboundedSourceWrapper snapshot to rescalable operator state in Flink 
> Runner
> -
>
> Key: BEAM-1456
> URL: https://issues.apache.org/jira/browse/BEAM-1456
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> By using the SPLIT_DISTRIBUTE OperatorState in flink to snapshot source 
> checkpoints we make UnboundedSourceWrapper operators rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1517) Garbage collect user state in Flink Runner

2017-02-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877903#comment-15877903
 ] 

Aljoscha Krettek commented on BEAM-1517:


[~lzljs3620320] We cannot depend on users always cleaning up the state. That's 
why we need the GC failsafe to ensure that we don't grow state indefinitely.

[~kenn] I was afraid you might say that and anticipated as much in the 
description of the issue. :-( We'll probably add something like a 
{{StatefulDoFnRunner}} that registers a GC timer in {{processElement()}} and 
does cleanup in {{onTimer()}}.

> Garbage collect user state in Flink Runner
> --
>
> Key: BEAM-1517
> URL: https://issues.apache.org/jira/browse/BEAM-1517
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> User facing state/timers in Beam are bound to the key/window of the data. 
> Right now, the Flink Runner does not clean up user state when the watermark 
> passes the GC horizon for the state associated with a given window.
> Neither {{StateInternals}} nor the Flink state API support discarding state 
> for a whole namespace (which is the window in this case) so we might have to 
> manually set a GC timer for each window/key combination, as is done in the 
> {{ReduceFnRunner}}. For this we have to know all states a user can possibly 
> use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1116) Support for new Timer API in Flink runner

2017-02-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875726#comment-15875726
 ] 

Aljoscha Krettek commented on BEAM-1116:


This is resolved for the Flink Streaming Runner, I'm leaving open until it's 
resolved for the Batch runner as well.

> Support for new Timer API in Flink runner
> -
>
> Key: BEAM-1116
> URL: https://issues.apache.org/jira/browse/BEAM-1116
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Jingsong Lee
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1517) Garbage collect user state in Flink Runner

2017-02-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875634#comment-15875634
 ] 

Aljoscha Krettek commented on BEAM-1517:


[~tgroh] or [~kenn], do you have any ideas about this? Should we make a generic 
version of this, similar to how {{ReduceFnRunner}} is used for {{GroupByKey}}?

[~lzljs3620320] I thought you might also find this interesting.

> Garbage collect user state in Flink Runner
> --
>
> Key: BEAM-1517
> URL: https://issues.apache.org/jira/browse/BEAM-1517
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> User facing state/timers in Beam are bound to the key/window of the data. 
> Right now, the Flink Runner does not clean up user state when the watermark 
> passes the GC horizon for the state associated with a given window.
> Neither {{StateInternals}} nor the Flink state API support discarding state 
> for a whole namespace (which is the window in this case) so we might have to 
> manually set a GC timer for each window/key combination, as is done in the 
> {{ReduceFnRunner}}. For this we have to know all states a user can possibly 
> use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1517) Garbage collect user state in Flink Runner

2017-02-21 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-1517:
--

 Summary: Garbage collect user state in Flink Runner
 Key: BEAM-1517
 URL: https://issues.apache.org/jira/browse/BEAM-1517
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


User facing state/timers in Beam are bound to the key/window of the data. Right 
now, the Flink Runner does not clean up user state when the watermark passes 
the GC horizon for the state associated with a given window.

Neither {{StateInternals}} nor the Flink state API support discarding state for 
a whole namespace (which is the window in this case) so we might have to 
manually set a GC timer for each window/key combination, as is done in the 
{{ReduceFnRunner}}. For this we have to know all states a user can possibly 
use, which we can get from the {{DoFn}} signature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1498) Use Flink-native side outputs

2017-02-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869941#comment-15869941
 ] 

Aljoscha Krettek commented on BEAM-1498:


[~lzljs3620320] I thought you might find this interesting. I'm currently 
working on this with a contributor and we should make it in time for Flink 1.3. 
:-)

> Use Flink-native side outputs
> -
>
> Key: BEAM-1498
> URL: https://issues.apache.org/jira/browse/BEAM-1498
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>
> Once Flink has support for side outputs we should use them instead of 
> manually dealing with the {{RawUnionValues}}.
> Side outputs for Flink is being tracked in 
> https://issues.apache.org/jira/browse/FLINK-4460.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1498) Use Flink-native side outputs

2017-02-16 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-1498:
--

 Summary: Use Flink-native side outputs
 Key: BEAM-1498
 URL: https://issues.apache.org/jira/browse/BEAM-1498
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek


Once Flink has support for side outputs we should use them instead of manually 
dealing with the {{RawUnionValues}}.

Side outputs for Flink is being tracked in 
https://issues.apache.org/jira/browse/FLINK-4460.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1394.
--
   Resolution: Fixed
Fix Version/s: 0.6.0

> Use Flink InternalTimerService for TimerInternals
> -
>
> Key: BEAM-1394
> URL: https://issues.apache.org/jira/browse/BEAM-1394
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> When updating our Flink version to 1.2 we can use the new internal timer API 
> for both the windowing and for wiring in the Beam user-facing Timer API.
> By using the internal timer API we make operators rescalable, that is, we can 
> change the parallelism of a running Beam on Flink job by performing a 
> savepoint and then restarting with a different parallelism.
> An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
> {{open()}} using:
> {code}
> /**
>  * Returns a {@link InternalTimerService} that can be used to query current 
> processing time
>  * and event time and to set timers. An operator can have several timer 
> services, where
>  * each has its own namespace serializer. Timer services are differentiated 
> by the string
>  * key that is given when requesting them, if you call this method with the 
> same key
>  * multiple times you will get the same timer service instance in subsequent 
> requests.
>  *
>  * Timers are always scoped to a key, the currently active key of a keyed 
> stream operation.
>  * When a timer fires, this key will also be set as the currently active key.
>  *
>  * Each timer has attached metadata, the namespace. Different timer 
> services
>  * can have a different namespace type. If you don't need namespace 
> differentiation you
>  * can use {@link VoidNamespaceSerializer} as the namespace serializer.
>  *
>  * @param name The name of the requested timer service. If no service exists 
> under the given
>  * name a new one will be created and returned.
>  * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
>  * @param triggerable The {@link Triggerable} that should be invoked when 
> timers fire
>  *
>  * @param  The type of the timer namespace.
>  */
> public  InternalTimerService getInternalTimerService(
> String name,
> TypeSerializer namespaceSerializer,
> Triggerable triggerable);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1445) Use Flink broadcast state to store side-input data

2017-02-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1445.
--
   Resolution: Fixed
Fix Version/s: 0.6.0

> Use Flink broadcast state to store side-input data
> --
>
> Key: BEAM-1445
> URL: https://issues.apache.org/jira/browse/BEAM-1445
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> By using the broadcast state to store side-input data we make operators 
> rescalable. What BROADCAST does is collect all checkpointed states into one 
> "list" and then send out that list to all parallel subtasks when 
> restoring.The way we would use it is to only checkpoint anything from the 
> operator with subtask index 0 because we assume that the state is the same on 
> all parallel instances of the operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1393.
--
   Resolution: Fixed
Fix Version/s: 0.6.0

> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
> Fix For: 0.6.0
>
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-02-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865303#comment-15865303
 ] 

Aljoscha Krettek commented on BEAM-1346:


Ah dammit, you're right, I forgot about BEAM-696. In fact the Flink Runner also 
[disables that 
optimization|https://github.com/apache/beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L850].

> Drop Late Data in ReduceFnRunner
> 
>
> Key: BEAM-1346
> URL: https://issues.apache.org/jira/browse/BEAM-1346
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 0.5.0
>Reporter: Aljoscha Krettek
>
> I think these two commits recently broke late-data dropping for the Flink 
> Runner (and maybe for other runners as well):
> - https://github.com/apache/beam/commit/2b26ec8
> - https://github.com/apache/beam/commit/8989473
> It boils down to the {{LateDataDroppingDoFnRunner}} not being used anymore  
> because {{DoFnRunners.lateDataDroppingRunner()}} is not called anymore when a 
> {{DoFn}} is a {{ReduceFnExecutor}} (because that interface was removed).
> Maybe we should think about dropping late data in another place, my 
> suggestion is {{ReduceFnRunner}} but that's open for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1461) duplication with StartBundle and prepareForProcessing in DoFn

2017-02-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863967#comment-15863967
 ] 

Aljoscha Krettek commented on BEAM-1461:


{{DoFn.prepareForProcessing()}} is only invoked once, before executing a 
pipeline using a Runner. The {{@StartBundle}} method is invoked before 
processing a bundle, this can happen several times if you have many bundles.

[~davor] you think we should maybe add better JavaDoc to 
{{DoFn.prepareForProcessing()}}? If not, I think we should close this issue. 
What do you think, [~mingmxu]?

> duplication with StartBundle and prepareForProcessing in DoFn
> -
>
> Key: BEAM-1461
> URL: https://issues.apache.org/jira/browse/BEAM-1461
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xu Mingmin
>Assignee: Davor Bonaci
>
> There're one annotation `StartBundle`, and one public function 
> `prepareForProcessing` in DoFn, which are called both before 
> `ProcessElement`. It's confused which one should be implemented in a subclass.
> The call sequence seems as:
> prepareForProcessing -> StartBundle -> processElement



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-02-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863721#comment-15863721
 ] 

Aljoscha Krettek commented on BEAM-1346:


[~kenn] another thing that crossed my mind is elements being pushed back due to 
their side input not being ready. Think {{PushbackSideInputRunner}} and similar 
implementations for other runners, if they have it. It's similar to this issue 
but in the end we probably need a separate issue.

The problem occurs when you have a special implementation for "combine" that 
doesn't simply do {{GroupByKey | ParDo(CombineFn)}} where the first one is 
{{GroupByKey: KV → KV}}. The {{CombineFn}} can access side 
inputs and the side input that it can access is determined by the window that 
the value has after merging (as evident from the proper definition of combine 
given above). {{PushbackSideInputRunner}}, however, only considers the 
(proto-)window that the value has before merging so the pushing back and 
determining when a side input is ready is based on the wrong information.

Do you agree or is that just me getting a little paranoid with the whole 
merging stuff? ;-)

> Drop Late Data in ReduceFnRunner
> 
>
> Key: BEAM-1346
> URL: https://issues.apache.org/jira/browse/BEAM-1346
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 0.5.0
>Reporter: Aljoscha Krettek
>
> I think these two commits recently broke late-data dropping for the Flink 
> Runner (and maybe for other runners as well):
> - https://github.com/apache/beam/commit/2b26ec8
> - https://github.com/apache/beam/commit/8989473
> It boils down to the {{LateDataDroppingDoFnRunner}} not being used anymore  
> because {{DoFnRunners.lateDataDroppingRunner()}} is not called anymore when a 
> {{DoFn}} is a {{ReduceFnExecutor}} (because that interface was removed).
> Maybe we should think about dropping late data in another place, my 
> suggestion is {{ReduceFnRunner}} but that's open for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1458) Checkpoint support in Beam

2017-02-10 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15861511#comment-15861511
 ] 

Aljoscha Krettek commented on BEAM-1458:


Flink snapshots/checkpoints are different from how the term is used in 
Scalding. In Flink, they are used to make the state of a streaming program 
fault tolerant, i.e. Flink periodically performs checkpoints and in case a 
failure happens we restore state from a checkpoint.

That being said, Scalding-style checkpoints could still be a nice feature but I 
imagine that it can be tricky to implement support for that across all runners. 
For now, I think you can implement a poor man's version of that by simply 
writing intermediate results to files and reading them with sources, thereby 
splitting up your pipeline.

> Checkpoint support in Beam
> --
>
> Key: BEAM-1458
> URL: https://issues.apache.org/jira/browse/BEAM-1458
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Affects Versions: 0.5.0
>Reporter: Rafal Wojdyla
>Assignee: Rafal Wojdyla
>  Labels: features
>
> Beam could support checkpoints - similar to:
>  * flink's snapshots
>  * scalding's checkpoints
> Checkpoint should provides a simple mechanism to read and write intermediate 
> results. It would be useful for debugging one part of a long flow, when you 
> would otherwise have to run many steps to get to the one you care about.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-10 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15861079#comment-15861079
 ] 

Aljoscha Krettek commented on BEAM-1393:


Yes, your analysis is spot on!

I think what we can do is completely copy the code of 
{{AbstractStreamOperator}} to the Beam code, then try and come up with a good 
interface that works for checkpointing key-groups. And then contribute that 
back to Flink for the next release so that we can again remove 
{{AbstractStreamOperator}} from the Beam code base. What do you think?

I had in mind something like:

{code}
interface KeyGroupCheckpointer {
  void checkpointKeyGroup(int keyGroupIndex, OutputStream out);
}
{code}

Our Beam operators would implement this and our custom 
{{AbstractStreamOperator}} would call this hook in 
{{AbstractStreamOperator.snapshotState()}} while iterating over the key groups.

If we find that this interface works well we can easily add it to Flink proper.

We would also need a new implementation of {{StateInternals}} that can 
checkpoint itself to a stream and restore from a stream. We could then use this 
to either write to key-group checkpoints or to SPLIT_DISTRIBUTE streams for the 
non-keyed case.

> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859777#comment-15859777
 ] 

Aljoscha Krettek commented on BEAM-1393:


That last part is actually a bit more complicated. (I saw what you did in your 
PR and you're definitely going in the right direction.)

We have to differentiate between where we store side-input state and 
pushed-back events. The reason for this is that there can be keyed operators 
that have state and for those we need to ensure that the pushed back elements 
end up on the correct operator when restoring. Which only happens for keyed 
state. For non-keyed state, we need to store the elements in state that is not 
BROADCAST, they can just be reshuffled to any operator.

> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-1394:
--

Assignee: (was: Aljoscha Krettek)

> Use Flink InternalTimerService for TimerInternals
> -
>
> Key: BEAM-1394
> URL: https://issues.apache.org/jira/browse/BEAM-1394
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>
> When updating our Flink version to 1.2 we can use the new internal timer API 
> for both the windowing and for wiring in the Beam user-facing Timer API.
> By using the internal timer API we make operators rescalable, that is, we can 
> change the parallelism of a running Beam on Flink job by performing a 
> savepoint and then restarting with a different parallelism.
> An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
> {{open()}} using:
> {code}
> /**
>  * Returns a {@link InternalTimerService} that can be used to query current 
> processing time
>  * and event time and to set timers. An operator can have several timer 
> services, where
>  * each has its own namespace serializer. Timer services are differentiated 
> by the string
>  * key that is given when requesting them, if you call this method with the 
> same key
>  * multiple times you will get the same timer service instance in subsequent 
> requests.
>  *
>  * Timers are always scoped to a key, the currently active key of a keyed 
> stream operation.
>  * When a timer fires, this key will also be set as the currently active key.
>  *
>  * Each timer has attached metadata, the namespace. Different timer 
> services
>  * can have a different namespace type. If you don't need namespace 
> differentiation you
>  * can use {@link VoidNamespaceSerializer} as the namespace serializer.
>  *
>  * @param name The name of the requested timer service. If no service exists 
> under the given
>  * name a new one will be created and returned.
>  * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
>  * @param triggerable The {@link Triggerable} that should be invoked when 
> timers fire
>  *
>  * @param  The type of the timer namespace.
>  */
> public  InternalTimerService getInternalTimerService(
> String name,
> TypeSerializer namespaceSerializer,
> Triggerable triggerable);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1394:
---
Description: 
When updating our Flink version to 1.2 we can use the new internal timer API 
for both the windowing and for wiring in the Beam user-facing Timer API.

By using the internal timer API we make operators rescalable, that is, we can 
change the parallelism of a running Beam on Flink job by performing a savepoint 
and then restarting with a different parallelism.

An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
{{open()}} using:
{code}
/**
 * Returns a {@link InternalTimerService} that can be used to query current 
processing time
 * and event time and to set timers. An operator can have several timer 
services, where
 * each has its own namespace serializer. Timer services are differentiated by 
the string
 * key that is given when requesting them, if you call this method with the 
same key
 * multiple times you will get the same timer service instance in subsequent 
requests.
 *
 * Timers are always scoped to a key, the currently active key of a keyed 
stream operation.
 * When a timer fires, this key will also be set as the currently active key.
 *
 * Each timer has attached metadata, the namespace. Different timer services
 * can have a different namespace type. If you don't need namespace 
differentiation you
 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
 *
 * @param name The name of the requested timer service. If no service exists 
under the given
 * name a new one will be created and returned.
 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 * @param triggerable The {@link Triggerable} that should be invoked when 
timers fire
 *
 * @param  The type of the timer namespace.
 */
public  InternalTimerService getInternalTimerService(
String name,
TypeSerializer namespaceSerializer,
Triggerable triggerable);
{code}

  was:
When updating our Flink version to 1.2 we can use the new internal timer API 
for both the windowing and for wiring in the Beam user-facing Timer API.

An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
{{open()}} using:
{code}
/**
 * Returns a {@link InternalTimerService} that can be used to query current 
processing time
 * and event time and to set timers. An operator can have several timer 
services, where
 * each has its own namespace serializer. Timer services are differentiated by 
the string
 * key that is given when requesting them, if you call this method with the 
same key
 * multiple times you will get the same timer service instance in subsequent 
requests.
 *
 * Timers are always scoped to a key, the currently active key of a keyed 
stream operation.
 * When a timer fires, this key will also be set as the currently active key.
 *
 * Each timer has attached metadata, the namespace. Different timer services
 * can have a different namespace type. If you don't need namespace 
differentiation you
 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
 *
 * @param name The name of the requested timer service. If no service exists 
under the given
 * name a new one will be created and returned.
 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 * @param triggerable The {@link Triggerable} that should be invoked when 
timers fire
 *
 * @param  The type of the timer namespace.
 */
public  InternalTimerService getInternalTimerService(
String name,
TypeSerializer namespaceSerializer,
Triggerable triggerable);
{code}


> Use Flink InternalTimerService for TimerInternals
> -
>
> Key: BEAM-1394
> URL: https://issues.apache.org/jira/browse/BEAM-1394
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> When updating our Flink version to 1.2 we can use the new internal timer API 
> for both the windowing and for wiring in the Beam user-facing Timer API.
> By using the internal timer API we make operators rescalable, that is, we can 
> change the parallelism of a running Beam on Flink job by performing a 
> savepoint and then restarting with a different parallelism.
> An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
> {{open()}} using:
> {code}
> /**
>  * Returns a {@link InternalTimerService} that can be used to query current 
> processing time
>  * and event time and to set timers. An operator can have several timer 
> services, where
>  * each has its own namespace serializer. Timer services are differentiated 
> by the string
>  * key that is given when requesting them, if you call this method with the 
> same key
>  

[jira] [Created] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-05 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-1394:
--

 Summary: Use Flink InternalTimerService for TimerInternals
 Key: BEAM-1394
 URL: https://issues.apache.org/jira/browse/BEAM-1394
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


When updating our Flink version to 1.2 we can use the new internal timer API 
for both the windowing and for wiring in the Beam user-facing Timer API.

An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
{{open()}} using:
{code}
/**
 * Returns a {@link InternalTimerService} that can be used to query current 
processing time
 * and event time and to set timers. An operator can have several timer 
services, where
 * each has its own namespace serializer. Timer services are differentiated by 
the string
 * key that is given when requesting them, if you call this method with the 
same key
 * multiple times you will get the same timer service instance in subsequent 
requests.
 *
 * Timers are always scoped to a key, the currently active key of a keyed 
stream operation.
 * When a timer fires, this key will also be set as the currently active key.
 *
 * Each timer has attached metadata, the namespace. Different timer services
 * can have a different namespace type. If you don't need namespace 
differentiation you
 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
 *
 * @param name The name of the requested timer service. If no service exists 
under the given
 * name a new one will be created and returned.
 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 * @param triggerable The {@link Triggerable} that should be invoked when 
timers fire
 *
 * @param  The type of the timer namespace.
 */
public  InternalTimerService getInternalTimerService(
String name,
TypeSerializer namespaceSerializer,
Triggerable triggerable);
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-05 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-1393:
--

 Summary: Update Flink Runner to Flink 1.2.0
 Key: BEAM-1393
 URL: https://issues.apache.org/jira/browse/BEAM-1393
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


When we update to 1.2.0 we can use the new internal Timer API that is available 
to Flink operators: {{InternalTimerService}} and also use broadcast state to 
store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1389) Remove or update Flink Runner README.md

2017-02-04 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-1389:
--

 Summary: Remove or update Flink Runner README.md
 Key: BEAM-1389
 URL: https://issues.apache.org/jira/browse/BEAM-1389
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


The readme is outdated by now. We could either remove it or just put a link to 
the Runner documentation on the website.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1335) ValueState could use an initial/default value

2017-02-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850086#comment-15850086
 ] 

Aljoscha Krettek commented on BEAM-1335:


No, only the internal implementation is harder to get right. (One of the 
difficulties is that Flink doesn't just have a {{Coder}} but a 
{{TypeInformation}} that can be asked to give a {{TypeSerializer}}, where the 
latter is similar to a {{Coder}}. The {{TypeSeriailzer}} that is returned can 
depend on pipeline-level configuration settings so it's hard to get that always 
working correctly because of serialisation of the user function, where you also 
have to serialise the state spec.)

Since we tend to put ease of use before ease of internal development I think 
it's good to have this option, also since {{Coders}} are easier to deal with.

> ValueState could use an initial/default value
> -
>
> Key: BEAM-1335
> URL: https://issues.apache.org/jira/browse/BEAM-1335
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Priority: Minor
>  Labels: starter
>
> In writing example state code with {{ValueState}} there is almost always a 
> use of {{firstNonNull(state.read(), defaultValue)}}. It would be nice to bake 
> this into the declaration.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-31 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15847474#comment-15847474
 ] 

Aljoscha Krettek commented on BEAM-1346:


Yep, that fact (the Apex runner never using the DoFnRunners) is part of the 
reason why I suggested to put it into {{ReduceFnRunner}}.

Wherever we put it, in the end we need some good tests to ensure that dropping 
(and triggering) works correctly in all runners. :-)

> Drop Late Data in ReduceFnRunner
> 
>
> Key: BEAM-1346
> URL: https://issues.apache.org/jira/browse/BEAM-1346
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 0.5.0
>Reporter: Aljoscha Krettek
>
> I think these two commits recently broke late-data dropping for the Flink 
> Runner (and maybe for other runners as well):
> - https://github.com/apache/beam/commit/2b26ec8
> - https://github.com/apache/beam/commit/8989473
> It boils down to the {{LateDataDroppingDoFnRunner}} not being used anymore  
> because {{DoFnRunners.lateDataDroppingRunner()}} is not called anymore when a 
> {{DoFn}} is a {{ReduceFnExecutor}} (because that interface was removed).
> Maybe we should think about dropping late data in another place, my 
> suggestion is {{ReduceFnRunner}} but that's open for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-31 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15847134#comment-15847134
 ] 

Aljoscha Krettek commented on BEAM-1346:


Also, isn't dropping late data before giving it to the {{ReduceFnRunner}} wrong 
when we have a merging {{WindowFn}}? When an element arrives with a window that 
would be late with respect to the watermark but that would end up getting 
merged into a bigger window that is not late (which will therefore still be 
holding back the output watermark) we wouldn't have to drop that element.

> Drop Late Data in ReduceFnRunner
> 
>
> Key: BEAM-1346
> URL: https://issues.apache.org/jira/browse/BEAM-1346
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 0.5.0
>Reporter: Aljoscha Krettek
>
> I think these two commits recently broke late-data dropping for the Flink 
> Runner (and maybe for other runners as well):
> - https://github.com/apache/beam/commit/2b26ec8
> - https://github.com/apache/beam/commit/8989473
> It boils down to the {{LateDataDroppingDoFnRunner}} not being used anymore  
> because {{DoFnRunners.lateDataDroppingRunner()}} is not called anymore when a 
> {{DoFn}} is a {{ReduceFnExecutor}} (because that interface was removed).
> Maybe we should think about dropping late data in another place, my 
> suggestion is {{ReduceFnRunner}} but that's open for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-31 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek commented on  BEAM-1346 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 
 
 + Thomas Weise and Amit Sela 
AFAIK this is not just an issue for the Flink Runner, from a quick glance it looks as if the Apex runner and Spark Runner also don't drop late elements. For the Dataflow Runner I don't know but I assume that there is some internal code that does the right thing. 
If we want to ensure that the behaviour for the supported features is always correct with different Runners then I would consider this a blocker. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek assigned an issue to Unassigned 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1346 
 
 
 
  Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 

Change By:
 
 Aljoscha Krettek 
 
 
 

Assignee:
 
 Kenneth Knowles 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1346 
 
 
 
  Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Bug 
 
 
 

Affects Versions:
 

 0.5.0 
 
 
 

Assignee:
 
 Kenneth Knowles 
 
 
 

Components:
 

 runner-core 
 
 
 

Created:
 

 30/Jan/17 11:39 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Aljoscha Krettek 
 
 
 
 
 
 
 
 
 
 
I think these two commits recently broke late-data dropping for the Flink Runner (and maybe for other runners as well): 
 

https://github.com/apache/beam/commit/2b26ec8
 

https://github.com/apache/beam/commit/8989473
 
 
It boils down to the LateDataDroppingDoFnRunner not being used anymore because DoFnRunners.lateDataDroppingRunner() is not called anymore when a DoFn is a ReduceFnExecutor (because that interface was removed). 
Maybe we should think about dropping late data in another place, my suggestion is ReduceFnRunner but that's open for discussion. 
 
 
   

[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek commented on  BEAM-1346 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 
 
Kenneth Knowles, this is related to 

BEAM-241
. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-843) Use New DoFn Directly in Flink Runner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek commented on  BEAM-843 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Use New DoFn Directly in Flink Runner  
 
 
 
 
 
 
 
 
 
 
Jingsong Lee: I assigned the issue to you and am now merging your PR. In the future it would be good to comment on an issue before you start implementing to avoid duplicate work.  
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] [Commented] (BEAM-773) Implement Metrics support for Flink runner

2017-01-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811610#comment-15811610
 ] 

Aljoscha Krettek commented on BEAM-773:
---

[~bchambers] is correct on both issues: AFAIK no one is working on this right 
now and wiring the Beam Metrics to the Flink Metrics will only provide 
"attempted metrics".

[~celix] If your interested in working on this I'm happy to help.

> Implement Metrics support for Flink runner
> --
>
> Key: BEAM-773
> URL: https://issues.apache.org/jira/browse/BEAM-773
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Chambers
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


<    1   2   3