[jira] [Closed] (BEAM-5372) [Flink Runner] Make minPauseBetweenCheckpoints setting available in FlinkPipelineOptions

2018-09-14 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-5372.
--
Resolution: Fixed

> [Flink Runner] Make minPauseBetweenCheckpoints setting available in 
> FlinkPipelineOptions
> 
>
> Key: BEAM-5372
> URL: https://issues.apache.org/jira/browse/BEAM-5372
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Daniel Harper
>Assignee: Daniel Harper
>Priority: Trivial
> Fix For: 2.7.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The configuration setting {{minPauseBetweenCheckpoints}} [1] is available in 
> Flink to allow a grace period when checkpoints runtime is > checkpoint 
> interval. 
> This should be exposed in {{FlinkPipelineOptions}} and 
> {{FlinkExecutionEnvironments}} to allow users to configure this.
> The default for this value in Flink is 0ms [2] 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setMinPauseBetweenCheckpoints-long-
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/constant-values.html#org.apache.flink.streaming.api.environment.CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-5372) [Flink Runner] Make minPauseBetweenCheckpoints setting available in FlinkPipelineOptions

2018-09-14 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-5372:
---
Fix Version/s: 2.7.0

> [Flink Runner] Make minPauseBetweenCheckpoints setting available in 
> FlinkPipelineOptions
> 
>
> Key: BEAM-5372
> URL: https://issues.apache.org/jira/browse/BEAM-5372
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Daniel Harper
>Assignee: Daniel Harper
>Priority: Trivial
> Fix For: 2.7.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The configuration setting {{minPauseBetweenCheckpoints}} [1] is available in 
> Flink to allow a grace period when checkpoints runtime is > checkpoint 
> interval. 
> This should be exposed in {{FlinkPipelineOptions}} and 
> {{FlinkExecutionEnvironments}} to allow users to configure this.
> The default for this value in Flink is 0ms [2] 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setMinPauseBetweenCheckpoints-long-
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/constant-values.html#org.apache.flink.streaming.api.environment.CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3919) checkpoint can not work with flink 1.4.1,1.4.2

2018-08-22 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588744#comment-16588744
 ] 

Aljoscha Krettek commented on BEAM-3919:


Unfortunately, I don't think we can fix this right now with the incompatible 
changes in minor Flink versions.

> checkpoint can not work with flink 1.4.1,1.4.2
> --
>
> Key: BEAM-3919
> URL: https://issues.apache.org/jira/browse/BEAM-3919
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0, 2.4.0
>Reporter: eisig
>Assignee: Harshal Tripathi
>Priority: Critical
>
> When submmit application to flink cluster(1.4.1,1.4.2) with checkpoint 
> enabled. 
> Job fail whith exception:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(Lorg/apache/flink/core/memory/DataOutputViewStreamWrapper;I)V
>  
> It seems that 
> `org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup`.
>   was changed in flink1.4.1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4798) IndexOutOfBoundsException when Flink parallelism > 1

2018-08-10 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-4798.
--
Resolution: Fixed

> IndexOutOfBoundsException when Flink parallelism > 1
> 
>
> Key: BEAM-4798
> URL: https://issues.apache.org/jira/browse/BEAM-4798
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Alexey Romanenko
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.7.0
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Running job on Flink in streaming mode and get data from a Kafka topic with 
> parallelism > 1 causes an exception:
> {noformat}
> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>   at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}
> It happens when number of Kafka topic partitions is less than value of 
> parallelism (number of task slots).
> So, workaround for now can be to set parallelism <= number of topic 
> partitions, thus if parallelism=2 then number_partitions >= 2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4798) IndexOutOfBoundsException when Flink parallelism > 1

2018-08-10 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-4798:
---
Fix Version/s: 2.7.0

> IndexOutOfBoundsException when Flink parallelism > 1
> 
>
> Key: BEAM-4798
> URL: https://issues.apache.org/jira/browse/BEAM-4798
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Alexey Romanenko
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.7.0
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Running job on Flink in streaming mode and get data from a Kafka topic with 
> parallelism > 1 causes an exception:
> {noformat}
> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>   at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}
> It happens when number of Kafka topic partitions is less than value of 
> parallelism (number of task slots).
> So, workaround for now can be to set parallelism <= number of topic 
> partitions, thus if parallelism=2 then number_partitions >= 2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2930) Flink support for portable side input

2018-07-27 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559894#comment-16559894
 ] 

Aljoscha Krettek commented on BEAM-2930:


Yes, that sounds like a good approach. I upgraded the Flink Runner to use the 
new broadcast state feature when updating to Flink 1.5.0 and there are plans to 
make this backed by RocksDB so we will even get support for bigger side inputs 
once Flink has that support.

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4311) Enforce ErrorProne analysis in Flink runner project

2018-06-08 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-4311.
--
Resolution: Fixed

> Enforce ErrorProne analysis in Flink runner project
> ---
>
> Key: BEAM-4311
> URL: https://issues.apache.org/jira/browse/BEAM-4311
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Scott Wegner
>Assignee: Tim Robertson
>Priority: Minor
>  Labels: errorprone, starter
> Fix For: 2.6.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently 
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build 
> process, but only as warnings. ErrorProne errors are generally useful and 
> easy to fix. Some work was done to [make sdks-java-core 
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add 
> enforcement. This task is clean ErrorProne warnings and add enforcement in 
> {{beam-runners-flink}}. Additional context discussed on the [dev 
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution 
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development 
> environment.
> # Run the following command to compile and run ErrorProne analysis on the 
> project: {{./gradlew :beam-runners-flink:assemble}}
> # Fix each ErrorProne warning from the {{runners/flink}} project.
> # In {{runners/flink/build.gradle}}, add {{failOnWarning: true}} to the call 
> the {{applyJavaNature()}} 
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach 
> out|https://beam.apache.org/community/contact-us/] with questions or code 
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4311) Enforce ErrorProne analysis in Flink runner project

2018-06-08 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-4311:
---
Fix Version/s: 2.6.0

> Enforce ErrorProne analysis in Flink runner project
> ---
>
> Key: BEAM-4311
> URL: https://issues.apache.org/jira/browse/BEAM-4311
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Scott Wegner
>Assignee: Tim Robertson
>Priority: Minor
>  Labels: errorprone, starter
> Fix For: 2.6.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently 
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build 
> process, but only as warnings. ErrorProne errors are generally useful and 
> easy to fix. Some work was done to [make sdks-java-core 
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add 
> enforcement. This task is clean ErrorProne warnings and add enforcement in 
> {{beam-runners-flink}}. Additional context discussed on the [dev 
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution 
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development 
> environment.
> # Run the following command to compile and run ErrorProne analysis on the 
> project: {{./gradlew :beam-runners-flink:assemble}}
> # Fix each ErrorProne warning from the {{runners/flink}} project.
> # In {{runners/flink/build.gradle}}, add {{failOnWarning: true}} to the call 
> the {{applyJavaNature()}} 
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach 
> out|https://beam.apache.org/community/contact-us/] with questions or code 
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3909) Add tests for Flink DoFnOperator side-input checkpointing

2018-04-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-3909:
---
Fix Version/s: 2.5.0

> Add tests for Flink DoFnOperator side-input checkpointing
> -
>
> Key: BEAM-3909
> URL: https://issues.apache.org/jira/browse/BEAM-3909
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3909) Add tests for Flink DoFnOperator side-input checkpointing

2018-04-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3909.
--
Resolution: Fixed

> Add tests for Flink DoFnOperator side-input checkpointing
> -
>
> Key: BEAM-3909
> URL: https://issues.apache.org/jira/browse/BEAM-3909
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4063) Flink runner supports cluster-wide artifact deployments through the Distributed Cache

2018-04-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-4063:
--

Assignee: (was: Aljoscha Krettek)

> Flink runner supports cluster-wide artifact deployments through the 
> Distributed Cache
> -
>
> Key: BEAM-4063
> URL: https://issues.apache.org/jira/browse/BEAM-4063
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>
> As of now, Flink effectively has a dependency on an external storage system 
> for artifact management. This is because the Flink Distributed Cache does not 
> actually distribute and cache blobs itself, but rather expects that each node 
> in a running cluster has access to a well-known artifact resource.
> We should get this for free whenever 
> [https://github.com/apache/flink/pull/5580] is merged (likely in 1.5). For 
> now, we will have to defer to external storage systems like GCS or HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4067) Add portable Flink test runner

2018-04-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-4067:
--

Assignee: (was: Aljoscha Krettek)

> Add portable Flink test runner
> --
>
> Key: BEAM-4067
> URL: https://issues.apache.org/jira/browse/BEAM-4067
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>
> The portable Flink runner cannot be tested through the normal mechanisms used 
> for ValidatesRunner tests because it requires a job server to be constructed 
> out of band and for pipelines to be run through it. We should implement a 
> shim that acts as a standard Java SDK Runner that spins up the necessary 
> server (possibly in-process) and runs against it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2927) Python SDK support for portable side input

2018-04-03 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423602#comment-16423602
 ] 

Aljoscha Krettek commented on BEAM-2927:


The problematic piece is this: 
https://github.com/apache/beam/blob/1f681bb1687aaa1ec23741e87f8622e6a4e59f7d/sdks/python/apache_beam/runners/worker/sdk_worker.py#L72

The python harness tries to create a state client that uses the control API 
descriptor when starting the worker. However, the worker harness should use the 
state API descriptor that it gets from the {{ProcessBundleDescriptor}} to 
access state and side inputs.

cc [~lcwik]

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3087) Extend lock scope in Flink UnboundedSourceWrapper

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3087.
--
   Resolution: Fixed
Fix Version/s: 2.5.0

> Extend lock scope in Flink UnboundedSourceWrapper
> -
>
> Key: BEAM-3087
> URL: https://issues.apache.org/jira/browse/BEAM-3087
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>
> In {{UnboundedSourceWrapper}} the lock scope is not big enough: we 
> synchronise in {{emitElement()}} but should instead synchronise inside the 
> reader loop in {{run()}} because the {{Source}} interface does not allow 
> concurrent calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2393.
--
   Resolution: Fixed
Fix Version/s: 2.5.0

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (BEAM-622) Add checkpointing tests for DoFnOperator and WindowDoFnOperator

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened BEAM-622:
---

> Add checkpointing tests for DoFnOperator and WindowDoFnOperator 
> 
>
> Key: BEAM-622
> URL: https://issues.apache.org/jira/browse/BEAM-622
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Affects Versions: 0.3.0-incubating
>Reporter: Maximilian Michels
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Tests which test the correct snapshotting of these two operators are missing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-622) Add checkpointing tests for DoFnOperator and WindowDoFnOperator

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-622.
-
   Resolution: Fixed
Fix Version/s: 2.5.0

> Add checkpointing tests for DoFnOperator and WindowDoFnOperator 
> 
>
> Key: BEAM-622
> URL: https://issues.apache.org/jira/browse/BEAM-622
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Affects Versions: 0.3.0-incubating
>Reporter: Maximilian Michels
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Tests which test the correct snapshotting of these two operators are missing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3800) Set uids on Flink operators

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3800.
--
   Resolution: Fixed
Fix Version/s: 2.5.0

> Set uids on Flink operators
> ---
>
> Key: BEAM-3800
> URL: https://issues.apache.org/jira/browse/BEAM-3800
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Grzegorz Kołakowski
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Flink operators should have unique ids assigned, which are, in turn, used for 
> checkpointing stateful operators. Assigning operator ids is highly 
> recommended according to Flink documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3919) checkpoint can not work with flink 1.4.1,1.4.2

2018-03-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3919:
--

Assignee: Harshal Tripathi

> checkpoint can not work with flink 1.4.1,1.4.2
> --
>
> Key: BEAM-3919
> URL: https://issues.apache.org/jira/browse/BEAM-3919
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0, 2.4.0
>Reporter: eisig
>Assignee: Harshal Tripathi
>Priority: Critical
>
> When submmit application to flink cluster(1.4.1,1.4.2) with checkpoint 
> enabled. 
> Job fail whith exception:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(Lorg/apache/flink/core/memory/DataOutputViewStreamWrapper;I)V
>  
> It seems that 
> `org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup`.
>   was changed in flink1.4.1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3919) checkpoint can not work with flink 1.4.1,1.4.2

2018-03-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3919:
--

Assignee: (was: Aljoscha Krettek)

> checkpoint can not work with flink 1.4.1,1.4.2
> --
>
> Key: BEAM-3919
> URL: https://issues.apache.org/jira/browse/BEAM-3919
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0, 2.4.0
>Reporter: eisig
>Priority: Critical
>
> When submmit application to flink cluster(1.4.1,1.4.2) with checkpoint 
> enabled. 
> Job fail whith exception:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(Lorg/apache/flink/core/memory/DataOutputViewStreamWrapper;I)V
>  
> It seems that 
> `org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup`.
>   was changed in flink1.4.1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3909) Add tests for Flink DoFnOperator side-input checkpointing

2018-03-21 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3909:
--

 Summary: Add tests for Flink DoFnOperator side-input checkpointing
 Key: BEAM-3909
 URL: https://issues.apache.org/jira/browse/BEAM-3909
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3905) Update Flink Runner to Flink 1.5.0

2018-03-21 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3905:
--

 Summary: Update Flink Runner to Flink 1.5.0
 Key: BEAM-3905
 URL: https://issues.apache.org/jira/browse/BEAM-3905
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2018-03-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405185#comment-16405185
 ] 

Aljoscha Krettek commented on BEAM-2393:


Yes, it's very good to worry about those things! 

I would say that's it's ok, because I don't see how else we could do this 
currently.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Grzegorz Kołakowski
>Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2018-03-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16402220#comment-16402220
 ] 

Aljoscha Krettek commented on BEAM-2393:


Ah, I think that explains it. This call in the source: 
https://github.com/apache/beam/blob/6148a6d063a0503ee435ab5084fcba3fb864b26f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L333
 will at some point invoke the ParDo because the operations are chained 
together, i.e. they are being executed in the same thread and "sending" 
elements downstream is just method calls. This means that the source will block 
for about 20s in that synchronized block, which keeps the checkpointing logic 
from acquiring the lock.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Grzegorz Kołakowski
>Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-03-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3043.
--
Resolution: Fixed

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

2018-03-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397890#comment-16397890
 ] 

Aljoscha Krettek commented on BEAM-2393:


Have you enabled DEBUG logging in Flink? What do the logs say about the 
checkpointing times on the individual operators?

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Jingsong Lee
>Assignee: Grzegorz Kołakowski
>Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3675) FlinkRunner: Logging server

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3675:
--

Assignee: (was: Aljoscha Krettek)

> FlinkRunner: Logging server
> ---
>
> Key: BEAM-3675
> URL: https://issues.apache.org/jira/browse/BEAM-3675
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Major
>
> An implementation of BeamFnLogging that uses the default Flink logging 
> mechanism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3673) FlinkRunner: Harness manager for connecting operators to SDK Harnesses

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3673:
--

Assignee: (was: Aljoscha Krettek)

> FlinkRunner: Harness manager for connecting operators to SDK Harnesses
> --
>
> Key: BEAM-3673
> URL: https://issues.apache.org/jira/browse/BEAM-3673
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Major
>
> SDK harnesses require a common set of gRPC services to operate. The role of 
> the harness manager is to provide a uniform interface that multiplexes data 
> streams and auxiliary data between SDK environments and operators within a 
> given job.
> Note that multiple operators may communicate with a single SDK environment to 
> amortize container initialization cost. Environments are _not_ shared between 
> different jobs.
> The initial implementation will shell out to local docker, but the harness 
> manager should eventually support working with externally-managed 
> environments (e.g., created by Kubernetes).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3753) Integration ITCase tests are not executed

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3753.
--
   Resolution: Fixed
Fix Version/s: (was: 2.4.0)
   2.5.0

> Integration ITCase tests are not executed
> -
>
> Key: BEAM-3753
> URL: https://issues.apache.org/jira/browse/BEAM-3753
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Grzegorz Kołakowski
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The flink-runner {{*ITCase.java}} tests are not executed at all, either by 
> surefire or by filesafe plugin.
>  * org.apache.beam.runners.flink.ReadSourceStreamingITCase
>  * org.apache.beam.runners.flink.ReadSourceITCase
>  * org.apache.beam.runners.flink.streaming.TopWikipediaSessionsITCase
> In addition, two of them fail if run manually for IDE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened BEAM-3043:


Reopen to change fixVersion

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (BEAM-3753) Integration ITCase tests are not executed

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened BEAM-3753:


Reopen to change fixVersion.

> Integration ITCase tests are not executed
> -
>
> Key: BEAM-3753
> URL: https://issues.apache.org/jira/browse/BEAM-3753
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Grzegorz Kołakowski
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The flink-runner {{*ITCase.java}} tests are not executed at all, either by 
> surefire or by filesafe plugin.
>  * org.apache.beam.runners.flink.ReadSourceStreamingITCase
>  * org.apache.beam.runners.flink.ReadSourceITCase
>  * org.apache.beam.runners.flink.streaming.TopWikipediaSessionsITCase
> In addition, two of them fail if run manually for IDE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-3043:
---
Fix Version/s: (was: 2.4.0)
   2.5.0

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3043.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3753) Integration ITCase tests are not executed

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3753.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

> Integration ITCase tests are not executed
> -
>
> Key: BEAM-3753
> URL: https://issues.apache.org/jira/browse/BEAM-3753
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Grzegorz Kołakowski
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The flink-runner {{*ITCase.java}} tests are not executed at all, either by 
> surefire or by filesafe plugin.
>  * org.apache.beam.runners.flink.ReadSourceStreamingITCase
>  * org.apache.beam.runners.flink.ReadSourceITCase
>  * org.apache.beam.runners.flink.streaming.TopWikipediaSessionsITCase
> In addition, two of them fail if run manually for IDE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3768) Compile error for Flink translation

2018-03-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3768.
--
Resolution: Fixed
  Assignee: Thomas Groh  (was: Aljoscha Krettek)

This was resolved via https://github.com/apache/beam/pull/4787

> Compile error for Flink translation
> ---
>
> Key: BEAM-3768
> URL: https://issues.apache.org/jira/browse/BEAM-3768
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Robert Bradshaw
>Assignee: Thomas Groh
>Priority: Blocker
> Fix For: 2.4.0
>
>
> 2018-03-01T21:22:58.234 [INFO] --- maven-compiler-plugin:3.7.0:compile 
> (default-compile) @ beam-runners-flink_2.11 ---
> 2018-03-01T21:22:58.258 [INFO] Changes detected - recompiling the module!
> 2018-03-01T21:22:58.259 [INFO] Compiling 75 source files to 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/target/classes
> 2018-03-01T21:22:59.555 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Some input files use or override a deprecated API.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Recompile with -Xlint:deprecation for details.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Some input files use unchecked or unsafe operations.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Recompile with -Xlint:unchecked for details.
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.556 [ERROR] COMPILATION ERROR : 
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.557 [ERROR] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:[359,11]
>  cannot infer type arguments for 
> org.apache.beam.runners.core.ProcessFnRunner<>
>   reason: cannot infer type-variable(s) InputT,OutputT,RestrictionT
> (argument mismatch; org.apache.beam.runners.core.DoFnRunner cannot be 
> converted to 
> org.apache.beam.runners.core.SimpleDoFnRunner>,OutputT>)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3768) Compile error for Flink translation

2018-03-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-3768:
---
Issue Type: Bug  (was: Test)

> Compile error for Flink translation
> ---
>
> Key: BEAM-3768
> URL: https://issues.apache.org/jira/browse/BEAM-3768
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Robert Bradshaw
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.4.0
>
>
> 2018-03-01T21:22:58.234 [INFO] --- maven-compiler-plugin:3.7.0:compile 
> (default-compile) @ beam-runners-flink_2.11 ---
> 2018-03-01T21:22:58.258 [INFO] Changes detected - recompiling the module!
> 2018-03-01T21:22:58.259 [INFO] Compiling 75 source files to 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/target/classes
> 2018-03-01T21:22:59.555 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Some input files use or override a deprecated API.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Recompile with -Xlint:deprecation for details.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Some input files use unchecked or unsafe operations.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Recompile with -Xlint:unchecked for details.
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.556 [ERROR] COMPILATION ERROR : 
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.557 [ERROR] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:[359,11]
>  cannot infer type arguments for 
> org.apache.beam.runners.core.ProcessFnRunner<>
>   reason: cannot infer type-variable(s) InputT,OutputT,RestrictionT
> (argument mismatch; org.apache.beam.runners.core.DoFnRunner cannot be 
> converted to 
> org.apache.beam.runners.core.SimpleDoFnRunner>,OutputT>)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3768) Compile error for Flink translation

2018-03-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383585#comment-16383585
 ] 

Aljoscha Krettek commented on BEAM-3768:


[~robertwb] & [~rmannibucau] I commented on the PR: 
https://github.com/apache/beam/pull/4372. What do you think?

> Compile error for Flink translation
> ---
>
> Key: BEAM-3768
> URL: https://issues.apache.org/jira/browse/BEAM-3768
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Robert Bradshaw
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.4.0
>
>
> 2018-03-01T21:22:58.234 [INFO] --- maven-compiler-plugin:3.7.0:compile 
> (default-compile) @ beam-runners-flink_2.11 ---
> 2018-03-01T21:22:58.258 [INFO] Changes detected - recompiling the module!
> 2018-03-01T21:22:58.259 [INFO] Compiling 75 source files to 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/target/classes
> 2018-03-01T21:22:59.555 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Some input files use or override a deprecated API.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java:
>  Recompile with -Xlint:deprecation for details.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Some input files use unchecked or unsafe operations.
> 2018-03-01T21:22:59.556 [INFO] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java:
>  Recompile with -Xlint:unchecked for details.
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.556 [ERROR] COMPILATION ERROR : 
> 2018-03-01T21:22:59.556 [INFO] 
> -
> 2018-03-01T21:22:59.557 [ERROR] 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/src/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:[359,11]
>  cannot infer type arguments for 
> org.apache.beam.runners.core.ProcessFnRunner<>
>   reason: cannot infer type-variable(s) InputT,OutputT,RestrictionT
> (argument mismatch; org.apache.beam.runners.core.DoFnRunner cannot be 
> converted to 
> org.apache.beam.runners.core.SimpleDoFnRunner>,OutputT>)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2018-02-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-2140:
---
Fix Version/s: 2.4.0

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2018-02-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2140.
--
Resolution: Fixed

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3728) Failing ParDoTest for Flink Runner

2018-02-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3728:
--

Assignee: (was: Aljoscha Krettek)

> Failing ParDoTest for Flink Runner
> --
>
> Key: BEAM-3728
> URL: https://issues.apache.org/jira/browse/BEAM-3728
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.4.0
>
>
> It seems the changes for BEAM-3700/BEAM-3701 broke some tests in ParDoTest 
> for the Flink Runner: 
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/5054/
> I don't currently understand what is causing this. [~lcwik] Do you have any 
> idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3728) Failing ParDoTest for Flink Runner

2018-02-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372906#comment-16372906
 ] 

Aljoscha Krettek commented on BEAM-3728:


Also cc [~romain.manni-bucau] as the implementor. I thought it was Luke but I 
was confused by the git history.

> Failing ParDoTest for Flink Runner
> --
>
> Key: BEAM-3728
> URL: https://issues.apache.org/jira/browse/BEAM-3728
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.4.0
>
>
> It seems the changes for BEAM-3700/BEAM-3701 broke some tests in ParDoTest 
> for the Flink Runner: 
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/5054/
> I don't currently understand what is causing this. [~lcwik] Do you have any 
> idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3728) Failing ParDoTest for Flink Runner

2018-02-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3728:
--

 Summary: Failing ParDoTest for Flink Runner
 Key: BEAM-3728
 URL: https://issues.apache.org/jira/browse/BEAM-3728
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 2.4.0


It seems the changes for BEAM-3700/BEAM-3701 broke some tests in ParDoTest for 
the Flink Runner: 
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/5054/

I don't currently understand what is causing this. [~lcwik] Do you have any 
idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3673) FlinkRunner: Harness manager for connecting operators to SDK Harnesses

2018-02-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358208#comment-16358208
 ] 

Aljoscha Krettek commented on BEAM-3673:


[~bsidhom] Am I supposed to be assigned to all of those recently created issues 
or is that just because of the default assignment?

> FlinkRunner: Harness manager for connecting operators to SDK Harnesses
> --
>
> Key: BEAM-3673
> URL: https://issues.apache.org/jira/browse/BEAM-3673
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Aljoscha Krettek
>Priority: Major
>
> SDK harnesses require a common set of gRPC services to operate. The role of 
> the harness manager is to provide a uniform interface that multiplexes data 
> streams and auxiliary data between SDK environments and operators within a 
> given job.
> Note that multiple operators may communicate with a single SDK environment to 
> amortize container initialization cost. Environments are _not_ shared between 
> different jobs.
> The initial implementation will shell out to local docker, but the harness 
> manager should eventually support working with externally-managed 
> environments (e.g., created by Kubernetes).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3648) Support Splittable DoFn in Flink Batch Runner

2018-02-08 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3648:
--

 Summary: Support Splittable DoFn in Flink Batch Runner
 Key: BEAM-3648
 URL: https://issues.apache.org/jira/browse/BEAM-3648
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2018-02-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353712#comment-16353712
 ] 

Aljoscha Krettek commented on BEAM-2806:


[~jbonofre] I merged this in master in commit 
b5c31ed96f7abcf489ccd4d46a8db4fb17476a27. Do you think we can also cherry-pick 
this on the 2.3.0 branch because it fixes a somewhat bad bug in the Flink 
Runner.

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.3.0
>
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView> rowsView = rightRows
> .apply(View.asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2018-02-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-2806:
---
Fix Version/s: 2.3.0

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.3.0
>
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView> rowsView = rightRows
> .apply(View.asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2018-02-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2806.
--
Resolution: Fixed

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Grzegorz Kołakowski
>Priority: Major
> Fix For: 2.3.0
>
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView> rowsView = rightRows
> .apply(View.asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3186) In-flight data loss when restoring from savepoint

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3186.
--
Resolution: Fixed

> In-flight data loss when restoring from savepoint
> -
>
> Key: BEAM-3186
> URL: https://issues.apache.org/jira/browse/BEAM-3186
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Pawel Bartoszek
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 2.3.0
>
> Attachments: restore_no_trigger.png, restore_with_trigger.png, 
> restore_with_trigger_b.png
>
>
> *The context:*
> I want to count how many events of given type(A,B, etc) I receive every 
> minute using 1 minute windows and AfterWatermark trigger with allowed 
> lateness 1 min.
> *Data loss case*
> In the case below if there is at least one A element with the event time 
> belonging to the window 14:00-14:01 read from Kinesis stream after job is 
> restored from savepoint the data loss will not be observed for this key and 
> this window.
> !restore_no_trigger.png!
> *Not data loss case*
> However, if no new A element element is read from Kinesis stream than data 
> loss is observable.
> !restore_with_trigger.png!
> *Workaround*
> As a workaround we could configure early firings every X seconds which gives 
> up to X seconds data loss per key on restore.
> *My guess where the issue might be*
> I believe this is Beam-Flink integration layer bug. From my investigation I 
> don't think it's KinesisReader and possibility that it couldn't advance 
> watermark. To prove that after I restore from savepoint I sent some records 
> for different key (B) for the same window as shown in the 
> pictures(14:00-14:01) without seeing trigger going off for restored window 
> and key A.
> My guess is that Beam after job is restored doesn't register flink event time 
> timer for restored window unless there is a new element (key) coming for the 
> restored window.
> Please refer to [this 
> gist|https://gist.github.com/pbartoszek/7ab88c8b6538039db1b383358d1d1b5a] for 
> test job that shows this behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3589.
--
Resolution: Fixed

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-3589:
---
Fix Version/s: 2.3.0

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352250#comment-16352250
 ] 

Aljoscha Krettek commented on BEAM-3589:


[~jbonofre] This was now fixed as part of another fix in this commit:
01d74b3b8381c04ffba3879281897ea2ef6fbcc5

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3186) In-flight data loss when restoring from savepoint

2018-02-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352248#comment-16352248
 ] 

Aljoscha Krettek commented on BEAM-3186:


[~jbonofre] This was fixed on master in these commits:
01d74b3b8381c04ffba3879281897ea2ef6fbcc5
6808b8855f90f69098bd37d2353da7d3ca329eb3

Should I cherry-pick them onto the 2.3.0 release branch or do you want to do 
that?

> In-flight data loss when restoring from savepoint
> -
>
> Key: BEAM-3186
> URL: https://issues.apache.org/jira/browse/BEAM-3186
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Pawel Bartoszek
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 2.3.0
>
> Attachments: restore_no_trigger.png, restore_with_trigger.png, 
> restore_with_trigger_b.png
>
>
> *The context:*
> I want to count how many events of given type(A,B, etc) I receive every 
> minute using 1 minute windows and AfterWatermark trigger with allowed 
> lateness 1 min.
> *Data loss case*
> In the case below if there is at least one A element with the event time 
> belonging to the window 14:00-14:01 read from Kinesis stream after job is 
> restored from savepoint the data loss will not be observed for this key and 
> this window.
> !restore_no_trigger.png!
> *Not data loss case*
> However, if no new A element element is read from Kinesis stream than data 
> loss is observable.
> !restore_with_trigger.png!
> *Workaround*
> As a workaround we could configure early firings every X seconds which gives 
> up to X seconds data loss per key on restore.
> *My guess where the issue might be*
> I believe this is Beam-Flink integration layer bug. From my investigation I 
> don't think it's KinesisReader and possibility that it couldn't advance 
> watermark. To prove that after I restore from savepoint I sent some records 
> for different key (B) for the same window as shown in the 
> pictures(14:00-14:01) without seeing trigger going off for restored window 
> and key A.
> My guess is that Beam after job is restored doesn't register flink event time 
> timer for restored window unless there is a new element (key) coming for the 
> restored window.
> Please refer to [this 
> gist|https://gist.github.com/pbartoszek/7ab88c8b6538039db1b383358d1d1b5a] for 
> test job that shows this behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348631#comment-16348631
 ] 

Aljoscha Krettek commented on BEAM-3589:


[~grzegorz_kolakowski] I think I'm too tired today but does 
https://github.com/apache/beam/pull/4558 actually remove that unnecessary 
{{(Read.Unbounded)}} cast, I couldn't find it anymore. 

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2018-02-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-2806:
--

Assignee: Grzegorz Kołakowski  (was: Aljoscha Krettek)

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Grzegorz Kołakowski
>Priority: Major
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView> rowsView = rightRows
> .apply(View.asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3589) Flink runner breaks with ClassCastException on UnboundedSource

2018-02-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3589:
--

Assignee: Grzegorz Kołakowski

> Flink runner breaks with ClassCastException on UnboundedSource
> --
>
> Key: BEAM-3589
> URL: https://issues.apache.org/jira/browse/BEAM-3589
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0
>Reporter: Ismaël Mejía
>Assignee: Grzegorz Kołakowski
>Priority: Blocker
>
> When you execute a pipeline tha uses an unbounded source and an empty 
> transform it produces a ClassCastException:
> {quote}[WARNING] 
> java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.runners.core.construction.AutoValue_PTransformTranslation_UnknownRawPTransform
>  cannot be cast to org.apache.beam.sdk.io.Read$Unbounded
>     at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode
>  (FlinkStreamingTransformTranslators.java:256)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform
>  (FlinkStreamingPipelineTranslator.java:139)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform
>  (FlinkStreamingPipelineTranslator.java:118)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:670)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:623)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:647)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:662)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 
> (TransformHierarchy.java:311)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:458)
>     at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate 
> (FlinkPipelineTranslator.java:38)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate 
> (FlinkStreamingPipelineTranslator.java:70)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate 
> (FlinkPipelineExecutionEnvironment.java:113)
>     at org.apache.beam.runners.flink.FlinkRunner.run (FlinkRunner.java:110)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
>     at org.apache.beam.sdk.nexmark.NexmarkLauncher.run 
> (NexmarkLauncher.java:1139)
>     at org.apache.beam.sdk.nexmark.Main.runAll (Main.java:69)
>     at org.apache.beam.sdk.nexmark.Main.main (Main.java:301)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
>     at java.lang.Thread.run (Thread.java:748)
> {quote}
> You can reproduce it quickly by running this command from the nexmark 
> directory:
> {quote}mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main 
> -Pflink-runner "-Dexec.args=--runner=FlinkRunner --query=2 --streaming=true 
> --manageResources=false --monitorJobs=true"
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3186) In-flight data loss when restoring from savepoint

2018-02-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3186:
--

Assignee: Dawid Wysakowicz  (was: Aljoscha Krettek)

