Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2603

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3369

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build became unstable: beam_PostCommit_Java_MavenInstall #4346

2017-07-10 Thread Apache Jenkins Server
See 




[GitHub] beam pull request #3503: Split bundle processor into separate class.

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3503


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: Closes #3503

2017-07-10 Thread robertwb
Closes #3503


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af08f535
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af08f535
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af08f535

Branch: refs/heads/master
Commit: af08f5352c8d8f15db9aa187df2de6c83d2a7c9d
Parents: 0e89df3 3099e81
Author: Robert Bradshaw 
Authored: Mon Jul 10 22:23:37 2017 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 10 22:23:37 2017 -0700

--
 .../runners/portability/fn_api_runner.py|  20 +-
 .../runners/worker/bundle_processor.py  | 426 +++
 .../apache_beam/runners/worker/sdk_worker.py| 398 +
 3 files changed, 444 insertions(+), 400 deletions(-)
--




[1/2] beam git commit: Split bundle processor into separate class.

2017-07-10 Thread robertwb
Repository: beam
Updated Branches:
  refs/heads/master 0e89df3f6 -> af08f5352


Split bundle processor into separate class.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3099e814
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3099e814
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3099e814

Branch: refs/heads/master
Commit: 3099e81428ea19cdb7e3ef6b35a5de462c598ef8
Parents: 0e89df3
Author: Robert Bradshaw 
Authored: Wed Jun 28 18:20:12 2017 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 10 22:23:36 2017 -0700

--
 .../runners/portability/fn_api_runner.py|  20 +-
 .../runners/worker/bundle_processor.py  | 426 +++
 .../apache_beam/runners/worker/sdk_worker.py| 398 +
 3 files changed, 444 insertions(+), 400 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/3099e814/sdks/python/apache_beam/runners/portability/fn_api_runner.py
--
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index f522864..f88fe53 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -38,6 +38,7 @@ from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import pipeline_context
 from apache_beam.runners.portability import maptask_executor_runner
+from apache_beam.runners.worker import bundle_processor
 from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import sdk_worker
@@ -186,7 +187,7 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 target_name = only_element(get_inputs(operation).keys())
 runner_sinks[(transform_id, target_name)] = operation
 transform_spec = beam_runner_api_pb2.FunctionSpec(
-urn=sdk_worker.DATA_OUTPUT_URN,
+urn=bundle_processor.DATA_OUTPUT_URN,
 parameter=proto_utils.pack_Any(data_operation_spec))
 
   elif isinstance(operation, operation_specs.WorkerRead):
@@ -200,7 +201,7 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
   operation.source.source.read(None),
   operation.source.source.default_output_coder())
   transform_spec = beam_runner_api_pb2.FunctionSpec(
-  urn=sdk_worker.DATA_INPUT_URN,
+  urn=bundle_processor.DATA_INPUT_URN,
   parameter=proto_utils.pack_Any(data_operation_spec))
 
 else:
@@ -209,7 +210,7 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
   # The Dataflow runner harness strips the base64 encoding. do the same
   # here until we get the same thing back that we sent in.
   transform_spec = beam_runner_api_pb2.FunctionSpec(
-  urn=sdk_worker.PYTHON_SOURCE_URN,
+  urn=bundle_processor.PYTHON_SOURCE_URN,
   parameter=proto_utils.pack_Any(
   wrappers_pb2.BytesValue(
   value=base64.b64decode(
@@ -223,21 +224,22 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
   element_coder = si.source.default_output_coder()
   # TODO(robertwb): Actually flesh out the ViewFn API.
   side_input_extras.append((si.tag, element_coder))
-  side_input_data[sdk_worker.side_input_tag(transform_id, si.tag)] = (
-  self._reencode_elements(
-  si.source.read(si.source.get_range_tracker(None, None)),
-  element_coder))
+  side_input_data[
+  bundle_processor.side_input_tag(transform_id, si.tag)] = (
+  self._reencode_elements(
+  si.source.read(si.source.get_range_tracker(None, None)),
+  element_coder))
 augmented_serialized_fn = pickler.dumps(
 (operation.serialized_fn, side_input_extras))
 transform_spec = beam_runner_api_pb2.FunctionSpec(
-urn=sdk_worker.PYTHON_DOFN_URN,
+urn=bundle_processor.PYTHON_DOFN_URN,
 parameter=proto_utils.pack_Any(
 wrappers_pb2.BytesValue(value=augmented_serialized_fn)))
 
   elif isinstance(operation, operation_specs.WorkerFlatten):
 # Flatten is nice and simple.
 transform_spec = beam_runner_api_pb2.FunctionSpec(
-urn=sdk_worker.IDENTITY_DOFN_URN)
+urn=bundle_processor.IDENTITY_DOFN_URN)
 
   else:
 raise NotImplementedError(operation)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2602

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build is back to stable : beam_PostCommit_Java_MavenInstall #4344

2017-07-10 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2371) Make Java DirectRunner demonstrate language-agnostic Runner API translation wrappers

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081637#comment-16081637
 ] 

ASF GitHub Bot commented on BEAM-2371:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3536


> Make Java DirectRunner demonstrate language-agnostic Runner API translation 
> wrappers
> 
>
> Key: BEAM-2371
> URL: https://issues.apache.org/jira/browse/BEAM-2371
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-direct
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
> Fix For: 2.2.0
>
>
> This will complete the PoC for runners-core-construction-java and the Runner 
> API and show other runners the easy path to executing non-Java pipelines, 
> modulo Fn API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2601

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3368

2017-07-10 Thread Apache Jenkins Server
See 




beam git commit: [maven-release-plugin] rollback changes from release preparation of v2.1.0-RC1

2017-07-10 Thread jbonofre
Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 0e1eb9d97 -> 948735130


[maven-release-plugin] rollback changes from release preparation of v2.1.0-RC1


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94873513
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94873513
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94873513

Branch: refs/heads/release-2.1.0
Commit: 94873513021ca22c85bbf2ef841fa012e517238d
Parents: 0e1eb9d
Author: Jean-Baptiste Onofré 
Authored: Tue Jul 11 06:40:41 2017 +0200
Committer: Jean-Baptiste Onofré 
Committed: Tue Jul 11 06:40:41 2017 +0200

--
 examples/java/pom.xml   | 2 +-
 examples/java8/pom.xml  | 2 +-
 examples/pom.xml| 2 +-
 pom.xml | 4 ++--
 runners/apex/pom.xml| 2 +-
 runners/core-construction-java/pom.xml  | 2 +-
 runners/core-java/pom.xml   | 2 +-
 runners/direct-java/pom.xml | 2 +-
 runners/flink/pom.xml   | 2 +-
 runners/google-cloud-dataflow-java/pom.xml  | 2 +-
 runners/pom.xml | 2 +-
 runners/spark/pom.xml   | 2 +-
 sdks/common/fn-api/pom.xml  | 2 +-
 sdks/common/pom.xml | 2 +-
 sdks/common/runner-api/pom.xml  | 2 +-
 sdks/java/build-tools/pom.xml   | 2 +-
 sdks/java/core/pom.xml  | 2 +-
 sdks/java/extensions/google-cloud-platform-core/pom.xml | 2 +-
 sdks/java/extensions/jackson/pom.xml| 2 +-
 sdks/java/extensions/join-library/pom.xml   | 2 +-
 sdks/java/extensions/pom.xml| 2 +-
 sdks/java/extensions/protobuf/pom.xml   | 2 +-
 sdks/java/extensions/sorter/pom.xml | 2 +-
 sdks/java/harness/pom.xml   | 2 +-
 sdks/java/io/amqp/pom.xml   | 2 +-
 sdks/java/io/cassandra/pom.xml  | 2 +-
 sdks/java/io/common/pom.xml | 2 +-
 sdks/java/io/elasticsearch/pom.xml  | 2 +-
 sdks/java/io/google-cloud-platform/pom.xml  | 2 +-
 sdks/java/io/hadoop-common/pom.xml  | 2 +-
 sdks/java/io/hadoop-file-system/pom.xml | 2 +-
 sdks/java/io/hadoop/input-format/pom.xml| 2 +-
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml| 2 +-
 sdks/java/io/hadoop/pom.xml | 2 +-
 sdks/java/io/hbase/pom.xml  | 2 +-
 sdks/java/io/hcatalog/pom.xml   | 2 +-
 sdks/java/io/jdbc/pom.xml   | 2 +-
 sdks/java/io/jms/pom.xml| 2 +-
 sdks/java/io/kafka/pom.xml  | 2 +-
 sdks/java/io/kinesis/pom.xml| 2 +-
 sdks/java/io/mongodb/pom.xml| 2 +-
 sdks/java/io/mqtt/pom.xml   | 2 +-
 sdks/java/io/pom.xml| 2 +-
 sdks/java/io/xml/pom.xml| 2 +-
 sdks/java/java8tests/pom.xml| 2 +-
 sdks/java/javadoc/pom.xml   | 2 +-
 sdks/java/maven-archetypes/examples-java8/pom.xml   | 2 +-
 sdks/java/maven-archetypes/examples/pom.xml | 2 +-
 sdks/java/maven-archetypes/pom.xml  | 2 +-
 sdks/java/maven-archetypes/starter/pom.xml  | 2 +-
 sdks/java/pom.xml   | 2 +-
 sdks/pom.xml| 2 +-
 sdks/python/pom.xml | 2 +-
 53 files changed, 54 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/94873513/examples/java/pom.xml
--
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index c04932e..7ae4e6a 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.beam
 beam-examples-parent
-2.1.0
+2.1.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/beam/blob/94873513/examples/java8/pom.xml
--
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index af18ac0..a0ce708 100644
--- a/examples/java8/pom.xml
+++ 

[beam] Git Push Summary

2017-07-10 Thread jbonofre
Repository: beam
Updated Tags:  refs/tags/v2.1.0-RC1 [created] b8eb20d18


beam git commit: [maven-release-plugin] prepare release v2.1.0-RC1

2017-07-10 Thread jbonofre
Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 5c4ea884d -> 0e1eb9d97


[maven-release-plugin] prepare release v2.1.0-RC1


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e1eb9d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e1eb9d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e1eb9d9

Branch: refs/heads/release-2.1.0
Commit: 0e1eb9d970779c6b6b585a6a60ed92a7d377fcc5
Parents: 5c4ea88
Author: Jean-Baptiste Onofré 
Authored: Tue Jul 11 06:40:20 2017 +0200
Committer: Jean-Baptiste Onofré 
Committed: Tue Jul 11 06:40:20 2017 +0200

--
 examples/java/pom.xml   | 2 +-
 examples/java8/pom.xml  | 2 +-
 examples/pom.xml| 2 +-
 pom.xml | 4 ++--
 runners/apex/pom.xml| 2 +-
 runners/core-construction-java/pom.xml  | 2 +-
 runners/core-java/pom.xml   | 2 +-
 runners/direct-java/pom.xml | 2 +-
 runners/flink/pom.xml   | 2 +-
 runners/google-cloud-dataflow-java/pom.xml  | 2 +-
 runners/pom.xml | 2 +-
 runners/spark/pom.xml   | 2 +-
 sdks/common/fn-api/pom.xml  | 2 +-
 sdks/common/pom.xml | 2 +-
 sdks/common/runner-api/pom.xml  | 2 +-
 sdks/java/build-tools/pom.xml   | 2 +-
 sdks/java/core/pom.xml  | 2 +-
 sdks/java/extensions/google-cloud-platform-core/pom.xml | 2 +-
 sdks/java/extensions/jackson/pom.xml| 2 +-
 sdks/java/extensions/join-library/pom.xml   | 2 +-
 sdks/java/extensions/pom.xml| 2 +-
 sdks/java/extensions/protobuf/pom.xml   | 2 +-
 sdks/java/extensions/sorter/pom.xml | 2 +-
 sdks/java/harness/pom.xml   | 2 +-
 sdks/java/io/amqp/pom.xml   | 2 +-
 sdks/java/io/cassandra/pom.xml  | 2 +-
 sdks/java/io/common/pom.xml | 2 +-
 sdks/java/io/elasticsearch/pom.xml  | 2 +-
 sdks/java/io/google-cloud-platform/pom.xml  | 2 +-
 sdks/java/io/hadoop-common/pom.xml  | 2 +-
 sdks/java/io/hadoop-file-system/pom.xml | 2 +-
 sdks/java/io/hadoop/input-format/pom.xml| 2 +-
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml| 2 +-
 sdks/java/io/hadoop/pom.xml | 2 +-
 sdks/java/io/hbase/pom.xml  | 2 +-
 sdks/java/io/hcatalog/pom.xml   | 2 +-
 sdks/java/io/jdbc/pom.xml   | 2 +-
 sdks/java/io/jms/pom.xml| 2 +-
 sdks/java/io/kafka/pom.xml  | 2 +-
 sdks/java/io/kinesis/pom.xml| 2 +-
 sdks/java/io/mongodb/pom.xml| 2 +-
 sdks/java/io/mqtt/pom.xml   | 2 +-
 sdks/java/io/pom.xml| 2 +-
 sdks/java/io/xml/pom.xml| 2 +-
 sdks/java/java8tests/pom.xml| 2 +-
 sdks/java/javadoc/pom.xml   | 2 +-
 sdks/java/maven-archetypes/examples-java8/pom.xml   | 2 +-
 sdks/java/maven-archetypes/examples/pom.xml | 2 +-
 sdks/java/maven-archetypes/pom.xml  | 2 +-
 sdks/java/maven-archetypes/starter/pom.xml  | 2 +-
 sdks/java/pom.xml   | 2 +-
 sdks/pom.xml| 2 +-
 sdks/python/pom.xml | 2 +-
 53 files changed, 54 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0e1eb9d9/examples/java/pom.xml
--
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 7ae4e6a..c04932e 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.beam
 beam-examples-parent
-2.1.0-SNAPSHOT
+2.1.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e1eb9d9/examples/java8/pom.xml
--
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index a0ce708..af18ac0 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ 

[GitHub] beam pull request #3539: [BEAM-2353] Unbundle Context and WindowedContext.

2017-07-10 Thread reuvenlax
GitHub user reuvenlax opened a pull request:

https://github.com/apache/beam/pull/3539

[BEAM-2353] Unbundle Context and WindowedContext.

Remove the unnecessary classes, and inline the parameters. This is not a 
backwards-compatible change, however these classes are marked as @Experimental.

R: @kennknowles 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/reuvenlax/incubator-beam 
unbundle_filename_policy_context

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3539.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3539






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2353) FileNamePolicy context parameters allow backwards compatibility where we really don't want any

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081641#comment-16081641
 ] 

ASF GitHub Bot commented on BEAM-2353:
--

GitHub user reuvenlax opened a pull request:

https://github.com/apache/beam/pull/3539

[BEAM-2353] Unbundle Context and WindowedContext.

Remove the unnecessary classes, and inline the parameters. This is not a 
backwards-compatible change, however these classes are marked as @Experimental.

R: @kennknowles 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/reuvenlax/incubator-beam 
unbundle_filename_policy_context

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3539.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3539






> FileNamePolicy context parameters allow backwards compatibility where we 
> really don't want any
> --
>
> Key: BEAM-2353
> URL: https://issues.apache.org/jira/browse/BEAM-2353
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Reuven Lax
> Fix For: 2.2.0
>
>
> Currently, in {{FileBasedSink}} the {{FileNamePolicy}} object accepts 
> parameters of type {{Context}} and {{WindowedContext}} respectively.
> These contexts are a coding technique to allow easy backwards compatibility 
> when adding new parameters. However, if a new parameter is added to the file 
> name policy it is likely data loss for the user to not incorporate it, so in 
> fact that is never a safe backwards compatible change.
> These are brand-new APIs and marked experimental. This is important enough I 
> think we should make the breaking change.
> We should inline all the parameters of the context, so that we _cannot_ add 
> parameters and maintain compatibility. Instead, if we have new ones we want 
> to add, it will have to be a new method or some such.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3536: [BEAM-2371] Use URNs, not Java classes, in immutabi...

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3536


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #3536: Use URNs, not Java classes, in immutability enforcements

2017-07-10 Thread kenn
This closes #3536: Use URNs, not Java classes, in immutability enforcements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e89df3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e89df3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e89df3f

Branch: refs/heads/master
Commit: 0e89df3f6db93f99c3e51a2e9d255fa57f3e0aa5
Parents: eeb0432 311547a
Author: Kenneth Knowles 
Authored: Mon Jul 10 21:13:37 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 21:13:37 2017 -0700

--
 .../beam/runners/direct/DirectRunner.java   | 21 
 .../direct/ExecutorServiceParallelExecutor.java | 16 ++-
 2 files changed, 14 insertions(+), 23 deletions(-)
--




[1/2] beam git commit: Use URNs, not Java classes, in immutability enforcements

2017-07-10 Thread kenn
Repository: beam
Updated Branches:
  refs/heads/master eeb043299 -> 0e89df3f6


Use URNs, not Java classes, in immutability enforcements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/311547aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/311547aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/311547aa

Branch: refs/heads/master
Commit: 311547aa561bb314a8fe743b6f4677a2eaaaca50
Parents: 9f904dc
Author: Kenneth Knowles 
Authored: Mon Jul 10 15:25:11 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 15:31:40 2017 -0700

--
 .../beam/runners/direct/DirectRunner.java   | 21 
 .../direct/ExecutorServiceParallelExecutor.java | 16 ++-
 2 files changed, 14 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7a221c4..4621224 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -38,14 +38,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -72,16 +69,17 @@ public class DirectRunner extends 
PipelineRunner {
 IMMUTABILITY {
   @Override
   public boolean appliesTo(PCollection collection, DirectGraph graph) {
-return 
CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass());
+return CONTAINS_UDF.contains(
+
PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform()));
   }
 };
 
 /**
  * The set of {@link PTransform PTransforms} that execute a UDF. Useful 
for some enforcements.
  */
-private static final Set CONTAINS_UDF =
+private static final Set CONTAINS_UDF =
 ImmutableSet.of(
-Read.Bounded.class, Read.Unbounded.class, 
ParDo.SingleOutput.class, MultiOutput.class);
+PTransformTranslation.READ_TRANSFORM_URN, 
PTransformTranslation.PAR_DO_TRANSFORM_URN);
 
 public abstract boolean appliesTo(PCollection collection, DirectGraph 
graph);
 
@@ -110,22 +108,19 @@ public class DirectRunner extends 
PipelineRunner {
   return bundleFactory;
 }
 
-@SuppressWarnings("rawtypes")
-private static Map
+private static Map
 defaultModelEnforcements(Set enabledEnforcements) {
-  ImmutableMap.Builder
-  enforcements = ImmutableMap.builder();
+  ImmutableMap.Builder 
enforcements =
+  ImmutableMap.builder();
   ImmutableList.Builder enabledParDoEnforcements =
   ImmutableList.builder();
   if (enabledEnforcements.contains(Enforcement.IMMUTABILITY)) {
 enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
   }
   Collection parDoEnforcements = 
enabledParDoEnforcements.build();
-  enforcements.put(ParDo.SingleOutput.class, parDoEnforcements);
-  enforcements.put(MultiOutput.class, parDoEnforcements);
+  enforcements.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, 
parDoEnforcements);
   return enforcements.build();
 }
-
   }
 
   


http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 2f4d1f6..75e2562 

Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3367

2017-07-10 Thread Apache Jenkins Server
See 




[GitHub] beam pull request #3453: Reflect #assignsToOneWindow in WindowingStrategy

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3453


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: Reflect #assignsToOneWindow in WindowingStrategy

2017-07-10 Thread kenn
Repository: beam
Updated Branches:
  refs/heads/master b6b1c8b7c -> eeb043299


Reflect #assignsToOneWindow in WindowingStrategy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6f9fdea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6f9fdea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6f9fdea

Branch: refs/heads/master
Commit: c6f9fdeadaeda68be86e454377f8c665c22a7c0f
Parents: 9f904dc
Author: Thomas Groh 
Authored: Tue Jun 27 15:03:11 2017 -0700
Committer: Thomas Groh 
Committed: Mon Jul 10 14:54:44 2017 -0700

--
 .../runners/core/construction/WindowingStrategyTranslation.java | 1 +
 .../core/construction/WindowingStrategyTranslationTest.java | 3 +++
 sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 5 +
 3 files changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c6f9fdea/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 88ebc01..1456a3f 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -307,6 +307,7 @@ public class WindowingStrategyTranslation implements 
Serializable {
 
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
 
.setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger()))
 .setWindowFn(windowFnSpec)
+
.setAssignsToOneWindow(windowingStrategy.getWindowFn().assignsToOneWindow())
 .setWindowCoderId(
 
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c6f9fdea/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
--
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index e406545..7a57fd7 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -116,5 +116,8 @@ public class WindowingStrategyTranslationTest {
 
 protoComponents.getCodersOrThrow(
 
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
+assertThat(
+proto.getAssignsToOneWindow(),
+equalTo(windowingStrategy.getWindowFn().assignsToOneWindow()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c6f9fdea/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 24e907a..93fea44 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -436,6 +436,11 @@ message WindowingStrategy {
 
   // (Required) Indicate whether empty on-time panes should be omitted.
   OnTimeBehavior OnTimeBehavior = 9;
+
+  // (Required) Whether or not the window fn assigns inputs to exactly one 
window
+  //
+  // This knowledge is required for some optimizations
+  bool assigns_to_one_window = 10;
 }
 
 // Whether or not a PCollection's WindowFn is non-merging, merging, or



[2/2] beam git commit: This closes #3453: Reflect #assignsToOneWindow in WindowingStrategy

2017-07-10 Thread kenn
This closes #3453: Reflect #assignsToOneWindow in WindowingStrategy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eeb04329
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eeb04329
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eeb04329

Branch: refs/heads/master
Commit: eeb0432996e2a497ba37b10df5bbe52050833ea9
Parents: b6b1c8b c6f9fde
Author: Kenneth Knowles 
Authored: Mon Jul 10 20:45:30 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:45:30 2017 -0700

--
 .../runners/core/construction/WindowingStrategyTranslation.java | 1 +
 .../core/construction/WindowingStrategyTranslationTest.java | 3 +++
 sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 5 +
 3 files changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/eeb04329/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--



Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2600

2017-07-10 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-92) Data-dependent sinks

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-92?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081471#comment-16081471
 ] 

ASF GitHub Bot commented on BEAM-92:


Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3356


> Data-dependent sinks
> 
>
> Key: BEAM-92
> URL: https://issues.apache.org/jira/browse/BEAM-92
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Reuven Lax
>
> Current sink API writes all data to a single destination, but there are many 
> use cases where different pieces of data need to be routed to different 
> destinations where the set of destinations is data-dependent (so can't be 
> implemented with a Partition transform).
> One internally discussed proposal was an API of the form:
> {code}
> PCollection PCollection.apply(
> Write.using(DoFn where,
> MapFn> how)
> {code}
> so an item T gets written to a destination (or multiple destinations) 
> determined by "where"; and the writing strategy is determined by "how" that 
> produces a WriteOperation (current API - global init/write/global finalize 
> hooks) for any given destination.
> This API also has other benefits:
> * allows the SinkT to be computed dynamically (in "where"), rather than 
> specified at pipeline construction time
> * removes the necessity for a Sink class entirely
> * is sequenceable w.r.t. downstream transforms (you can stick transforms onto 
> the returned PCollection, while the current Write.to() returns a PDone)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3366

2017-07-10 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2571) Flink ValidatesRunner failing CombineTest.testSlidingWindowsCombineWithContext

2017-07-10 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081602#comment-16081602
 ] 

Kenneth Knowles commented on BEAM-2571:
---

I would hold off on making changes here. It seems Dataflow may have similar 
issues. It is unfortunate that there are so few tests catching this, but it may 
be a spec bug. Since it isn't a user-facing API we don't have to continue 
compatibility so we also have some freedom to do the most useful thing here.

> Flink ValidatesRunner failing CombineTest.testSlidingWindowsCombineWithContext
> --
>
> Key: BEAM-2571
> URL: https://issues.apache.org/jira/browse/BEAM-2571
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Aljoscha Krettek
> Fix For: 2.1.0
>
>
> This appears to have been caused by https://github.com/apache/beam/pull/3429 
> which fixes a couple errors in how trigger timers were processed / final 
> panes labeled.
> I am investigating, considering roll back vs forward fix. Since it is an 
> esoteric use case where I would advise users to use a stateful DoFn instead, 
> I think the bug fixed probably outweighs the bug introduced. I would like to 
> fix for 2.1.0 but will report back soon.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (BEAM-2371) Make Java DirectRunner demonstrate language-agnostic Runner API translation wrappers

2017-07-10 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles resolved BEAM-2371.
---
   Resolution: Fixed
Fix Version/s: 2.2.0

I think this is adequately established. We will be fleshing out more APIs as we 
go, but the DirectRunner overrides do lots of SDK-independent things now.

> Make Java DirectRunner demonstrate language-agnostic Runner API translation 
> wrappers
> 
>
> Key: BEAM-2371
> URL: https://issues.apache.org/jira/browse/BEAM-2371
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-direct
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
> Fix For: 2.2.0
>
>
> This will complete the PoC for runners-core-construction-java and the Runner 
> API and show other runners the easy path to executing non-Java pipelines, 
> modulo Fn API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2371) Make Java DirectRunner demonstrate language-agnostic Runner API translation wrappers

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081600#comment-16081600
 ] 

ASF GitHub Bot commented on BEAM-2371:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3509


> Make Java DirectRunner demonstrate language-agnostic Runner API translation 
> wrappers
> 
>
> Key: BEAM-2371
> URL: https://issues.apache.org/jira/browse/BEAM-2371
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-direct
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> This will complete the PoC for runners-core-construction-java and the Runner 
> API and show other runners the easy path to executing non-Java pipelines, 
> modulo Fn API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[1/8] beam git commit: Add more utilities to ParDoTranslation

2017-07-10 Thread kenn
Repository: beam
Updated Branches:
  refs/heads/master c14a3184e -> b6b1c8b7c


Add more utilities to ParDoTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/165dfa68
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/165dfa68
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/165dfa68

Branch: refs/heads/master
Commit: 165dfa688beaeb2de9b5041c81f6e02b517f74fd
Parents: 20ce075
Author: Kenneth Knowles 
Authored: Thu Jun 8 13:46:18 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:04:14 2017 -0700

--
 .../core/construction/ParDoTranslation.java | 48 
 1 file changed, 48 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/165dfa68/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 34e0d86..5f2bcae 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -34,9 +34,11 @@ import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -74,6 +76,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
@@ -215,11 +218,56 @@ public class ParDoTranslation {
 return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
   }
 
+  public static DoFn getDoFn(AppliedPTransform application) 
throws IOException {
+return getDoFn(getParDoPayload(application));
+  }
+
   public static TupleTag getMainOutputTag(ParDoPayload payload)
   throws InvalidProtocolBufferException {
 return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
   }
 
+  public static TupleTag getMainOutputTag(AppliedPTransform 
application)
+  throws IOException {
+return getMainOutputTag(getParDoPayload(application));
+  }
+
+  public static TupleTagList getAdditionalOutputTags(AppliedPTransform application)
+  throws IOException {
+
+RunnerApi.PTransform protoTransform =
+PTransformTranslation.toProto(application, SdkComponents.create());
+
+ParDoPayload payload = 
protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+TupleTag mainOutputTag = getMainOutputTag(payload);
+Set outputTags =
+Sets.difference(
+protoTransform.getOutputsMap().keySet(), 
Collections.singleton(mainOutputTag.getId()));
+
+ArrayList additionalOutputTags = new ArrayList<>();
+for (String outputTag : outputTags) {
+  additionalOutputTags.add(new TupleTag<>(outputTag));
+}
+return TupleTagList.of(additionalOutputTags);
+  }
+
+  public static List getSideInputs(AppliedPTransform application)
+  throws IOException {
+
+SdkComponents sdkComponents = SdkComponents.create();
+RunnerApi.PTransform parDoProto =
+PTransformTranslation.toProto(application, sdkComponents);
+ParDoPayload payload = 
parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
+
+List views = new ArrayList<>();
+for (Map.Entry sideInput : 
payload.getSideInputsMap().entrySet()) {
+  views.add(
+  fromProto(
+  sideInput.getValue(), sideInput.getKey(), parDoProto, 
sdkComponents.toComponents()));
+}
+return views;
+  }
+
   public static RunnerApi.PCollection getMainInput(
   RunnerApi.PTransform ptransform, Components components) throws 
IOException {
 checkArgument(



[7/8] beam git commit: Fix null checks in TransformHierarchy

2017-07-10 Thread kenn
Fix null checks in TransformHierarchy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1518d732
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1518d732
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1518d732

Branch: refs/heads/master
Commit: 1518d732e74c61d021509d2fc325427cb93e73e8
Parents: be9a387
Author: Kenneth Knowles 
Authored: Mon Jun 12 15:12:18 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:18:36 2017 -0700

--
 .../org/apache/beam/sdk/runners/TransformHierarchy.java   | 10 +-
 1 file changed, 1 insertion(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/1518d732/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 6f1ee94..d8ff59e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -145,14 +145,6 @@ public class TransformHierarchy {
   Node producerNode = getProducer(inputValue);
   PInput input = producerInput.remove(inputValue);
   inputValue.finishSpecifying(input, producerNode.getTransform());
-  checkState(
-  producers.get(inputValue) != null,
-  "Producer unknown for input %s",
-  inputValue);
-  checkState(
-  producers.get(inputValue) != null,
-  "Producer unknown for input %s",
-  inputValue);
 }
   }
 
@@ -201,7 +193,7 @@ public class TransformHierarchy {
   }
 
   Node getProducer(PValue produced) {
-return producers.get(produced);
+return checkNotNull(producers.get(produced), "No producer found for %s", 
produced);
   }
 
   public Set visit(PipelineVisitor visitor) {



[8/8] beam git commit: This closes #3509: [BEAM-2371] Port DirectRunner overrides to SDK-agnostic APIs

2017-07-10 Thread kenn
This closes #3509: [BEAM-2371] Port DirectRunner overrides to SDK-agnostic APIs

  Fix null checks in TransformHierarchy
  Fix misleading comment in TransformHierarchy
  Port DirectRunner ParDo overrides to SDK-agnostic APIs
  Enable SplittableParDo on rehydrated ParDo transform
  Include PCollection in rehydrated PCollectionView
  Add more utilities to ParDoTranslation
  Rehydrate PCollections


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b6b1c8b7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b6b1c8b7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b6b1c8b7

Branch: refs/heads/master
Commit: b6b1c8b7c495df7e9c57a4757b91e4bb2ea82a7c
Parents: c14a318 1518d73
Author: Kenneth Knowles 
Authored: Mon Jul 10 20:21:36 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:21:36 2017 -0700

--
 .../construction/PCollectionTranslation.java|  16 +++
 .../core/construction/ParDoTranslation.java | 103 ++-
 .../construction/RunnerPCollectionView.java |  23 -
 .../core/construction/SplittableParDo.java  |  25 +
 .../PCollectionTranslationTest.java |  22 
 .../core/construction/ParDoTranslationTest.java |  28 +++--
 .../direct/ParDoMultiOverrideFactory.java   |  47 +
 .../flink/FlinkStreamingPipelineTranslator.java |   2 +-
 .../beam/sdk/runners/TransformHierarchy.java|  12 +--
 9 files changed, 232 insertions(+), 46 deletions(-)
--




[3/8] beam git commit: Include PCollection in rehydrated PCollectionView

2017-07-10 Thread kenn
Include PCollection in rehydrated PCollectionView


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de39f324
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de39f324
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de39f324

Branch: refs/heads/master
Commit: de39f324c4b0914418894a41c6f75596310bf633
Parents: 165dfa6
Author: Kenneth Knowles 
Authored: Thu Jul 6 09:24:55 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:14:27 2017 -0700

--
 .../core/construction/ParDoTranslation.java | 51 +---
 .../construction/RunnerPCollectionView.java |  7 +--
 .../core/construction/ParDoTranslationTest.java | 28 +++
 3 files changed, 67 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/de39f324/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 5f2bcae..fe8c5aa 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -74,6 +75,7 @@ import 
org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -262,8 +264,12 @@ public class ParDoTranslation {
 List views = new ArrayList<>();
 for (Map.Entry sideInput : 
payload.getSideInputsMap().entrySet()) {
   views.add(
-  fromProto(
-  sideInput.getValue(), sideInput.getKey(), parDoProto, 
sdkComponents.toComponents()));
+  viewFromProto(
+  application.getPipeline(),
+  sideInput.getValue(),
+  sideInput.getKey(),
+  parDoProto,
+  sdkComponents.toComponents()));
 }
 return views;
   }
@@ -495,15 +501,47 @@ public class ParDoTranslation {
 return builder.build();
   }
 
-  public static PCollectionView fromProto(
-  SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, 
Components components)
+  public static PCollectionView viewFromProto(
+  Pipeline pipeline,
+  SideInput sideInput,
+  String localName,
+  RunnerApi.PTransform parDoTransform,
+  Components components)
   throws IOException {
-TupleTag tag = new TupleTag<>(id);
+
+String pCollectionId = parDoTransform.getInputsOrThrow(localName);
+
+// This may be a PCollection defined in another language, but we should be
+// able to rehydrate it enough to stick it in a side input. The coder may 
not
+// be grokkable in Java.
+PCollection pCollection =
+PCollectionTranslation.fromProto(
+pipeline, components.getPcollectionsOrThrow(pCollectionId), 
components);
+
+return viewFromProto(sideInput, localName, pCollection, parDoTransform, 
components);
+  }
+
+  /**
+   * Create a {@link PCollectionView} from a side input spec and an 
already-deserialized {@link
+   * PCollection} that should be wired up.
+   */
+  public static PCollectionView viewFromProto(
+  SideInput sideInput,
+  String localName,
+  PCollection pCollection,
+  RunnerApi.PTransform parDoTransform,
+  Components components)
+  throws IOException {
+checkArgument(
+localName != null,
+"%s.viewFromProto: localName must not be null",
+ParDoTranslation.class.getSimpleName());
+TupleTag tag = new TupleTag<>(localName);
 WindowMappingFn windowMappingFn = 
windowMappingFnFromProto(sideInput.getWindowMappingFn());
 ViewFn viewFn = viewFnFromProto(sideInput.getViewFn());
 
 RunnerApi.PCollection inputCollection =
-

[GitHub] beam pull request #3509: [BEAM-2371] Port DirectRunner overrides to SDK-agno...

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3509


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[4/8] beam git commit: Enable SplittableParDo on rehydrated ParDo transform

2017-07-10 Thread kenn
Enable SplittableParDo on rehydrated ParDo transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa61ed17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa61ed17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa61ed17

Branch: refs/heads/master
Commit: fa61ed17424083fa53b8aa8e70908fb6194ad4ad
Parents: de39f32
Author: Kenneth Knowles 
Authored: Thu Jun 8 14:27:02 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:15:49 2017 -0700

--
 .../core/construction/SplittableParDo.java  | 25 ++
 .../direct/ParDoMultiOverrideFactory.java   | 36 ++--
 .../flink/FlinkStreamingPipelineTranslator.java |  2 +-
 3 files changed, 52 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/fa61ed17/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index f31b495..e71187b 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -26,6 +27,7 @@ import 
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTrans
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -103,6 +105,9 @@ public class SplittableParDo
   public static  SplittableParDo 
forJavaParDo(
   ParDo.MultiOutput parDo) {
 checkArgument(parDo != null, "parDo must not be null");
+checkArgument(
+
DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
+"fn must be a splittable DoFn");
 return new SplittableParDo(
 parDo.getFn(),
 parDo.getMainOutputTag(),
@@ -110,6 +115,26 @@ public class SplittableParDo
 parDo.getAdditionalOutputTags());
   }
 
+  /**
+   * Creates the transform for a {@link ParDo}-compatible {@link 
AppliedPTransform}.
+   *
+   * The input may generally be a deserialized transform so it may not 
actually be a {@link
+   * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
+   */
+  public static SplittableParDo forAppliedParDo(AppliedPTransform parDo) {
+checkArgument(parDo != null, "parDo must not be null");
+
+try {
+  return new SplittableParDo<>(
+  ParDoTranslation.getDoFn(parDo),
+  (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
+  ParDoTranslation.getSideInputs(parDo),
+  ParDoTranslation.getAdditionalOutputTags(parDo));
+} catch (IOException exc) {
+  throw new RuntimeException(exc);
+}
+  }
+
   @Override
   public PCollectionTuple expand(PCollection input) {
 Coder restrictionCoder =

http://git-wip-us.apache.org/repos/asf/beam/blob/fa61ed17/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 2904bc1..8881967 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
@@ -26,6 +27,7 @@ import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import 

[6/8] beam git commit: Fix misleading comment in TransformHierarchy

2017-07-10 Thread kenn
Fix misleading comment in TransformHierarchy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be9a3879
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be9a3879
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be9a3879

Branch: refs/heads/master
Commit: be9a387976adf3424d680778f92ce22f728ffa32
Parents: 1ac4b7e
Author: Kenneth Knowles 
Authored: Mon Jun 12 15:11:49 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:18:01 2017 -0700

--
 .../main/java/org/apache/beam/sdk/runners/TransformHierarchy.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/be9a3879/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 9c5f148..6f1ee94 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -406,7 +406,7 @@ public class TransformHierarchy {
   return fullName;
 }
 
-/** Returns the transform input, in unexpanded form. */
+/** Returns the transform input, in fully expanded form. */
 public Map getInputs() {
   return inputs == null ? Collections.emptyMap() : 
inputs;
 }



[5/8] beam git commit: Port DirectRunner ParDo overrides to SDK-agnostic APIs

2017-07-10 Thread kenn
Port DirectRunner ParDo overrides to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ac4b7e6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ac4b7e6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ac4b7e6

Branch: refs/heads/master
Commit: 1ac4b7e6f7dbbd68c27c6634cd52767885a42760
Parents: fa61ed1
Author: Kenneth Knowles 
Authored: Thu Jun 8 13:44:52 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:17:56 2017 -0700

--
 .../core/construction/ParDoTranslation.java | 16 ++---
 .../construction/RunnerPCollectionView.java | 16 +
 .../direct/ParDoMultiOverrideFactory.java   | 35 +---
 3 files changed, 43 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index fe8c5aa..90c9aad 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
 
@@ -262,12 +263,19 @@ public class ParDoTranslation {
 ParDoPayload payload = 
parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
 
 List views = new ArrayList<>();
-for (Map.Entry sideInput : 
payload.getSideInputsMap().entrySet()) {
+for (Map.Entry sideInputEntry : 
payload.getSideInputsMap().entrySet()) {
+  String sideInputTag = sideInputEntry.getKey();
+  RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+  PCollection originalPCollection =
+  checkNotNull(
+  (PCollection) application.getInputs().get(new 
TupleTag<>(sideInputTag)),
+  "no input with tag %s",
+  sideInputTag);
   views.add(
   viewFromProto(
-  application.getPipeline(),
-  sideInput.getValue(),
-  sideInput.getKey(),
+  sideInput,
+  sideInputTag,
+  originalPCollection,
   parDoProto,
   sdkComponents.toComponents()));
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index b275188..85139e8 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.core.construction;
 
 import java.util.Map;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
@@ -94,4 +95,19 @@ class RunnerPCollectionView extends PValueBase implements 
PCollectionView
 throw new UnsupportedOperationException(String.format(
 "A %s cannot be expanded", 
RunnerPCollectionView.class.getSimpleName()));
   }
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof PCollectionView)) {
+  return false;
+}
+@SuppressWarnings("unchecked")
+PCollectionView otherView = (PCollectionView) other;
+return tag.equals(otherView.getTagInternal());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(tag);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java

[2/8] beam git commit: Rehydrate PCollections

2017-07-10 Thread kenn
Rehydrate PCollections


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20ce0756
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20ce0756
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20ce0756

Branch: refs/heads/master
Commit: 20ce0756c97f5ed47ad9c8cb46da574c273b5b46
Parents: c14a318
Author: Kenneth Knowles 
Authored: Thu Jul 6 09:24:22 2017 -0700
Committer: Kenneth Knowles 
Committed: Mon Jul 10 20:04:14 2017 -0700

--
 .../construction/PCollectionTranslation.java| 16 ++
 .../PCollectionTranslationTest.java | 22 
 2 files changed, 38 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/20ce0756/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 968966f..52526bb 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.values.PCollection;
@@ -47,6 +48,21 @@ public class PCollectionTranslation {
 .build();
   }
 
+  public static PCollection fromProto(
+  Pipeline pipeline, RunnerApi.PCollection pCollection, 
RunnerApi.Components components)
+  throws IOException {
+return PCollection.createPrimitiveOutputInternal(
+pipeline,
+WindowingStrategyTranslation.fromProto(
+
components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()),
+components),
+fromProto(pCollection.getIsBounded()))
+.setCoder(
+(Coder)
+CoderTranslation.fromProto(
+components.getCodersOrThrow(pCollection.getCoderId()), 
components));
+  }
+
   public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
 return fromProto(pCollection.getIsBounded());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/20ce0756/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
--
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
index 3b94220..5c45487 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
@@ -113,6 +113,28 @@ public class PCollectionTranslationTest {
 
   @Test
   public void testEncodeDecodeCycle() throws Exception {
+// Encode
+SdkComponents sdkComponents = SdkComponents.create();
+RunnerApi.PCollection protoCollection =
+PCollectionTranslation.toProto(testCollection, sdkComponents);
+RunnerApi.Components protoComponents = sdkComponents.toComponents();
+
+// Decode
+Pipeline pipeline = Pipeline.create();
+PCollection decodedCollection =
+PCollectionTranslation.fromProto(pipeline, protoCollection, 
protoComponents);
+
+// Verify
+assertThat(decodedCollection.getCoder(), 
Matchers.equalTo(testCollection.getCoder()));
+assertThat(
+decodedCollection.getWindowingStrategy(),
+Matchers.equalTo(
+testCollection.getWindowingStrategy().fixDefaults()));
+assertThat(decodedCollection.isBounded(), 
equalTo(testCollection.isBounded()));
+  }
+
+  @Test
+  public void testEncodeDecodeFields() throws Exception {
 SdkComponents sdkComponents = SdkComponents.create();
 RunnerApi.PCollection protoCollection = PCollectionTranslation
 .toProto(testCollection, sdkComponents);



Jenkins build is unstable: beam_PostCommit_Java_MavenInstall #4343

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2599

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3365

2017-07-10 Thread Apache Jenkins Server
See 




[GitHub] beam pull request #3356: [BEAM-92] Allow value-dependent files in FileBasedS...

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3356


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[5/5] beam git commit: This closes #3356: [BEAM-92] Allow value-dependent files in FileBasedSink

2017-07-10 Thread jkff
This closes #3356: [BEAM-92] Allow value-dependent files in FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c14a3184
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c14a3184
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c14a3184

Branch: refs/heads/master
Commit: c14a3184e69f3ba6d228a61b2218930537008da8
Parents: 9d48bd5 77ba7a3
Author: Eugene Kirpichov 
Authored: Mon Jul 10 18:10:19 2017 -0700
Committer: Eugene Kirpichov 
Committed: Mon Jul 10 18:10:19 2017 -0700

--
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../beam/examples/WindowedWordCountIT.java  |   4 +-
 .../complete/game/utils/WriteToText.java|  43 +-
 .../construction/WriteFilesTranslation.java |  67 +-
 .../construction/PTransformMatchersTest.java|  22 +-
 .../construction/WriteFilesTranslationTest.java |  62 +-
 .../direct/WriteWithShardingFactory.java|   6 +-
 .../direct/WriteWithShardingFactoryTest.java|  18 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  15 +-
 .../runners/dataflow/DataflowRunnerTest.java|  35 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  26 +-
 .../src/main/proto/beam_runner_api.proto|   7 +-
 .../apache/beam/sdk/coders/ShardedKeyCoder.java |  66 ++
 .../java/org/apache/beam/sdk/io/AvroIO.java | 220 ---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  32 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java  | 274 +---
 .../beam/sdk/io/DynamicFileDestinations.java| 115 
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 513 ---
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  44 +-
 .../java/org/apache/beam/sdk/io/TextIO.java | 488 ++
 .../java/org/apache/beam/sdk/io/TextSink.java   |  22 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 640 +++
 .../sdk/transforms/SerializableFunctions.java   |  50 ++
 .../org/apache/beam/sdk/values/ShardedKey.java  |  65 ++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  85 ++-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  | 135 ++--
 .../sdk/io/DrunkWritableByteChannelFactory.java |   2 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  93 +--
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  56 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 264 +++-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  | 339 --
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java|   2 +
 .../io/gcp/bigquery/DynamicDestinations.java|  29 +-
 .../io/gcp/bigquery/GenerateShardedTable.java   |   1 +
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java|  67 --
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java|  74 ---
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   1 +
 .../io/gcp/bigquery/StreamingWriteTables.java   |   2 +
 .../sdk/io/gcp/bigquery/TagWithUniqueIds.java   |   1 +
 .../io/gcp/bigquery/WriteBundlesToFiles.java|   2 +
 .../bigquery/WriteGroupedRecordsToFiles.java|   1 +
 .../sdk/io/gcp/bigquery/WritePartition.java |   1 +
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |   1 +
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java |   2 +
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java |  21 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |   4 +-
 47 files changed, 2710 insertions(+), 1363 deletions(-)
--




[4/5] beam git commit: Adds DynamicDestinations support to FileBasedSink

2017-07-10 Thread jkff
Adds DynamicDestinations support to FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77ba7a35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77ba7a35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77ba7a35

Branch: refs/heads/master
Commit: 77ba7a35cdae0b036791cce0682beefeb3fd809b
Parents: 9d48bd5
Author: Reuven Lax 
Authored: Fri Jun 9 17:11:32 2017 -0700
Committer: Eugene Kirpichov 
Committed: Mon Jul 10 18:05:33 2017 -0700

--
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../beam/examples/WindowedWordCountIT.java  |   4 +-
 .../complete/game/utils/WriteToText.java|  43 +-
 .../construction/WriteFilesTranslation.java |  67 +-
 .../construction/PTransformMatchersTest.java|  22 +-
 .../construction/WriteFilesTranslationTest.java |  62 +-
 .../direct/WriteWithShardingFactory.java|   6 +-
 .../direct/WriteWithShardingFactoryTest.java|  18 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  15 +-
 .../runners/dataflow/DataflowRunnerTest.java|  35 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  26 +-
 .../src/main/proto/beam_runner_api.proto|   7 +-
 .../apache/beam/sdk/coders/ShardedKeyCoder.java |  66 ++
 .../java/org/apache/beam/sdk/io/AvroIO.java | 220 ---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  32 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java  | 274 +---
 .../beam/sdk/io/DynamicFileDestinations.java| 115 
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 513 ---
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  44 +-
 .../java/org/apache/beam/sdk/io/TextIO.java | 488 ++
 .../java/org/apache/beam/sdk/io/TextSink.java   |  22 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 640 +++
 .../sdk/transforms/SerializableFunctions.java   |  50 ++
 .../org/apache/beam/sdk/values/ShardedKey.java  |  65 ++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  85 ++-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  | 135 ++--
 .../sdk/io/DrunkWritableByteChannelFactory.java |   2 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  93 +--
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  56 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 264 +++-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  | 339 --
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java|   2 +
 .../io/gcp/bigquery/DynamicDestinations.java|  29 +-
 .../io/gcp/bigquery/GenerateShardedTable.java   |   1 +
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java|  67 --
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java|  74 ---
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   1 +
 .../io/gcp/bigquery/StreamingWriteTables.java   |   2 +
 .../sdk/io/gcp/bigquery/TagWithUniqueIds.java   |   1 +
 .../io/gcp/bigquery/WriteBundlesToFiles.java|   2 +
 .../bigquery/WriteGroupedRecordsToFiles.java|   1 +
 .../sdk/io/gcp/bigquery/WritePartition.java |   1 +
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |   1 +
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java |   2 +
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java |  21 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |   4 +-
 47 files changed, 2710 insertions(+), 1363 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
--
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 5e6df9c..49865ba 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.examples.common;
 
-import static com.google.common.base.Verify.verifyNotNull;
+import static com.google.common.base.MoreObjects.firstNonNull;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -53,22 +54,12 @@ public class WriteOneFilePerWindow extends 
PTransform

[2/5] beam git commit: Adds DynamicDestinations support to FileBasedSink

2017-07-10 Thread jkff
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 511d697..b57b28c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -34,27 +34,29 @@ import org.apache.beam.sdk.util.MimeTypes;
  * '\n'} represented in {@code UTF-8} format as the record separator. Each 
record (including the
  * last) is terminated.
  */
-class TextSink extends FileBasedSink {
+class TextSink extends FileBasedSink {
   @Nullable private final String header;
   @Nullable private final String footer;
 
   TextSink(
   ValueProvider baseOutputFilename,
-  FilenamePolicy filenamePolicy,
+  DynamicDestinations dynamicDestinations,
   @Nullable String header,
   @Nullable String footer,
   WritableByteChannelFactory writableByteChannelFactory) {
-super(baseOutputFilename, filenamePolicy, writableByteChannelFactory);
+super(baseOutputFilename, dynamicDestinations, writableByteChannelFactory);
 this.header = header;
 this.footer = footer;
   }
+
   @Override
-  public WriteOperation createWriteOperation() {
-return new TextWriteOperation(this, header, footer);
+  public WriteOperation createWriteOperation() {
+return new TextWriteOperation<>(this, header, footer);
   }
 
   /** A {@link WriteOperation WriteOperation} for text files. */
-  private static class TextWriteOperation extends WriteOperation {
+  private static class TextWriteOperation
+  extends WriteOperation {
 @Nullable private final String header;
 @Nullable private final String footer;
 
@@ -65,20 +67,20 @@ class TextSink extends FileBasedSink {
 }
 
 @Override
-public Writer createWriter() throws Exception {
-  return new TextWriter(this, header, footer);
+public Writer createWriter() throws Exception {
+  return new TextWriter<>(this, header, footer);
 }
   }
 
   /** A {@link Writer Writer} for text files. */
-  private static class TextWriter extends Writer {
+  private static class TextWriter extends Writer {
 private static final String NEWLINE = "\n";
 @Nullable private final String header;
 @Nullable private final String footer;
 private OutputStreamWriter out;
 
 public TextWriter(
-WriteOperation writeOperation,
+WriteOperation writeOperation,
 @Nullable String header,
 @Nullable String footer) {
   super(writeOperation, MimeTypes.TEXT);

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index a220eab..7013044 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,9 +20,12 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -30,8 +33,11 @@ import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -47,6 +53,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import 

[1/5] beam git commit: Adds DynamicDestinations support to FileBasedSink

2017-07-10 Thread jkff
Repository: beam
Updated Branches:
  refs/heads/master 9d48bd5e8 -> c14a3184e


http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 9468893..8797ff7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -42,7 +42,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -69,22 +71,31 @@ import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -205,7 +216,7 @@ public class TextIOTest {
 });
   }
 
-  private  void runTestRead(String[] expected) throws Exception {
+  private void runTestRead(String[] expected) throws Exception {
 File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
 String filename = tmpFile.getPath();
 
@@ -274,6 +285,213 @@ public class TextIOTest {
 displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar");
   }
 
+  static class TestDynamicDestinations extends DynamicDestinations {
+ResourceId baseDir;
+
+TestDynamicDestinations(ResourceId baseDir) {
+  this.baseDir = baseDir;
+}
+
+@Override
+public String getDestination(String element) {
+  // Destination is based on first character of string.
+  return element.substring(0, 1);
+}
+
+@Override
+public String getDefaultDestination() {
+  return "";
+}
+
+@Nullable
+@Override
+public Coder getDestinationCoder() {
+  return StringUtf8Coder.of();
+}
+
+@Override
+public FilenamePolicy getFilenamePolicy(String destination) {
+  return DefaultFilenamePolicy.fromStandardParameters(
+  StaticValueProvider.of(
+  baseDir.resolve("file_" + destination + ".txt", 
StandardResolveOptions.RESOLVE_FILE)),
+  null,
+  null,
+  false);
+}
+  }
+
+  class StartsWith implements Predicate {
+String prefix;
+
+StartsWith(String prefix) {
+  this.prefix = prefix;
+}
+
+@Override
+public boolean apply(@Nullable String input) {
+  return input.startsWith(prefix);
+}
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinations() throws Exception {
+ResourceId baseDir =
+FileSystems.matchNewResource(
+Files.createTempDirectory(tempFolder, 
"testDynamicDestinations").toString(), true);
+
+List elements = Lists.newArrayList("", "aaab", "baaa", "baab", 
"caaa", "caab");
+PCollection input = 
p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
+input.apply(
+TextIO.write()
+.to(new TestDynamicDestinations(baseDir))
+

[3/5] beam git commit: Adds DynamicDestinations support to FileBasedSink

2017-07-10 Thread jkff
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
new file mode 100644
index 000..e7ef0f6
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/** Some helper classes that derive from {@link 
FileBasedSink.DynamicDestinations}. */
+public class DynamicFileDestinations {
+  /** Always returns a constant {@link FilenamePolicy}. */
+  private static class ConstantFilenamePolicy extends 
DynamicDestinations {
+private final FilenamePolicy filenamePolicy;
+
+public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
+  this.filenamePolicy = filenamePolicy;
+}
+
+@Override
+public Void getDestination(T element) {
+  return (Void) null;
+}
+
+@Override
+public Coder getDestinationCoder() {
+  return null;
+}
+
+@Override
+public Void getDefaultDestination() {
+  return (Void) null;
+}
+
+@Override
+public FilenamePolicy getFilenamePolicy(Void destination) {
+  return filenamePolicy;
+}
+
+@Override
+public void populateDisplayData(DisplayData.Builder builder) {
+  filenamePolicy.populateDisplayData(builder);
+}
+  }
+
+  /**
+   * A base class for a {@link DynamicDestinations} object that returns 
differently-configured
+   * instances of {@link DefaultFilenamePolicy}.
+   */
+  private static class DefaultPolicyDestinations extends 
DynamicDestinations {
+SerializableFunction destinationFunction;
+Params emptyDestination;
+
+public DefaultPolicyDestinations(
+SerializableFunction destinationFunction, Params 
emptyDestination) {
+  this.destinationFunction = destinationFunction;
+  this.emptyDestination = emptyDestination;
+}
+
+@Override
+public Params getDestination(UserT element) {
+  return destinationFunction.apply(element);
+}
+
+@Override
+public Params getDefaultDestination() {
+  return emptyDestination;
+}
+
+@Nullable
+@Override
+public Coder getDestinationCoder() {
+  return ParamsCoder.of();
+}
+
+@Override
+public FilenamePolicy getFilenamePolicy(DefaultFilenamePolicy.Params 
params) {
+  return DefaultFilenamePolicy.fromParams(params);
+}
+  }
+
+  /** Returns a {@link DynamicDestinations} that always returns the same 
{@link FilenamePolicy}. */
+  public static  DynamicDestinations constant(FilenamePolicy 
filenamePolicy) {
+return new ConstantFilenamePolicy<>(filenamePolicy);
+  }
+
+  /**
+   * Returns a {@link DynamicDestinations} that returns instances of {@link 
DefaultFilenamePolicy}
+   * configured with the given {@link Params}.
+   */
+  public static  DynamicDestinations toDefaultPolicies(
+  SerializableFunction destinationFunction, Params 
emptyDestination) {
+return new DefaultPolicyDestinations<>(destinationFunction, 
emptyDestination);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 

Build failed in Jenkins: beam_PostCommit_Java_MavenInstall #4342

2017-07-10 Thread Apache Jenkins Server
See 


--
[...truncated 1.47 MB...]
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.http-client:google-http-client:jar:1.22.0 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
org.apache.httpcomponents:httpclient:jar:4.0.1 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
org.apache.httpcomponents:httpcore:jar:4.0.1 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding commons-codec:commons-codec:jar:1.3 
from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.cloud.bigdataoss:util:jar:1.4.5 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.api-client:google-api-client-java6:jar:1.22.0 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.api-client:google-api-client-jackson2:jar:1.22.0 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.oauth-client:google-oauth-client-java6:jar:1.22.0 from the shaded 
jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.auth:google-auth-library-oauth2-http:jar:0.6.1 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.auth:google-auth-library-credentials:jar:0.6.1 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding org.apache.avro:avro:jar:1.8.2 from 
the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
org.codehaus.jackson:jackson-core-asl:jar:1.9.13 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.thoughtworks.paranamer:paranamer:jar:2.7 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding org.tukaani:xz:jar:1.5 from the shaded 
jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.apis:google-api-services-pubsub:jar:v1-rev10-1.22.0 from the shaded 
jar.
2017-07-11T00:39:25.361 [INFO] Including com.google.guava:guava:jar:20.0 in the 
shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.cloud.datastore:datastore-v1-proto-client:jar:1.4.0 from the shaded 
jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.http-client:google-http-client-protobuf:jar:1.22.0 from the shaded 
jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.http-client:google-http-client-jackson:jar:1.22.0 from the shaded 
jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.cloud.datastore:datastore-v1-protos:jar:1.3.0 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.api.grpc:grpc-google-common-protos:jar:0.1.9 from the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding joda-time:joda-time:jar:2.4 from the 
shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.14 from 
the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding org.slf4j:slf4j-jdk14:jar:1.7.14 from 
the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
org.apache.beam:beam-sdks-common-runner-api:jar:2.2.0-SNAPSHOT from the shaded 
jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
org.apache.beam:beam-runners-core-construction-java:jar:2.2.0-SNAPSHOT from the 
shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
org.apache.beam:beam-runners-google-cloud-dataflow-java:jar:2.2.0-SNAPSHOT from 
the shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.apis:google-api-services-dataflow:jar:v1b3-rev196-1.22.0 from the 
shaded jar.
2017-07-11T00:39:25.361 [INFO] Excluding 
com.google.apis:google-api-services-clouddebugger:jar:v2-rev8-1.22.0 from the 
shaded jar.
2017-07-11T00:39:27.089 [INFO] Replacing original artifact with shaded artifact.
2017-07-11T00:39:27.089 [INFO] Replacing 

 with 

2017-07-11T00:39:27.089 [INFO] Replacing original test artifact with shaded 
test artifact.
2017-07-11T00:39:27.089 [INFO] Replacing 

 with 

2017-07-11T00:39:27.089 [INFO] Dependency-reduced POM written at: 

2017-07-11T00:39:28.144 [INFO] 
2017-07-11T00:39:28.144 [INFO] --- maven-javadoc-plugin:2.10.4:jar 
(attach-javadocs) @ beam-examples-java ---
2017-07-11T00:39:30.806 [INFO] Building jar: 

2017-07-11T00:39:31.016 [INFO] 
2017-07-11T00:39:31.016 [INFO] --- 

[GitHub] beam pull request #3538: Add a test for Avro write with RVP; fix code

2017-07-10 Thread bjchambers
GitHub user bjchambers opened a pull request:

https://github.com/apache/beam/pull/3538

Add a test for Avro write with RVP; fix code

Add a test for AvroIO using RuntimeValueProvider

Make AvroIO actually work with RuntimeValueProvider. Previously it
caused the code to be non-serializable.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [*] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [*] Make sure tests pass via `mvn clean verify`.
 - [*] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [*] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bjchambers/beam avro-io

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3538


commit 08f1adafa2792d09eecc173d05cb2650a4f9eedb
Author: bchambers 
Date:   2017-07-10T23:41:15Z

Add a test for Avro write with RVP; fix code

Add a test for AvroIO using RuntimeValueProvider

Make AvroIO actually work with RuntimeValueProvider. Previously it
caused the code to be non-serializable.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #3537: TestPipeline should support errors in expand

2017-07-10 Thread bjchambers
GitHub user bjchambers opened a pull request:

https://github.com/apache/beam/pull/3537

TestPipeline should support errors in expand

Writing a test that expects an exception during transform application is
currently not possible with TestPipeline in a NeedsRunner or
ValidatesRunner test. The exception causes the pipeline to be unrunnable.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [*] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [*] Make sure tests pass via `mvn clean verify`.
 - [*] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [*] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bjchambers/beam 
test-pipeline-construction-errors

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3537.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3537


commit 5d820c690540322f2d3c5869061703b8e460069f
Author: bchambers 
Date:   2017-07-11T00:29:24Z

TestPipeline should support errors in expand

Writing a test that expects an exception during transform application is
currently not possible with TestPipeline in a NeedsRunner or
ValidatesRunner test. The exception causes the pipeline to be unrunnable.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2598

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3364

2017-07-10 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2572) Implement an S3 filesystem for Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081374#comment-16081374
 ] 

Dmitry Demeshchuk commented on BEAM-2572:
-

How about the following plan, then?

1. Add an ability to hide pipeline options. For example, extend 
{{_BeamArgumentParser}} by overloading the {{add_argument}} method, adding a 
{{hidden=False}} parameter there.
2. Add an {{AWSOptions}} class that inherits from {{PipelineOptions}} and 
provides hidden options {{aws_access_key_id}}, {{aws_secret_access_key}} and 
{{aws_default_region}}.
3. Add an AWS extra package to {{apache_beam}} (similar to 
{{apache_beam[gcp]}}), which depends on boto and contains all the AWS-related 
code.
4. Add an ability for filesystems to be aware of the pipeline options.
5. Add the actual S3 filesystem.

I can make the corresponding tickets and start working on them.

> Implement an S3 filesystem for Python SDK
> -
>
> Key: BEAM-2572
> URL: https://issues.apache.org/jira/browse/BEAM-2572
> Project: Beam
>  Issue Type: Task
>  Components: sdk-py
>Reporter: Dmitry Demeshchuk
>Assignee: Ahmet Altay
>Priority: Minor
>
> There are two paths worth exploring, to my understanding:
> 1. Sticking to the HDFS-based approach (like it's done in Java).
> 2. Using boto/boto3 for accessing S3 through its common API endpoints.
> I personally prefer the second approach, for a few reasons:
> 1. In real life, HDFS and S3 have different consistency guarantees, therefore 
> their behaviors may contradict each other in some edge cases (say, we write 
> something to S3, but it's not immediately accessible for reading from another 
> end).
> 2. There are other AWS-based sources and sinks we may want to create in the 
> future: DynamoDB, Kinesis, SQS, etc.
> 3. boto3 already provides somewhat good logic for basic things like 
> reattempting.
> Whatever path we choose, there's another problem related to this: we 
> currently cannot pass any global settings (say, pipeline options, or just an 
> arbitrary kwarg) to a filesystem. Because of that, we'd have to setup the 
> runner nodes to have AWS keys set up in the environment, which is not trivial 
> to achieve and doesn't look too clean either (I'd rather see one single place 
> for configuring the runner options).
> Also, it's worth mentioning that I already have a janky S3 filesystem 
> implementation that only supports DirectRunner at the moment (because of the 
> previous paragraph). I'm perfectly fine finishing it myself, with some 
> guidance from the maintainers.
> Where should I move on from here, and whose input should I be looking for?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2597

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3363

2017-07-10 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-1348) Model the Fn Api

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081311#comment-16081311
 ] 

ASF GitHub Bot commented on BEAM-1348:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3482


> Model the Fn Api
> 
>
> Key: BEAM-1348
> URL: https://issues.apache.org/jira/browse/BEAM-1348
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model-fn-api
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>
> Create a proto representation of the services and data types required to 
> execute the Fn Api.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[2/2] beam git commit: [BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API concepts).

2017-07-10 Thread lcwik
[BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API 
concepts).

This closes #3482


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9d48bd5e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9d48bd5e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9d48bd5e

Branch: refs/heads/master
Commit: 9d48bd5e85f568fe0c00f304f9ae022181e25160
Parents: 9f904dc 521488f
Author: Luke Cwik 
Authored: Mon Jul 10 15:53:29 2017 -0700
Committer: Luke Cwik 
Committed: Mon Jul 10 15:53:29 2017 -0700

--
 .../fn-api/src/main/proto/beam_fn_api.proto | 151 +--
 .../harness/control/ProcessBundleHandler.java   |   4 +-
 .../fn/harness/control/RegisterHandler.java |   2 +-
 .../fn/harness/control/RegisterHandlerTest.java |   8 +-
 .../apache_beam/runners/pipeline_context.py |   2 +-
 .../runners/portability/fn_api_runner.py|   2 +-
 .../apache_beam/runners/worker/sdk_worker.py|   4 +-
 .../runners/worker/sdk_worker_test.py   |  16 +-
 8 files changed, 25 insertions(+), 164 deletions(-)
--




[1/2] beam git commit: [BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API concepts).

2017-07-10 Thread lcwik
Repository: beam
Updated Branches:
  refs/heads/master 9f904dc00 -> 9d48bd5e8


[BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API 
concepts).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/521488f8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/521488f8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/521488f8

Branch: refs/heads/master
Commit: 521488f8239547c7e93c30e75ecb2462ff114cb8
Parents: 9f904dc
Author: Luke Cwik 
Authored: Fri Jun 30 10:21:55 2017 -0700
Committer: Luke Cwik 
Committed: Mon Jul 10 15:53:07 2017 -0700

--
 .../fn-api/src/main/proto/beam_fn_api.proto | 151 +--
 .../harness/control/ProcessBundleHandler.java   |   4 +-
 .../fn/harness/control/RegisterHandler.java |   2 +-
 .../fn/harness/control/RegisterHandlerTest.java |   8 +-
 .../apache_beam/runners/pipeline_context.py |   2 +-
 .../runners/portability/fn_api_runner.py|   2 +-
 .../apache_beam/runners/worker/sdk_worker.py|   4 +-
 .../runners/worker/sdk_worker_test.py   |  16 +-
 8 files changed, 25 insertions(+), 164 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
--
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 8162bc5..9da5afe 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -38,7 +38,6 @@ option java_package = "org.apache.beam.fn.v1";
 option java_outer_classname = "BeamFnApi";
 
 import "beam_runner_api.proto";
-import "google/protobuf/any.proto";
 import "google/protobuf/timestamp.proto";
 
 /*
@@ -67,129 +66,6 @@ message Target {
   string name = 2;
 }
 
-// (Deprecated) Information defining a PCollection
-//
-// Migrate to Runner API.
-message PCollection {
-  // (Required) A reference to a coder.
-  string coder_reference = 1 [deprecated = true];
-
-  // TODO: Windowing strategy, ...
-}
-
-// (Deprecated) A primitive transform within Apache Beam.
-//
-// Migrate to Runner API.
-message PrimitiveTransform {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1 [deprecated = true];
-
-  // (Required) A function spec that is used by this primitive
-  // transform to process data.
-  FunctionSpec function_spec = 2 [deprecated = true];
-
-  // A map of distinct input names to target definitions.
-  // For example, in CoGbk this represents the tag name associated with each
-  // distinct input name and a list of primitive transforms that are associated
-  // with the specified input.
-  map inputs = 3 [deprecated = true];
-
-  // A map from local output name to PCollection definitions. For example, in
-  // DoFn this represents the tag name associated with each distinct output.
-  map outputs = 4 [deprecated = true];
-
-  // TODO: Should we model side inputs as a special type of input for a
-  // primitive transform or should it be modeled as the relationship that
-  // the predecessor input will be a view primitive transform.
-  // A map of from side input names to side inputs.
-  map side_inputs = 5 [deprecated = true];
-
-  // The user name of this step.
-  // TODO: This should really be in display data and not at this level
-  string step_name = 6 [deprecated = true];
-}
-
-/*
- * User Definable Functions
- *
- * This is still unstable mainly due to how we model the side input.
- */
-
-// (Deprecated) Defines the common elements of user-definable functions,
-// to allow the SDK to express the information the runner needs to execute 
work.
-//
-// Migrate to Runner API.
-message FunctionSpec {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1 [deprecated = true];
-
-  // (Required) A globally unique name representing this user definable
-  // function.
-  //
-  // User definable functions use the urn encodings registered such that 
another
-  // may implement the user definable function within another language.
-  //
-  // For example:
-  //urn:org.apache.beam:coder:kv:1.0
-  string urn = 2 [deprecated = true];
-
-  // (Required) Reference to specification of execution environment required to
-  // invoke this function.
-  string environment_reference = 3 [deprecated = true];
-
-  // Data used to parameterize this function. Depending on the urn, this may be
-  // optional or required.
-  google.protobuf.Any data = 4 [deprecated = true];
-}
-
-// (Deprecated) Migrate to Runner API.
-message SideInput {
-  

[GitHub] beam pull request #3482: [BEAM-1348] Remove deprecated concepts in Fn API (n...

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3482


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2371) Make Java DirectRunner demonstrate language-agnostic Runner API translation wrappers

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081293#comment-16081293
 ] 

ASF GitHub Bot commented on BEAM-2371:
--

GitHub user kennknowles opened a pull request:

https://github.com/apache/beam/pull/3536

[BEAM-2371] Use URNs, not Java classes, in immutability enforcements

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`.
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---

R: @tgroh peeled this off #3334 as I think it is separable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/beam DirectRunner-enforcements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3536.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3536


commit 311547aa561bb314a8fe743b6f4677a2eaaaca50
Author: Kenneth Knowles 
Date:   2017-07-10T22:25:11Z

Use URNs, not Java classes, in immutability enforcements




> Make Java DirectRunner demonstrate language-agnostic Runner API translation 
> wrappers
> 
>
> Key: BEAM-2371
> URL: https://issues.apache.org/jira/browse/BEAM-2371
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-direct
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> This will complete the PoC for runners-core-construction-java and the Runner 
> API and show other runners the easy path to executing non-Java pipelines, 
> modulo Fn API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3536: [BEAM-2371] Use URNs, not Java classes, in immutabi...

2017-07-10 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/beam/pull/3536

[BEAM-2371] Use URNs, not Java classes, in immutability enforcements

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`.
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---

R: @tgroh peeled this off #3334 as I think it is separable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/beam DirectRunner-enforcements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3536.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3536


commit 311547aa561bb314a8fe743b6f4677a2eaaaca50
Author: Kenneth Knowles 
Date:   2017-07-10T22:25:11Z

Use URNs, not Java classes, in immutability enforcements




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2534) KafkaIO should allow gaps in message offsets

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080984#comment-16080984
 ] 

ASF GitHub Bot commented on BEAM-2534:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3514


> KafkaIO should allow gaps in message offsets
> 
>
> Key: BEAM-2534
> URL: https://issues.apache.org/jira/browse/BEAM-2534
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.0.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: 2.1.0
>
>
> KafkaIO reader logs a warning when it notices gaps in offsets for messages. 
> While such gaps are not expected for normal Kafka topics, there could be gaps 
> when log compaction is enabled (which deletes older messages for a key). 
> This warning log is not very useful. Also we should take such gaps while 
> estimating backlog.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2530) Make Beam compatible with Java 9

2017-07-10 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-2530:
---
Fix Version/s: (was: 2.1.0)
   Not applicable

> Make Beam compatible with Java 9
> 
>
> Key: BEAM-2530
> URL: https://issues.apache.org/jira/browse/BEAM-2530
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system
>Affects Versions: Not applicable
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
> Fix For: Not applicable
>
>
> Java 9 seems to be finally been released this year, this is a JIRA to keep 
> track of the needed changes to support Beam on Java 9.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2530) Make Beam compatible with Java 9

2017-07-10 Thread JIRA

[ 
https://issues.apache.org/jira/browse/BEAM-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081224#comment-16081224
 ] 

Ismaël Mejía commented on BEAM-2530:


No, it was marked 2.1.0 by some confusion. I just changed it, all current 
changes are already in master, but there are upstream fixes (maven-enforcer + 
auto) that we need also first, so not blocking at all.

> Make Beam compatible with Java 9
> 
>
> Key: BEAM-2530
> URL: https://issues.apache.org/jira/browse/BEAM-2530
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system
>Affects Versions: Not applicable
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
> Fix For: Not applicable
>
>
> Java 9 seems to be finally been released this year, this is a JIRA to keep 
> track of the needed changes to support Beam on Java 9.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081179#comment-16081179
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

[~altay]: I was doing exactly the option 2, yes.

My guess is that there are some cached dependency issues, I've stumbled upon 
cached dependencies before. Will try to clean the virtualenv, install 
everything from scratch and see how it goes.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> 

[jira] [Comment Edited] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Sourabh Bajaj (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081145#comment-16081145
 ] 

Sourabh Bajaj edited comment on BEAM-2573 at 7/10/17 9:03 PM:
--

I think the worker is specified in 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L81
 so it is related to the version of the sdk being used.

You don't need the release if you're on master as I can run  the following 
command and see logs about failing to import the dataflow package on head and 
success for the builtin plugins.

{code:none}
python -m apache_beam.examples.wordcount --output $BUCKET/wc/output --project 
$PROJECT --staging_location $BUCKET/wc/staging --temp_location $BUCKET/wc/tmp  
--job_name "sourabhbajaj-wc-4" --runner DataflowRunner --sdk_location 
dist/apache-beam-2.1.0.dev0.tar.gz  --beam_plugin dataflow.s3.aws.S3FS
{code}


{code:none}
13:54:41.574
Failed to import beam plugin dataflow.s3.aws.S3FS
13:54:41.573
Successfully imported beam plugin apache_beam.io.filesystem.FileSystem
13:54:41.573
Successfully imported beam plugin apache_beam.io.localfilesystem.LocalFileSystem
13:54:41.573
Successfully imported beam plugin apache_beam.io.gcp.gcsfilesystem.GCSFileSystem
{code}




was (Author: sb2nov):
I think the worker is specified in 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L81
 so it is related to the version of the sdk being used.

I might be wrong about needing the release if you're on master as I can run  
the following command and see logs about failing to import the dataflow package 
on head and success for the builtin plugins.

{code:none}
python -m apache_beam.examples.wordcount --output $BUCKET/wc/output --project 
$PROJECT --staging_location $BUCKET/wc/staging --temp_location $BUCKET/wc/tmp  
--job_name "sourabhbajaj-wc-4" --runner DataflowRunner --sdk_location 
dist/apache-beam-2.1.0.dev0.tar.gz  --beam_plugin dataflow.s3.aws.S3FS
{code}


{code:none}
13:54:41.574
Failed to import beam plugin dataflow.s3.aws.S3FS
13:54:41.573
Successfully imported beam plugin apache_beam.io.filesystem.FileSystem
13:54:41.573
Successfully imported beam plugin apache_beam.io.localfilesystem.LocalFileSystem
13:54:41.573
Successfully imported beam plugin apache_beam.io.gcp.gcsfilesystem.GCSFileSystem
{code}



> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081160#comment-16081160
 ] 

Ahmet Altay commented on BEAM-2573:
---

[~demeshchuk], I belive you are mixing two SDK versions. The SDK version 
installed in your virtual environment should exactly match the sdk version you 
use with --sdk_location flag.

You have two options:
1. Use the released version. pip install the latest version and do not set the 
--sdk_location flag. You will not have the new features that will be available 
in the next version.
2. Use the head version. Build and sdk from head. Install that in your new 
virtual environment and also use --sdk_location flag to point to that sdk. You 
will have all the new features available at head.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> 

[jira] [Comment Edited] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Sourabh Bajaj (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081145#comment-16081145
 ] 

Sourabh Bajaj edited comment on BEAM-2573 at 7/10/17 9:03 PM:
--

I think the worker is specified in 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L81
 so it is related to the version of the sdk being used.

I might be wrong about needing the release if you're on master as I can run  
the following command and see logs about failing to import the dataflow package 
on head and success for the builtin plugins.

{code:none}
python -m apache_beam.examples.wordcount --output $BUCKET/wc/output --project 
$PROJECT --staging_location $BUCKET/wc/staging --temp_location $BUCKET/wc/tmp  
--job_name "sourabhbajaj-wc-4" --runner DataflowRunner --sdk_location 
dist/apache-beam-2.1.0.dev0.tar.gz  --beam_plugin dataflow.s3.aws.S3FS
{code}


{code:none}
13:54:41.574
Failed to import beam plugin dataflow.s3.aws.S3FS
13:54:41.573
Successfully imported beam plugin apache_beam.io.filesystem.FileSystem
13:54:41.573
Successfully imported beam plugin apache_beam.io.localfilesystem.LocalFileSystem
13:54:41.573
Successfully imported beam plugin apache_beam.io.gcp.gcsfilesystem.GCSFileSystem
{code}




was (Author: sb2nov):
I think the worker is specified in 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L81
 so it is related to the version of the sdk being used.

I might be wrong about needing the release if you're on master as I can run  
the following command and see logs about failing to import the dataflow package 
on head and success for the builtin plugins.

{code:shell}
python -m apache_beam.examples.wordcount --output $BUCKET/wc/output --project 
$PROJECT --staging_location $BUCKET/wc/staging --temp_location $BUCKET/wc/tmp  
--job_name "sourabhbajaj-wc-4" --runner DataflowRunner --sdk_location 
dist/apache-beam-2.1.0.dev0.tar.gz  --beam_plugin dataflow.s3.aws.S3FS
{code}


> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> 

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Sourabh Bajaj (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081145#comment-16081145
 ] 

Sourabh Bajaj commented on BEAM-2573:
-

I think the worker is specified in 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L81
 so it is related to the version of the sdk being used.

I might be wrong about needing the release if you're on master as I can run  
the following command and see logs about failing to import the dataflow package 
on head and success for the builtin plugins.

{code:shell}
python -m apache_beam.examples.wordcount --output $BUCKET/wc/output --project 
$PROJECT --staging_location $BUCKET/wc/staging --temp_location $BUCKET/wc/tmp  
--job_name "sourabhbajaj-wc-4" --runner DataflowRunner --sdk_location 
dist/apache-beam-2.1.0.dev0.tar.gz  --beam_plugin dataflow.s3.aws.S3FS
{code}


> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> 

[jira] [Created] (BEAM-2579) Improve component determinism methods in StandardCoder

2017-07-10 Thread Thomas Groh (JIRA)
Thomas Groh created BEAM-2579:
-

 Summary: Improve component determinism methods in StandardCoder
 Key: BEAM-2579
 URL: https://issues.apache.org/jira/browse/BEAM-2579
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Thomas Groh
Priority: Trivial


Effectively, add the methods that are introduced in 
https://github.com/apache/beam/pull/3049, while retaining (but deprecating) the 
static verifyDeterministic methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3402: Add a SlidingWindows Test for Combine without Conte...

2017-07-10 Thread tgroh
Github user tgroh closed the pull request at:

https://github.com/apache/beam/pull/3402


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #3049: Update signature for Deterministic Helper

2017-07-10 Thread tgroh
Github user tgroh closed the pull request at:

https://github.com/apache/beam/pull/3049


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2571) Flink ValidatesRunner failing CombineTest.testSlidingWindowsCombineWithContext

2017-07-10 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081106#comment-16081106
 ] 

Kenneth Knowles commented on BEAM-2571:
---

Since this is longstanding, and other tests of side inputs seem OK, I don't 
feel the same urgency to block 2.1.0 on it. But I am a little surprised we got 
this far. Perhaps it is just because that one test is tighter than others?

> Flink ValidatesRunner failing CombineTest.testSlidingWindowsCombineWithContext
> --
>
> Key: BEAM-2571
> URL: https://issues.apache.org/jira/browse/BEAM-2571
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Aljoscha Krettek
> Fix For: 2.1.0
>
>
> This appears to have been caused by https://github.com/apache/beam/pull/3429 
> which fixes a couple errors in how trigger timers were processed / final 
> panes labeled.
> I am investigating, considering roll back vs forward fix. Since it is an 
> esoteric use case where I would advise users to use a stateful DoFn instead, 
> I think the bug fixed probably outweighs the bug introduced. I would like to 
> fix for 2.1.0 but will report back soon.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2571) Flink ValidatesRunner failing CombineTest.testSlidingWindowsCombineWithContext

2017-07-10 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081106#comment-16081106
 ] 

Kenneth Knowles edited comment on BEAM-2571 at 7/10/17 8:40 PM:


Since this is longstanding, and other tests of side inputs seem OK, I don't 
feel the same urgency to block 2.1.0 on it because of this failure. We might 
consider blocking any release based on the danger here...

But I am a little surprised we got this far. Perhaps it is just because that 
one test is tighter than others?


was (Author: kenn):
Since this is longstanding, and other tests of side inputs seem OK, I don't 
feel the same urgency to block 2.1.0 on it. But I am a little surprised we got 
this far. Perhaps it is just because that one test is tighter than others?

> Flink ValidatesRunner failing CombineTest.testSlidingWindowsCombineWithContext
> --
>
> Key: BEAM-2571
> URL: https://issues.apache.org/jira/browse/BEAM-2571
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Aljoscha Krettek
> Fix For: 2.1.0
>
>
> This appears to have been caused by https://github.com/apache/beam/pull/3429 
> which fixes a couple errors in how trigger timers were processed / final 
> panes labeled.
> I am investigating, considering roll back vs forward fix. Since it is an 
> esoteric use case where I would advise users to use a stateful DoFn instead, 
> I think the bug fixed probably outweighs the bug introduced. I would like to 
> fix for 2.1.0 but will report back soon.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3124: Serialize all Primitive Transforms in the DirectRun...

2017-07-10 Thread tgroh
Github user tgroh closed the pull request at:

https://github.com/apache/beam/pull/3124


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2530) Make Beam compatible with Java 9

2017-07-10 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081108#comment-16081108
 ] 

Kenneth Knowles commented on BEAM-2530:
---

This is still on 2.1.0 burndown. Since it is a meta-issue, is there anything 
actually blocking 2.1.0 that can be cherry picked?

> Make Beam compatible with Java 9
> 
>
> Key: BEAM-2530
> URL: https://issues.apache.org/jira/browse/BEAM-2530
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system
>Affects Versions: Not applicable
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
> Fix For: 2.1.0
>
>
> Java 9 seems to be finally been released this year, this is a JIRA to keep 
> track of the needed changes to support Beam on Java 9.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (BEAM-2505) When EOW != GC and the timers fire in together, the output is not marked as the final pane

2017-07-10 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-2505.

Resolution: Fixed

> When EOW != GC and the timers fire in together, the output is not marked as 
> the final pane
> --
>
> Key: BEAM-2505
> URL: https://issues.apache.org/jira/browse/BEAM-2505
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2505) When EOW != GC and the timers fire in together, the output is not marked as the final pane

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081002#comment-16081002
 ] 

ASF GitHub Bot commented on BEAM-2505:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3511


> When EOW != GC and the timers fire in together, the output is not marked as 
> the final pane
> --
>
> Key: BEAM-2505
> URL: https://issues.apache.org/jira/browse/BEAM-2505
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (BEAM-2502) Processing time timers for expired windows are not ignored

2017-07-10 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-2502.

Resolution: Fixed

> Processing time timers for expired windows are not ignored
> --
>
> Key: BEAM-2502
> URL: https://issues.apache.org/jira/browse/BEAM-2502
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>Priority: Minor
> Fix For: 2.1.0
>
>
> If the ReduceFnRunner receives a processing time timer for an expired window, 
> it may produce output even though the window is expired (and may have already 
> sent a final output!)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (BEAM-2551) KafkaIO reader blocks indefinitely if servers are not reachable

2017-07-10 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-2551.

Resolution: Fixed

> KafkaIO reader blocks indefinitely if servers are not reachable
> ---
>
> Key: BEAM-2551
> URL: https://issues.apache.org/jira/browse/BEAM-2551
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.0.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
> Fix For: 2.1.0
>
>
> If the KafaIO source reader on the worker can't reach the server, Kafka 
> consumer blocks forever inside {{UnboundedReader.start()}}. Users have no 
> indication of the error. It is better if start() fails with an error. 
> It is easy to reproduce in Kafka. I reported it on Kafka users list here : 
> https://lists.apache.org/thread.html/98cebefacbd65b0d6c6817fe0b5197e26bc60252e72d05fced91e628@%3Cusers.kafka.apache.org%3E
> It blocks inside Kafka client. Fortunately it can be unblocked with 
> KafkaConsumer.wakeup(). We could run initialization in another thread and 
> cancel it if takes longer than a minute.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3511: [BEAM-2505, BEAM-2502] Fixes to ReduceFnRunner.onTi...

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3511


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[3/3] beam git commit: This closes #3511

2017-07-10 Thread jbonofre
This closes #3511


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c4ea884
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c4ea884
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c4ea884

Branch: refs/heads/release-2.1.0
Commit: 5c4ea884dcbb87cacba5ac67de3e4162ed2ffeea
Parents: 2351c7e b1e53d6
Author: Jean-Baptiste Onofré 
Authored: Mon Jul 10 22:04:55 2017 +0200
Committer: Jean-Baptiste Onofré 
Committed: Mon Jul 10 22:04:55 2017 +0200

--
 .../examples/complete/game/LeaderBoardTest.java |   2 +
 .../beam/runners/core/ReduceFnRunner.java   | 106 +--
 .../beam/runners/core/ReduceFnRunnerTest.java   |  81 +-
 3 files changed, 156 insertions(+), 33 deletions(-)
--




[1/3] beam git commit: Process timer firings for a window together

2017-07-10 Thread jbonofre
Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 2351c7e33 -> 5c4ea884d


Process timer firings for a window together


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b1e53d6d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b1e53d6d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b1e53d6d

Branch: refs/heads/release-2.1.0
Commit: b1e53d6d87a14e3116f996d44b14540b6b7bfa59
Parents: 3d7b009
Author: Kenneth Knowles 
Authored: Thu Jun 22 18:43:39 2017 -0700
Committer: Kenneth Knowles 
Committed: Thu Jul 6 14:38:58 2017 -0700

--
 .../examples/complete/game/LeaderBoardTest.java |  2 +
 .../beam/runners/core/ReduceFnRunner.java   | 98 +---
 .../beam/runners/core/ReduceFnRunnerTest.java   | 49 +-
 3 files changed, 115 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b1e53d6d/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
--
diff --git 
a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
 
b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 745c210..611e2b3 100644
--- 
a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ 
b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -276,6 +276,8 @@ public class LeaderBoardTest implements Serializable {
 .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
 event(TestUser.BLUE_TWO, 3, Duration.ZERO),
 event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+// Move the watermark to the end of the window to output on time
+.advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION))
 // Move the watermark past the end of the allowed lateness plus the 
end of the window
 .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
 .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))

http://git-wip-us.apache.org/repos/asf/beam/blob/b1e53d6d/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 0632c05..634a2d1 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -29,7 +29,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -638,11 +637,9 @@ public class ReduceFnRunner {
   }
 
   /**
-   * Enriches TimerData with state necessary for processing a timer as well as
-   * common queries about a timer.
+   * A descriptor of the activation for a window based on a timer.
*/
-  private class EnrichedTimerData {
-public final Instant timestamp;
+  private class WindowActivation {
 public final ReduceFn.Context directContext;
 public final ReduceFn.Context renamedContext;
 // If this is an end-of-window timer then we may need to set a garbage 
collection timer
@@ -653,19 +650,34 @@ public class ReduceFnRunner {
 // end-of-window time to be a signal to garbage collect.
 public final boolean isGarbageCollection;
 
-EnrichedTimerData(
-TimerData timer,
+WindowActivation(
 ReduceFn.Context directContext,
 ReduceFn.Context renamedContext) {
-  this.timestamp = timer.getTimestamp();
   this.directContext = directContext;
   this.renamedContext = renamedContext;
   W window = directContext.window();
-  this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-  && timer.getTimestamp().equals(window.maxTimestamp());
-  Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
+
+  // The output watermark is before the end of the window if it is either 
unknown
+  // or it is known to be before it. If it is unknown, that means that 
there hasn't been
+  // enough data to advance it.
+  boolean outputWatermarkBeforeEOW =
+  timerInternals.currentOutputWatermarkTime() == null
+  || 

[2/3] beam git commit: Ignore processing time timers in expired windows

2017-07-10 Thread jbonofre
Ignore processing time timers in expired windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d7b0098
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d7b0098
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d7b0098

Branch: refs/heads/release-2.1.0
Commit: 3d7b00983d2ef215639c3fefc3d8df487aac7b2e
Parents: 53b372b
Author: Kenneth Knowles 
Authored: Thu Jun 22 18:09:11 2017 -0700
Committer: Kenneth Knowles 
Committed: Thu Jul 6 14:38:58 2017 -0700

--
 .../beam/runners/core/ReduceFnRunner.java   | 10 ++
 .../beam/runners/core/ReduceFnRunnerTest.java   | 32 
 2 files changed, 42 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/3d7b0098/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index ef33bef..0632c05 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -693,6 +693,11 @@ public class ReduceFnRunner {
   @SuppressWarnings("unchecked")
 WindowNamespace windowNamespace = (WindowNamespace) 
timer.getNamespace();
   W window = windowNamespace.getWindow();
+
+  if (TimeDomain.PROCESSING_TIME == timer.getDomain() && 
windowIsExpired(window)) {
+continue;
+  }
+
   ReduceFn.Context directContext =
   contextFactory.base(window, StateStyle.DIRECT);
   ReduceFn.Context renamedContext =
@@ -1090,4 +1095,9 @@ public class ReduceFnRunner {
 }
   }
 
+  private boolean windowIsExpired(BoundedWindow w) {
+return timerInternals
+.currentInputWatermarkTime()
+
.isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3d7b0098/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3a2c220..79ee91b 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -286,6 +286,38 @@ public class ReduceFnRunnerTest {
 
   /**
* Tests that when a processing time timer comes in after a window is expired
+   * it is just ignored.
+   */
+  @Test
+  public void testLateProcessingTimeTimer() throws Exception {
+WindowingStrategy strategy =
+WindowingStrategy.of((WindowFn) 
FixedWindows.of(Duration.millis(100)))
+.withTimestampCombiner(TimestampCombiner.EARLIEST)
+.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+.withAllowedLateness(Duration.ZERO)
+.withTrigger(
+Repeatedly.forever(
+
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10;
+
+ReduceFnTester tester =
+ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+tester.advanceProcessingTime(new Instant(5000));
+injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+injectElement(tester, 5);
+
+// After this advancement, the window is expired and only the GC process
+// should be allowed to touch it
+tester.advanceInputWatermarkNoTimers(new Instant(100));
+
+// This should not output
+tester.advanceProcessingTime(new Instant(6000));
+
+assertThat(tester.extractOutput(), emptyIterable());
+  }
+
+  /**
+   * Tests that when a processing time timer comes in after a window is expired
* but in the same bundle it does not cause a spurious output.
*/
   @Test



[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080997#comment-16080997
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

[~sb2nov] Thanks for the explanation! I'll wait till the 2.1.0 release, then 
(or maybe release candidates would be good enough too).

By the way, am I right to assume that full Dataflow compatibility is only 
guaranteed for tagged Beam versions, then?

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> 

[GitHub] beam pull request #3513: [BEAM-2551] Cherrypick #3492 to 2.1.0

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3513


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2551) KafkaIO reader blocks indefinitely if servers are not reachable

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080995#comment-16080995
 ] 

ASF GitHub Bot commented on BEAM-2551:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3513


> KafkaIO reader blocks indefinitely if servers are not reachable
> ---
>
> Key: BEAM-2551
> URL: https://issues.apache.org/jira/browse/BEAM-2551
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.0.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
> Fix For: 2.1.0
>
>
> If the KafaIO source reader on the worker can't reach the server, Kafka 
> consumer blocks forever inside {{UnboundedReader.start()}}. Users have no 
> indication of the error. It is better if start() fails with an error. 
> It is easy to reproduce in Kafka. I reported it on Kafka users list here : 
> https://lists.apache.org/thread.html/98cebefacbd65b0d6c6817fe0b5197e26bc60252e72d05fced91e628@%3Cusers.kafka.apache.org%3E
> It blocks inside Kafka client. Fortunately it can be unblocked with 
> KafkaConsumer.wakeup(). We could run initialization in another thread and 
> cancel it if takes longer than a minute.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[2/2] beam git commit: This closes #3513

2017-07-10 Thread jbonofre
This closes #3513


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2351c7e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2351c7e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2351c7e3

Branch: refs/heads/release-2.1.0
Commit: 2351c7e33d2949392d3db3b1bf5a403b504824fa
Parents: 7f2419f 38fc2b2
Author: Jean-Baptiste Onofré 
Authored: Mon Jul 10 22:00:31 2017 +0200
Committer: Jean-Baptiste Onofré 
Committed: Mon Jul 10 22:00:31 2017 +0200

--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 81 +++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 30 
 2 files changed, 92 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/2351c7e3/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--



[1/2] beam git commit: Add timeout to initialization of partition in KafkaIO

2017-07-10 Thread jbonofre
Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 7f2419f09 -> 2351c7e33


Add timeout to initialization of partition in KafkaIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38fc2b2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38fc2b2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38fc2b2e

Branch: refs/heads/release-2.1.0
Commit: 38fc2b2e76fd13c3edd304ffcb5378fb36ab48c0
Parents: 53b372b
Author: Raghu Angadi 
Authored: Mon Jul 3 23:54:10 2017 -0700
Committer: Raghu Angadi 
Committed: Thu Jul 6 22:02:40 2017 -0700

--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 81 +++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 30 
 2 files changed, 92 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/38fc2b2e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 702bdd3..28262c9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,9 +49,11 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -1049,8 +1051,32 @@ public class KafkaIO {
   curBatch = Iterators.cycle(nonEmpty);
 }
 
+private void setupInitialOffset(PartitionState pState) {
+  Read spec = source.spec;
+
+  if (pState.nextOffset != UNINITIALIZED_OFFSET) {
+consumer.seek(pState.topicPartition, pState.nextOffset);
+  } else {
+// nextOffset is unininitialized here, meaning start reading from 
latest record as of now
+// ('latest' is the default, and is configurable) or 'look up offset 
by startReadTime.
+// Remember the current position without waiting until the first 
record is read. This
+// ensures checkpoint is accurate even if the reader is closed before 
reading any records.
+Instant startReadTime = spec.getStartReadTime();
+if (startReadTime != null) {
+  pState.nextOffset =
+  consumerSpEL.offsetForTime(consumer, pState.topicPartition, 
spec.getStartReadTime());
+  consumer.seek(pState.topicPartition, pState.nextOffset);
+} else {
+  pState.nextOffset = consumer.position(pState.topicPartition);
+}
+  }
+}
+
 @Override
 public boolean start() throws IOException {
+  final int defaultPartitionInitTimeout = 60 * 1000;
+  final int kafkaRequestTimeoutMultiple = 2;
+
   Read spec = source.spec;
   consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
   consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -1065,25 +1091,38 @@ public class KafkaIO {
   keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
   valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
 
-  for (PartitionState p : partitionStates) {
-if (p.nextOffset != UNINITIALIZED_OFFSET) {
-  consumer.seek(p.topicPartition, p.nextOffset);
-} else {
-  // nextOffset is unininitialized here, meaning start reading from 
latest record as of now
-  // ('latest' is the default, and is configurable) or 'look up offset 
by startReadTime.
-  // Remember the current position without waiting until the first 
record is read. This
-  // ensures checkpoint is accurate even if the reader is closed 
before reading any records.
-  Instant startReadTime = spec.getStartReadTime();
-  if (startReadTime != null) {
-p.nextOffset =
-consumerSpEL.offsetForTime(consumer, p.topicPartition, 
spec.getStartReadTime());
-consumer.seek(p.topicPartition, p.nextOffset);
-  } else {
-p.nextOffset = consumer.position(p.topicPartition);
+  // Seek to start offset for each partition. This is the first 
interaction with the server.
+  // Unfortunately it can block forever in case of network issues like 
incorrect ACLs.
+  // Initialize partition in a separate thread and 

[jira] [Resolved] (BEAM-2534) KafkaIO should allow gaps in message offsets

2017-07-10 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-2534.

Resolution: Fixed

> KafkaIO should allow gaps in message offsets
> 
>
> Key: BEAM-2534
> URL: https://issues.apache.org/jira/browse/BEAM-2534
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.0.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: 2.1.0
>
>
> KafkaIO reader logs a warning when it notices gaps in offsets for messages. 
> While such gaps are not expected for normal Kafka topics, there could be gaps 
> when log compaction is enabled (which deletes older messages for a key). 
> This warning log is not very useful. Also we should take such gaps while 
> estimating backlog.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3514: [BEAM-2534] Cherry-pick #3461 into 2.1.0.

2017-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3514


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: [BEAM-2534] Handle offset gaps in Kafka messages.

2017-07-10 Thread jbonofre
Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 de7652b2a -> 7f2419f09


[BEAM-2534] Handle offset gaps in Kafka messages.

KafkaIO logged a warning when there is a gap in offstes for messages.
Kafka also support 'KV' store style topics where some of the messages
are deleted leading gaps in offsets. This PR removes the log and
accounts for offset gaps in backlog estimate.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ffaf185
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ffaf185
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ffaf185

Branch: refs/heads/release-2.1.0
Commit: 9ffaf185ceefaf7de51b1197e00d2d54bdbb6760
Parents: 53b372b
Author: Raghu Angadi 
Authored: Wed Jun 28 12:07:06 2017 -0700
Committer: Raghu Angadi 
Committed: Thu Jul 6 22:22:13 2017 -0700

--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 49 
 1 file changed, 29 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/9ffaf185/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 702bdd3..e520367 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -904,6 +904,22 @@ public class KafkaIO {
   return name;
 }
 
+// Maintains approximate average over last 1000 elements
+private static class MovingAvg {
+  private static final int MOVING_AVG_WINDOW = 1000;
+  private double avg = 0;
+  private long numUpdates = 0;
+
+  void update(double quantity) {
+numUpdates++;
+avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
+  }
+
+  double get() {
+return avg;
+  }
+}
+
 // maintains state of each assigned partition (buffered records, consumed 
offset, etc)
 private static class PartitionState {
   private final TopicPartition topicPartition;
@@ -911,9 +927,8 @@ public class KafkaIO {
   private long latestOffset;
   private Iterator> recordIter = 
Collections.emptyIterator();
 
-  // simple moving average for size of each record in bytes
-  private double avgRecordSize = 0;
-  private static final int movingAvgWindow = 1000; // very roughly avg of 
last 1000 elements
+  private MovingAvg avgRecordSize = new MovingAvg();
+  private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log 
compaction is enabled.
 
   PartitionState(TopicPartition partition, long nextOffset) {
 this.topicPartition = partition;
@@ -921,17 +936,13 @@ public class KafkaIO {
 this.latestOffset = UNINITIALIZED_OFFSET;
   }
 
-  // update consumedOffset and avgRecordSize
-  void recordConsumed(long offset, int size) {
+  // Update consumedOffset, avgRecordSize, and avgOffsetGap
+  void recordConsumed(long offset, int size, long offsetGap) {
 nextOffset = offset + 1;
 
-// this is always updated from single thread. probably not worth 
making it an AtomicDouble
-if (avgRecordSize <= 0) {
-  avgRecordSize = size;
-} else {
-  // initially, first record heavily contributes to average.
-  avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
-}
+// This is always updated from single thread. Probably not worth 
making atomic.
+avgRecordSize.update(size);
+avgOffsetGap.update(offsetGap);
   }
 
   synchronized void setLatestOffset(long latestOffset) {
@@ -944,14 +955,15 @@ public class KafkaIO {
 if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
   return UnboundedReader.BACKLOG_UNKNOWN;
 }
-return (long) (backlogMessageCount * avgRecordSize);
+return (long) (backlogMessageCount * avgRecordSize.get());
   }
 
   synchronized long backlogMessageCount() {
 if (latestOffset < 0 || nextOffset < 0) {
   return UnboundedReader.BACKLOG_UNKNOWN;
 }
-return Math.max(0, (latestOffset - nextOffset));
+double remaining = (latestOffset - nextOffset) / (1 + 
avgOffsetGap.get());
+return Math.max(0, (long) Math.ceil(remaining));
   }
 }
 
@@ -1154,14 +1166,11 @@ public class KafkaIO {
 continue;
   }
 
-  // sanity check
-  if (offset != expected) {
-LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
- 

[2/2] beam git commit: This closes #3514

2017-07-10 Thread jbonofre
This closes #3514


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7f2419f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7f2419f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7f2419f0

Branch: refs/heads/release-2.1.0
Commit: 7f2419f0963be14abdab8cffc4562036101fdbfd
Parents: de7652b 9ffaf18
Author: Jean-Baptiste Onofré 
Authored: Mon Jul 10 21:53:33 2017 +0200
Committer: Jean-Baptiste Onofré 
Committed: Mon Jul 10 21:53:33 2017 +0200

--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 49 
 1 file changed, 29 insertions(+), 20 deletions(-)
--




[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Sourabh Bajaj (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080974#comment-16080974
 ] 

Sourabh Bajaj commented on BEAM-2573:
-

I think the plugins are working correctly if they are passing a list of class 
names to be imported at the start. You might need to wait for the next release 
as this required a change to the dataflow workers as they need to start 
importing the paths specified in the beam-plugins list. There is a release 
going on right now so that might happen in the next few days itself.

I am not sure about the crash loop in windmillio  [~charleschen] might know 
more.



> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> 

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080933#comment-16080933
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

Hmm, looks like it was indeed a dependency issue which setuptools was hiding.

I did `pip install .` from the beam repo, and the plugins made it in there.

Interestingly, Beam seems to have automatically converted the plugins list into 
a list of classes. In the Dataflow job's options, {{beam_plugins}} equals to 
{{['apache_beam.io.filesystem.FileSystem', 
'apache_beam.io.localfilesystem.LocalFileSystem', 
'apache_beam.io.gcp.gcsfilesystem.GCSFileSystem', 
'dataflow.aws.s3.S3FileSystem']}}.

Interestingly, the crash loop described in my previous comment still happens. 
I'll continue looking at the sources to see where I screwed up.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080889#comment-16080889
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

Hmm, now it's getting a bit past that, but my pipeline enters an infinite crash 
loop backoff. This is what Stackdriver prints out:
{code}
IFile "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
line 26, in  
I   
I  from dataflow_worker import windmillio 
IFile 
"/usr/local/lib/python2.7/dist-packages/dataflow_worker/windmillio.py", line 
41, in  
I  class PubSubWindmillSource(pubsub.PubSubSource): 
I  :  
I  'module' object has no attribute 'PubSubSource' 
I  checking backoff for container "python" in pod 
"dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8" 
E  Error syncing pod e6169e8537b1bd83321865dafc047ba4, skipping: failed to 
"StartContainer" for "python" with CrashLoopBackOff: "Back-off 5m0s restarting 
failed container=python 
pod=dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8_default(e6169e8537b1bd83321865dafc047ba4)"
 
I  Setting node annotation to enable volume controller attach/detach 
I  checking backoff for container "python" in pod 
"dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8" 
I  Back-off 5m0s restarting failed container=python 
pod=dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8_default(e6169e8537b1bd83321865dafc047ba4)
 
{code}

I'll try to clean up the local dependencies and see if that helps.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> 

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080867#comment-16080867
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

So, I tried to switch to the head version of the SDK and use the 
{{beam_plugins}} option.

Here are the steps I performed:

1. Did {{python setup.py sdist}} in the master branch of Beam. Also, did 
{{python setup.py install}}.
2. Created the following dictionary in Python (note the {{beam_plugins}} option 
present):
{code}
{'job_name': 's3-wordcount-example-2', 'staging_location': 
'gs://dataflow-test-gc-project-168517/s3-wordcount-example-2/staging_location', 
'runner': 'dataflow', 'streaming': False, 'runtime_type_check': False, 
'temp_location': 
'gs://dataflow-test-gc-project-168517/s3-wordcount-example-2/temporary_location',
 'setup_file': '/tmp/tmpEdRIo2/setup.py', 'dataflow_endpoint': 
'https://dataflow.googleapis.com', 'sdk_location': 
'/Users/dmitrydemeshchuk/beam/sdks/python/dist/apache-beam-2.2.0.dev0.tar.gz', 
'save_main_session': True, 'zone': 'us-west1-a', 'region': 'us-west1', 
'profile_cpu': False, 'bucket': 'gs://dataflow-test-gc-project-168517', 
'profile_memory': False, 'pipeline_type_check': True, 'project': 
'test-gc-project-168517', 'direct_runner_use_stacked_bundle': True, 
'type_check_strictness': 'DEFAULT_TO_ANY', 'beam_plugins': ' dataflow', 
'no_auth': False}
{code}
3. Created a {{PipelineOptions}} object and used it inside a pipeline: 
{{options = PipelineOptions.from_dictionary(options_dict)}}
4. Ran the pipeline in Dataflow.

Now, in the Dataflow UI I'm seeing some of the pipeline options (for example, 
{{sdk_location}} is correct), however I'm not seeing {{beam_plugins}} anywhere. 
FWIW, the job's "Dataflow SDK version" equals to "Google Cloud Dataflow SDK for 
Python 2.0.0", but {{sdk_location}} equals to 
{{/Users/dmitrydemeshchuk/beam/sdks/python/dist/apache-beam-2.2.0.dev0.tar.gz}} 
(note the 2.2.0 version).

Needless to say, the `beam_plugins` option doesn't seem to get applied to my 
pipeline, at least it fails as if the plugin wasn't imported.

I'm almost sure this has something to do with the Dataflow SDK version, but so 
far cannot find a way to make it right. Any suggestions?

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> 

Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2596

2017-07-10 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3362

2017-07-10 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-934) Findbugs doesn't pass in Java8 Examples

2017-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080773#comment-16080773
 ] 

ASF GitHub Bot commented on BEAM-934:
-

Github user eralmas7 closed the pull request at:

https://github.com/apache/beam/pull/3526


> Findbugs doesn't pass in Java8 Examples
> ---
>
> Key: BEAM-934
> URL: https://issues.apache.org/jira/browse/BEAM-934
> Project: Beam
>  Issue Type: Bug
>  Components: examples-java
>Reporter: Daniel Halperin
>Assignee: Jean-Baptiste Onofré
>  Labels: newbie, starter
>
> {code}
> [INFO] --- findbugs-maven-plugin:3.0.1:check (default) @ beam-examples-java8 
> ---
> [INFO] BugInstance size is 2
> [INFO] Error size is 0
> [INFO] Total bugs: 2
> [INFO] Result of integer multiplication cast to long in 
> org.apache.beam.examples.complete.game.injector.Injector$TeamInfo.getEndTimeInMillis()
>  [org.apache.beam.examples.complete.game.injector.Injector$TeamInfo] At 
> Injector.java:[line 170]
> [INFO] Format string should use %n rather than \n in 
> org.apache.beam.examples.complete.game.injector.InjectorUtils.createTopic(Pubsub,
>  String) [org.apache.beam.examples.complete.game.injector.InjectorUtils] At 
> InjectorUtils.java:[line 96]
> [INFO]  
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >