[jira] [Closed] (BEAM-5372) [Flink Runner] Make minPauseBetweenCheckpoints setting available in FlinkPipelineOptions

2018-09-14 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-5372.
--
Resolution: Fixed

> [Flink Runner] Make minPauseBetweenCheckpoints setting available in 
> FlinkPipelineOptions
> 
>
> Key: BEAM-5372
> URL: https://issues.apache.org/jira/browse/BEAM-5372
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Daniel Harper
>Assignee: Daniel Harper
>Priority: Trivial
> Fix For: 2.7.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The configuration setting {{minPauseBetweenCheckpoints}} [1] is available in 
> Flink to allow a grace period when checkpoints runtime is > checkpoint 
> interval. 
> This should be exposed in {{FlinkPipelineOptions}} and 
> {{FlinkExecutionEnvironments}} to allow users to configure this.
> The default for this value in Flink is 0ms [2] 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setMinPauseBetweenCheckpoints-long-
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/constant-values.html#org.apache.flink.streaming.api.environment.CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated (faa5699 -> 73def89)

2018-09-14 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from faa5699  Merge pull request #6385: [BEAM-5337] Fix flaky test 
UnboundedSourceWrapperTest#testValueEmission
 add 780ca79  BEAM-5372 -> Make minPauseBetweenCheckpoints setting 
available in FlinkPipelineOptions
 add 31d5101  BEAM-5372 -> Updating setting to only set if value is not 
default
 add 73def89  Merge pull request #6378: [BEAM-5372] Make 
minPauseBetweenCheckpoints setting available in FlinkPipelineOptions

No new revisions were added by this update.

Summary of changes:
 .../org/apache/beam/runners/flink/FlinkExecutionEnvironments.java  | 7 +++
 .../java/org/apache/beam/runners/flink/FlinkPipelineOptions.java   | 6 ++
 2 files changed, 13 insertions(+)



[jira] [Updated] (BEAM-5372) [Flink Runner] Make minPauseBetweenCheckpoints setting available in FlinkPipelineOptions

2018-09-14 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-5372:
---
Fix Version/s: 2.7.0

> [Flink Runner] Make minPauseBetweenCheckpoints setting available in 
> FlinkPipelineOptions
> 
>
> Key: BEAM-5372
> URL: https://issues.apache.org/jira/browse/BEAM-5372
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Daniel Harper
>Assignee: Daniel Harper
>Priority: Trivial
> Fix For: 2.7.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The configuration setting {{minPauseBetweenCheckpoints}} [1] is available in 
> Flink to allow a grace period when checkpoints runtime is > checkpoint 
> interval. 
> This should be exposed in {{FlinkPipelineOptions}} and 
> {{FlinkExecutionEnvironments}} to allow users to configure this.
> The default for this value in Flink is 0ms [2] 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setMinPauseBetweenCheckpoints-long-
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/constant-values.html#org.apache.flink.streaming.api.environment.CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3919) checkpoint can not work with flink 1.4.1,1.4.2

2018-08-22 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588744#comment-16588744
 ] 

Aljoscha Krettek commented on BEAM-3919:


Unfortunately, I don't think we can fix this right now with the incompatible 
changes in minor Flink versions.

> checkpoint can not work with flink 1.4.1,1.4.2
> --
>
> Key: BEAM-3919
> URL: https://issues.apache.org/jira/browse/BEAM-3919
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0, 2.4.0
>Reporter: eisig
>Assignee: Harshal Tripathi
>Priority: Critical
>
> When submmit application to flink cluster(1.4.1,1.4.2) with checkpoint 
> enabled. 
> Job fail whith exception:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(Lorg/apache/flink/core/memory/DataOutputViewStreamWrapper;I)V
>  
> It seems that 
> `org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup`.
>   was changed in flink1.4.1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4798) IndexOutOfBoundsException when Flink parallelism > 1

2018-08-10 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-4798.
--
Resolution: Fixed

> IndexOutOfBoundsException when Flink parallelism > 1
> 
>
> Key: BEAM-4798
> URL: https://issues.apache.org/jira/browse/BEAM-4798
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Alexey Romanenko
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.7.0
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Running job on Flink in streaming mode and get data from a Kafka topic with 
> parallelism > 1 causes an exception:
> {noformat}
> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>   at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}
> It happens when number of Kafka topic partitions is less than value of 
> parallelism (number of task slots).
> So, workaround for now can be to set parallelism <= number of topic 
> partitions, thus if parallelism=2 then number_partitions >= 2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4798) IndexOutOfBoundsException when Flink parallelism > 1

2018-08-10 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-4798:
---
Fix Version/s: 2.7.0

> IndexOutOfBoundsException when Flink parallelism > 1
> 
>
> Key: BEAM-4798
> URL: https://issues.apache.org/jira/browse/BEAM-4798
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Alexey Romanenko
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.7.0
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Running job on Flink in streaming mode and get data from a Kafka topic with 
> parallelism > 1 causes an exception:
> {noformat}
> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>   at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}
> It happens when number of Kafka topic partitions is less than value of 
> parallelism (number of task slots).
> So, workaround for now can be to set parallelism <= number of topic 
> partitions, thus if parallelism=2 then number_partitions >= 2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2930) Flink support for portable side input

2018-07-27 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559894#comment-16559894
 ] 

Aljoscha Krettek commented on BEAM-2930:


Yes, that sounds like a good approach. I upgraded the Flink Runner to use the 
new broadcast state feature when updating to Flink 1.5.0 and there are plans to 
make this backed by RocksDB so we will even get support for bigger side inputs 
once Flink has that support.

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4311) Enforce ErrorProne analysis in Flink runner project

2018-06-08 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-4311.
--
Resolution: Fixed

> Enforce ErrorProne analysis in Flink runner project
> ---
>
> Key: BEAM-4311
> URL: https://issues.apache.org/jira/browse/BEAM-4311
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Scott Wegner
>Assignee: Tim Robertson
>Priority: Minor
>  Labels: errorprone, starter
> Fix For: 2.6.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently 
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build 
> process, but only as warnings. ErrorProne errors are generally useful and 
> easy to fix. Some work was done to [make sdks-java-core 
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add 
> enforcement. This task is clean ErrorProne warnings and add enforcement in 
> {{beam-runners-flink}}. Additional context discussed on the [dev 
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution 
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development 
> environment.
> # Run the following command to compile and run ErrorProne analysis on the 
> project: {{./gradlew :beam-runners-flink:assemble}}
> # Fix each ErrorProne warning from the {{runners/flink}} project.
> # In {{runners/flink/build.gradle}}, add {{failOnWarning: true}} to the call 
> the {{applyJavaNature()}} 
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach 
> out|https://beam.apache.org/community/contact-us/] with questions or code 
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4311) Enforce ErrorProne analysis in Flink runner project

2018-06-08 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-4311:
---
Fix Version/s: 2.6.0

> Enforce ErrorProne analysis in Flink runner project
> ---
>
> Key: BEAM-4311
> URL: https://issues.apache.org/jira/browse/BEAM-4311
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Scott Wegner
>Assignee: Tim Robertson
>Priority: Minor
>  Labels: errorprone, starter
> Fix For: 2.6.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently 
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build 
> process, but only as warnings. ErrorProne errors are generally useful and 
> easy to fix. Some work was done to [make sdks-java-core 
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add 
> enforcement. This task is clean ErrorProne warnings and add enforcement in 
> {{beam-runners-flink}}. Additional context discussed on the [dev 
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution 
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development 
> environment.
> # Run the following command to compile and run ErrorProne analysis on the 
> project: {{./gradlew :beam-runners-flink:assemble}}
> # Fix each ErrorProne warning from the {{runners/flink}} project.
> # In {{runners/flink/build.gradle}}, add {{failOnWarning: true}} to the call 
> the {{applyJavaNature()}} 
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach 
> out|https://beam.apache.org/community/contact-us/] with questions or code 
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated (c70d5be -> 358898e)

2018-06-08 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from c70d5be  Merge pull request #5431 from asethia/master: Update right 
documentation for HCatalogIO.write
 add ef4ea06  [BEAM-4311] Enforce ErrorProne analysis in Flink runner
 new 358898e  Merge pull request #5572: [BEAM-4311] Enforce ErrorProne 
analysis in Flink runner

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 runners/flink/build.gradle |  4 ++-
 .../runners/flink/FlinkExecutionEnvironments.java  | 13 
 .../wrappers/streaming/DoFnOperator.java   | 20 ++--
 .../wrappers/streaming/io/DedupingOperator.java|  5 ++-
 .../streaming/io/UnboundedSocketSource.java|  7 ++--
 .../streaming/io/UnboundedSourceWrapper.java   |  4 ++-
 .../state/FlinkBroadcastStateInternals.java| 38 +++---
 .../state/FlinkKeyGroupStateInternals.java | 19 ++-
 .../streaming/state/FlinkSplitStateInternals.java  | 12 +++
 .../streaming/state/FlinkStateInternals.java   | 17 ++
 .../flink/streaming/DedupingOperatorTest.java  | 31 --
 .../flink/streaming/GroupByNullKeyTest.java|  2 ++
 .../flink/streaming/TestCountingSource.java|  7 ++--
 13 files changed, 105 insertions(+), 74 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 01/01: Merge pull request #5572: [BEAM-4311] Enforce ErrorProne analysis in Flink runner

2018-06-08 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 358898e6c0574ec97e84081485a6644ba6dd3a30
Merge: c70d5be ef4ea06
Author: Aljoscha Krettek 
AuthorDate: Fri Jun 8 13:21:46 2018 +0200

Merge pull request #5572: [BEAM-4311] Enforce ErrorProne analysis in Flink 
runner

 runners/flink/build.gradle |  4 ++-
 .../runners/flink/FlinkExecutionEnvironments.java  | 13 
 .../wrappers/streaming/DoFnOperator.java   | 20 ++--
 .../wrappers/streaming/io/DedupingOperator.java|  5 ++-
 .../streaming/io/UnboundedSocketSource.java|  7 ++--
 .../streaming/io/UnboundedSourceWrapper.java   |  4 ++-
 .../state/FlinkBroadcastStateInternals.java| 38 +++---
 .../state/FlinkKeyGroupStateInternals.java | 19 ++-
 .../streaming/state/FlinkSplitStateInternals.java  | 12 +++
 .../streaming/state/FlinkStateInternals.java   | 17 ++
 .../flink/streaming/DedupingOperatorTest.java  | 31 --
 .../flink/streaming/GroupByNullKeyTest.java|  2 ++
 .../flink/streaming/TestCountingSource.java|  7 ++--
 13 files changed, 105 insertions(+), 74 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Updated] (BEAM-3909) Add tests for Flink DoFnOperator side-input checkpointing

2018-04-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-3909:
---
Fix Version/s: 2.5.0

> Add tests for Flink DoFnOperator side-input checkpointing
> -
>
> Key: BEAM-3909
> URL: https://issues.apache.org/jira/browse/BEAM-3909
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3909) Add tests for Flink DoFnOperator side-input checkpointing

2018-04-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3909.
--
Resolution: Fixed

> Add tests for Flink DoFnOperator side-input checkpointing
> -
>
> Key: BEAM-3909
> URL: https://issues.apache.org/jira/browse/BEAM-3909
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated (e0aac4e -> c744afd)

2018-04-30 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from e0aac4e  [BEAM-3973] Adds a parameter to the Cloud Spanner read 
connector that can disable batch API (#4946)
 add aaa2ead  [BEAM-3909] Add tests for Flink DoFnOperator side-input 
checkpointing
 new c744afd  Merge pull request #4931: [BEAM-3909] Add tests for Flink 
DoFnOperator side-input checkpointing

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../wrappers/streaming/DoFnOperator.java   |   5 +
 .../runners/flink/streaming/DoFnOperatorTest.java  | 277 +++--
 2 files changed, 266 insertions(+), 16 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 01/01: Merge pull request #4931: [BEAM-3909] Add tests for Flink DoFnOperator side-input checkpointing

2018-04-30 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c744afd2603e1d4db8e26762e626769bbbfa271e
Merge: e0aac4e aaa2ead
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Mon Apr 30 12:26:40 2018 +0300

Merge pull request #4931: [BEAM-3909] Add tests for Flink DoFnOperator 
side-input checkpointing

 .../wrappers/streaming/DoFnOperator.java   |   5 +
 .../runners/flink/streaming/DoFnOperatorTest.java  | 277 +++--
 2 files changed, 266 insertions(+), 16 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Assigned] (BEAM-4063) Flink runner supports cluster-wide artifact deployments through the Distributed Cache

2018-04-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-4063:
--

Assignee: (was: Aljoscha Krettek)

> Flink runner supports cluster-wide artifact deployments through the 
> Distributed Cache
> -
>
> Key: BEAM-4063
> URL: https://issues.apache.org/jira/browse/BEAM-4063
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>
> As of now, Flink effectively has a dependency on an external storage system 
> for artifact management. This is because the Flink Distributed Cache does not 
> actually distribute and cache blobs itself, but rather expects that each node 
> in a running cluster has access to a well-known artifact resource.
> We should get this for free whenever 
> [https://github.com/apache/flink/pull/5580] is merged (likely in 1.5). For 
> now, we will have to defer to external storage systems like GCS or HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4067) Add portable Flink test runner

2018-04-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-4067:
--

Assignee: (was: Aljoscha Krettek)

> Add portable Flink test runner
> --
>
> Key: BEAM-4067
> URL: https://issues.apache.org/jira/browse/BEAM-4067
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>
> The portable Flink runner cannot be tested through the normal mechanisms used 
> for ValidatesRunner tests because it requires a job server to be constructed 
> out of band and for pipelines to be run through it. We should implement a 
> shim that acts as a standard Java SDK Runner that spins up the necessary 
> server (possibly in-process) and runs against it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2927) Python SDK support for portable side input

2018-04-03 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423602#comment-16423602
 ] 

Aljoscha Krettek commented on BEAM-2927:


The problematic piece is this: 
https://github.com/apache/beam/blob/1f681bb1687aaa1ec23741e87f8622e6a4e59f7d/sdks/python/apache_beam/runners/worker/sdk_worker.py#L72

The python harness tries to create a state client that uses the control API 
descriptor when starting the worker. However, the worker harness should use the 
state API descriptor that it gets from the {{ProcessBundleDescriptor}} to 
access state and side inputs.

cc [~lcwik]

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3087) Extend lock scope in Flink UnboundedSourceWrapper

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3087.
--
   Resolution: Fixed
Fix Version/s: 2.5.0

> Extend lock scope in Flink UnboundedSourceWrapper
> -
>
> Key: BEAM-3087
> URL: https://issues.apache.org/jira/browse/BEAM-3087
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>
> In {{UnboundedSourceWrapper}} the lock scope is not big enough: we 
> synchronise in {{emitElement()}} but should instead synchronise inside the 
> reader loop in {{run()}} because the {{Source}} interface does not allow 
> concurrent calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2393.
--
   Resolution: Fixed
Fix Version/s: 2.5.0

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated (302b247 -> f93a332)

2018-03-26 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 302b247  Merge pull request #4937: [BEAM-622] Add checkpointing tests 
for DoFnOperator and WindowDoFnOperator
 new fef8f54  [BEAM-3087] Make reader state update and element emission 
atomic
 new 0e44feb  [BEAM-2393] Make BoundedSource fault-tolerant
 new f93a332  Merge pull request #4895: [BEAM-2393] Make BoundedSource 
fault-tolerant in FlinkRunner Streaming mode

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../UnboundedReadFromBoundedSource.java|   5 +-
 .../flink/FlinkStreamingTransformTranslators.java  | 138 ++-
 .../streaming/io/BoundedSourceWrapper.java | 259 -
 .../streaming/io/UnboundedSourceWrapper.java   |  67 +++---
 .../flink/streaming/BoundedSourceRestoreTest.java  | 236 +++
 5 files changed, 404 insertions(+), 301 deletions(-)
 delete mode 100644 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 create mode 100644 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 02/03: [BEAM-2393] Make BoundedSource fault-tolerant

2018-03-26 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0e44feb120a73c36116b2bd6b89e4f1676d7266f
Author: Grzegorz Kołakowski <grzegorz.kolakow...@getindata.com>
AuthorDate: Wed Feb 21 11:11:53 2018 +0100

[BEAM-2393] Make BoundedSource fault-tolerant
---
 .../UnboundedReadFromBoundedSource.java|   5 +-
 .../flink/FlinkStreamingTransformTranslators.java  | 138 ++-
 .../streaming/io/BoundedSourceWrapper.java | 259 -
 .../flink/streaming/BoundedSourceRestoreTest.java  | 236 +++
 4 files changed, 367 insertions(+), 271 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 09acc82..88119d1 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -169,8 +169,11 @@ public class UnboundedReadFromBoundedSource extends 
PTransform<PBegin, PColle
   return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
 }
 
+/**
+ * A marker representing the progress and state of an {@link 
BoundedToUnboundedSourceAdapter}.
+ */
 @VisibleForTesting
-static class Checkpoint implements UnboundedSource.CheckpointMark {
+public static class Checkpoint implements 
UnboundedSource.CheckpointMark {
   private final @Nullable List<TimestampedValue> residualElements;
   private final @Nullable BoundedSource residualSource;
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 970ece1..74ca168 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -42,6 +42,7 @@ import 
org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import 
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
+import 
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -51,7 +52,6 @@ import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKey
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -87,18 +87,27 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputSt

[beam] 03/03: Merge pull request #4895: [BEAM-2393] Make BoundedSource fault-tolerant in FlinkRunner Streaming mode

2018-03-26 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f93a332a20857f425fa1c1b69d2423f3c2e7880d
Merge: 302b247 0e44feb
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Tue Mar 27 06:49:21 2018 +0200

Merge pull request #4895: [BEAM-2393] Make BoundedSource fault-tolerant in 
FlinkRunner Streaming mode

 .../UnboundedReadFromBoundedSource.java|   5 +-
 .../flink/FlinkStreamingTransformTranslators.java  | 138 ++-
 .../streaming/io/BoundedSourceWrapper.java | 259 -
 .../streaming/io/UnboundedSourceWrapper.java   |  67 +++---
 .../flink/streaming/BoundedSourceRestoreTest.java  | 236 +++
 5 files changed, 404 insertions(+), 301 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 01/03: [BEAM-3087] Make reader state update and element emission atomic

2018-03-26 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit fef8f54be417be88730ebf2d0e8db8c55fbd013f
Author: Grzegorz Kołakowski <grzegorz.kolakow...@getindata.com>
AuthorDate: Wed Feb 21 10:31:53 2018 +0100

[BEAM-3087] Make reader state update and element emission atomic

Reader advancement should be considered as reader state update too.
Therefore, the reader's advancement and element emission are in the
same synchronized section.
---
 .../streaming/io/UnboundedSourceWrapper.java   | 67 --
 1 file changed, 37 insertions(+), 30 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index fc23c01..3f04b6c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -186,7 +186,7 @@ public class UnboundedSourceWrapper<
 
 if (isRestored) {
   // restore the splitSources from the checkpoint to ensure consistent 
ordering
-  for (KV, 
CheckpointMarkT> restored:
+  for (KV, 
CheckpointMarkT> restored :
   stateForCheckpoint.get()) {
 localSplitSources.add(restored.getKey());
 localReaders.add(restored.getKey().createReader(
@@ -229,19 +229,25 @@ public class UnboundedSourceWrapper<
   // the easy case, we just read from one reader
   UnboundedSource.UnboundedReader reader = localReaders.get(0);
 
-  boolean dataAvailable = readerInvoker.invokeStart(reader);
-  if (dataAvailable) {
-emitElement(ctx, reader);
+  synchronized (ctx.getCheckpointLock()) {
+boolean dataAvailable = readerInvoker.invokeStart(reader);
+if (dataAvailable) {
+  emitElement(ctx, reader);
+}
   }
 
   setNextWatermarkTimer(this.runtimeContext);
 
   while (isRunning) {
-dataAvailable = readerInvoker.invokeAdvance(reader);
+boolean dataAvailable;
+synchronized (ctx.getCheckpointLock()) {
+  dataAvailable = readerInvoker.invokeAdvance(reader);
 
-if (dataAvailable)  {
-  emitElement(ctx, reader);
-} else {
+  if (dataAvailable) {
+emitElement(ctx, reader);
+  }
+}
+if (!dataAvailable) {
   Thread.sleep(50);
 }
   }
@@ -254,9 +260,11 @@ public class UnboundedSourceWrapper<
 
   // start each reader and emit data if immediately available
   for (UnboundedSource.UnboundedReader reader : localReaders) {
-boolean dataAvailable = readerInvoker.invokeStart(reader);
-if (dataAvailable) {
-  emitElement(ctx, reader);
+synchronized (ctx.getCheckpointLock()) {
+  boolean dataAvailable = readerInvoker.invokeStart(reader);
+  if (dataAvailable) {
+emitElement(ctx, reader);
+  }
 }
   }
 
@@ -267,11 +275,13 @@ public class UnboundedSourceWrapper<
   boolean hadData = false;
   while (isRunning) {
 UnboundedSource.UnboundedReader reader = 
localReaders.get(currentReader);
-boolean dataAvailable = readerInvoker.invokeAdvance(reader);
 
-if (dataAvailable) {
-  emitElement(ctx, reader);
-  hadData = true;
+synchronized (ctx.getCheckpointLock()) {
+  boolean dataAvailable = readerInvoker.invokeAdvance(reader);
+  if (dataAvailable) {
+emitElement(ctx, reader);
+hadData = true;
+  }
 }
 
 currentReader = (currentReader + 1) % numReaders;
@@ -321,24 +331,21 @@ public class UnboundedSourceWrapper<
   UnboundedSource.UnboundedReader reader) {
 // make sure that reader state update and element emission are atomic
 // with respect to snapshots
-synchronized (ctx.getCheckpointLock()) {
-
-  OutputT item = reader.getCurrent();
-  byte[] recordId = reader.getCurrentRecordId();
-  Instant timestamp = reader.getCurrentTimestamp();
-
-  WindowedValue<ValueWithRecordId> windowedValue =
-  WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
-  GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-  ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
-}
+OutputT item = reader.getCurrent();
+byte[] recordId = reader.getCurrentRecordId();
+Instant timestamp = reader.getCurrentTimestamp();
+
+WindowedValue<ValueWithRecordId> windowedValue =
+WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
+  

[beam] branch master updated: [BEAM-622] Add checkpointing tests for DoFnOperator and WindowDoFnOperator

2018-03-26 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ebc0fb0  [BEAM-622] Add checkpointing tests for DoFnOperator and 
WindowDoFnOperator
 new 302b247  Merge pull request #4937: [BEAM-622] Add checkpointing tests 
for DoFnOperator and WindowDoFnOperator
ebc0fb0 is described below

commit ebc0fb0e3d0f7340400fe07d7d508907da351424
Author: Grzegorz Kołakowski <grzegorz.kolakow...@getindata.com>
AuthorDate: Thu Mar 22 09:51:44 2018 +0100

[BEAM-622] Add checkpointing tests for DoFnOperator and WindowDoFnOperator

Timers restore test for DoFnOperator already exists.
---
 .../flink/streaming/DedupingOperatorTest.java  |  30 +--
 .../runners/flink/streaming/DoFnOperatorTest.java  | 177 ++--
 .../flink/streaming/StreamRecordStripper.java  |  49 +
 .../flink/streaming/WindowDoFnOperatorTest.java| 227 +
 4 files changed, 397 insertions(+), 86 deletions(-)

diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
index 0b376e9..73be0ef 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
@@ -17,13 +17,11 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
+import static 
org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
 import java.nio.ByteBuffer;
-import javax.annotation.Nullable;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.ValueWithRecordId;
@@ -64,7 +62,7 @@ public class DedupingOperatorTest {
 WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, 
key1.getBytes();
 
 assertThat(
-this.stripStreamRecordFromWindowedValue(harness.getOutput()),
+stripStreamRecordFromWindowedValue(harness.getOutput()),
 contains(WindowedValue.valueInGlobalWindow(key1),
 WindowedValue.valueInGlobalWindow(key2)));
 
@@ -86,7 +84,7 @@ public class DedupingOperatorTest {
 WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key3, 
key3.getBytes();
 
 assertThat(
-this.stripStreamRecordFromWindowedValue(harness.getOutput()),
+stripStreamRecordFromWindowedValue(harness.getOutput()),
 contains(WindowedValue.valueInGlobalWindow(key3)));
 
 harness.close();
@@ -102,26 +100,4 @@ public class DedupingOperatorTest {
 value -> ByteBuffer.wrap(value.getValue().getId()),
 TypeInformation.of(ByteBuffer.class));
   }
-
-  private  Iterable<WindowedValue> stripStreamRecordFromWindowedValue(
-  Iterable input) {
-
-return FluentIterable.from(input)
-.filter(
-o ->
-o instanceof StreamRecord && ((StreamRecord) o).getValue() 
instanceof WindowedValue)
-.transform(
-new Function<Object, WindowedValue>() {
-  @Nullable
-  @Override
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public WindowedValue apply(@Nullable Object o) {
-if (o instanceof StreamRecord
-&& ((StreamRecord) o).getValue() instanceof WindowedValue) 
{
-  return (WindowedValue) ((StreamRecord) o).getValue();
-}
-throw new RuntimeException("unreachable");
-  }
-});
-  }
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 73a0a08..4d6fca6 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
+import static 
org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
@@ -28,6 +29,7 @@ import com.google.common.collect.ImmutableList;
 imp

[jira] [Reopened] (BEAM-622) Add checkpointing tests for DoFnOperator and WindowDoFnOperator

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened BEAM-622:
---

> Add checkpointing tests for DoFnOperator and WindowDoFnOperator 
> 
>
> Key: BEAM-622
> URL: https://issues.apache.org/jira/browse/BEAM-622
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Affects Versions: 0.3.0-incubating
>Reporter: Maximilian Michels
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Tests which test the correct snapshotting of these two operators are missing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-622) Add checkpointing tests for DoFnOperator and WindowDoFnOperator

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-622.
-
   Resolution: Fixed
Fix Version/s: 2.5.0

> Add checkpointing tests for DoFnOperator and WindowDoFnOperator 
> 
>
> Key: BEAM-622
> URL: https://issues.apache.org/jira/browse/BEAM-622
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Affects Versions: 0.3.0-incubating
>Reporter: Maximilian Michels
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Tests which test the correct snapshotting of these two operators are missing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3800) Set uids on Flink operators

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3800.
--
   Resolution: Fixed
Fix Version/s: 2.5.0

> Set uids on Flink operators
> ---
>
> Key: BEAM-3800
> URL: https://issues.apache.org/jira/browse/BEAM-3800
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Grzegorz Kołakowski
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Flink operators should have unique ids assigned, which are, in turn, used for 
> checkpointing stateful operators. Assigning operator ids is highly 
> recommended according to Flink documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] 01/01: Merge pull request #4903: [BEAM-3800] Set uids on Flink operators

2018-03-26 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ab3c38dcc3b65f850016be3cf68b36943def5645
Merge: 8b72e5a f3dbd8c
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Mon Mar 26 04:16:25 2018 -0700

Merge pull request #4903: [BEAM-3800] Set uids on Flink operators

 .../flink/FlinkStreamingTransformTranslators.java  | 37 +-
 1 file changed, 22 insertions(+), 15 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] branch master updated (8b72e5a -> ab3c38d)

2018-03-26 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 8b72e5a  Merge pull request #4940: Update generated files
 add f3dbd8c  [BEAM-3800] Set uids on Flink operators
 new ab3c38d  Merge pull request #4903: [BEAM-3800] Set uids on Flink 
operators

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/FlinkStreamingTransformTranslators.java  | 37 +-
 1 file changed, 22 insertions(+), 15 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Assigned] (BEAM-3919) checkpoint can not work with flink 1.4.1,1.4.2

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3919:
--

Assignee: Harshal Tripathi

> checkpoint can not work with flink 1.4.1,1.4.2
> --
>
> Key: BEAM-3919
> URL: https://issues.apache.org/jira/browse/BEAM-3919
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0, 2.4.0
>Reporter: eisig
>Assignee: Harshal Tripathi
>Priority: Critical
>
> When submmit application to flink cluster(1.4.1,1.4.2) with checkpoint 
> enabled. 
> Job fail whith exception:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(Lorg/apache/flink/core/memory/DataOutputViewStreamWrapper;I)V
>  
> It seems that 
> `org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup`.
>   was changed in flink1.4.1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3919) checkpoint can not work with flink 1.4.1,1.4.2

2018-03-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3919:
--

Assignee: (was: Aljoscha Krettek)

> checkpoint can not work with flink 1.4.1,1.4.2
> --
>
> Key: BEAM-3919
> URL: https://issues.apache.org/jira/browse/BEAM-3919
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0, 2.4.0
>Reporter: eisig
>Priority: Critical
>
> When submmit application to flink cluster(1.4.1,1.4.2) with checkpoint 
> enabled. 
> Job fail whith exception:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(Lorg/apache/flink/core/memory/DataOutputViewStreamWrapper;I)V
>  
> It seems that 
> `org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup`.
>   was changed in flink1.4.1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3909) Add tests for Flink DoFnOperator side-input checkpointing

2018-03-21 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3909:
--

 Summary: Add tests for Flink DoFnOperator side-input checkpointing
 Key: BEAM-3909
 URL: https://issues.apache.org/jira/browse/BEAM-3909
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3905) Update Flink Runner to Flink 1.5.0

2018-03-21 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3905:
--

 Summary: Update Flink Runner to Flink 1.5.0
 Key: BEAM-3905
 URL: https://issues.apache.org/jira/browse/BEAM-3905
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2018-03-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405185#comment-16405185
 ] 

Aljoscha Krettek commented on BEAM-2393:


Yes, it's very good to worry about those things! 

I would say that's it's ok, because I don't see how else we could do this 
currently.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Grzegorz Kołakowski
>Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2018-03-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16402220#comment-16402220
 ] 

Aljoscha Krettek commented on BEAM-2393:


Ah, I think that explains it. This call in the source: 
https://github.com/apache/beam/blob/6148a6d063a0503ee435ab5084fcba3fb864b26f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L333
 will at some point invoke the ParDo because the operations are chained 
together, i.e. they are being executed in the same thread and "sending" 
elements downstream is just method calls. This means that the source will block 
for about 20s in that synchronized block, which keeps the checkpointing logic 
from acquiring the lock.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Grzegorz Kołakowski
>Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-03-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3043.
--
Resolution: Fixed

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2018-03-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397890#comment-16397890
 ] 

Aljoscha Krettek commented on BEAM-2393:


Have you enabled DEBUG logging in Flink? What do the logs say about the 
checkpointing times on the individual operators?

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Grzegorz Kołakowski
>Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3675) FlinkRunner: Logging server

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3675:
--

Assignee: (was: Aljoscha Krettek)

> FlinkRunner: Logging server
> ---
>
> Key: BEAM-3675
> URL: https://issues.apache.org/jira/browse/BEAM-3675
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Major
>
> An implementation of BeamFnLogging that uses the default Flink logging 
> mechanism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3673) FlinkRunner: Harness manager for connecting operators to SDK Harnesses

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3673:
--

Assignee: (was: Aljoscha Krettek)

> FlinkRunner: Harness manager for connecting operators to SDK Harnesses
> --
>
> Key: BEAM-3673
> URL: https://issues.apache.org/jira/browse/BEAM-3673
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Major
>
> SDK harnesses require a common set of gRPC services to operate. The role of 
> the harness manager is to provide a uniform interface that multiplexes data 
> streams and auxiliary data between SDK environments and operators within a 
> given job.
> Note that multiple operators may communicate with a single SDK environment to 
> amortize container initialization cost. Environments are _not_ shared between 
> different jobs.
> The initial implementation will shell out to local docker, but the harness 
> manager should eventually support working with externally-managed 
> environments (e.g., created by Kubernetes).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3753) Integration ITCase tests are not executed

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3753.
--
   Resolution: Fixed
Fix Version/s: (was: 2.4.0)
   2.5.0

> Integration ITCase tests are not executed
> -
>
> Key: BEAM-3753
> URL: https://issues.apache.org/jira/browse/BEAM-3753
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Grzegorz Kołakowski
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The flink-runner {{*ITCase.java}} tests are not executed at all, either by 
> surefire or by filesafe plugin.
>  * org.apache.beam.runners.flink.ReadSourceStreamingITCase
>  * org.apache.beam.runners.flink.ReadSourceITCase
>  * org.apache.beam.runners.flink.streaming.TopWikipediaSessionsITCase
> In addition, two of them fail if run manually for IDE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened BEAM-3043:


Reopen to change fixVersion

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (BEAM-3753) Integration ITCase tests are not executed

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened BEAM-3753:


Reopen to change fixVersion.

> Integration ITCase tests are not executed
> -
>
> Key: BEAM-3753
> URL: https://issues.apache.org/jira/browse/BEAM-3753
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Grzegorz Kołakowski
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The flink-runner {{*ITCase.java}} tests are not executed at all, either by 
> surefire or by filesafe plugin.
>  * org.apache.beam.runners.flink.ReadSourceStreamingITCase
>  * org.apache.beam.runners.flink.ReadSourceITCase
>  * org.apache.beam.runners.flink.streaming.TopWikipediaSessionsITCase
> In addition, two of them fail if run manually for IDE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-3043:
---
Fix Version/s: (was: 2.4.0)
   2.5.0

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] 01/01: Merge pull request #4759: [BEAM-3043] Set user-specified PTransform names on Flink operators

2018-03-07 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d497bfa2cbe752376cc8961ab6e6fb6754485ac2
Merge: 8ec9c81 c6467d8
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Wed Mar 7 14:31:27 2018 +0100

Merge pull request #4759: [BEAM-3043] Set user-specified PTransform names 
on Flink operators

 .../flink/FlinkBatchTransformTranslators.java  | 40 +++-
 .../flink/FlinkStreamingTransformTranslators.java  | 43 --
 2 files changed, 46 insertions(+), 37 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Closed] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3043.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated (8ec9c81 -> d497bfa)

2018-03-07 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 8ec9c81  Merge pull request #4767: [BEAM-3753] Fix Flink Runner 
integration tests
 add c6467d8  [BEAM-3043] Set user-specified PTransform names on Flink 
operators
 new d497bfa  Merge pull request #4759: [BEAM-3043] Set user-specified 
PTransform names on Flink operators

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/FlinkBatchTransformTranslators.java  | 40 +++-
 .../flink/FlinkStreamingTransformTranslators.java  | 43 --
 2 files changed, 46 insertions(+), 37 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Closed] (BEAM-3753) Integration ITCase tests are not executed

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3753.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

> Integration ITCase tests are not executed
> -
>
> Key: BEAM-3753
> URL: https://issues.apache.org/jira/browse/BEAM-3753
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Grzegorz Kołakowski
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The flink-runner {{*ITCase.java}} tests are not executed at all, either by 
> surefire or by filesafe plugin.
>  * org.apache.beam.runners.flink.ReadSourceStreamingITCase
>  * org.apache.beam.runners.flink.ReadSourceITCase
>  * org.apache.beam.runners.flink.streaming.TopWikipediaSessionsITCase
> In addition, two of them fail if run manually for IDE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated (5c1e49c -> 8ec9c81)

2018-03-07 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 5c1e49c  Merge pull request #4810 from alanmyrvold/alan-version
 add 788c83c  [BEAM-3753] Fix failing integration tests
 add f28d198  [BEAM-3753] Rename *ITCase.java tests files to *Test.java
 new 8ec9c81  Merge pull request #4767: [BEAM-3753] Fix Flink Runner 
integration tests

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 runners/flink/build.gradle|  1 +
 runners/flink/pom.xml |  6 ++
 ...eStreamingITCase.java => ReadSourceStreamingTest.java} | 14 ++
 .../flink/{ReadSourceITCase.java => ReadSourceTest.java}  |  4 ++--
 ...aSessionsITCase.java => TopWikipediaSessionsTest.java} | 15 +++
 5 files changed, 30 insertions(+), 10 deletions(-)
 rename 
runners/flink/src/test/java/org/apache/beam/runners/flink/{ReadSourceStreamingITCase.java
 => ReadSourceStreamingTest.java} (81%)
 rename 
runners/flink/src/test/java/org/apache/beam/runners/flink/{ReadSourceITCase.java
 => ReadSourceTest.java} (96%)
 rename 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/{TopWikipediaSessionsITCase.java
 => TopWikipediaSessionsTest.java} (93%)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 01/01: Merge pull request #4767: [BEAM-3753] Fix Flink Runner integration tests

2018-03-07 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8ec9c813814b10eb7b8e2e1515086ef5c4dfc353
Merge: 5c1e49c f28d198
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Wed Mar 7 14:27:54 2018 +0100

Merge pull request #4767: [BEAM-3753] Fix Flink Runner integration tests

 runners/flink/build.gradle|  1 +
 runners/flink/pom.xml |  6 ++
 ...eStreamingITCase.java => ReadSourceStreamingTest.java} | 14 ++
 .../flink/{ReadSourceITCase.java => ReadSourceTest.java}  |  4 ++--
 ...aSessionsITCase.java => TopWikipediaSessionsTest.java} | 15 +++
 5 files changed, 30 insertions(+), 10 deletions(-)


-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Closed] (BEAM-3768) Compile error for Flink translation

2018-03-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3768.
--
Resolution: Fixed
  Assignee: Thomas Groh  (was: Aljoscha Krettek)

This was resolved via https://github.com/apache/beam/pull/4787

> Compile error for Flink translation
> ---
>
> Key: BEAM-3768
> URL: https://issues.apache.org/jira/browse/BEAM-3768
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Robert Bradshaw
>Assignee: Thomas Groh
>Priority: Blocker
> Fix For: 2.4.0
>
>
> 2018-03-01T21:22:58.234 [INFO] --- maven-compiler-plugin:3.7.0:compile 
> (default-compile) @ beam-runners-flink_2.11 ---
> 2018-03-01T21:22:58.258 [INFO] Changes detected - recompiling the module!
> 2018-03-01T21:22:58.259 [INFO] Compiling 75 source files to 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/target/classes
> 2018-03-01T21:22:59.555 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Some input files use or override a deprecated API.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Recompile with -Xlint:deprecation for details.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Some input files use unchecked or unsafe operations.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Recompile with -Xlint:unchecked for details.
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.556 [ERROR] COMPILATION ERROR : 
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.557 [ERROR] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:[359,11]
>  cannot infer type arguments for 
> org.apache.beam.runners.core.ProcessFnRunner<>
>   reason: cannot infer type-variable(s) InputT,OutputT,RestrictionT
> (argument mismatch; org.apache.beam.runners.core.DoFnRunner cannot be 
> converted to 
> org.apache.beam.runners.core.SimpleDoFnRunner<org.apache.beam.runners.core.KeyedWorkItem<java.lang.String,org.apache.beam.sdk.values.KV<InputT,RestrictionT>>,OutputT>)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3768) Compile error for Flink translation

2018-03-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-3768:
---
Issue Type: Bug  (was: Test)

> Compile error for Flink translation
> ---
>
> Key: BEAM-3768
> URL: https://issues.apache.org/jira/browse/BEAM-3768
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Robert Bradshaw
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.4.0
>
>
> 2018-03-01T21:22:58.234 [INFO] --- maven-compiler-plugin:3.7.0:compile 
> (default-compile) @ beam-runners-flink_2.11 ---
> 2018-03-01T21:22:58.258 [INFO] Changes detected - recompiling the module!
> 2018-03-01T21:22:58.259 [INFO] Compiling 75 source files to 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/target/classes
> 2018-03-01T21:22:59.555 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Some input files use or override a deprecated API.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Recompile with -Xlint:deprecation for details.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Some input files use unchecked or unsafe operations.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Recompile with -Xlint:unchecked for details.
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.556 [ERROR] COMPILATION ERROR : 
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.557 [ERROR] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:[359,11]
>  cannot infer type arguments for 
> org.apache.beam.runners.core.ProcessFnRunner<>
>   reason: cannot infer type-variable(s) InputT,OutputT,RestrictionT
> (argument mismatch; org.apache.beam.runners.core.DoFnRunner cannot be 
> converted to 
> org.apache.beam.runners.core.SimpleDoFnRunner<org.apache.beam.runners.core.KeyedWorkItem<java.lang.String,org.apache.beam.sdk.values.KV<InputT,RestrictionT>>,OutputT>)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3768) Compile error for Flink translation

2018-03-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383585#comment-16383585
 ] 

Aljoscha Krettek commented on BEAM-3768:


[~robertwb] & [~rmannibucau] I commented on the PR: 
https://github.com/apache/beam/pull/4372. What do you think?

> Compile error for Flink translation
> ---
>
> Key: BEAM-3768
> URL: https://issues.apache.org/jira/browse/BEAM-3768
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Robert Bradshaw
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.4.0
>
>
> 2018-03-01T21:22:58.234 [INFO] --- maven-compiler-plugin:3.7.0:compile 
> (default-compile) @ beam-runners-flink_2.11 ---
> 2018-03-01T21:22:58.258 [INFO] Changes detected - recompiling the module!
> 2018-03-01T21:22:58.259 [INFO] Compiling 75 source files to 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/target/classes
> 2018-03-01T21:22:59.555 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Some input files use or override a deprecated API.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Recompile with -Xlint:deprecation for details.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Some input files use unchecked or unsafe operations.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Recompile with -Xlint:unchecked for details.
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.556 [ERROR] COMPILATION ERROR : 
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.557 [ERROR] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:[359,11]
>  cannot infer type arguments for 
> org.apache.beam.runners.core.ProcessFnRunner<>
>   reason: cannot infer type-variable(s) InputT,OutputT,RestrictionT
> (argument mismatch; org.apache.beam.runners.core.DoFnRunner cannot be 
> converted to 
> org.apache.beam.runners.core.SimpleDoFnRunner<org.apache.beam.runners.core.KeyedWorkItem<java.lang.String,org.apache.beam.sdk.values.KV<InputT,RestrictionT>>,OutputT>)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] 04/12: [BEAM-2140] Ignore event-time timers in SplittableDoFnOperator

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2e0b192efbae72fe5c0ef61abfc7b4d1b3bb75f5
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Fri Jan 5 14:16:27 2018 +0100

[BEAM-2140] Ignore event-time timers in SplittableDoFnOperator

SplittableDoFnOperator is only used for executing ProcessFn, which does
not use event-time timers. However, StatefulDoFnRunner does use
event-time timers for state cleanup so this change makes sure that they
don't end up being forwarded to the ProcessFn.
---
 .../runners/flink/translation/wrappers/streaming/DoFnOperator.java  | 1 -
 .../translation/wrappers/streaming/SplittableDoFnOperator.java  | 6 ++
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 688a7cb..dd2f9c4 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -38,7 +38,6 @@ import java.util.concurrent.ScheduledFuture;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 1a418a0..44be5f3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -142,6 +143,11 @@ public class SplittableDoFnOperator<
 
   @Override
   public void fireTimer(InternalTimer timer) {
+if (timer.getNamespace().getDomain().equals(TimeDomain.EVENT_TIME)) {
+  // ignore this, it can only be a state cleanup timers from 
StatefulDoFnRunner and ProcessFn
+  // does its own state cleanup and should never set event-time timers.
+  return;
+}
 doFnRunner.processElement(
 WindowedValue.valueInGlobalWindow(
 KeyedWorkItems.timersWorkItem(

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 10/12: [BEAM-2140] Enable SDF tests in gradle for Flink Streaming Runner

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6ba3fc1ff2798764756a7188980facc31eb1ac4c
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Feb 8 13:00:34 2018 +0100

[BEAM-2140] Enable SDF tests in gradle for Flink Streaming Runner
---
 runners/flink/build.gradle | 24 +---
 1 file changed, 17 insertions(+), 7 deletions(-)

diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index dc77bcf..362288e 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -97,13 +97,23 @@ def createValidatesRunnerTask(Map m) {
 classpath = configurations.validatesRunner
 testClassesDirs = 
files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
 maxParallelForks 4
-useJUnit {
-  includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
-  excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
-  excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-  excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDo'
-  excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-  excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+if (config.streaming) {
+  useJUnit {
+includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+  }
+} else {
+  useJUnit {
+includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDo'
+excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+  }
 }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 07/12: Make ProcessFnRunner constructor public

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4ec4644952c05167f52ae0937e043140d26059ae
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Feb 8 13:01:22 2018 +0100

Make ProcessFnRunner constructor public

We need this to be able to instantiate with the constructor in the Flink
Runner.
---
 .../src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index 88275d6..e4dfd13 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -43,7 +43,7 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
   private final Collection<PCollectionView> views;
   private final ReadyCheckingSideInputReader sideInputReader;
 
-  ProcessFnRunner(
+  public ProcessFnRunner(
   DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> 
underlying,
   Collection<PCollectionView> views,
   ReadyCheckingSideInputReader sideInputReader) {

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 06/12: [BEAM-2140] Don't use StatefulDoFnRunner when running SDF in FlinkRunner

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d15979f2cf0d6bc47d049a6ea157d9d7b2b97848
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Wed Jan 10 06:48:49 2018 +0100

[BEAM-2140] Don't use StatefulDoFnRunner when running SDF in FlinkRunner
---
 .../translation/wrappers/streaming/SplittableDoFnOperator.java | 10 ++
 1 file changed, 10 insertions(+)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 44be5f3..e088b07 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
@@ -88,6 +89,15 @@ public class SplittableDoFnOperator<
   }
 
   @Override
+  protected DoFnRunner<
+  KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> 
createWrappingDoFnRunner(
+  DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> 
wrappedRunner) {
+// don't wrap in anything because we don't need state cleanup because 
ProcessFn does
+// all that
+return wrappedRunner;
+  }
+
+  @Override
   public void open() throws Exception {
 super.open();
 

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Updated] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2018-02-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-2140:
---
Fix Version/s: 2.4.0

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] 08/12: [BEAM-2140] Use ProcessFnRunner in DoFnRunner for executing SDF

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 74ac703bbfa40d16e7d1115912768c7a63598d52
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Feb 8 13:02:26 2018 +0100

[BEAM-2140] Use ProcessFnRunner in DoFnRunner for executing SDF

For this to work, we need to also change how we wrap values in
KeyedWorkItems, because ProcessFnRunner expects them to be in the
GlobalWindow.
---
 .../flink/FlinkStreamingTransformTranslators.java  | 35 ++
 .../wrappers/streaming/DoFnOperator.java   | 11 +--
 2 files changed, 39 insertions(+), 7 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index d39b5c1..f5dc3ce 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -1017,10 +1017,8 @@ class FlinkStreamingTransformTranslators {
 
 
   WindowedValue.
-  FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> 
windowedWorkItemCoder =
-  WindowedValue.getFullCoder(
-  workItemCoder,
-  input.getWindowingStrategy().getWindowFn().windowCoder());
+  ValueOnlyWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> 
windowedWorkItemCoder =
+  WindowedValue.getValueOnlyCoder(workItemCoder);
 
   CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemTypeInfo =
   new CoderTypeInformation<>(windowedWorkItemCoder);
@@ -1029,7 +1027,7 @@ class FlinkStreamingTransformTranslators {
 
   DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemStream =
   inputDataStream
-  .flatMap(new ToKeyedWorkItem<>())
+  .flatMap(new ToKeyedWorkItemInGlobalWindow<>())
   .returns(workItemTypeInfo)
   .name("ToKeyedWorkItem");
 
@@ -1041,6 +1039,33 @@ class FlinkStreamingTransformTranslators {
 }
   }
 
+  private static class ToKeyedWorkItemInGlobalWindow<K, InputT>
+  extends RichFlatMapFunction<
+  WindowedValue<KV<K, InputT>>,
+  WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
+
+@Override
+public void flatMap(
+WindowedValue<KV<K, InputT>> inWithMultipleWindows,
+Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) 
throws Exception {
+
+  // we need to wrap each one work item per window for now
+  // since otherwise the PushbackSideInputRunner will not correctly
+  // determine whether side inputs are ready
+  //
+  // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
+  for (WindowedValue<KV<K, InputT>> in : 
inWithMultipleWindows.explodeWindows()) {
+SingletonKeyedWorkItem<K, InputT> workItem =
+new SingletonKeyedWorkItem<>(
+in.getValue().getKey(),
+in.withValue(in.getValue().getValue()));
+
+out.collect(WindowedValue.valueInGlobalWindow(workItem));
+  }
+}
+  }
+
+
   private static class FlattenPCollectionTranslator
   extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
 PTransform<PCollection, PCollection>> {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 37f56f5..f9b4ee3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -39,10 +39,12 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.ProcessFnRunner;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -352,8 +354,13 @@ public cla

[beam] 09/12: [BEAM-2140] Enable SDF tests for Flink Streaming Runner

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c920a39b9781ac604f55059e597e327643d58205
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Fri Jan 5 14:20:19 2018 +0100

[BEAM-2140] Enable SDF tests for Flink Streaming Runner
---
 runners/flink/pom.xml | 1 -
 1 file changed, 1 deletion(-)

diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 46917e9..cd4d8c3 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -94,7 +94,6 @@
 org.apache.beam.sdk.testing.LargeKeys$Above100MB,
 org.apache.beam.sdk.testing.UsesCommittedMetrics,
 org.apache.beam.sdk.testing.UsesTestStream,
-org.apache.beam.sdk.testing.UsesSplittableParDo
   
   none
   true

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 05/12: [BEAM-2140] Block DoFnOperator.close() if we have pending timers

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3eecb381f3fb412df6844cfc13b12bf265253926
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Fri Jan 5 14:17:49 2018 +0100

[BEAM-2140] Block DoFnOperator.close() if we have pending timers

It can happen that the input operation finishes while we still have
pending processing-time timers, for example from processing a Splittable
DoFn. This change makes sure that we block as long as we have pending
timers.

This change also makes sure that we forward a +Inf watermark in close().
We have to do this because it can happen that we get a +Inf watermark on
input while we still have active watermark holds (which will get
resolved when all pending timers are gone). With this change we make
sure to send a +Inf watermark downstream once everything is resolved.
---
 .../wrappers/streaming/DoFnOperator.java   | 43 +-
 1 file changed, 33 insertions(+), 10 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index dd2f9c4..37f56f5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -371,18 +371,41 @@ public class DoFnOperator<InputT, OutputT>
 
   @Override
   public void close() throws Exception {
-super.close();
-
-// sanity check: these should have been flushed out by +Inf watermarks
-if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
-  BagState<WindowedValue> pushedBack =
-  nonKeyedStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+try {
 
-  Iterable<WindowedValue> pushedBackContents = pushedBack.read();
-  if (pushedBackContents != null && 
!Iterables.isEmpty(pushedBackContents)) {
-String pushedBackString = Joiner.on(",").join(pushedBackContents);
+  // This is our last change to block shutdown of this operator while
+  // there are still remaining processing-time timers. Flink will ignore 
pending
+  // processing-time timers when upstream operators have shut down and 
will also
+  // shut down this operator with pending processing-time timers.
+  while (this.numProcessingTimeTimers() > 0) {
+getContainingTask().getCheckpointLock().wait(100);
+  }
+  if (this.numProcessingTimeTimers() > 0) {
 throw new RuntimeException(
-"Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
+"There are still processing-time timers left, this indicates a 
bug");
+  }
+
+  // make sure we send a +Inf watermark downstream. It can happen that we 
receive +Inf
+  // in processWatermark*() but have holds, so we have to re-evaluate here.
+  processWatermark(new Watermark(Long.MAX_VALUE));
+  if (currentOutputWatermark < Long.MAX_VALUE) {
+throw new RuntimeException("There are still watermark holds. Watermark 
held at "
++ keyedStateInternals.watermarkHold().getMillis() + ".");
+  }
+} finally {
+  super.close();
+
+  // sanity check: these should have been flushed out by +Inf watermarks
+  if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
+BagState<WindowedValue> pushedBack =
+nonKeyedStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+
+Iterable<WindowedValue> pushedBackContents = pushedBack.read();
+if (pushedBackContents != null && 
!Iterables.isEmpty(pushedBackContents)) {
+  String pushedBackString = Joiner.on(",").join(pushedBackContents);
+  throw new RuntimeException(
+  "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
+}
   }
 }
   }

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Closed] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2018-02-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2140.
--
Resolution: Fixed

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated (51b793a -> 0259636)

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 51b793a  Merge pull request #4723: [BEAM-3317] Use fixed system time 
for KinesisReaderTest
 new 50c6d93  Make parameter of DoFnRunners.lateDataDroppingRunner() more 
specific
 new c40868f  Allow overriding DoFnRunners in subclasses of Flink 
DoFnOperator
 new 4029e23f Invoke finishBundle() before teardown() in DoFnOperator
 new 2e0b192  [BEAM-2140] Ignore event-time timers in SplittableDoFnOperator
 new 3eecb38  [BEAM-2140] Block DoFnOperator.close() if we have pending 
timers
 new d15979f  [BEAM-2140] Don't use StatefulDoFnRunner when running SDF in 
FlinkRunner
 new 4ec4644  Make ProcessFnRunner constructor public
 new 74ac703  [BEAM-2140] Use ProcessFnRunner in DoFnRunner for executing 
SDF
 new c920a39  [BEAM-2140] Enable SDF tests for Flink Streaming Runner
 new 6ba3fc1  [BEAM-2140] Enable SDF tests in gradle for Flink Streaming 
Runner
 new 6148a6d  [BEAM-3727] Never shutdown sources in Flink Streaming 
execution mode
 new 0259636  Merge pull request #4639: [BEAM-2140] Fix SplittableDoFn 
ValidatesRunner tests in Flink Streaming Runner

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/beam/runners/core/DoFnRunners.java  |   4 +-
 .../apache/beam/runners/core/ProcessFnRunner.java  |   2 +-
 runners/flink/build.gradle |  24 ++--
 runners/flink/pom.xml  |   1 -
 .../beam/runners/flink/FlinkPipelineOptions.java   |  12 ++
 .../flink/FlinkStreamingTransformTranslators.java  |  35 +-
 .../apache/beam/runners/flink/TestFlinkRunner.java |   1 +
 .../wrappers/streaming/DoFnOperator.java   | 127 +
 .../wrappers/streaming/SplittableDoFnOperator.java |  16 +++
 .../wrappers/streaming/WindowDoFnOperator.java |  18 +++
 .../streaming/io/BoundedSourceWrapper.java |  28 +
 .../streaming/io/UnboundedSourceWrapper.java   |  56 -
 12 files changed, 234 insertions(+), 90 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 01/12: Make parameter of DoFnRunners.lateDataDroppingRunner() more specific

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 50c6d9316c8c5089065dd291b097e06b0c80980e
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Jan 11 10:56:49 2018 +0100

Make parameter of DoFnRunners.lateDataDroppingRunner() more specific
---
 .../src/main/java/org/apache/beam/runners/core/DoFnRunners.java   | 4 ++--
 .../runners/flink/translation/wrappers/streaming/DoFnOperator.java| 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 9d3e25d..80c830a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -80,12 +80,12 @@ public class DoFnRunners {
   public static <K, InputT, OutputT, W extends BoundedWindow>
   DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> 
lateDataDroppingRunner(
   DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> wrappedRunner,
-  StepContext stepContext,
+  TimerInternals timerInternals,
   WindowingStrategy windowingStrategy) {
 return new LateDataDroppingDoFnRunner<>(
 wrappedRunner,
 windowingStrategy,
-stepContext.timerInternals());
+timerInternals);
   }
 
   /**
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 41a35ce..8ccbd8f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -314,7 +314,7 @@ public class DoFnOperator<InputT, OutputT>
 
   doFnRunner = DoFnRunners.lateDataDroppingRunner(
   (DoFnRunner) doFnRunner,
-  stepContext,
+  timerInternals,
   windowingStrategy);
 } else if (keyCoder != null) {
   // It is a stateful DoFn

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 11/12: [BEAM-3727] Never shutdown sources in Flink Streaming execution mode

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6148a6d063a0503ee435ab5084fcba3fb864b26f
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Feb 22 14:26:16 2018 +0100

[BEAM-3727] Never shutdown sources in Flink Streaming execution mode

This adds an option that controls whether to shutdown sources or not in
case of reaching the +Inf watermark.

The reason for this is https://issues.apache.org/jira/browse/FLINK-2491,
which causes checkpointing to stop once some source is shut down.
---
 .../beam/runners/flink/FlinkPipelineOptions.java   | 12 +
 .../apache/beam/runners/flink/TestFlinkRunner.java |  1 +
 .../streaming/io/BoundedSourceWrapper.java | 28 +++
 .../streaming/io/UnboundedSourceWrapper.java   | 56 --
 4 files changed, 71 insertions(+), 26 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 01f7847..b2cbefb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -137,4 +137,16 @@ public interface FlinkPipelineOptions
   Long getMaxBundleTimeMills();
   void setMaxBundleTimeMills(Long time);
 
+  /**
+   * Whether to shutdown sources when their watermark reaches {@code +Inf}. 
For production use
+   * cases you want this to be disabled because Flink will currently (versions 
{@literal <=} 1.5)
+   * stop doing checkpoints when any operator (which includes sources) is 
finished.
+   *
+   * Please see https://issues.apache.org/jira/browse/FLINK-2491;>FLINK-2491 for
+   * progress on this issue.
+   */
+  @Description("If set, shutdown sources when their watermark reaches +Inf.")
+  @Default.Boolean(false)
+  Boolean isShutdownSourcesOnFinalWatermark();
+  void setShutdownSourcesOnFinalWatermark(Boolean shutdownOnFinalWatermark);
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index 01b67e5..47d4494 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -36,6 +36,7 @@ public class TestFlinkRunner extends 
PipelineRunner {
   private TestFlinkRunner(FlinkPipelineOptions options) {
 // We use [auto] for testing since this will make it pick up the Testing 
ExecutionEnvironment
 options.setFlinkMaster("[auto]");
+options.setShutdownSourcesOnFinalWatermark(true);
 this.delegate = FlinkRunner.fromOptions(options);
   }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 5ddc46f..6db5426 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -180,6 +181,33 @@ public class BoundedSourceWrapper
 
 // emit final Long.MAX_VALUE watermark, just to be sure
 ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+
+FlinkPipelineOptions options = 
serializedOptions.get().as(FlinkPipelineOptions.class);
+if (!options.isShutdownSourcesOnFinalWatermark()) {
+  // do nothing, but still look busy ...
+  // we can't return here since Flink requires that all operators stay up,
+  // otherwise checkpointing would not work correctly anymore
+  //
+  // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on 
this issue
+
+  // wait until this is canceled
+  final Object waitLock = new Object();
+  while (isRunning) {
+try {
+  // Flink will interrupt us at some point
+  //noinspection SynchronizationOnLocalVariableOrMethodParameter
+  synchronized (waitLock) {
+// don't wait indefinitely, in case something goes horribly wrong
+waitLock.wait(1000);
+  }
+ 

[beam] 03/12: Invoke finishBundle() before teardown() in DoFnOperator

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4029e23f3075365eddcdad5187b4fc8a6e590989
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Fri Jan 5 14:22:40 2018 +0100

Invoke finishBundle() before teardown() in DoFnOperator
---
 .../runners/flink/translation/wrappers/streaming/DoFnOperator.java | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 2e7f741..688a7cb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -363,6 +363,9 @@ public class DoFnOperator<InputT, OutputT>
   super.dispose();
   checkFinishBundleTimer.cancel(true);
 } finally {
+  if (bundleStarted) {
+invokeFinishBundle();
+  }
   doFnInvoker.invokeTeardown();
 }
   }

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 02/12: Allow overriding DoFnRunners in subclasses of Flink DoFnOperator

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c40868fe2fc550ae97c7b2d9308dd8b58b20edab
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Jan 11 10:52:01 2018 +0100

Allow overriding DoFnRunners in subclasses of Flink DoFnOperator
---
 .../wrappers/streaming/DoFnOperator.java   | 69 +++---
 .../wrappers/streaming/WindowDoFnOperator.java | 18 ++
 2 files changed, 52 insertions(+), 35 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8ccbd8f..2e7f741 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -218,6 +218,39 @@ public class DoFnOperator<InputT, OutputT>
 return doFn;
   }
 
+  // allow overriding this, for example SplittableDoFnOperator will not create 
a
+  // stateful DoFn runner because ProcessFn, which is used for executing a 
Splittable DoFn
+  // doesn't play by the normal DoFn rules and WindowDoFnOperator uses 
LateDataDroppingDoFnRunner
+  protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
+  DoFnRunner<InputT, OutputT> wrappedRunner) {
+
+if (keyCoder != null) {
+  StatefulDoFnRunner.CleanupTimer cleanupTimer =
+  new StatefulDoFnRunner.TimeInternalsCleanupTimer(
+  timerInternals, windowingStrategy);
+
+  // we don't know the window type
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  StatefulDoFnRunner.StateCleaner stateCleaner =
+  new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+  doFn, keyedStateInternals, windowCoder);
+
+
+  return DoFnRunners.defaultStatefulDoFnRunner(
+  doFn,
+  wrappedRunner,
+  windowingStrategy,
+  cleanupTimer,
+  stateCleaner);
+
+} else {
+  return doFnRunner;
+}
+  }
+
   @Override
   public void setup(
   StreamTask containingTask,
@@ -304,41 +337,7 @@ public class DoFnOperator<InputT, OutputT>
 stepContext,
 windowingStrategy);
 
-if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
-  // When the doFn is this, we know it came from WindowDoFnOperator and
-  //   InputT = KeyedWorkItem<K, V>
-  //   OutputT = KV<K, V>
-  //
-  // for some K, V
-
-
-  doFnRunner = DoFnRunners.lateDataDroppingRunner(
-  (DoFnRunner) doFnRunner,
-  timerInternals,
-  windowingStrategy);
-} else if (keyCoder != null) {
-  // It is a stateful DoFn
-
-  StatefulDoFnRunner.CleanupTimer cleanupTimer =
-  new StatefulDoFnRunner.TimeInternalsCleanupTimer(
-  stepContext.timerInternals(), windowingStrategy);
-
-  // we don't know the window type
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  StatefulDoFnRunner.StateCleaner stateCleaner =
-  new StatefulDoFnRunner.StateInternalsStateCleaner<>(
-  doFn, stepContext.stateInternals(), windowCoder);
-
-  doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
-  doFn,
-  doFnRunner,
-  windowingStrategy,
-  cleanupTimer,
-  stateCleaner);
-}
+doFnRunner = createWrappingDoFnRunner(doFnRunner);
 
 if (options.getEnableMetrics()) {
   doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, 
getRuntimeContext());
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 7a04238..8447ade 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.r

[beam] 12/12: Merge pull request #4639: [BEAM-2140] Fix SplittableDoFn ValidatesRunner tests in Flink Streaming Runner

2018-02-23 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0259636661998b8aabd3bf20eff8d519bcb9fb47
Merge: 51b793a 6148a6d
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Fri Feb 23 11:59:36 2018 +0100

Merge pull request #4639: [BEAM-2140] Fix SplittableDoFn ValidatesRunner 
tests in Flink Streaming Runner

 .../org/apache/beam/runners/core/DoFnRunners.java  |   4 +-
 .../apache/beam/runners/core/ProcessFnRunner.java  |   2 +-
 runners/flink/build.gradle |  24 ++--
 runners/flink/pom.xml  |   1 -
 .../beam/runners/flink/FlinkPipelineOptions.java   |  12 ++
 .../flink/FlinkStreamingTransformTranslators.java  |  35 +-
 .../apache/beam/runners/flink/TestFlinkRunner.java |   1 +
 .../wrappers/streaming/DoFnOperator.java   | 127 +
 .../wrappers/streaming/SplittableDoFnOperator.java |  16 +++
 .../wrappers/streaming/WindowDoFnOperator.java |  18 +++
 .../streaming/io/BoundedSourceWrapper.java |  28 +
 .../streaming/io/UnboundedSourceWrapper.java   |  56 -
 12 files changed, 234 insertions(+), 90 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Assigned] (BEAM-3728) Failing ParDoTest for Flink Runner

2018-02-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3728:
--

Assignee: (was: Aljoscha Krettek)

> Failing ParDoTest for Flink Runner
> --
>
> Key: BEAM-3728
> URL: https://issues.apache.org/jira/browse/BEAM-3728
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.4.0
>
>
> It seems the changes for BEAM-3700/BEAM-3701 broke some tests in ParDoTest 
> for the Flink Runner: 
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/5054/
> I don't currently understand what is causing this. [~lcwik] Do you have any 
> idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3728) Failing ParDoTest for Flink Runner

2018-02-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372906#comment-16372906
 ] 

Aljoscha Krettek commented on BEAM-3728:


Also cc [~romain.manni-bucau] as the implementor. I thought it was Luke but I 
was confused by the git history.

> Failing ParDoTest for Flink Runner
> --
>
> Key: BEAM-3728
> URL: https://issues.apache.org/jira/browse/BEAM-3728
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.4.0
>
>
> It seems the changes for BEAM-3700/BEAM-3701 broke some tests in ParDoTest 
> for the Flink Runner: 
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/5054/
> I don't currently understand what is causing this. [~lcwik] Do you have any 
> idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3728) Failing ParDoTest for Flink Runner

2018-02-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3728:
--

 Summary: Failing ParDoTest for Flink Runner
 Key: BEAM-3728
 URL: https://issues.apache.org/jira/browse/BEAM-3728
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 2.4.0


It seems the changes for BEAM-3700/BEAM-3701 broke some tests in ParDoTest for 
the Flink Runner: 
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/5054/

I don't currently understand what is causing this. [~lcwik] Do you have any 
idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3673) FlinkRunner: Harness manager for connecting operators to SDK Harnesses

2018-02-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358208#comment-16358208
 ] 

Aljoscha Krettek commented on BEAM-3673:


[~bsidhom] Am I supposed to be assigned to all of those recently created issues 
or is that just because of the default assignment?

> FlinkRunner: Harness manager for connecting operators to SDK Harnesses
> --
>
> Key: BEAM-3673
> URL: https://issues.apache.org/jira/browse/BEAM-3673
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Aljoscha Krettek
>Priority: Major
>
> SDK harnesses require a common set of gRPC services to operate. The role of 
> the harness manager is to provide a uniform interface that multiplexes data 
> streams and auxiliary data between SDK environments and operators within a 
> given job.
> Note that multiple operators may communicate with a single SDK environment to 
> amortize container initialization cost. Environments are _not_ shared between 
> different jobs.
> The initial implementation will shell out to local docker, but the harness 
> manager should eventually support working with externally-managed 
> environments (e.g., created by Kubernetes).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3648) Support Splittable DoFn in Flink Batch Runner

2018-02-08 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3648:
--

 Summary: Support Splittable DoFn in Flink Batch Runner
 Key: BEAM-3648
 URL: https://issues.apache.org/jira/browse/BEAM-3648
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2018-02-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353712#comment-16353712
 ] 

Aljoscha Krettek commented on BEAM-2806:


[~jbonofre] I merged this in master in commit 
b5c31ed96f7abcf489ccd4d46a8db4fb17476a27. Do you think we can also cherry-pick 
this on the 2.3.0 branch because it fixes a somewhat bad bug in the Flink 
Runner.

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.3.0
>
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView<Map<BeamRecord, Iterable>> rowsView = rightRows
> .apply(View.<BeamRecord, BeamRecord>asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2018-02-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-2806:
---
Fix Version/s: 2.3.0

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.3.0
>
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView<Map<BeamRecord, Iterable>> rowsView = rightRows
> .apply(View.<BeamRecord, BeamRecord>asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2018-02-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2806.
--
Resolution: Fixed

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.3.0
>
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView<Map<BeamRecord, Iterable>> rowsView = rightRows
> .apply(View.<BeamRecord, BeamRecord>asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated: [BEAM-2806] Fix pipeline translation mode recognition in Flink Runner

2018-02-06 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b5c31ed  [BEAM-2806] Fix pipeline translation mode recognition in 
Flink Runner
 new f3dba1a  Merge pull request #4558: Fixes streaming mode recognition in 
Flink Runner
b5c31ed is described below

commit b5c31ed96f7abcf489ccd4d46a8db4fb17476a27
Author: Grzegorz Kołakowski <grzegorz.kolakow...@getindata.com>
AuthorDate: Thu Feb 1 09:21:24 2018 +0100

[BEAM-2806] Fix pipeline translation mode recognition in Flink Runner
---
 .../flink/FlinkPipelineExecutionEnvironment.java   |  5 +-
 .../flink/PipelineTranslationOptimizer.java| 16 -
 .../FlinkPipelineExecutionEnvironmentTest.java | 68 ++
 3 files changed, 84 insertions(+), 5 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 7a6c61f..7f7281e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -93,14 +93,15 @@ class FlinkPipelineExecutionEnvironment {
   throw new RuntimeException(e);
 }
 
-
pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
-
 PipelineTranslationOptimizer optimizer =
 new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
 
 optimizer.translate(pipeline);
 TranslationMode translationMode = optimizer.getTranslationMode();
 
+pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(
+translationMode == TranslationMode.STREAMING));
+
 FlinkPipelineTranslator translator;
 if (translationMode == TranslationMode.STREAMING) {
   this.flinkStreamEnv = createStreamExecutionEnvironment();
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
index 3acc3ea..8877f1a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
@@ -17,9 +17,11 @@
  */
 package org.apache.beam.runners.flink;
 
-import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,13 +62,21 @@ class PipelineTranslationOptimizer extends 
FlinkPipelineTranslator {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-Class transformClass = 
node.getTransform().getClass();
-if (transformClass == Read.Unbounded.class) {
+AppliedPTransform appliedPTransform = 
node.toAppliedPTransform(getPipeline());
+if (hasUnboundedOutput(appliedPTransform)) {
+  Class transformClass = 
node.getTransform().getClass();
   LOG.info("Found {}. Switching to streaming execution.", transformClass);
   translationMode = TranslationMode.STREAMING;
 }
   }
 
+  private boolean hasUnboundedOutput(AppliedPTransform transform) {
+return transform.getOutputs().values().stream()
+.filter(value -> value instanceof PCollection)
+.map(value -> (PCollection) value)
+.anyMatch(collection -> collection.isBounded() == IsBounded.UNBOUNDED);
+  }
+
   @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {}
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
new file mode 100644
index 000..0e5ce14
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ 

[jira] [Closed] (BEAM-3186) In-flight data loss when restoring from savepoint

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3186.
--
Resolution: Fixed

> In-flight data loss when restoring from savepoint
> -
>
> Key: BEAM-3186
> URL: https://issues.apache.org/jira/browse/BEAM-3186
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Pawel Bartoszek
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 2.3.0
>
> Attachments: restore_no_trigger.png, restore_with_trigger.png, 
> restore_with_trigger_b.png
>
>
> *The context:*
> I want to count how many events of given type(A,B, etc) I receive every 
> minute using 1 minute windows and AfterWatermark trigger with allowed 
> lateness 1 min.
> *Data loss case*
> In the case below if there is at least one A element with the event time 
> belonging to the window 14:00-14:01 read from Kinesis stream after job is 
> restored from savepoint the data loss will not be observed for this key and 
> this window.
> !restore_no_trigger.png!
> *Not data loss case*
> However, if no new A element element is read from Kinesis stream than data 
> loss is observable.
> !restore_with_trigger.png!
> *Workaround*
> As a workaround we could configure early firings every X seconds which gives 
> up to X seconds data loss per key on restore.
> *My guess where the issue might be*
> I believe this is Beam-Flink integration layer bug. From my investigation I 
> don't think it's KinesisReader and possibility that it couldn't advance 
> watermark. To prove that after I restore from savepoint I sent some records 
> for different key (B) for the same window as shown in the 
> pictures(14:00-14:01) without seeing trigger going off for restored window 
> and key A.
> My guess is that Beam after job is restored doesn't register flink event time 
> timer for restored window unless there is a new element (key) coming for the 
> restored window.
> Please refer to [this 
> gist|https://gist.github.com/pbartoszek/7ab88c8b6538039db1b383358d1d1b5a] for 
> test job that shows this behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3589.
--
Resolution: Fixed

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-3589:
---
Fix Version/s: 2.3.0

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352250#comment-16352250
 ] 

Aljoscha Krettek commented on BEAM-3589:


[~jbonofre] This was now fixed as part of another fix in this commit:
01d74b3b8381c04ffba3879281897ea2ef6fbcc5

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3186) In-flight data loss when restoring from savepoint

2018-02-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352248#comment-16352248
 ] 

Aljoscha Krettek commented on BEAM-3186:


[~jbonofre] This was fixed on master in these commits:
01d74b3b8381c04ffba3879281897ea2ef6fbcc5
6808b8855f90f69098bd37d2353da7d3ca329eb3

Should I cherry-pick them onto the 2.3.0 release branch or do you want to do 
that?

> In-flight data loss when restoring from savepoint
> -
>
> Key: BEAM-3186
> URL: https://issues.apache.org/jira/browse/BEAM-3186
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Pawel Bartoszek
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 2.3.0
>
> Attachments: restore_no_trigger.png, restore_with_trigger.png, 
> restore_with_trigger_b.png
>
>
> *The context:*
> I want to count how many events of given type(A,B, etc) I receive every 
> minute using 1 minute windows and AfterWatermark trigger with allowed 
> lateness 1 min.
> *Data loss case*
> In the case below if there is at least one A element with the event time 
> belonging to the window 14:00-14:01 read from Kinesis stream after job is 
> restored from savepoint the data loss will not be observed for this key and 
> this window.
> !restore_no_trigger.png!
> *Not data loss case*
> However, if no new A element element is read from Kinesis stream than data 
> loss is observable.
> !restore_with_trigger.png!
> *Workaround*
> As a workaround we could configure early firings every X seconds which gives 
> up to X seconds data loss per key on restore.
> *My guess where the issue might be*
> I believe this is Beam-Flink integration layer bug. From my investigation I 
> don't think it's KinesisReader and possibility that it couldn't advance 
> watermark. To prove that after I restore from savepoint I sent some records 
> for different key (B) for the same window as shown in the 
> pictures(14:00-14:01) without seeing trigger going off for restored window 
> and key A.
> My guess is that Beam after job is restored doesn't register flink event time 
> timer for restored window unless there is a new element (key) coming for the 
> restored window.
> Please refer to [this 
> gist|https://gist.github.com/pbartoszek/7ab88c8b6538039db1b383358d1d1b5a] for 
> test job that shows this behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] branch master updated (24804e9 -> 93818ab)

2018-02-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 24804e9  Merge pull request #3978: [BEAM-230] Remove deprecated 
WindowedValue.valueInEmptyWindows
 new 01d74b3  Remove erroneous cast in FlinkStreamingTransformTranslators
 new 6808b88  [BEAM-3186] Correctly use deserialized timerService in Flink 
Runner
 new 93818ab  Merge pull request #4563: Fixes Flink timer service restore

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/FlinkStreamingTransformTranslators.java  |   3 +-
 .../wrappers/streaming/DoFnOperator.java   |  20 ++--
 .../runners/flink/streaming/DoFnOperatorTest.java  | 105 +
 3 files changed, 119 insertions(+), 9 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 02/03: [BEAM-3186] Correctly use deserialized timerService in Flink Runner

2018-02-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6808b8855f90f69098bd37d2353da7d3ca329eb3
Author: Dawid Wysakowicz <da...@getindata.com>
AuthorDate: Thu Feb 1 13:47:30 2018 +0100

[BEAM-3186] Correctly use deserialized timerService in Flink Runner

Before, DoFnOperator was always creating a new timer service and not
using the one that was checkpointed in state. This was manifesting as
lost timers after a checkpoint/restore cycle.
---
 .../wrappers/streaming/DoFnOperator.java   |  20 ++--
 .../runners/flink/streaming/DoFnOperatorTest.java  | 105 +
 2 files changed, 118 insertions(+), 7 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index de3c054..830a718 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -276,8 +276,10 @@ public class DoFnOperator<InputT, OutputT>
   keyedStateInternals = new FlinkStateInternals<>((KeyedStateBackend) 
getKeyedStateBackend(),
   keyCoder);
 
-  timerService = (HeapInternalTimerService)
-  getInternalTimerService("beam-timer", new 
CoderTypeSerializer<>(timerCoder), this);
+  if (timerService == null) {
+timerService = (HeapInternalTimerService)
+getInternalTimerService("beam-timer", new 
CoderTypeSerializer<>(timerCoder), this);
+  }
 
   timerInternals = new FlinkTimerInternals();
 
@@ -730,11 +732,15 @@ public class DoFnOperator<InputT, OutputT>
 // We just initialize our timerService
 if (keyCoder != null) {
   if (timerService == null) {
-timerService = new HeapInternalTimerService<>(
-totalKeyGroups,
-localKeyGroupRange,
-this,
-getRuntimeContext().getProcessingTimeService());
+final HeapInternalTimerService<Object, TimerData> localService =
+new HeapInternalTimerService<>(
+totalKeyGroups,
+localKeyGroupRange,
+this,
+getRuntimeContext().getProcessingTimeService());
+
localService.startTimerService(getKeyedStateBackend().getKeySerializer(),
+new CoderTypeSerializer<>(timerCoder), this);
+timerService = localService;
   }
   timerService.restoreTimersForKeyGroup(div, keyGroupIdx, 
getUserCodeClassloader());
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 6c32047..73a0a08 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -58,6 +58,7 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -609,6 +610,110 @@ public class DoFnOperatorTest {
 
   }
 
+  @Test
+  public void testTimersRestore() throws Exception {
+final Instant timerTimestamp = new Instant(1000);
+final String outputMessage = "Timer fired";
+
+WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+WindowingStrategy.of(FixedWindows.of(new Duration(10_000)));
+
+DoFn<Integer, String> fn = new DoFn<Integer, String>() {
+  private static final String EVENT_TIMER_ID = "eventTimer";
+
+  @TimerId(EVENT_TIMER_ID)
+  private final TimerSpec eventTimer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  @ProcessElement
+  public void processElement(ProcessContext context, 
@TimerId(EVENT_TIMER_ID) Timer timer) {
+timer.set(timerTimestamp);
+  }
+
+  @OnTimer(EVENT_TIMER_ID)
+  public void onEventTime(OnTimerContext context) {
+assertEquals(
+"Timer timestamp must match set timestamp.", timerTimestamp, 
context.timestamp());
+context.outputWithTimestamp(outputMessage, context.timestamp());
+  }
+

[beam] 01/03: Remove erroneous cast in FlinkStreamingTransformTranslators

2018-02-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 01d74b3b8381c04ffba3879281897ea2ef6fbcc5
Author: Dawid Wysakowicz <da...@getindata.com>
AuthorDate: Mon Feb 5 11:36:33 2018 +0100

Remove erroneous cast in FlinkStreamingTransformTranslators
---
 .../apache/beam/runners/flink/FlinkStreamingTransformTranslators.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 811c159..a2923a9 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -58,7 +58,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -253,7 +252,7 @@ class FlinkStreamingTransformTranslators {
   if 
(context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED))
 {
 boundedTranslator.translateNode(transform, context);
   } else {
-unboundedTranslator.translateNode((Read.Unbounded) transform, 
context);
+unboundedTranslator.translateNode(transform, context);
   }
 }
   }

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[beam] 03/03: Merge pull request #4563: Fixes Flink timer service restore

2018-02-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 93818abf901dd996aa30445457153811b13ece43
Merge: 24804e9 6808b88
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Mon Feb 5 11:57:36 2018 +0100

Merge pull request #4563: Fixes Flink timer service restore

 .../flink/FlinkStreamingTransformTranslators.java  |   3 +-
 .../wrappers/streaming/DoFnOperator.java   |  20 ++--
 .../runners/flink/streaming/DoFnOperatorTest.java  | 105 +
 3 files changed, 119 insertions(+), 9 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.


[jira] [Commented] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348631#comment-16348631
 ] 

Aljoscha Krettek commented on BEAM-3589:


[~grzegorz_kolakowski] I think I'm too tired today but does 
https://github.com/apache/beam/pull/4558 actually remove that unnecessary 
{{(Read.Unbounded)}} cast, I couldn't find it anymore. 

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2018-02-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-2806:
--

Assignee: Grzegorz Kołakowski  (was: Aljoscha Krettek)

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Grzegorz Kołakowski
>Priority: Major
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView<Map<BeamRecord, Iterable>> rowsView = rightRows
> .apply(View.<BeamRecord, BeamRecord>asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3589:
--

Assignee: Grzegorz Kołakowski

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3186) In-flight data loss when restoring from savepoint

2018-02-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3186:
--

Assignee: Dawid Wysakowicz  (was: Aljoscha Krettek)

> In-flight data loss when restoring from savepoint
> -
>
> Key: BEAM-3186
> URL: https://issues.apache.org/jira/browse/BEAM-3186
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Pawel Bartoszek
>Assignee: Dawid Wysakowicz
>Priority: Major
> Attachments: restore_no_trigger.png, restore_with_trigger.png, 
> restore_with_trigger_b.png
>
>
> *The context:*
> I want to count how many events of given type(A,B, etc) I receive every 
> minute using 1 minute windows and AfterWatermark trigger with allowed 
> lateness 1 min.
> *Data loss case*
> In the case below if there is at least one A element with the event time 
> belonging to the window 14:00-14:01 read from Kinesis stream after job is 
> restored from savepoint the data loss will not be observed for this key and 
> this window.
> !restore_no_trigger.png!
> *Not data loss case*
> However, if no new A element element is read from Kinesis stream than data 
> loss is observable.
> !restore_with_trigger.png!
> *Workaround*
> As a workaround we could configure early firings every X seconds which gives 
> up to X seconds data loss per key on restore.
> *My guess where the issue might be*
> I believe this is Beam-Flink integration layer bug. From my investigation I 
> don't think it's KinesisReader and possibility that it couldn't advance 
> watermark. To prove that after I restore from savepoint I sent some records 
> for different key (B) for the same window as shown in the 
> pictures(14:00-14:01) without seeing trigger going off for restored window 
> and key A.
> My guess is that Beam after job is restored doesn't register flink event time 
> timer for restored window unless there is a new element (key) coming for the 
> restored window.
> Please refer to [this 
> gist|https://gist.github.com/pbartoszek/7ab88c8b6538039db1b383358d1d1b5a] for 
> test job that shows this behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-02-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348542#comment-16348542
 ] 

Aljoscha Krettek commented on BEAM-3414:


Still working on getting that PR in, yes.

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Aljoscha Krettek
>Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.<String, String>read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn<Long, Void>() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3087) Extend lock scope in Flink UnboundedSourceWrapper

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341059#comment-16341059
 ] 

Aljoscha Krettek commented on BEAM-3087:


There was one user that had a problem because of this but I didn't hear about 
it recently, so I think it's ok to push it back.

> Extend lock scope in Flink UnboundedSourceWrapper
> -
>
> Key: BEAM-3087
> URL: https://issues.apache.org/jira/browse/BEAM-3087
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>    Reporter: Aljoscha Krettek
>Priority: Critical
> Fix For: 2.3.0
>
>
> In {{UnboundedSourceWrapper}} the lock scope is not big enough: we 
> synchronise in {{emitElement()}} but should instead synchronise inside the 
> reader loop in {{run()}} because the {{Source}} interface does not allow 
> concurrent calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341057#comment-16341057
 ] 

Aljoscha Krettek edited comment on BEAM-3494 at 1/26/18 1:32 PM:
-

How are you enabling checkpointing? Also, could you please reformat your 
posting to put the code inside code tags to make it more readable?


was (Author: aljoscha):
How are you enabling checkpointing? Also, could you please reformat your 
posting to put the code between code tags, like this
{code:java}
{code}
your code ...
{code}{code}
to make it more readable?

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341057#comment-16341057
 ] 

Aljoscha Krettek commented on BEAM-3494:


How are you enabling checkpointing? Also, could you please reformat your 
posting to put the code between code tags, like this
{code:java}
{code}
your code ...
{code}{code}
to make it more readable?

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Assigned] (BEAM-3472) Create a callback triggered at the end of a batch in flink runner

2018-01-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3472:
--

Assignee: (was: Aljoscha Krettek)

> Create a callback triggered at the end of a batch in flink runner
> -
>
> Key: BEAM-3472
> URL: https://issues.apache.org/jira/browse/BEAM-3472
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Etienne Chauchot
>Priority: Major
>
> In the future we might add new features to the runners for which we might 
> need to do some processing at the end of a batch. Currently there is not 
> unique place (a callback) to add this processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-01-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3494:
--

Assignee: (was: Aljoscha Krettek)

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3529) Side Input callbacks should fire after a window has expired in addition to when the trigger would fire

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341044#comment-16341044
 ] 

Aljoscha Krettek commented on BEAM-3529:


There should probably also be {{ValidatesRunner}} tests about this behaviour to 
ensure that it is uniform across runners, right?

> Side Input callbacks should fire after a window has expired in addition to 
> when the trigger would fire
> --
>
> Key: BEAM-3529
> URL: https://issues.apache.org/jira/browse/BEAM-3529
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>
> Otherwise a streaming pipeline where elements trying to access a window that 
> has expired will not progress, even though we can be certain that there will 
> never be inputs in that window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3370) Add ability to stage directories with compiled classes to Flink

2018-01-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3370:
--

Assignee: (was: Aljoscha Krettek)

> Add ability to stage directories with compiled classes to Flink
> ---
>
> Key: BEAM-3370
> URL: https://issues.apache.org/jira/browse/BEAM-3370
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Łukasz Gajowy
>Priority: Minor
>
> Currently, when _filesToStage_ contain a path to directory with resources, 
> flink runner throws a {{"java.io.FileNotFoundException:  (Is 
> a directory)"}}. A way to include directory resources would be helpful. 
> This "blocker" occurs while trying to run IOITs on flink runner, which 
> basically makes it impossible/very inconvenient to run. When the tests are 
> run via "mvn verify" command, a "test-classes" *directory* gets detected by 
> detectClasspathResourcesToStage() method which in turn causes the above 
> error. 
> One way to solve this issue is to package the directories to jars with unique 
> names and update the paths accordingly before staging the files on flink. 
> Something similar is already done in the Dataflow runner 
> ([GcsStager|https://github.com/apache/beam/blob/cd186a531aaff0b21cf009b034e1a41f7e7b64af/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java#L74]),
>  more specifically in 
> [PackageUtil|https://github.com/apache/beam/blob/cd186a531aaff0b21cf009b034e1a41f7e7b64af/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java#L280]
>  class. We are able to run the tests on dataflow thanks to that. 
> As I checked in a [small experiment of 
> mine|https://github.com/lgajowy/beam/commits/spark-and-flink-run-tests], 
> providing analogous change makes it possible to run the tests on a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3359) Unable to change "flinkMaster" from "[auto]" in TestFlinkRunner

2018-01-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3359:
--

Assignee: (was: Aljoscha Krettek)

> Unable to change "flinkMaster" from "[auto]" in TestFlinkRunner
> ---
>
> Key: BEAM-3359
> URL: https://issues.apache.org/jira/browse/BEAM-3359
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Łukasz Gajowy
>Priority: Minor
>
> In TestFlinkRunner's constructor there is a line like this:
> {{options.setFlinkMaster("\[auto\]");}}
> which basically ignores any "flinkMaster" provided earlier (eg. using command 
> line) leading to  errors that are hard to find (for example wondering: "i 
> provided good url in pipeline options... why is it not connecting to my 
> cluster?). 
> Setting a {{@Default.String("\[auto\]")}} in FlinkPipelineOptions could be 
> one solution I guess. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3379) PAssert not successful when running IOITs with Flink runner

2018-01-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313338#comment-16313338
 ] 

Aljoscha Krettek commented on BEAM-3379:


Could you check whether this PR fixes it? 
https://github.com/apache/beam/pull/4348

I'm assuming the File IOs use SDF and this PR fixes problems around that that 
have to do with processing-time timers being dropped.

> PAssert not successful when running IOITs with Flink runner
> ---
>
> Key: BEAM-3379
> URL: https://issues.apache.org/jira/browse/BEAM-3379
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Łukasz Gajowy
>Assignee: Aljoscha Krettek
>
> When running IOIT tests on Flink either with command:
> {{mvn clean verify \-pl sdks/java/io/file-based-io-tests/ \-Pio-it 
> \-DintegrationTestPipelineOptions='\["\-\-runner=FlinkRunner", 
> "\-\-flinkMaster=localhost:6123", "\-\-numberOfRecords=10", 
> "\-\-filenamePrefix=TEST", 
> "\-\-filesToStage=/Users/lukasz/Projects/apache-beam/beam/sdks/java/io/file-based-io-tests/target/beam-sdks-java-io-file-based-io-tests-2.3.0-SNAPSHOT-shaded.jar,/Users/lukasz/Projects/apache-beam/beam/sdks/java/io/file-based-io-tests/target/beam-sdks-java-io-file-based-io-tests-2.3.0-SNAPSHOT-tests.jar"\]'
>  -Dit.test=TextIOIT -Pflink-runner}}
> or without the filesToStage argument but using modifications described in 
> BEAM-3370, we get the following error, even though the assertion is succesful 
> (checked manually):
> java.lang.AssertionError:
> Expected 1 successful assertions, but found 0.
> Expected: is <1L>
>  but: was <0L>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.beam.sdk.testing.TestPipeline.verifyPAssertsSucceeded(TestPipeline.java:541)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:359)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:340)
>   at 
> org.apache.beam.sdk.io.text.TextIOIT.writeThenReadAll(TextIOIT.java:109)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:329)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   4   5   >