> In-flight data loss when restoring from savepoint
> -
>
> Key: BEAM-3186
> URL: https://issues.apache.org/jira/browse/BEAM-3186
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Pawel Bartoszek
>Assignee: Dawid Wysakowicz
>Priority: Major
> Attachments: restore_no_trigger.png, restore_with_trigger.png, 
> restore_with_trigger_b.png
>
>
> *The context:*
> I want to count how many events of given type(A,B, etc) I receive every 
> minute using 1 minute windows and AfterWatermark trigger with allowed 
> lateness 1 min.
> *Data loss case*
> In the case below if there is at least one A element with the event time 
> belonging to the window 14:00-14:01 read from Kinesis stream after job is 
> restored from savepoint the data loss will not be observed for this key and 
> this window.
> !restore_no_trigger.png!
> *Not data loss case*
> However, if no new A element element is read from Kinesis stream than data 
> loss is observable.
> !restore_with_trigger.png!
> *Workaround*
> As a workaround we could configure early firings every X seconds which gives 
> up to X seconds data loss per key on restore.
> *My guess where the issue might be*
> I believe this is Beam-Flink integration layer bug. From my investigation I 
> don't think it's KinesisReader and possibility that it couldn't advance 
> watermark. To prove that after I restore from savepoint I sent some records 
> for different key (B) for the same window as shown in the 
> pictures(14:00-14:01) without seeing trigger going off for restored window 
> and key A.
> My guess is that Beam after job is restored doesn't register flink event time 
> timer for restored window unless there is a new element (key) coming for the 
> restored window.
> Please refer to [this 
> gist|https://gist.github.com/pbartoszek/7ab88c8b6538039db1b383358d1d1b5a] for 
> test job that shows this behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-02-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348542#comment-16348542
 ] 

Aljoscha Krettek commented on BEAM-3414:


Still working on getting that PR in, yes.

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Aljoscha Krettek
>Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3087) Extend lock scope in Flink UnboundedSourceWrapper

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341059#comment-16341059
 ] 

Aljoscha Krettek commented on BEAM-3087:


There was one user that had a problem because of this but I didn't hear about 
it recently, so I think it's ok to push it back.

> Extend lock scope in Flink UnboundedSourceWrapper
> -
>
> Key: BEAM-3087
> URL: https://issues.apache.org/jira/browse/BEAM-3087
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Priority: Critical
> Fix For: 2.3.0
>
>
> In {{UnboundedSourceWrapper}} the lock scope is not big enough: we 
> synchronise in {{emitElement()}} but should instead synchronise inside the 
> reader loop in {{run()}} because the {{Source}} interface does not allow 
> concurrent calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341057#comment-16341057
 ] 

Aljoscha Krettek edited comment on BEAM-3494 at 1/26/18 1:32 PM:
-

How are you enabling checkpointing? Also, could you please reformat your 
posting to put the code inside code tags to make it more readable?


was (Author: aljoscha):
How are you enabling checkpointing? Also, could you please reformat your 
posting to put the code between code tags, like this
{code:java}
{code}
your code ...
{code}{code}
to make it more readable?

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341057#comment-16341057
 ] 

Aljoscha Krettek commented on BEAM-3494:


How are you enabling checkpointing? Also, could you please reformat your 
posting to put the code between code tags, like this
{code:java}
{code}
your code ...
{code}{code}
to make it more readable?

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Assigned] (BEAM-3472) Create a callback triggered at the end of a batch in flink runner

2018-01-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3472:
--

Assignee: (was: Aljoscha Krettek)

> Create a callback triggered at the end of a batch in flink runner
> -
>
> Key: BEAM-3472
> URL: https://issues.apache.org/jira/browse/BEAM-3472
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Etienne Chauchot
>Priority: Major
>
> In the future we might add new features to the runners for which we might 
> need to do some processing at the end of a batch. Currently there is not 
> unique place (a callback) to add this processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-01-26 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3494:
--

Assignee: (was: Aljoscha Krettek)

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3529) Side Input callbacks should fire after a window has expired in addition to when the trigger would fire

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341044#comment-16341044
 ] 

Aljoscha Krettek commented on BEAM-3529:


There should probably also be {{ValidatesRunner}} tests about this behaviour to 
ensure that it is uniform across runners, right?

> Side Input callbacks should fire after a window has expired in addition to 
> when the trigger would fire
> --
>
> Key: BEAM-3529
> URL: https://issues.apache.org/jira/browse/BEAM-3529
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>
> Otherwise a streaming pipeline where elements trying to access a window that 
> has expired will not progress, even though we can be certain that there will 
> never be inputs in that window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3370) Add ability to stage directories with compiled classes to Flink

2018-01-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3370:
--

Assignee: (was: Aljoscha Krettek)

> Add ability to stage directories with compiled classes to Flink
> ---
>
> Key: BEAM-3370
> URL: https://issues.apache.org/jira/browse/BEAM-3370
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Łukasz Gajowy
>Priority: Minor
>
> Currently, when _filesToStage_ contain a path to directory with resources, 
> flink runner throws a {{"java.io.FileNotFoundException:  (Is 
> a directory)"}}. A way to include directory resources would be helpful. 
> This "blocker" occurs while trying to run IOITs on flink runner, which 
> basically makes it impossible/very inconvenient to run. When the tests are 
> run via "mvn verify" command, a "test-classes" *directory* gets detected by 
> detectClasspathResourcesToStage() method which in turn causes the above 
> error. 
> One way to solve this issue is to package the directories to jars with unique 
> names and update the paths accordingly before staging the files on flink. 
> Something similar is already done in the Dataflow runner 
> ([GcsStager|https://github.com/apache/beam/blob/cd186a531aaff0b21cf009b034e1a41f7e7b64af/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java#L74]),
>  more specifically in 
> [PackageUtil|https://github.com/apache/beam/blob/cd186a531aaff0b21cf009b034e1a41f7e7b64af/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java#L280]
>  class. We are able to run the tests on dataflow thanks to that. 
> As I checked in a [small experiment of 
> mine|https://github.com/lgajowy/beam/commits/spark-and-flink-run-tests], 
> providing analogous change makes it possible to run the tests on a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3359) Unable to change "flinkMaster" from "[auto]" in TestFlinkRunner

2018-01-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3359:
--

Assignee: (was: Aljoscha Krettek)

> Unable to change "flinkMaster" from "[auto]" in TestFlinkRunner
> ---
>
> Key: BEAM-3359
> URL: https://issues.apache.org/jira/browse/BEAM-3359
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Łukasz Gajowy
>Priority: Minor
>
> In TestFlinkRunner's constructor there is a line like this:
> {{options.setFlinkMaster("\[auto\]");}}
> which basically ignores any "flinkMaster" provided earlier (eg. using command 
> line) leading to  errors that are hard to find (for example wondering: "i 
> provided good url in pipeline options... why is it not connecting to my 
> cluster?). 
> Setting a {{@Default.String("\[auto\]")}} in FlinkPipelineOptions could be 
> one solution I guess. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3379) PAssert not successful when running IOITs with Flink runner

2018-01-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313338#comment-16313338
 ] 

Aljoscha Krettek commented on BEAM-3379:


Could you check whether this PR fixes it? 
https://github.com/apache/beam/pull/4348

I'm assuming the File IOs use SDF and this PR fixes problems around that that 
have to do with processing-time timers being dropped.

> PAssert not successful when running IOITs with Flink runner
> ---
>
> Key: BEAM-3379
> URL: https://issues.apache.org/jira/browse/BEAM-3379
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Łukasz Gajowy
>Assignee: Aljoscha Krettek
>
> When running IOIT tests on Flink either with command:
> {{mvn clean verify \-pl sdks/java/io/file-based-io-tests/ \-Pio-it 
> \-DintegrationTestPipelineOptions='\["\-\-runner=FlinkRunner", 
> "\-\-flinkMaster=localhost:6123", "\-\-numberOfRecords=10", 
> "\-\-filenamePrefix=TEST", 
> "\-\-filesToStage=/Users/lukasz/Projects/apache-beam/beam/sdks/java/io/file-based-io-tests/target/beam-sdks-java-io-file-based-io-tests-2.3.0-SNAPSHOT-shaded.jar,/Users/lukasz/Projects/apache-beam/beam/sdks/java/io/file-based-io-tests/target/beam-sdks-java-io-file-based-io-tests-2.3.0-SNAPSHOT-tests.jar"\]'
>  -Dit.test=TextIOIT -Pflink-runner}}
> or without the filesToStage argument but using modifications described in 
> BEAM-3370, we get the following error, even though the assertion is succesful 
> (checked manually):
> java.lang.AssertionError:
> Expected 1 successful assertions, but found 0.
> Expected: is <1L>
>  but: was <0L>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.beam.sdk.testing.TestPipeline.verifyPAssertsSucceeded(TestPipeline.java:541)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:359)
>   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:340)
>   at 
> org.apache.beam.sdk.io.text.TextIOIT.writeThenReadAll(TextIOIT.java:109)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:329)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at 
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-01-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313329#comment-16313329
 ] 

Aljoscha Krettek commented on BEAM-3414:


I openen a PR for a related bug and I think this issue should also be fixed by 
the change: https://github.com/apache/beam/pull/4348

The reason for the issue here is that processing-time timers don't block 
shutdown of a pipeline.

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Kenneth Knowles
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3043) Set user-specified Transform names on Flink operations

2018-01-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3043:
--

Assignee: (was: Aljoscha Krettek)

> Set user-specified Transform names on Flink operations
> --
>
> Key: BEAM-3043
> URL: https://issues.apache.org/jira/browse/BEAM-3043
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>
> Currently, we don't always set a name on the generated operations or we set 
> the wrong name. For example, in the batch translation we set the result of 
> {{PTransform.getName()}} as the name, which is only the name of the 
> {{PTransform}} itself, not the name that the user specified when creating a 
> Pipeline.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2873) Detect number of shards for file sink in Flink Streaming Runner

2018-01-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-2873:
---
Summary: Detect number of shards for file sink in Flink Streaming Runner  
(was: Detect number of sharsd for file sink in Flink Streaming Runner)

> Detect number of shards for file sink in Flink Streaming Runner
> ---
>
> Key: BEAM-2873
> URL: https://issues.apache.org/jira/browse/BEAM-2873
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>
> [~reuvenlax] mentioned that this is done for the Dataflow Runner and the 
> default behaviour on Flink can be somewhat surprising for users.
> ML entry: https://www.mail-archive.com/dev@beam.apache.org/msg02665.html:
> This is how the file sink has always worked in Beam. If no sharding is 
> specified, then this means runner-determined sharding, and by default that is 
> one file per bundle. If Flink has small bundles, then I suggest using the 
> withNumShards method to explicitly pick the number of output shards.
> The Flink runner can detect that runner-determined sharding has been chosen, 
> and override it with a specific number of shards. For example, the Dataflow 
> streaming runner (which as you mentioned also has small bundles) detects this 
> case and sets the number of out files shards based on the number of workers 
> in the worker pool 
> [Here|https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354]
>  is the code that does this; it should be quite simple to do something 
> similar for Flink, and then there will be no need for users to explicitly 
> call withNumShards themselves.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2873) Detect number of sharsd for file sink in Flink Streaming Runner

2018-01-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-2873:
--

Assignee: (was: Aljoscha Krettek)

> Detect number of sharsd for file sink in Flink Streaming Runner
> ---
>
> Key: BEAM-2873
> URL: https://issues.apache.org/jira/browse/BEAM-2873
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>
> [~reuvenlax] mentioned that this is done for the Dataflow Runner and the 
> default behaviour on Flink can be somewhat surprising for users.
> ML entry: https://www.mail-archive.com/dev@beam.apache.org/msg02665.html:
> This is how the file sink has always worked in Beam. If no sharding is 
> specified, then this means runner-determined sharding, and by default that is 
> one file per bundle. If Flink has small bundles, then I suggest using the 
> withNumShards method to explicitly pick the number of output shards.
> The Flink runner can detect that runner-determined sharding has been chosen, 
> and override it with a specific number of shards. For example, the Dataflow 
> streaming runner (which as you mentioned also has small bundles) detects this 
> case and sets the number of out files shards based on the number of workers 
> in the worker pool 
> [Here|https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354]
>  is the code that does this; it should be quite simple to do something 
> similar for Flink, and then there will be no need for users to explicitly 
> call withNumShards themselves.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2377) Cross compile flink runner to scala 2.11

2018-01-04 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2377.
--
   Resolution: Invalid
Fix Version/s: (was: 2.3.0)
   Not applicable

Now that we always build against Flink_2.11 this becomes obsolete.

> Cross compile flink runner to scala 2.11
> 
>
> Key: BEAM-2377
> URL: https://issues.apache.org/jira/browse/BEAM-2377
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ole Langbehn
>Assignee: Aljoscha Krettek
> Fix For: Not applicable
>
>
> The flink runner is compiled for flink built against scala 2.10. flink cross 
> compiles its scala artifacts against 2.10 and 2.11.
> In order to make it possible to use beam with the flink runner in scala 2.11 
> projects, it would be nice if you could publish the flink runner for 2.11 
> next to 2.10.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1601) Flink Runner support for PerfKit Benchmarker

2017-11-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240093#comment-16240093
 ] 

Aljoscha Krettek commented on BEAM-1601:


I'm not aware of any work on this, no.

> Flink Runner support for PerfKit Benchmarker 
> -
>
> Key: BEAM-1601
> URL: https://issues.apache.org/jira/browse/BEAM-1601
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink, testing
>Reporter: Jason Kuster
>Assignee: Aljoscha Krettek
>
> See 
> https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73#heading=h.exn0s6jsm24q
>  for more details on what this entails. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3087) Extend lock scope in Flink UnboundedSourceWrapper

2017-10-21 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3087:
--

 Summary: Extend lock scope in Flink UnboundedSourceWrapper
 Key: BEAM-3087
 URL: https://issues.apache.org/jira/browse/BEAM-3087
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 2.3.0


In {{UnboundedSourceWrapper}} the lock scope is not big enough: we synchronise 
in {{emitElement()}} but should instead synchronise inside the reader loop in 
{{run()}} because the {{Source}} interface does not allow concurrent calls.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3087) Extend lock scope in Flink UnboundedSourceWrapper

2017-10-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-3087:
--

Assignee: (was: Aljoscha Krettek)

> Extend lock scope in Flink UnboundedSourceWrapper
> -
>
> Key: BEAM-3087
> URL: https://issues.apache.org/jira/browse/BEAM-3087
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
> Fix For: 2.3.0
>
>
> In {{UnboundedSourceWrapper}} the lock scope is not big enough: we 
> synchronise in {{emitElement()}} but should instead synchronise inside the 
> reader loop in {{run()}} because the {{Source}} interface does not allow 
> concurrent calls.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3043) Set user-specified Transform names on Flink operations

2017-10-11 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3043:
--

 Summary: Set user-specified Transform names on Flink operations
 Key: BEAM-3043
 URL: https://issues.apache.org/jira/browse/BEAM-3043
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


Currently, we don't always set a name on the generated operations or we set the 
wrong name. For example, in the batch translation we set the result of 
{{PTransform.getName()}} as the name, which is only the name of the 
{{PTransform}} itself, not the name that the user specified when creating a 
Pipeline.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-3027) Output type of SourceID-stripper is not correctly set

2017-10-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-3027.
--
Resolution: Fixed

> Output type of SourceID-stripper is not correctly set
> -
>
> Key: BEAM-3027
> URL: https://issues.apache.org/jira/browse/BEAM-3027
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 2.2.0
>
>
> This code: 
> https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L193
> Does not set the output type of the flatMap to {{outputTypeInfo}}, which is 
> the correct {{TypeInfo}} that uses the Beam coder. This means that Flink will 
> use Kryo as a fallback, which can be problematic in cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3027) Output type of SourceID-stripper is not correctly set

2017-10-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-3027:
--

 Summary: Output type of SourceID-stripper is not correctly set
 Key: BEAM-3027
 URL: https://issues.apache.org/jira/browse/BEAM-3027
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
Priority: Blocker
 Fix For: 2.2.0


This code: 
https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L193

Does not set the output type of the flatMap to {{outputTypeInfo}}, which is the 
correct {{TypeInfo}} that uses the Beam coder. This means that Flink will use 
Kryo as a fallback, which can be problematic in cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)

2017-09-28 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16184169#comment-16184169
 ] 

Aljoscha Krettek commented on BEAM-2995:


How are you starting your cluster? There was also some discussion about this on 
BEAM-2457.

> can't read/write hdfs in Flink CLUSTER(Standalone)
> --
>
> Key: BEAM-2995
> URL: https://issues.apache.org/jira/browse/BEAM-2995
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.2.0
>Reporter: huangjianhuang
>Assignee: Aljoscha Krettek
>
> i just write a simple demo like:
> {code:java}
> Configuration conf = new Configuration();
> conf.set("fs.default.name", "hdfs://localhost:9000");
> //other codes
> p.apply("ReadLines", 
> TextIO.read().from("hdfs://localhost:9000/tmp/words"))
> 
> .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout"));
> {code}
> it works in flink local model with cmd:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar"
> {code}
> but not works in CLUSTER mode:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar 
> --flinkMaster=localhost:6123 "
> {code}
> it seems the flink cluster regard the hdfs as local file system. 
> The input log from flink-jobmanger.log is:
> {code:java}
> 2017-09-27 20:17:37,962 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Successfully ran initialization on master in 136 ms.
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 
> matched 0 files with total size 0{color}
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - Splitting filepattern hdfs://localhost:9000/tmp/words2 into 
> bundles of size 0 took 0 ms and produced 0 files a
> nd 0 bundles
> {code}
> The output  error message is :
> {code:java}
> Caused by: java.lang.ClassCastException: 
> {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to 
> org.apache.beam.sdk.io.LocalResourceId{color}
> at 
> org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77)
> at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256)
> at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
> at 
> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922)
> at 
> org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884)
> at 
> org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909)
> at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110)
> at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858)
> {code}
> can somebody help me, i've try all the way just can't work it out [cry]
> https://issues.apache.org/jira/browse/BEAM-2457



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2975) Results of ReadableState.read() should be snapshots of the underlying state

2017-09-27 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182151#comment-16182151
 ] 

Aljoscha Krettek commented on BEAM-2975:


These changes also broke the PostCommit tests for the Flink Runner: 
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/org.apache.beam$beam-runners-flink_2.10/3936/#showFailuresLink

It seems there is still need for some discussion so what should we do? Revert 
the changes for now or fix the Flink Runner to work with the new semantics to 
get the signal back?

> Results of ReadableState.read() should be snapshots of the underlying state
> ---
>
> Key: BEAM-2975
> URL: https://issues.apache.org/jira/browse/BEAM-2975
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Daniel Mills
>Assignee: Daniel Mills
>Priority: Minor
>
> Future modification of state should not be reflected in previous calls to 
> read().  For example:
> @StateId("tag") BagState state;
> Iterable ints = state.read();
> state.add(17);
> // ints should still be empty here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2980) BagState.isEmpty needs a tighter spec

2017-09-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176119#comment-16176119
 ] 

Aljoscha Krettek commented on BEAM-2980:


I think this is a more specific version of BEAM-2975.

> BagState.isEmpty needs a tighter spec
> -
>
> Key: BEAM-2980
> URL: https://issues.apache.org/jira/browse/BEAM-2980
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> Consider the following:
> {code}
> BagState myBag = // empty
> ReadableState isMyBagEmpty = myBag.isEmpty();
> myBag.add(bizzle);
> bool empty = isMyBagEmpty.read();
> {code}
> Should {{empty}} be true or false? We need a consistent answer, across all 
> kinds of state, when snapshots are required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (BEAM-2377) Cross compile flink runner to scala 2.11

2017-09-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened BEAM-2377:


Commit was reverted.

> Cross compile flink runner to scala 2.11
> 
>
> Key: BEAM-2377
> URL: https://issues.apache.org/jira/browse/BEAM-2377
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ole Langbehn
>Assignee: Aljoscha Krettek
> Fix For: 2.2.0
>
>
> The flink runner is compiled for flink built against scala 2.10. flink cross 
> compiles its scala artifacts against 2.10 and 2.11.
> In order to make it possible to use beam with the flink runner in scala 2.11 
> projects, it would be nice if you could publish the flink runner for 2.11 
> next to 2.10.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2973) Jenkins PreCommit broken due to missing dependency

2017-09-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2973.
--
   Resolution: Fixed
Fix Version/s: Not applicable

This was fixed in the revert commit e1548435c45b1e4b349f55df1e37e1b6de8fc500.

Sorry for the inconvenience (I merged the PR that caused the breakage).

> Jenkins PreCommit broken due to missing dependency
> --
>
> Key: BEAM-2973
> URL: https://issues.apache.org/jira/browse/BEAM-2973
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Blocker
> Fix For: Not applicable
>
>
> Jenkins seems to be failing nearly immediately this morning for multiple 
> builds. The error is:
> ERROR: Failed to parse POMs
> java.io.IOException: remote file operation failed: 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall at 
> hudson.remoting.Channel@7b1b4155:beam7: hudson.remoting.ProxyException: 
> hudson.maven.MavenModuleSetBuild$MavenExecutionException: 
> org.apache.maven.project.ProjectBuildingException: Some problems were 
> encountered while processing the POMs:
> [ERROR] 'dependencies.dependency.version' for 
> org.apache.beam:beam-runners-flink_2.10:jar is missing. @ line 68, column 21
> [WARNING] 'artifactId' contains an expression but should be a constant. @ 
> org.apache.beam:beam-runners-flink_${flink.scala.version}:2.2.0-SNAPSHOT, 
> /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall/runners/flink/pom.xml,
>  line 29, column 15
> https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/14497/console
> I suspect this commit is causing the issue:
> https://github.com/apache/beam/commit/ab975317e1aa532053b68ccc105e13afff0c0b1a



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2377) Cross compile flink runner to scala 2.11

2017-09-20 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2377.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

Implemented in ab975317e1aa532053b68ccc105e13afff0c0b1a

> Cross compile flink runner to scala 2.11
> 
>
> Key: BEAM-2377
> URL: https://issues.apache.org/jira/browse/BEAM-2377
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ole Langbehn
>Assignee: Aljoscha Krettek
> Fix For: 2.2.0
>
>
> The flink runner is compiled for flink built against scala 2.10. flink cross 
> compiles its scala artifacts against 2.10 and 2.11.
> In order to make it possible to use beam with the flink runner in scala 2.11 
> projects, it would be nice if you could publish the flink runner for 2.11 
> next to 2.10.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2948) Unable to find registrar when restoring flink job from savepoint

2017-09-19 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2948.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

Fixed in 14ea5abe3c3900e5cb423a8580fbdcc6e28fe376

> Unable to find registrar when restoring flink job from savepoint
> 
>
> Key: BEAM-2948
> URL: https://issues.apache.org/jira/browse/BEAM-2948
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
> Environment: Flink v1.2.1 job on a EMR cluster using Beam v2.0.0
>Reporter: Luke Cwik
>Assignee: Aljoscha Krettek
>Priority: Minor
> Fix For: 2.2.0
>
>
> Reported: 
> https://lists.apache.org/thread.html/d113707e84d7562e6d0d891830a8d85d76a497435105fe3ed1e06e13@%3Cdev.beam.apache.org%3E
> When I try to restore job from savepoint on one task manager I get the
> exception *Unable to find registrar for s3n*.
> The job can write files to s3 acting as a sink. So S3 access works except
> when restoring from savepoint.
> I am passing the following configuration as the  pipeline
> HadoopFileSystemOptions
> options:
> Configuration configuration = new Configuration();
> configuration.set("fs.defaultFS", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.default.name", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.s3.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
> configuration.set("fs.s3n.awsAccessKeyId",
> jobOptions.getAwsAccessKey());
> configuration.set("fs.s3n.awsSecretAccessKey",
> jobOptions.getAwsSecretKey());
> From my investigation it looks like Beam FileSystems.
> setDefaultPipelineOptions method is not called before
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder relay on initialised
> FileSystems.SCHEME_TO_FILESYSTEM map
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Unable to find registrar for s3n
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1059)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1020)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
> ... 6 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-09-18 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16170437#comment-16170437
 ] 

Aljoscha Krettek commented on BEAM-2140:


As I commented on the PR, I have a solution that is simpler but also doesn't 
work because there is currently no way of getting around the fact that Flink 
will shutdown the timer service before calling close() on an operator. Meaning 
that there is no way of blocking on "in-flight" timers. I'll try and get such a 
feature in Flink, only then can we fix this issue.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-1688) Getting listener timeout after waiting for [10000] ms from ElasticsearchIO

2017-09-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-1688.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

> Getting listener timeout after waiting for [1] ms from ElasticsearchIO
> --
>
> Key: BEAM-1688
> URL: https://issues.apache.org/jira/browse/BEAM-1688
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.5.0
> Environment: Linux,  ElasticSearch 5.2.1
>Reporter: Sathish Jayaraman
>Assignee: Etienne Chauchot
>Priority: Minor
> Fix For: 2.2.0
>
>
> I am trying to test a simple Apache Beam code with source as Elasticsearch. I 
> modified Beam's MinimalWordCount example to include source as Elasticsearch 
> instead of TextIO. 
> When I run the code,  I get below error, 
> An exception occured while executing the Java class. null: 
> InvocationTargetException: java.io.IOException: listener timeout after 
> waiting for [1] ms
> I debugged ElasticsearchIO.java and could see everything works fine, the 
> Elasicsearch client is built & data from index is being retrieved by the 
> code. But ParDo function after read transform does not get executed at all. 
> The Elasticsearch client keeps waiting and finally gets a timeout error.
> Any idea about the issue? 
> Have raised issue here too 
> http://stackoverflow.com/questions/42720857/getting-listener-timeout-after-waiting-for-1-ms-from-elasticsearchio



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2712) SerializablePipelineOptions should not call FileSystems.setDefaultPipelineOptions.

2017-09-15 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167598#comment-16167598
 ] 

Aljoscha Krettek commented on BEAM-2712:


[~jkff] See my comment on BEAM-2948, I think for Flink we can resolve this by 
calling it in the {{setup()}} method of the stream operator because this is 
invoked before any user code/state is touched.

> SerializablePipelineOptions should not call 
> FileSystems.setDefaultPipelineOptions.
> --
>
> Key: BEAM-2712
> URL: https://issues.apache.org/jira/browse/BEAM-2712
> Project: Beam
>  Issue Type: Bug
>  Components: runner-apex, runner-core, runner-flink, runner-spark
>Reporter: Eugene Kirpichov
>
> https://github.com/apache/beam/pull/3654 introduces 
> SerializablePipelineOptions, which on deserialization calls 
> FileSystems.setDefaultPipelineOptions.
> This is obviously problematic and racy in case the same process uses 
> SerializablePipelineOptions with different filesystem-related options in them.
> The reason the PR does this is, Flink and Apex runners were already doing it 
> in their respective SerializablePipelineOptions-like classes (being removed 
> in the PR); and Spark wasn't but probably should have.
> I believe this is done for the sake of having the proper filesystem options 
> automatically available on workers in all places where any kind of 
> PipelineOptions are used. Instead, all 3 runners should pick a better place 
> to initialize their workers, and explicitly call 
> FileSystems.setDefaultPipelineOptions there.
> It would be even better if FileSystems.setDefaultPipelineOptions didn't exist 
> at all, but that's a topic for a separate JIRA.
> CC'ing runner contributors [~aljoscha] [~aviemzur] [~thw]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2948) Unable to find registrar when restoring flink job from savepoint

2017-09-15 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167532#comment-16167532
 ] 

Aljoscha Krettek commented on BEAM-2948:


Yes, that's true but we probably have to wait a few more versions until that 
lands. In the mean time, I think we can solve this issue by initialising the 
{{FileSystems}} earlier, in {{DoFnOperator.setup()}} which is called before 
state is eagerly deserialised.

> Unable to find registrar when restoring flink job from savepoint
> 
>
> Key: BEAM-2948
> URL: https://issues.apache.org/jira/browse/BEAM-2948
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
> Environment: Flink v1.2.1 job on a EMR cluster using Beam v2.0.0
>Reporter: Luke Cwik
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> Reported: 
> https://lists.apache.org/thread.html/d113707e84d7562e6d0d891830a8d85d76a497435105fe3ed1e06e13@%3Cdev.beam.apache.org%3E
> When I try to restore job from savepoint on one task manager I get the
> exception *Unable to find registrar for s3n*.
> The job can write files to s3 acting as a sink. So S3 access works except
> when restoring from savepoint.
> I am passing the following configuration as the  pipeline
> HadoopFileSystemOptions
> options:
> Configuration configuration = new Configuration();
> configuration.set("fs.defaultFS", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.default.name", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.s3.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
> configuration.set("fs.s3n.awsAccessKeyId",
> jobOptions.getAwsAccessKey());
> configuration.set("fs.s3n.awsSecretAccessKey",
> jobOptions.getAwsSecretKey());
> From my investigation it looks like Beam FileSystems.
> setDefaultPipelineOptions method is not called before
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder relay on initialised
> FileSystems.SCHEME_TO_FILESYSTEM map
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Unable to find registrar for s3n
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1059)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1020)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
> ... 6 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2807) NullPointerException during checkpoint on FlinkRunner

2017-09-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed BEAM-2807.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

Fixed in 81d0c436691d33a7ea1bd808413e1aa08e1c4d10

> NullPointerException during checkpoint on FlinkRunner
> -
>
> Key: BEAM-2807
> URL: https://issues.apache.org/jira/browse/BEAM-2807
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
>Reporter: Daniel Harper
>Assignee: Daniel Harper
>Priority: Blocker
> Fix For: 2.2.0
>
>
> *Beam version:* 2.1.0
> *Runner:* FlinkRunner
> We're seeing the following exception when checkpointing, which is causing our 
> job to restart
> {code}
> 2017-08-25 09:42:17,658 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) 
> -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32) (f00a31b722a31030f18d83ac613de21d) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32).}
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:966)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
> state future.
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
> ... 5 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
> ... 7 more
> Caused by: java.lang.NullPointerException
> at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
> at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
> at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
> at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
> at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:229)
> at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:151)
> at 
> org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoWriterV3.writeStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:107)
> at 
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:104)
> at 
> 

[jira] [Commented] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166276#comment-16166276
 ] 

Aljoscha Krettek commented on BEAM-2943:


Could you verify that 
{{org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}} is in 
the jar file? I just want to rule out the easy cases but it could still be a 
more involved problem.

> Beam Flink deployment results in ClassNotFoundException
> ---
>
> Key: BEAM-2943
> URL: https://issues.apache.org/jira/browse/BEAM-2943
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
> Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 
> (2017-08-06) x86_64 GNU/Linux
>Reporter: Guenther Grill
>Assignee: Aljoscha Krettek
>  Labels: flink
>
> Hi,
> I followed the guide https://beam.apache.org/documentation/runners/flink/ to 
> run beam program within a flink cluster. 
> The output of the dependency-command is:
> {code}
> mvn dependency:tree -Pflink-runner |grep flink
>   
> [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
> [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
> [INFO]|  \- org.apache.flink:force-shading:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
> {code}
> Then I started the flink cluster with the correct version with docker-compose
> {code}
> export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
> docker-compose up -d
> {code}
> The compose file looks like this:
> {code}
> version: '3.3'
> services:
>   jobmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6123"
> ports:
>   - "6123:6123"
>   - "8081:8081"
> volumes:
>   - /tmp:/tmp
> command: jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6121"
>   - "6122"
> depends_on:
>   - jobmanager
> command: taskmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> {code}
> The flink cluster works, but when I execute 
> {code}
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
> -Pflink-runner \
> -Dexec.args="--runner=FlinkRunner \
>   --inputFile=pom.xml \
>   --output=/path/to/counts \
>   --flinkMaster=[HOST_IP]:6123 \
>   --filesToStage=target/word-count-beam-bundled-0.1.jar"
> {code}
> I get:
> {code}
> 2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Submitting job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199).
> 2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Using restart strategy NoRestartStrategy for 
> a913f922506053e65e732eeb8336b3bd.
> 2017-09-12 06:39:57,227 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
> via failover strategy: full graph restart
> 2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Running initialization on master for job 
> wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
> 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager  
>   - Failed to submit job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199)
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at Read(CompressedSource) 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': 
> Deserializing the InputFormat 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
>  failed: Could not read the user code wrapper: 
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
>   at 
> 

[jira] [Commented] (BEAM-2948) Unable to find registrar when restoring flink job from savepoint

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166124#comment-16166124
 ] 

Aljoscha Krettek commented on BEAM-2948:


I think the problem is that Beam will only "instantiate" the Hadoop options 
that are passed via pipeline options at a point when it is too late (This is 
done in {{FileSystem.setDefaultPipelineOptions()}}).

The {{FileBasedSink}} uses a {{GroupByKey}} internally, which requires storing 
a {{FileResult}} in state. Decoding this (using {{FileResultCoder}}) will try 
and resolve the Hadoop FileSystem. When using the Heap state backend (or 
{{FsStateBackend}}) in Flink, state is eagerly deserialised when the 
pipeline/operator is started. 

I see some possible solutions:
 1. Use the {{RocksDBStateBackend}} which deserialises state lazily so decoding 
should happen after the Hadoop options have been instantiated
 2. We have to change {{FileResultCoder}} to not try and eagerly get a Hadoop 
Filesystem but instead do this lazily when required.

> Unable to find registrar when restoring flink job from savepoint
> 
>
> Key: BEAM-2948
> URL: https://issues.apache.org/jira/browse/BEAM-2948
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
> Environment: Flink v1.2.1 job on a EMR cluster using Beam v2.0.0
>Reporter: Luke Cwik
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> Reported: 
> https://lists.apache.org/thread.html/d113707e84d7562e6d0d891830a8d85d76a497435105fe3ed1e06e13@%3Cdev.beam.apache.org%3E
> When I try to restore job from savepoint on one task manager I get the
> exception *Unable to find registrar for s3n*.
> The job can write files to s3 acting as a sink. So S3 access works except
> when restoring from savepoint.
> I am passing the following configuration as the  pipeline
> HadoopFileSystemOptions
> options:
> Configuration configuration = new Configuration();
> configuration.set("fs.defaultFS", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.default.name", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.s3.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
> configuration.set("fs.s3n.awsAccessKeyId",
> jobOptions.getAwsAccessKey());
> configuration.set("fs.s3n.awsSecretAccessKey",
> jobOptions.getAwsSecretKey());
> From my investigation it looks like Beam FileSystems.
> setDefaultPipelineOptions method is not called before
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder relay on initialised
> FileSystems.SCHEME_TO_FILESYSTEM map
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Unable to find registrar for s3n
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1059)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1020)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
> ... 6 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2873) Detect number of sharsd for file sink in Flink Streaming Runner

2017-09-10 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-2873:
--

 Summary: Detect number of sharsd for file sink in Flink Streaming 
Runner
 Key: BEAM-2873
 URL: https://issues.apache.org/jira/browse/BEAM-2873
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


[~reuvenlax] mentioned that this is done for the Dataflow Runner and the 
default behaviour on Flink can be somewhat surprising for users.

ML entry: https://www.mail-archive.com/dev@beam.apache.org/msg02665.html:

This is how the file sink has always worked in Beam. If no sharding is 
specified, then this means runner-determined sharding, and by default that is 
one file per bundle. If Flink has small bundles, then I suggest using the 
withNumShards method to explicitly pick the number of output shards.

The Flink runner can detect that runner-determined sharding has been chosen, 
and override it with a specific number of shards. For example, the Dataflow 
streaming runner (which as you mentioned also has small bundles) detects this 
case and sets the number of out files shards based on the number of workers in 
the worker pool 
[Here|https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354]
 is the code that does this; it should be quite simple to do something similar 
for Flink, and then there will be no need for users to explicitly call 
withNumShards themselves.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover

2017-09-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16150656#comment-16150656
 ] 

Aljoscha Krettek commented on BEAM-2831:


[~kenn] Do you think we can change {{SerializableCoder}} to pass through the 
exception instead of wrapping it in a {{CoderException}}? Not sure what 
implications this could have for other runners.

> Possible bug in Beam+Flink memory management, disk spillover
> 
>
> Key: BEAM-2831
> URL: https://issues.apache.org/jira/browse/BEAM-2831
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
> Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 
> 10.12.6 and unknown Linux
>Reporter: Reinier Kip
>Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
> the heap memory configuration of the jobmanager and taskmanager, I may run 
> into an EOFException, which causes the job to fail.
> As [discussed on Flink's 
> mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html]
>  (stacktrace enclosed), Flink catches these EOFExceptions and activates disk 
> spillover. Because Beam wraps these exceptions, this mechanism fails, the 
> exception travels up the stack, and the job aborts.
> Hopefully this is enough information and this is something that can be 
> adjusted for in Beam. I'd be glad to provide more information where needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover

2017-08-31 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148985#comment-16148985
 ] 

Aljoscha Krettek edited comment on BEAM-2831 at 8/31/17 1:41 PM:
-

Could you try running it with this modified {{SerializableCoder}}:
{code}
public class SerializableCoder extends CustomCoder {

  /**
   * Returns a {@link SerializableCoder} instance for the provided element type.
   * @param  the element type
   */
  public static  SerializableCoder 
of(TypeDescriptor type) {
@SuppressWarnings("unchecked")
Class clazz = (Class) type.getRawType();
return new SerializableCoder<>(clazz, type);
  }

  /**
   * Returns a {@link SerializableCoder} instance for the provided element 
class.
   * @param  the element type
   */
  public static  SerializableCoder of(Class 
clazz) {
return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
  }

  /**
   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} 
if possible for
   * all types.
   *
   * This method is invoked reflectively from {@link DefaultCoder}.
   */
  @SuppressWarnings("unused")
  public static CoderProvider getCoderProvider() {
return new SerializableCoderProvider();
  }

  /**
   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} 
which can handle
   * serializable types.
   */
  public static class SerializableCoderProviderRegistrar implements 
CoderProviderRegistrar {

@Override
public List getCoderProviders() {
  return ImmutableList.of(getCoderProvider());
}
  }

  /**
   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for 
any class that
   * implements serializable.
   */
  static class SerializableCoderProvider extends CoderProvider {
@Override
public  Coder coderFor(TypeDescriptor typeDescriptor,
List> componentCoders) throws 
CannotProvideCoderException {
  if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
return SerializableCoder.of((TypeDescriptor) typeDescriptor);
  }
  throw new CannotProvideCoderException(
  "Cannot provide SerializableCoder because " + typeDescriptor
  + " does not implement Serializable");
}
  }

  private final Class type;
  private transient TypeDescriptor typeDescriptor;

  protected SerializableCoder(Class type, TypeDescriptor typeDescriptor) {
this.type = type;
this.typeDescriptor = typeDescriptor;
  }

  public Class getRecordType() {
return type;
  }

  @Override
  public void encode(T value, OutputStream outStream)
  throws IOException {
ObjectOutputStream oos = new ObjectOutputStream(outStream);
oos.writeObject(value);
oos.flush();
  }

  @Override
  public T decode(InputStream inStream)
  throws IOException, CoderException {
try {
  ObjectInputStream ois = new ObjectInputStream(inStream);
  return type.cast(ois.readObject());
} catch (ClassNotFoundException e) {
  throw new CoderException("unable to deserialize record", e);
}
  }

  /**
   * {@inheritDoc}
   *
   * @throws NonDeterministicException always. Java serialization is not
   * deterministic with respect to {@link Object#equals} for all types.
   */
  @Override
  public void verifyDeterministic() throws NonDeterministicException {
throw new NonDeterministicException(this,
"Java Serialization may be non-deterministic.");
  }

  @Override
  public boolean equals(Object other) {
return !(other == null || getClass() != other.getClass())
&& type == ((SerializableCoder) other).type;
  }

  @Override
  public int hashCode() {
return type.hashCode();
  }

  @Override
  public TypeDescriptor getEncodedTypeDescriptor() {
if (typeDescriptor == null) {
  typeDescriptor = TypeDescriptor.of(type);
}
return typeDescriptor;
  }

  // This coder inherits isRegisterByteSizeObserverCheap,
  // getEncodedElementByteSize and registerByteSizeObserver
  // from StructuredCoder. Looks like we cannot do much better
  // in this case.
}
{code}

The only change is in {{encode()}} where we don't wrap the {{EOFException}} 
anymore. I think this should fix the problem and if it indeed does we should 
include this change in Beam.


was (Author: aljoscha):
Could you try running it with this modified {{SerializableCoder}}:
{coder}
public class SerializableCoder extends CustomCoder {

  /**
   * Returns a {@link SerializableCoder} instance for the provided element type.
   * @param  the element type
   */
  public static  SerializableCoder 
of(TypeDescriptor type) {
@SuppressWarnings("unchecked")
Class clazz = (Class) type.getRawType();
return new SerializableCoder<>(clazz, type);
  }

  /**
   * Returns a {@link SerializableCoder} instance for the provided element 
class.
   * @param  the element type
   */
  public static  SerializableCoder of(Class 

[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover

2017-08-31 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148985#comment-16148985
 ] 

Aljoscha Krettek commented on BEAM-2831:


Could you try running it with this modified {{SerializableCoder}}:
{coder}
public class SerializableCoder extends CustomCoder {

  /**
   * Returns a {@link SerializableCoder} instance for the provided element type.
   * @param  the element type
   */
  public static  SerializableCoder 
of(TypeDescriptor type) {
@SuppressWarnings("unchecked")
Class clazz = (Class) type.getRawType();
return new SerializableCoder<>(clazz, type);
  }

  /**
   * Returns a {@link SerializableCoder} instance for the provided element 
class.
   * @param  the element type
   */
  public static  SerializableCoder of(Class 
clazz) {
return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
  }

  /**
   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} 
if possible for
   * all types.
   *
   * This method is invoked reflectively from {@link DefaultCoder}.
   */
  @SuppressWarnings("unused")
  public static CoderProvider getCoderProvider() {
return new SerializableCoderProvider();
  }

  /**
   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} 
which can handle
   * serializable types.
   */
  public static class SerializableCoderProviderRegistrar implements 
CoderProviderRegistrar {

@Override
public List getCoderProviders() {
  return ImmutableList.of(getCoderProvider());
}
  }

  /**
   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for 
any class that
   * implements serializable.
   */
  static class SerializableCoderProvider extends CoderProvider {
@Override
public  Coder coderFor(TypeDescriptor typeDescriptor,
List> componentCoders) throws 
CannotProvideCoderException {
  if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
return SerializableCoder.of((TypeDescriptor) typeDescriptor);
  }
  throw new CannotProvideCoderException(
  "Cannot provide SerializableCoder because " + typeDescriptor
  + " does not implement Serializable");
}
  }

  private final Class type;
  private transient TypeDescriptor typeDescriptor;

  protected SerializableCoder(Class type, TypeDescriptor typeDescriptor) {
this.type = type;
this.typeDescriptor = typeDescriptor;
  }

  public Class getRecordType() {
return type;
  }

  @Override
  public void encode(T value, OutputStream outStream)
  throws IOException {
ObjectOutputStream oos = new ObjectOutputStream(outStream);
oos.writeObject(value);
oos.flush();
  }

  @Override
  public T decode(InputStream inStream)
  throws IOException, CoderException {
try {
  ObjectInputStream ois = new ObjectInputStream(inStream);
  return type.cast(ois.readObject());
} catch (ClassNotFoundException e) {
  throw new CoderException("unable to deserialize record", e);
}
  }

  /**
   * {@inheritDoc}
   *
   * @throws NonDeterministicException always. Java serialization is not
   * deterministic with respect to {@link Object#equals} for all types.
   */
  @Override
  public void verifyDeterministic() throws NonDeterministicException {
throw new NonDeterministicException(this,
"Java Serialization may be non-deterministic.");
  }

  @Override
  public boolean equals(Object other) {
return !(other == null || getClass() != other.getClass())
&& type == ((SerializableCoder) other).type;
  }

  @Override
  public int hashCode() {
return type.hashCode();
  }

  @Override
  public TypeDescriptor getEncodedTypeDescriptor() {
if (typeDescriptor == null) {
  typeDescriptor = TypeDescriptor.of(type);
}
return typeDescriptor;
  }

  // This coder inherits isRegisterByteSizeObserverCheap,
  // getEncodedElementByteSize and registerByteSizeObserver
  // from StructuredCoder. Looks like we cannot do much better
  // in this case.
}
{code}

The only change is in {{encode()}} where we don't wrap the {{EOFException}} 
anymore. I think this should fix the problem and if it indeed does we should 
include this change in Beam.

> Possible bug in Beam+Flink memory management, disk spillover
> 
>
> Key: BEAM-2831
> URL: https://issues.apache.org/jira/browse/BEAM-2831
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.0.0, 2.1.0
> Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 
> 10.12.6 and unknown Linux
>Reporter: Reinier Kip
>Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
> the heap memory configuration of the jobmanager and 

[jira] [Commented] (BEAM-2457) Error: "Unable to find registrar for hdfs" - need to prevent/improve error message

2017-08-30 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146873#comment-16146873
 ] 

Aljoscha Krettek commented on BEAM-2457:


(This is on the Cloudera Quickstart VM)

I noticed that this doesn't work:
{code}
$ java -cp word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount 
--inputFile=hdfs:///user/aljoscha/wc-in --output=hdfs:///tmp/wc-out-beam 
--runner=DirectRunner
{code}

This also doesn't work:
{code}
$ export HADOOP_CONF_DIR=/etc/hadoop/conf
$ java -cp word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount 
--inputFile=hdfs:///user/aljoscha/wc-in --output=hdfs:///tmp/wc-out-beam 
--runner=DirectRunner
{code}

This one also doesn't work:
{code}
$ java -cp word-count-beam-bundled-0.1.jar:/etc/hadoop/conf 
org.apache.beam.examples.WordCount --inputFile=hdfs:///user/aljoscha/wc-in 
--output=hdfs:///tmp/wc-out-beam --runner=DirectRunner
{code}

But this works:
{code}
$ export HADOOP_CONF_DIR=/etc/hadoop/conf
$ java -cp word-count-beam-bundled-0.1.jar:/etc/hadoop/conf 
org.apache.beam.examples.WordCount --inputFile=hdfs:///user/aljoscha/wc-in 
--output=hdfs:///tmp/wc-out-beam --runner=DirectRunner
{code}

> Error: "Unable to find registrar for hdfs" - need to prevent/improve error 
> message
> --
>
> Key: BEAM-2457
> URL: https://issues.apache.org/jira/browse/BEAM-2457
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Stephen Sisk
>Assignee: Flavio Fiszman
>
> I've noticed a number of user reports where jobs are failing with the error 
> message "Unable to find registrar for hdfs": 
> * 
> https://stackoverflow.com/questions/44497662/apache-beamunable-to-find-registrar-for-hdfs/44508533?noredirect=1#comment76026835_44508533
> * 
> https://lists.apache.org/thread.html/144c384e54a141646fcbe854226bb3668da091c5dc7fa2d471626e9b@%3Cuser.beam.apache.org%3E
> * 
> https://lists.apache.org/thread.html/e4d5ac744367f9d036a1f776bba31b9c4fe377d8f11a4b530be9f829@%3Cuser.beam.apache.org%3E
>  
> This isn't too many reports, but it is the only time I can recall so many 
> users reporting the same error message in a such a short amount of time. 
> We believe the problem is one of two things: 
> 1) bad uber jar creation
> 2) incorrect HDFS configuration
> However, it's highly possible this could have some other root cause. 
> It seems like it'd be useful to:
> 1) Follow up with the above reports to see if they've resolved the issue, and 
> if so what fixed it. There may be another root cause out there.
> 2) Improve the error message to include more information about how to resolve 
> it
> 3) See if we can improve detection of the error cases to give more specific 
> information (specifically, if HDFS is miconfigured, can we detect that 
> somehow and tell the user exactly that?)
> 4) update documentation



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2457) Error: "Unable to find registrar for hdfs" - need to prevent/improve error message

2017-08-29 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145017#comment-16145017
 ] 

Aljoscha Krettek commented on BEAM-2457:


The error in 2.2.0-SNAPSHOT occurs because we no longer throw the "Unable to 
find registrar.." exception but simply use the (default) local filesystem. It 
just masks the problem.



> Error: "Unable to find registrar for hdfs" - need to prevent/improve error 
> message
> --
>
> Key: BEAM-2457
> URL: https://issues.apache.org/jira/browse/BEAM-2457
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Stephen Sisk
>Assignee: Flavio Fiszman
>
> I've noticed a number of user reports where jobs are failing with the error 
> message "Unable to find registrar for hdfs": 
> * 
> https://stackoverflow.com/questions/44497662/apache-beamunable-to-find-registrar-for-hdfs/44508533?noredirect=1#comment76026835_44508533
> * 
> https://lists.apache.org/thread.html/144c384e54a141646fcbe854226bb3668da091c5dc7fa2d471626e9b@%3Cuser.beam.apache.org%3E
> * 
> https://lists.apache.org/thread.html/e4d5ac744367f9d036a1f776bba31b9c4fe377d8f11a4b530be9f829@%3Cuser.beam.apache.org%3E
>  
> This isn't too many reports, but it is the only time I can recall so many 
> users reporting the same error message in a such a short amount of time. 
> We believe the problem is one of two things: 
> 1) bad uber jar creation
> 2) incorrect HDFS configuration
> However, it's highly possible this could have some other root cause. 
> It seems like it'd be useful to:
> 1) Follow up with the above reports to see if they've resolved the issue, and 
> if so what fixed it. There may be another root cause out there.
> 2) Improve the error message to include more information about how to resolve 
> it
> 3) See if we can improve detection of the error cases to give more specific 
> information (specifically, if HDFS is miconfigured, can we detect that 
> somehow and tell the user exactly that?)
> 4) update documentation



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2017-08-28 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143897#comment-16143897
 ] 

Aljoscha Krettek commented on BEAM-2806:


I tried this on the Flink Runner (both batch and streaming):
{code}
   Pipeline p = Pipeline.create(options);

PCollection streamingPCollection = p.apply("src1", Create.of("1", 
"2"));
PCollection lkpPCollection = p.apply("src2", Create.of("1", "2", 
"3"));

final PCollectionView> lkpAsView = 
lkpPCollection
.apply(WithKeys.of(new 
SerializableFunction() {
  @Override
  public Integer apply(String input) {
return 0;
  }
}))
.apply(View.asMultimap());

PCollection ret = streamingPCollection.apply(
ParDo.of(new DoFn(){
  @ProcessElement public void processElement(ProcessContext 
context) {
String drvRow = context.element();
Map key2Rows = 
context.sideInput(lkpAsView);
int pageId = Integer.parseInt(drvRow);
if(key2Rows.get(pageId) != null){
  System.out.println("Record Pass: "+drvRow);
}
  }
}).withSideInputs(lkpAsView)
);

p.run().waitUntilFinish();
{code}

Note that I only replaced {{BeamRecord}} by {{String}}. This seems to work. Are 
you running this on the master branch or some other version? I checked with 
Beam 2.1.0.

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Aljoscha Krettek
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView> rowsView = rightRows
> .apply(View.asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2457) Error: "Unable to find registrar for hdfs" - need to prevent/improve error message

2017-08-28 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143846#comment-16143846
 ] 

Aljoscha Krettek commented on BEAM-2457:


Is there any update on this? I have a jar file that I build from the quickstart 
that exhibits this problem. This is the output I get:
{code}
$ java -cp word-count-beam-0.1-DIRECT.jar org.apache.beam.examples.WordCount 
--runner=DirectRunner  --inputFile=hdfs:///tmp/wc-in  
--output=hdfs:///tmp/wc-out
Exception in thread "main" java.lang.IllegalStateException: Unable to find 
registrar for hdfs
at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:517)
at 
org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:204)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:296)
at org.apache.beam.examples.WordCount.main(WordCount.java:182)
{code}

This is with Beam 2.1.0, the project was created from the Beam 2.1.0 examples 
archetype. I also tried this with the Flink Runner before and get the same 
results.

On Beam 2.2.0-SNAPSHOT I get this instead:
{code}
$ java -cp word-count-beam-22-0.1-DIRECT.jar org.apache.beam.examples.WordCount 
--runner=DirectRunner  --inputFile=hdfs:///tmp/wc-in  
--output=hdfs:///tmp/wc-out
Aug 28, 2017 2:46:34 PM org.apache.beam.sdk.io.FileBasedSource 
getEstimatedSizeBytes
INFO: Filepattern hdfs:///tmp/wc-in matched 0 files with total size 0
Aug 28, 2017 2:46:34 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern hdfs:///tmp/wc-in into bundles of size 0 took 0 ms 
and produced 0 files and 0 bundles
Aug 28, 2017 2:46:35 PM org.apache.beam.sdk.io.WriteFiles 
finalizeForDestinationFillEmptyShards
INFO: Finalizing write operation 
TextWriteOperation{tempDirectory=/home/hadoop/hdfs:/tmp/.temp-beam-2017-08-240_14-46-34-1/,
 windowedWrites=false} for destination null num shards 0.
Aug 28, 2017 2:46:35 PM org.apache.beam.sdk.io.WriteFiles 
finalizeForDestinationFillEmptyShards
INFO: Creating 1 empty output shards in addition to 0 written for a total of 1 
for destination null.
{code}

i.e. it's writing to my local filesystem under the path {{hdfs:}}.

> Error: "Unable to find registrar for hdfs" - need to prevent/improve error 
> message
> --
>
> Key: BEAM-2457
> URL: https://issues.apache.org/jira/browse/BEAM-2457
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Stephen Sisk
>Assignee: Flavio Fiszman
>
> I've noticed a number of user reports where jobs are failing with the error 
> message "Unable to find registrar for hdfs": 
> * 
> https://stackoverflow.com/questions/44497662/apache-beamunable-to-find-registrar-for-hdfs/44508533?noredirect=1#comment76026835_44508533
> * 
> https://lists.apache.org/thread.html/144c384e54a141646fcbe854226bb3668da091c5dc7fa2d471626e9b@%3Cuser.beam.apache.org%3E
> * 
> https://lists.apache.org/thread.html/e4d5ac744367f9d036a1f776bba31b9c4fe377d8f11a4b530be9f829@%3Cuser.beam.apache.org%3E
>  
> This isn't too many reports, but it is the only time I can recall so many 
> users reporting the same error message in a such a short amount of time. 
> We believe the problem is one of two things: 
> 1) bad uber jar creation
> 2) incorrect HDFS configuration
> However, it's highly possible this could have some other root cause. 
> It seems like it'd be useful to:
> 1) Follow up with the above reports to see if they've resolved the issue, and 
> if so what fixed it. There may be another root cause out there.
> 2) Improve the error message to include more information about how to resolve 
> it
> 3) See if we can improve detection of the error cases to give more specific 
> information (specifically, if HDFS is miconfigured, can we detect that 
> somehow and tell the user exactly that?)
> 4) update documentation



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2806) support View.CreatePCollectionView in FlinkRunner

2017-08-25 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141378#comment-16141378
 ] 

Aljoscha Krettek commented on BEAM-2806:


Could you please provide a complete minimal example that provokes the problem. 
Normally, the Flink Runner should support side input views, for example, the 
tests in {{ViewTest}} are executed for the Flink Runner as well.

> support View.CreatePCollectionView in FlinkRunner
> -
>
> Key: BEAM-2806
> URL: https://issues.apache.org/jira/browse/BEAM-2806
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Xu Mingmin
>Assignee: Aljoscha Krettek
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView> rowsView = rightRows
> .apply(View.asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>   at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>   at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >