Build failed in Jenkins: beam_PostCommit_Java_ValidatesRunner_Apex_Gradle #721

2018-06-13 Thread Apache Jenkins Server
See 


Changes:

[swegner] Revert "Merge pull request #5611: [BEAM-4445] Filter pre-commit

--
[...truncated 27.60 MB...]
Jun 14, 2018 5:57:35 AM 
com.datatorrent.stram.StramLocalCluster$LocalStreamingContainerLauncher run
INFO: Container container-14 terminating.
Jun 14, 2018 5:57:35 AM com.datatorrent.bufferserver.server.Server$3 run
INFO: Removing ln 
LogicalNode@7f74f52fidentifier=tcp://localhost:33933/23.output.24, 
upstream=23.output.24, group=stream92/24.input, partitions=[], 
iterator=com.datatorrent.bufferserver.internal.DataList$DataListIterator@2c02098b{da=com.datatorrent.bufferserver.internal.DataList$Block@63930320{identifier=23.output.24,
 data=1048576, readingOffset=0, writingOffset=98, 
starting_window=5b2203cb0001, ending_window=5b2203cb0009, refCount=2, 
uniqueIdentifier=0, next=null, future=null}}} from dl 
DataList@72233497[identifier=23.output.24]
Jun 14, 2018 5:57:35 AM com.datatorrent.bufferserver.server.Server$3 run
INFO: Removing ln 
LogicalNode@62484c89identifier=tcp://localhost:33933/23.output.25, 
upstream=23.output.25, group=stream92/55.data1, partitions=[], 
iterator=com.datatorrent.bufferserver.internal.DataList$DataListIterator@51578ea1{da=com.datatorrent.bufferserver.internal.DataList$Block@6d259b51{identifier=23.output.25,
 data=1048576, readingOffset=0, writingOffset=98, 
starting_window=5b2203cb0001, ending_window=5b2203cb0009, refCount=2, 
uniqueIdentifier=0, next=null, future=null}}} from dl 
DataList@16541fd6[identifier=23.output.25]
Jun 14, 2018 5:57:35 AM com.datatorrent.stram.util.LoggerUtil 
getFileAppender
WARNING: Log information is unavailable. To enable log information 
log4j/logging should be configured with single FileAppender that has 
immediateFlush set to true and log level set to ERROR or greater.
Jun 14, 2018 5:57:35 AM com.datatorrent.stram.util.LoggerUtil 
getFileAppender
WARNING: Log information is unavailable. To enable log information 
log4j/logging should be configured with single FileAppender that has 
immediateFlush set to true and log level set to ERROR or greater.
Jun 14, 2018 5:57:35 AM com.datatorrent.stram.engine.StreamingContainer$2 
run
SEVERE: Operator set 
[OperatorDeployInfo[id=5,name=split4,type=INPUT,checkpoint={, 
0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream143,bufferServer=localhost
 stopped running due to an exception.
com.datatorrent.api.Operator$ShutdownException
at 
com.datatorrent.common.util.BaseOperator.shutdown(BaseOperator.java:96)
at 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator.endWindow(ApexReadUnboundedInputOperator.java:116)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:229)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1429)

Jun 14, 2018 5:57:35 AM com.datatorrent.stram.engine.StreamingContainer$2 
run
SEVERE: Operator set 
[OperatorDeployInfo[id=17,name=split8,type=INPUT,checkpoint={, 
0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream139,bufferServer=localhost
 stopped running due to an exception.
com.datatorrent.api.Operator$ShutdownException
at 
com.datatorrent.common.util.BaseOperator.shutdown(BaseOperator.java:96)
at 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator.endWindow(ApexReadUnboundedInputOperator.java:116)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:229)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1429)

Jun 14, 2018 5:57:35 AM 
com.datatorrent.stram.StramLocalCluster$UmbilicalProtocolLocalImpl log
INFO: container-25 msg: Stopped running due to an exception. 
com.datatorrent.api.Operator$ShutdownException
at 
com.datatorrent.common.util.BaseOperator.shutdown(BaseOperator.java:96)
at 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator.endWindow(ApexReadUnboundedInputOperator.java:116)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:229)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1429)

Jun 14, 2018 5:57:35 AM 
com.datatorrent.stram.StramLocalCluster$UmbilicalProtocolLocalImpl log
INFO: container-38 msg: Stopped running due to an exception. 
com.datatorrent.api.Operator$ShutdownException
at 
com.datatorrent.common.util.BaseOperator.shutdown(BaseOperator.java:96)
at 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator.endWindow(ApexReadUnboundedInputOperator.java:116)
at 

[jira] [Resolved] (BEAM-4536) Python SDK: Pubsub reading with_attributes broken for Dataflow

2018-06-13 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/BEAM-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-4536.

   Resolution: Fixed
Fix Version/s: 2.5.0

> Python SDK: Pubsub reading with_attributes broken for Dataflow
> --
>
> Key: BEAM-4536
> URL: https://issues.apache.org/jira/browse/BEAM-4536
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.5.0
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Using 
> [ReadFromPubsub|https://github.com/apache/beam/blob/e30e0c807321934e862358e1e3be32dc74374aeb/sdks/python/apache_beam/io/gcp/pubsub.py#L106](with_attributes=True)
>  will fail on Dataflow.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4551) Update Spark runner to Spark version 2.3.1

2018-06-13 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/BEAM-4551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré updated BEAM-4551:
---
Fix Version/s: 2.5.0

> Update Spark runner to Spark version 2.3.1
> --
>
> Key: BEAM-4551
> URL: https://issues.apache.org/jira/browse/BEAM-4551
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
> Fix For: 2.5.0, 2.6.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The new release of Spark runner brings the classical stability fixes but 
> important for us it brings a fix to accumulators that hitted the runner 
> during the migration to Spark 2 SPARK-23697. Also it aligns the Parquet 
> version with the one we use for Beam.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-4535) Python tests are failing for Windows

2018-06-13 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/BEAM-4535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-4535.

   Resolution: Fixed
Fix Version/s: 2.5.0

> Python tests are failing for Windows
> 
>
> Key: BEAM-4535
> URL: https://issues.apache.org/jira/browse/BEAM-4535
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Chamikara Jayalath
>Assignee: Udi Meiri
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Error is:
> Traceback (most recent call last):
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource_test.py", line 532, in test_read_auto_pattern
>     compression_type=CompressionTypes.AUTO))
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource.py", line 119, in __init__
>     self._validate()
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\options\v
> alue_provider.py", line 133, in _f
>     return fnc(self, *args, **kwargs)
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource.py", line 179, in _validate
>     'No files found based on the file pattern %s' % pattern)
> IOError: No files found based on the file pattern 
> c:\windows\temp\tmpwon5_g\mytemp*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-4555) Pre-commit file include triggering breaks phrase triggering

2018-06-13 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/BEAM-4555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-4555.

   Resolution: Fixed
Fix Version/s: 2.6.0

> Pre-commit file include triggering breaks phrase triggering
> ---
>
> Key: BEAM-4555
> URL: https://issues.apache.org/jira/browse/BEAM-4555
> Project: Beam
>  Issue Type: Bug
>  Components: testing
>Reporter: Scott Wegner
>Assignee: Scott Wegner
>Priority: Major
> Fix For: 2.6.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> In BEAM-4445 we changed pre-commit triggering on PR's to only run when 
> afffected file paths were changed. However as a side-effect, it is no longer 
> possible to use trigger phrases to manually run pre-commits if include path 
> files are not changed.
> This ability to run a pre-commit even if files are not changed is useful to 
> check the stability of the codebase before submitting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4536) Python SDK: Pubsub reading with_attributes broken for Dataflow

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4536?focusedWorklogId=111786=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111786
 ]

ASF GitHub Bot logged work on BEAM-4536:


Author: ASF GitHub Bot
Created on: 14/Jun/18 05:28
Start Date: 14/Jun/18 05:28
Worklog Time Spent: 10m 
  Work Description: jbonofre closed pull request #5607: [BEAM-4536] Remove 
with_attributes keyword from ReadFromPubSub.
URL: https://github.com/apache/beam/pull/5607
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index e45dd23bfef..6db45bdbfa5 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -108,7 +108,7 @@ class ReadFromPubSub(PTransform):
   # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
   def __init__(self, topic=None, subscription=None, id_label=None,
-   with_attributes=False, timestamp_attribute=None):
+   timestamp_attribute=None):
 """Initializes ``ReadFromPubSub``.
 
 Args:
@@ -118,12 +118,8 @@ def __init__(self, topic=None, subscription=None, 
id_label=None,
 deduplication of messages. If not provided, we cannot guarantee
 that no duplicate data will be delivered on the Pub/Sub stream. In this
 case, deduplication of the stream will be strictly best effort.
-  with_attributes:
-True - output elements will be :class:`~PubsubMessage` objects.
-False - output elements will be of type ``str`` (message payload only).
   timestamp_attribute: Message value to use as element timestamp. If None,
 uses message publishing time as the timestamp.
-Note that this argument doesn't require with_attributes=True.
 
 Timestamp values should be in one of two formats:
 
@@ -135,12 +131,13 @@ def __init__(self, topic=None, subscription=None, 
id_label=None,
   units smaller than milliseconds) may be ignored.
 """
 super(ReadFromPubSub, self).__init__()
-self.with_attributes = with_attributes
+# TODO(BEAM-4536): Add with_attributes to kwargs once fixed.
+self.with_attributes = False
 self._source = _PubSubSource(
 topic=topic,
 subscription=subscription,
 id_label=id_label,
-with_attributes=with_attributes,
+with_attributes=self.with_attributes,
 timestamp_attribute=timestamp_attribute)
 
   def expand(self, pvalue):
@@ -174,8 +171,7 @@ def __init__(self, topic=None, subscription=None, 
id_label=None):
 
   def expand(self, pvalue):
 p = (pvalue.pipeline
- | ReadFromPubSub(self.topic, self.subscription, self.id_label,
-  with_attributes=False)
+ | ReadFromPubSub(self.topic, self.subscription, self.id_label)
  | 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
 p.element_type = text_type
 return p
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index f987947d454..165c072abb1 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -63,7 +63,7 @@ def test_expand_with_topic(self):
 p.options.view_as(StandardOptions).streaming = True
 pcoll = (p
  | ReadFromPubSub('projects/fakeprj/topics/a_topic',
-  None, 'a_label', with_attributes=False,
+  None, 'a_label',
   timestamp_attribute=None)
  | beam.Map(lambda x: x))
 self.assertEqual(str, pcoll.element_type)
@@ -87,7 +87,7 @@ def test_expand_with_subscription(self):
 pcoll = (p
  | ReadFromPubSub(
  None, 'projects/fakeprj/subscriptions/a_subscription',
- 'a_label', with_attributes=False, timestamp_attribute=None)
+ 'a_label', timestamp_attribute=None)
  | beam.Map(lambda x: x))
 self.assertEqual(str, pcoll.element_type)
 
@@ -107,16 +107,17 @@ def test_expand_with_subscription(self):
   def test_expand_with_no_topic_or_subscription(self):
 with self.assertRaisesRegexp(
 ValueError, "Either a topic or subscription must be provided."):
-  ReadFromPubSub(None, None, 'a_label', with_attributes=False,
+  ReadFromPubSub(None, None, 'a_label',
  timestamp_attribute=None)
 
   def test_expand_with_both_topic_and_subscription(self):
 with self.assertRaisesRegexp(
 ValueError, "Only one of topic or subscription should be provided."):
   ReadFromPubSub('a_topic', 

[beam] branch release-2.5.0 updated (f97dbbb -> 8f65a68)

2018-06-13 Thread jbonofre
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a change to branch release-2.5.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


from f97dbbb  Merge pull request #5624 from udim/cherrypick2-2.5.0
 add d194d42  Remove with_attributes keyword from ReadFromPubSub.
 new 8f65a68  Merge pull request #5607 from udim/cherrypick-2.5.0

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/io/gcp/pubsub.py  | 14 +-
 sdks/python/apache_beam/io/gcp/pubsub_test.py | 27 +--
 2 files changed, 22 insertions(+), 19 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.


[beam] 01/01: Merge pull request #5624 from udim/cherrypick2-2.5.0

2018-06-13 Thread jbonofre
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch release-2.5.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f97dbbb2859a6de898b71c868a601efedeccfb55
Merge: c408f6e 1201ca8
Author: Jean-Baptiste Onofré 
AuthorDate: Thu Jun 14 07:28:24 2018 +0200

Merge pull request #5624 from udim/cherrypick2-2.5.0

[BEAM-4535] Add a custom _url_dirname for local filesystems.

 sdks/python/apache_beam/io/filesystem.py  |  3 +--
 sdks/python/apache_beam/io/localfilesystem.py | 11 +++
 2 files changed, 12 insertions(+), 2 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.


[beam] branch release-2.5.0 updated (c408f6e -> f97dbbb)

2018-06-13 Thread jbonofre
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a change to branch release-2.5.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


from c408f6e  Merge pull request #5640 from pabloem/release-unblock-rruner
 add 1201ca8  Add a custom _url_dirname for local filesystems.
 new f97dbbb  Merge pull request #5624 from udim/cherrypick2-2.5.0

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/io/filesystem.py  |  3 +--
 sdks/python/apache_beam/io/localfilesystem.py | 11 +++
 2 files changed, 12 insertions(+), 2 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.


[jira] [Work logged] (BEAM-4535) Python tests are failing for Windows

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4535?focusedWorklogId=111785=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111785
 ]

ASF GitHub Bot logged work on BEAM-4535:


Author: ASF GitHub Bot
Created on: 14/Jun/18 05:28
Start Date: 14/Jun/18 05:28
Worklog Time Spent: 10m 
  Work Description: jbonofre closed pull request #5624: [BEAM-4535] Add a 
custom _url_dirname for local filesystems.
URL: https://github.com/apache/beam/pull/5624
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index 752cb31bd72..8ebb6b5f029 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -528,8 +528,7 @@ def _list(self, dir_or_prefix):
 """
 raise NotImplementedError
 
-  @staticmethod
-  def _url_dirname(url_or_path):
+  def _url_dirname(self, url_or_path):
 """Like posixpath.dirname, but preserves scheme:// prefix.
 
 Args:
diff --git a/sdks/python/apache_beam/io/localfilesystem.py 
b/sdks/python/apache_beam/io/localfilesystem.py
index aab68593100..7e7f88d4e45 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -81,6 +81,17 @@ def has_dirs(self):
 """Whether this FileSystem supports directories."""
 return True
 
+  def _url_dirname(self, url_or_path):
+"""Pass through to os.path.dirname.
+
+This version uses os.path instead of posixpath to be compatible with the
+host OS.
+
+Args:
+  url_or_path: A string in the form of /some/path.
+"""
+return os.path.dirname(url_or_path)
+
   def _list(self, dir_or_prefix):
 """List files in a location.
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111785)
Time Spent: 1h 10m  (was: 1h)

> Python tests are failing for Windows
> 
>
> Key: BEAM-4535
> URL: https://issues.apache.org/jira/browse/BEAM-4535
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Chamikara Jayalath
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Error is:
> Traceback (most recent call last):
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource_test.py", line 532, in test_read_auto_pattern
>     compression_type=CompressionTypes.AUTO))
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource.py", line 119, in __init__
>     self._validate()
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\options\v
> alue_provider.py", line 133, in _f
>     return fnc(self, *args, **kwargs)
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource.py", line 179, in _validate
>     'No files found based on the file pattern %s' % pattern)
> IOError: No files found based on the file pattern 
> c:\windows\temp\tmpwon5_g\mytemp*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[beam] 01/01: Merge pull request #5607 from udim/cherrypick-2.5.0

2018-06-13 Thread jbonofre
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch release-2.5.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8f65a681440b3a1ac660b2cae60699da8c66d587
Merge: f97dbbb d194d42
Author: Jean-Baptiste Onofré 
AuthorDate: Thu Jun 14 07:28:52 2018 +0200

Merge pull request #5607 from udim/cherrypick-2.5.0

[BEAM-4536] Remove with_attributes keyword from ReadFromPubSub.

 sdks/python/apache_beam/io/gcp/pubsub.py  | 14 +-
 sdks/python/apache_beam/io/gcp/pubsub_test.py | 27 +--
 2 files changed, 22 insertions(+), 19 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.


[beam] branch master updated (8a76405 -> 8d4bc25)

2018-06-13 Thread jbonofre
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 8a76405  Merge pull request #5623 from charlesccychen/sideinputids
 add 7174df4  Revert "Merge pull request #5611: [BEAM-4445] Filter 
pre-commit triggering based on touched files"
 new 8d4bc25  Merge pull request #5638 from swegner/revert_triggering

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .test-infra/jenkins/common_job_properties.groovy   | 31 +++---
 .../jenkins/job_PreCommit_Go_GradleBuild.groovy|  7 +
 .../jenkins/job_PreCommit_Java_GradleBuild.groovy  |  8 +-
 .../job_PreCommit_Python_GradleBuild.groovy|  7 +
 4 files changed, 7 insertions(+), 46 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.


[jira] [Work logged] (BEAM-4555) Pre-commit file include triggering breaks phrase triggering

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4555?focusedWorklogId=111783=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111783
 ]

ASF GitHub Bot logged work on BEAM-4555:


Author: ASF GitHub Bot
Created on: 14/Jun/18 05:27
Start Date: 14/Jun/18 05:27
Worklog Time Spent: 10m 
  Work Description: jbonofre closed pull request #5638: [BEAM-4555] Revert 
pull request pre-commit triggering
URL: https://github.com/apache/beam/pull/5638
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.test-infra/jenkins/common_job_properties.groovy 
b/.test-infra/jenkins/common_job_properties.groovy
index d5e1b26bb47..ec19dd303c2 100644
--- a/.test-infra/jenkins/common_job_properties.groovy
+++ b/.test-infra/jenkins/common_job_properties.groovy
@@ -38,7 +38,6 @@ class common_job_properties {
 branch('${sha1}')
 extensions {
   cleanAfterCheckout()
-  disableRemotePoll() // needed for included regions PR triggering; 
see [JENKINS-23606]
   relativeTargetDirectory(checkoutDir)
 }
   }
@@ -129,8 +128,7 @@ class common_job_properties {
  String commitStatusContext,
  String prTriggerPhrase = '',
  boolean 
onlyTriggerPhraseToggle = true,
- String successComment = 
'--none--',
- List 
triggerPathPatterns = []) {
+ String successComment = 
'--none--') {
 context.triggers {
   githubPullRequest {
 admins(['asfbot'])
@@ -149,9 +147,6 @@ class common_job_properties {
 if (onlyTriggerPhraseToggle) {
   onlyTriggerPhrase()
 }
-if (!triggerPathPatterns.isEmpty()) {
-  includedRegions(triggerPathPatterns.join('\n'))
-}
 
 extensions {
   commitStatus {
@@ -198,31 +193,13 @@ class common_job_properties {
 context.switches("-Dorg.gradle.jvmargs=-Xmx${(int)perWorkerMemoryMb}m")
   }
 
-  /**
-   * Sets common config for PreCommit jobs.
-   *
-   * @param commitStatusName Status displayed in pull request for the job
-   * @param prTriggerPhrase Adding a comment to the PR with this phrase will 
trigger the job to re-run
-   * @param triggerPathPatterns List of path includes regex which will trigger 
the PR. Patterns should
-   *match the entire file path. A default set of 
paths will also be added.
-   */
-
+  // Sets common config for PreCommit jobs.
   static void setPreCommit(context,
String commitStatusName,
String prTriggerPhrase = '',
-   List triggerPathPatterns = []) {
-def defaultPathTriggers = [
-  '^build.gradle$',
-  '^build_rules.gradle$',
-  '^gradle.properties$',
-  '^gradlew$',
-  '^gradle.bat$',
-  '^settings.gradle$'
-]
-
+   String successComment = '--none--') {
 // Set pull request build trigger.
-triggerPathPatterns.addAll(defaultPathTriggers)
-setPullRequestBuildTrigger context, commitStatusName, prTriggerPhrase, 
false, '--none--', triggerPathPatterns
+setPullRequestBuildTrigger(context, commitStatusName, prTriggerPhrase, 
false, successComment)
   }
 
   // Enable triggering postcommit runs against pull requests. Users can 
comment the trigger phrase
diff --git a/.test-infra/jenkins/job_PreCommit_Go_GradleBuild.groovy 
b/.test-infra/jenkins/job_PreCommit_Go_GradleBuild.groovy
index e6a91f0df3c..6a3bc15c74b 100644
--- a/.test-infra/jenkins/job_PreCommit_Go_GradleBuild.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Go_GradleBuild.groovy
@@ -33,12 +33,7 @@ job('beam_PreCommit_Go_GradleBuild') {
 150)
 
   // Sets that this is a PreCommit job.
-  common_job_properties.setPreCommit(delegate, './gradlew :goPreCommit', 'Run 
Go PreCommit', [
-  '^model/.*$',
-  '^sdks/go/.*$',
-  '^runners/.*$',
-  '^release/.*$',
-])
+  common_job_properties.setPreCommit(delegate, './gradlew :goPreCommit', 'Run 
Go PreCommit')
   steps {
 gradle {
   rootBuildScriptDir(common_job_properties.checkoutDir)
diff --git a/.test-infra/jenkins/job_PreCommit_Java_GradleBuild.groovy 
b/.test-infra/jenkins/job_PreCommit_Java_GradleBuild.groovy
index 0bf7cc843bc..ce67d399ea6 100644
--- a/.test-infra/jenkins/job_PreCommit_Java_GradleBuild.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Java_GradleBuild.groovy
@@ -38,13 +38,7 @@ job('beam_PreCommit_Java_GradleBuild') {
   }
 
  

[beam] 01/01: Merge pull request #5638 from swegner/revert_triggering

2018-06-13 Thread jbonofre
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8d4bc25b04448cdb987ad28737cf4f13b511fd19
Merge: 8a76405 7174df4
Author: Jean-Baptiste Onofré 
AuthorDate: Thu Jun 14 07:27:28 2018 +0200

Merge pull request #5638 from swegner/revert_triggering

[BEAM-4555] Revert pull request pre-commit triggering

 .test-infra/jenkins/common_job_properties.groovy   | 31 +++---
 .../jenkins/job_PreCommit_Go_GradleBuild.groovy|  7 +
 .../jenkins/job_PreCommit_Java_GradleBuild.groovy  |  8 +-
 .../job_PreCommit_Python_GradleBuild.groovy|  7 +
 4 files changed, 7 insertions(+), 46 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.


[beam] 01/01: Merge pull request #5640 from pabloem/release-unblock-rruner

2018-06-13 Thread jbonofre
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch release-2.5.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c408f6e80fdf4d5223a79c633c825cba376f32c9
Merge: 04e9ac7 af312ed
Author: Jean-Baptiste Onofré 
AuthorDate: Thu Jun 14 07:26:42 2018 +0200

Merge pull request #5640 from pabloem/release-unblock-rruner

Ignoring reference runner tests, as reference runner is not yet standard

 .../org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java| 2 ++
 1 file changed, 2 insertions(+)

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.


[beam] branch release-2.5.0 updated (04e9ac7 -> c408f6e)

2018-06-13 Thread jbonofre
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a change to branch release-2.5.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 04e9ac7  Merge pull request #5635: [BEAM-4551] Update spark runner to 
Spark version 2.3.1
 add af312ed  Ignoring reference runner tests, as reference runner is not 
yet standard.
 new c408f6e  Merge pull request #5640 from pabloem/release-unblock-rruner

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java| 2 ++
 1 file changed, 2 insertions(+)

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.


[jira] [Commented] (BEAM-4445) Filter pre-commit triggering based on touched files

2018-06-13 Thread Scott Wegner (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511997#comment-16511997
 ] 

Scott Wegner commented on BEAM-4445:


I've [submitted a patch|https://github.com/jenkinsci/ghprb-plugin/pull/680] to 
the upstream ghprb-plugin; we'll see if it gets accepted.

> Filter pre-commit triggering based on touched files
> ---
>
> Key: BEAM-4445
> URL: https://issues.apache.org/jira/browse/BEAM-4445
> Project: Beam
>  Issue Type: Sub-task
>  Components: build-system, testing
>Reporter: Scott Wegner
>Assignee: Scott Wegner
>Priority: Minor
>  Labels: beam-site-automation-reliability
> Fix For: Not applicable
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> This is discussed in the [Beam-Site Automation 
> Reliability|https://s.apache.org/beam-site-automation] design, under 
> "Pre-Commit Job Filtering"
> The proposal is to filter pre-commit job triggered on PR's based on which 
> files are touched. The impact is that most PRs will only run one set of 
> relevant tests, rather than all three. This will decrease test overhead and 
> the impact of flaky tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : beam_PostCommit_Java_GradleBuild #767

2018-06-13 Thread Apache Jenkins Server
See 




[jira] [Work logged] (BEAM-3761) Fix Python 3 cmp function

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3761?focusedWorklogId=111768=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111768
 ]

ASF GitHub Bot logged work on BEAM-3761:


Author: ASF GitHub Bot
Created on: 14/Jun/18 04:54
Start Date: 14/Jun/18 04:54
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #4774: [BEAM-3761]Fix 
Python 3 cmp usage
URL: https://github.com/apache/beam/pull/4774#issuecomment-397170047
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111768)
Time Spent: 7h 40m  (was: 7.5h)

> Fix Python 3 cmp function
> -
>
> Key: BEAM-3761
> URL: https://issues.apache.org/jira/browse/BEAM-3761
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: holdenk
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> Various functions don't exist in Python 3 that did in python 2. This Jira is 
> to fix the use of cmp (which often will involve rewriting __cmp__ as well).
>  
> Note: there are existing PRs for basestring and unicode ( 
> [https://github.com/apache/beam/pull/4697|https://github.com/apache/beam/pull/4697,]
>  , [https://github.com/apache/beam/pull/4730] )
>  
> Note once all of the missing names/functions are fixed we can enable F821 in 
> falke8 python 3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3648) Support Splittable DoFn in Flink Batch Runner

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3648?focusedWorklogId=111769=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111769
 ]

ASF GitHub Bot logged work on BEAM-3648:


Author: ASF GitHub Bot
Created on: 14/Jun/18 04:54
Start Date: 14/Jun/18 04:54
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #4640: [BEAM-3648] 
Support Splittable DoFn in Flink Batch Runner
URL: https://github.com/apache/beam/pull/4640#issuecomment-397170028
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111769)
Time Spent: 3h  (was: 2h 50m)

> Support Splittable DoFn in Flink Batch Runner
> -
>
> Key: BEAM-3648
> URL: https://issues.apache.org/jira/browse/BEAM-3648
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3761) Fix Python 3 cmp function

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3761?focusedWorklogId=111773=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111773
 ]

ASF GitHub Bot logged work on BEAM-3761:


Author: ASF GitHub Bot
Created on: 14/Jun/18 04:56
Start Date: 14/Jun/18 04:56
Worklog Time Spent: 10m 
  Work Description: stale[bot] closed pull request #4774: [BEAM-3761]Fix 
Python 3 cmp usage
URL: https://github.com/apache/beam/pull/4774
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index 87d798bebe3..7a58fe3e212 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -90,7 +90,7 @@ def compare_path(p1, p2):
   3. If no `id` is defined for both paths, then their `names` are compared.
   """
 
-  result = cmp(p1.kind, p2.kind)
+  result = (p1.kind > p2.kind) - (p1.kind < p2.kind)
   if result != 0:
 return result
 
@@ -98,12 +98,12 @@ def compare_path(p1, p2):
 if not p2.HasField('id'):
   return -1
 
-return cmp(p1.id, p2.id)
+return (p1.id > p2.id) - (p1.id < p2.id)
 
   if p2.HasField('id'):
 return 1
 
-  return cmp(p1.name, p2.name)
+  return (p1.name > p2.name) - (p1.name < p2.name)
 
 
 def get_datastore(project):
diff --git a/sdks/python/apache_beam/io/vcfio_test.py 
b/sdks/python/apache_beam/io/vcfio_test.py
index 029515fe341..a750dc74711 100644
--- a/sdks/python/apache_beam/io/vcfio_test.py
+++ b/sdks/python/apache_beam/io/vcfio_test.py
@@ -70,15 +70,6 @@ def get_full_dir():
   return os.path.join(os.path.dirname(__file__), '..', 'testing', 'data', 
'vcf')
 
 
-# Helper method for comparing variants.
-def _variant_comparator(v1, v2):
-  if v1.reference_name == v2.reference_name:
-if v1.start == v2.start:
-  return cmp(v1.end, v2.end)
-return cmp(v1.start, v2.start)
-  return cmp(v1.reference_name, v2.reference_name)
-
-
 # Helper method for verifying equal count on PCollection.
 def _count_equals_to(expected_count):
   def _count_equal(actual_list):
diff --git a/sdks/python/apache_beam/testing/test_stream.py 
b/sdks/python/apache_beam/testing/test_stream.py
index 8a63e7bd056..db4cc66b1d0 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -21,7 +21,7 @@
 """
 
 from abc import ABCMeta
-from abc import abstractmethod
+from functools import total_ordering
 
 from apache_beam import coders
 from apache_beam import core
@@ -46,44 +46,65 @@ class Event(object):
 
   __metaclass__ = ABCMeta
 
-  def __cmp__(self, other):
-if type(self) is not type(other):
-  return cmp(type(self), type(other))
-return self._typed_cmp(other)
-
-  @abstractmethod
-  def _typed_cmp(self, other):
-raise NotImplementedError
-
 
+@total_ordering
 class ElementEvent(Event):
   """Element-producing test stream event."""
 
   def __init__(self, timestamped_values):
 self.timestamped_values = timestamped_values
 
-  def _typed_cmp(self, other):
-return cmp(self.timestamped_values, other.timestamped_values)
+  def __eq__(self, other):
+return (type(self) is type(other) and
+self.timestamped_values == other.timestamped_values)
+
+  def __ne__(self, other):
+return not self.__eq__(other)
+
+  def __lt__(self, other):
+if type(self) is not type(other):
+  return type(self) < type(other)
+return self.timestamped_values < other.timestamped_values
 
 
+@total_ordering
 class WatermarkEvent(Event):
   """Watermark-advancing test stream event."""
 
   def __init__(self, new_watermark):
 self.new_watermark = timestamp.Timestamp.of(new_watermark)
 
-  def _typed_cmp(self, other):
-return cmp(self.new_watermark, other.new_watermark)
+  def __eq__(self, other):
+return (type(self) is type(other) and
+self.new_watermark == other.new_watermark)
 
+  def __ne__(self, other):
+return not self.__eq__(other)
 
+  def __lt__(self, other):
+if type(self) is not type(other):
+  return type(self) < type(other)
+return self.new_watermark < other.new_watermark
+
+
+@total_ordering
 class ProcessingTimeEvent(Event):
   """Processing time-advancing test stream event."""
 
   def __init__(self, advance_by):
 self.advance_by = timestamp.Duration.of(advance_by)
 
-  def _typed_cmp(self, other):
-return cmp(self.advance_by, other.advance_by)
+  def __eq__(self, other):
+return (type(self) is type(other) and
+self.advance_by == other.advance_by)
+
+  def __ne__(self, other):
+return not self.__eq__(other)
+
+  def 

[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=111770=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111770
 ]

ASF GitHub Bot logged work on BEAM-2535:


Author: ASF GitHub Bot
Created on: 14/Jun/18 04:54
Start Date: 14/Jun/18 04:54
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #4700: [BEAM-2535] Added 
control to set output timestamp independent of firing time for event time 
timers. (Direct Runner implementation)
URL: https://github.com/apache/beam/pull/4700#issuecomment-397170038
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111770)
Time Spent: 3h 50m  (was: 3h 40m)

> Allow explicit output time independent of firing specification for all timers
> -
>
> Key: BEAM-2535
> URL: https://issues.apache.org/jira/browse/BEAM-2535
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Batkhuyag Batsaikhan
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Today, we have insufficient control over the event time timestamp of elements 
> output from a timer callback.
> 1. For an event time timer, it is the timestamp of the timer itself.
>  2. For a processing time timer, it is the current input watermark at the 
> time of processing.
> But for both of these, we may want to reserve the right to output a 
> particular time, aka set a "watermark hold".
> A naive implementation of a {{TimerWithWatermarkHold}} would work for making 
> sure output is not droppable, but does not fully explain window expiration 
> and late data/timer dropping.
> In the natural interpretation of a timer as a feedback loop on a transform, 
> timers should be viewed as another channel of input, with a watermark, and 
> items on that channel _all need event time timestamps even if they are 
> delivered according to a different time domain_.
> I propose that the specification for when a timer should fire should be 
> separated (with nice defaults) from the specification of the event time of 
> resulting outputs. These timestamps will determine a side channel with a new 
> "timer watermark" that constrains the output watermark.
>  - We still need to fire event time timers according to the input watermark, 
> so that event time timers fire.
>  - Late data dropping and window expiration will be in terms of the minimum 
> of the input watermark and the timer watermark. In this way, whenever a timer 
> is set, the window is not going to be garbage collected.
>  - We will need to make sure we have a way to "wake up" a window once it is 
> expired; this may be as simple as exhausting the timer channel as soon as the 
> input watermark indicates expiration of a window
> This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It 
> seems reasonable to use timers as an implementation detail (e.g. in 
> runners-core utilities) without wanting any of this additional machinery. For 
> example, if there is no possibility of output from the timer callback.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=111771=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111771
 ]

ASF GitHub Bot logged work on BEAM-3089:


Author: ASF GitHub Bot
Created on: 14/Jun/18 04:55
Start Date: 14/Jun/18 04:55
Worklog Time Spent: 10m 
  Work Description: stale[bot] closed pull request #4766: [BEAM-3089] Fix 
job parallelism resolution
URL: https://github.com/apache/beam/pull/4766
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index d6673ae246d..1ed3ac32ad8 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -93,7 +93,11 @@ def createValidatesRunnerTask(Map m) {
 group = "Verification"
 def runnerType = config.streaming ? "streaming" : "batch"
 description = "Validates the ${runnerType} runner"
-def pipelineOptions = JsonOutput.toJson(["--runner=TestFlinkRunner", 
"--streaming=${config.streaming}"])
+def pipelineOptions = JsonOutput.toJson([
+"--runner=TestFlinkRunner",
+"--streaming=${config.streaming}",
+"--parallelism=1"
+])
 systemProperty "beamTestPipelineOptions", pipelineOptions
 classpath = configurations.validatesRunner
 testClassesDirs = 
files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 610bc9d200d..dac092ee44c 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -72,7 +72,8 @@
 
   [
   "--runner=TestFlinkRunner",
-  "--streaming=false"
+  "--streaming=false",
+  "--parallelism=1"
   ]
 
   
@@ -104,7 +105,8 @@
 
   [
   "--runner=TestFlinkRunner",
-  "--streaming=true"
+  "--streaming=true",
+  "--parallelism=1"
   ]
 
   
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
deleted file mode 100644
index b745f0bd441..000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-/**
- * {@link DefaultValueFactory} for getting a default value for the parallelism 
option
- * on {@link FlinkPipelineOptions}.
- *
- * This will return either the default value from {@link 
GlobalConfiguration} or {@code 1}.
- * A valid {@link GlobalConfiguration} is only available if the program is 
executed by the Flink
- * run scripts.
- */
-public class DefaultParallelismFactory implements DefaultValueFactory 
{
-  @Override
-  public Integer create(PipelineOptions options) {
-return GlobalConfiguration.loadConfiguration()
-.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
-  }
-}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index b2cbefbc5b0..51c81650ba9 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -60,7 +60,7 @@
   void setFlinkMaster(String 

Build failed in Jenkins: beam_PostCommit_Java_ValidatesRunner_Apex_Gradle #720

2018-06-13 Thread Apache Jenkins Server
See 


--
[...truncated 27.67 MB...]
com.datatorrent.api.Operator$ShutdownException
at 
com.datatorrent.common.util.BaseOperator.shutdown(BaseOperator.java:96)
at 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator.endWindow(ApexReadUnboundedInputOperator.java:116)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:229)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1429)

Jun 14, 2018 4:37:42 AM 
com.datatorrent.stram.StramLocalCluster$UmbilicalProtocolLocalImpl log
INFO: container-5 msg: Stopped running due to an exception. 
com.datatorrent.api.Operator$ShutdownException
at 
com.datatorrent.common.util.BaseOperator.shutdown(BaseOperator.java:96)
at 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator.endWindow(ApexReadUnboundedInputOperator.java:116)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:229)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1429)

Jun 14, 2018 4:37:42 AM 
com.datatorrent.stram.StramLocalCluster$UmbilicalProtocolLocalImpl log
INFO: container-52 msg: Stopped running due to an exception. 
com.datatorrent.api.Operator$ShutdownException
at 
com.datatorrent.common.util.BaseOperator.shutdown(BaseOperator.java:96)
at 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator.endWindow(ApexReadUnboundedInputOperator.java:116)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:229)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1429)

Jun 14, 2018 4:37:42 AM 
com.datatorrent.stram.StramLocalCluster$LocalStreamingContainerLauncher run
INFO: Container container-52 terminating.
Jun 14, 2018 4:37:42 AM 
com.datatorrent.stram.StramLocalCluster$LocalStreamingContainerLauncher run
INFO: Container container-5 terminating.
Jun 14, 2018 4:37:42 AM com.datatorrent.bufferserver.server.Server$3 run
INFO: Removing ln 
LogicalNode@29190befidentifier=tcp://localhost:33905/12.output.10, 
upstream=12.output.10, group=stream76/13.data1, partitions=[], 
iterator=com.datatorrent.bufferserver.internal.DataList$DataListIterator@48f0010c{da=com.datatorrent.bufferserver.internal.DataList$Block@7ffd43eb{identifier=12.output.10,
 data=1048576, readingOffset=0, writingOffset=2281, 
starting_window=5b21f1120001, ending_window=5b21f112000a, refCount=2, 
uniqueIdentifier=0, next=null, future=null}}} from dl 
DataList@11263ad7[identifier=12.output.10]
Jun 14, 2018 4:37:42 AM com.datatorrent.bufferserver.server.Server$3 run
INFO: Removing ln 
LogicalNode@3fda97bdidentifier=tcp://localhost:33905/2.output.2, 
upstream=2.output.2, group=stream131/3.data1, partitions=[], 
iterator=com.datatorrent.bufferserver.internal.DataList$DataListIterator@7704ab58{da=com.datatorrent.bufferserver.internal.DataList$Block@3f342cd5{identifier=2.output.2,
 data=1048576, readingOffset=0, writingOffset=2381, 
starting_window=5b21f1120001, ending_window=5b21f112000a, refCount=2, 
uniqueIdentifier=0, next=null, future=null}}} from dl 
DataList@2a43d2c3[identifier=2.output.2]
Jun 14, 2018 4:37:42 AM com.datatorrent.stram.util.LoggerUtil 
getFileAppender
WARNING: Log information is unavailable. To enable log information 
log4j/logging should be configured with single FileAppender that has 
immediateFlush set to true and log level set to ERROR or greater.
Jun 14, 2018 4:37:42 AM com.datatorrent.stram.util.LoggerUtil 
getFileAppender
WARNING: Log information is unavailable. To enable log information 
log4j/logging should be configured with single FileAppender that has 
immediateFlush set to true and log level set to ERROR or greater.
Jun 14, 2018 4:37:42 AM com.datatorrent.stram.util.LoggerUtil 
getFileAppender
WARNING: Log information is unavailable. To enable log information 
log4j/logging should be configured with single FileAppender that has 
immediateFlush set to true and log level set to ERROR or greater.
Jun 14, 2018 4:37:42 AM com.datatorrent.stram.util.LoggerUtil 
getFileAppender
WARNING: Log information is unavailable. To enable log information 
log4j/logging should be configured with single FileAppender that has 
immediateFlush set to true and log level set to ERROR or greater.
Jun 14, 2018 4:37:42 AM com.datatorrent.stram.engine.StreamingContainer$2 
run
SEVERE: Operator set 
[OperatorDeployInfo[id=11,name=split1,type=INPUT,checkpoint={, 
0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream66,bufferServer=localhost
 stopped running due to an exception.
com.datatorrent.api.Operator$ShutdownException
at 

[jira] [Work logged] (BEAM-4302) Fix to dependency hell

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4302?focusedWorklogId=111766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111766
 ]

ASF GitHub Bot logged work on BEAM-4302:


Author: ASF GitHub Bot
Created on: 14/Jun/18 04:20
Start Date: 14/Jun/18 04:20
Worklog Time Spent: 10m 
  Work Description: yifanzou commented on issue #5406: Do Not Merge, 
[BEAM-4302] add beam dependency checks
URL: https://github.com/apache/beam/pull/5406#issuecomment-397165920
 
 
   Run Seed Job


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111766)
Time Spent: 54h 40m  (was: 54.5h)

> Fix to dependency hell
> --
>
> Key: BEAM-4302
> URL: https://issues.apache.org/jira/browse/BEAM-4302
> Project: Beam
>  Issue Type: New Feature
>  Components: testing
>Reporter: yifan zou
>Assignee: yifan zou
>Priority: Major
>  Time Spent: 54h 40m
>  Remaining Estimate: 0h
>
> # For Java, a daily Jenkins test to compare version of all Beam dependencies 
> to the latest version available in Maven Central.
>  # For Python, a daily Jenkins test to compare versions of all Beam 
> dependencies to the latest version available in PyPI.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: beam_PerformanceTests_JDBC #720

2018-06-13 Thread Apache Jenkins Server
See 


Changes:

[robinyq] Defer calling formatTimestamp() to achieve better performance

[ccy] [BEAM-4549] Use per-pipeline unique ids for side inputs in

[robbe.sneyders] Futurize utils subpackage

[robinyq] Throws exception directly instead of calling checkState()

[aaltay] Futurize portability subpackage (#5385)

[altay] Futurize unpackaged files

[altay] resolved six.string_types equivalency

[altay] Futurize testing subpackage

[altay] Futurize tools subpackage

[altay] Remove old_div

--
[...truncated 127.27 KB...]
Jun 14, 2018 4:13:51 AM org.postgresql.ds.common.BaseDataSource 
getConnection
SEVERE: Failed to create a Non-Pooling DataSource from PostgreSQL JDBC 
Driver 42.2.2 for postgres at 
jdbc:postgresql://35.184.20.141:5432/postgres?ssl=false: 
org.postgresql.util.PSQLException: The connection attempt failed.

Gradle Test Executor 1 finished executing tests.

> Task :beam-sdks-java-io-jdbc:integrationTest FAILED

org.apache.beam.sdk.io.jdbc.JdbcIOIT > classMethod FAILED
org.postgresql.util.PSQLException: The connection attempt failed.
at 
org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:257)
at 
org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.(PgConnection.java:195)
at org.postgresql.Driver.makeConnection(Driver.java:452)
at org.postgresql.Driver.connect(Driver.java:254)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at 
org.postgresql.ds.common.BaseDataSource.getConnection(BaseDataSource.java:94)
at 
org.postgresql.ds.common.BaseDataSource.getConnection(BaseDataSource.java:79)
at 
org.apache.beam.sdk.io.common.DatabaseTestHelper.createTable(DatabaseTestHelper.java:46)
at org.apache.beam.sdk.io.jdbc.JdbcIOIT.setup(JdbcIOIT.java:85)

Caused by:
java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.postgresql.core.PGStream.(PGStream.java:69)
at 
org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:156)
... 10 more

org.apache.beam.sdk.io.jdbc.JdbcIOIT > classMethod FAILED
org.postgresql.util.PSQLException: The connection attempt failed.
at 
org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:257)
at 
org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.(PgConnection.java:195)
at org.postgresql.Driver.makeConnection(Driver.java:452)
at org.postgresql.Driver.connect(Driver.java:254)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at 
org.postgresql.ds.common.BaseDataSource.getConnection(BaseDataSource.java:94)
at 
org.postgresql.ds.common.BaseDataSource.getConnection(BaseDataSource.java:79)
at 
org.apache.beam.sdk.io.common.DatabaseTestHelper.deleteTable(DatabaseTestHelper.java:57)
at org.apache.beam.sdk.io.jdbc.JdbcIOIT.tearDown(JdbcIOIT.java:90)

Caused by:
java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.postgresql.core.PGStream.(PGStream.java:69)
at 
org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:156)
... 10 more
Finished generating test XML results (0.014 secs) into: 

Generating HTML test report...
Finished generating test html results (0.021 secs) into: 

Build failed in Jenkins: beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle #496

2018-06-13 Thread Apache Jenkins Server
See 


Changes:

[robinyq] Defer calling formatTimestamp() to achieve better performance

[ccy] [BEAM-4549] Use per-pipeline unique ids for side inputs in

[robinyq] Throws exception directly instead of calling checkState()

--
[...truncated 54.68 MB...]
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
at 
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:391)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:360)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:288)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.AssertionError: OutputSideInputs/ParMultiDo(Anonymous).output: 
org/hamcrest/Matchers
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at 
org.apache.beam.sdk.testing.PAssert$DefaultConcludeFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at 
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
at 
org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn.processElement(PAssert.java:1215)
at 
org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at 
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:42)
at 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:131)
at 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at 

[jira] [Work logged] (BEAM-4333) Add integration tests for mobile game examples

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4333?focusedWorklogId=111749=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111749
 ]

ASF GitHub Bot logged work on BEAM-4333:


Author: ASF GitHub Bot
Created on: 14/Jun/18 01:24
Start Date: 14/Jun/18 01:24
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#5630: [BEAM-4333] Add integration tests for python mobile games
URL: https://github.com/apache/beam/pull/5630#discussion_r195280612
 
 

 ##
 File path: sdks/python/apache_beam/examples/leader_board_it_test.py
 ##
 @@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the leader board example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
+Usage:
+
+  python setup.py nosetests --test-pipeline-options=" \
+  --runner=TestDataflowRunner \
+  --project=... \
+  --staging_location=gs://... \
+  --temp_location=gs://... \
+  --output=gs://... \
+  --sdk_location=... \
+
+"""
+
+import logging
+import time
+import unittest
+import uuid
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import leader_board
+from apache_beam.io.gcp.tests import utils
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class LeaderBoardIT(unittest.TestCase):
+
+  # Input event containing user, team, score, processing time, window start.
+  INPUT_EVENT = 'user1,teamA,10,%d,2015-11-02 09:09:28.224'
+  INPUT_TOPIC = 'leader_board_it_input_topic'
+  INPUT_SUB = 'leader_board_it_input_subscription'
+
+  # SHA-1 hash generated from sorted rows reading from BigQuery table
+  DEFAULT_EXPECTED_CHECKSUM = 'de00231fe6730b972c0ff60a99988438911cda53'
+  OUTPUT_DATASET = 'leader_board_it_dataset'
+  OUTPUT_TABLE_USERS = 'leader_board_users'
+  OUTPUT_TABLE_TEAMS = 'leader_board_teams'
+  DEFAULT_INPUT_COUNT = 500
+
+  WAIT_UNTIL_FINISH_DURATION = 10 * 60 * 1000   # in milliseconds
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+_unique_id = str(uuid.uuid4())
+
+# Set up PubSub environment.
+from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(project=self.project)
+unique_topic_name = self.INPUT_TOPIC + _unique_id
+unique_subscrition_name = self.INPUT_SUB + _unique_id
+self.input_topic = self.pubsub_client.topic(unique_topic_name)
+self.input_sub = self.input_topic.subscription(unique_subscrition_name)
+
+self.input_topic.create()
+test_utils.wait_for_topics_created([self.input_topic])
+self.input_sub.create()
+
+# Set up BigQuery environment
+from google.cloud import bigquery
+client = bigquery.Client()
+unique_dataset_name = self.OUTPUT_DATASET + str(int(time.time()))
+self.dataset = client.dataset(unique_dataset_name, project=self.project)
+self.dataset.create()
+
+self._test_timestamp = int(time.time() * 1000)
+
+  def _inject_pubsub_game_events(self, topic, message_count):
+"""Inject game events as test data to PubSub."""
+
+logging.debug('Injecting %d game events to topic %s',
+  message_count, topic.full_name)
+
+for _ in xrange(message_count):
+  topic.publish(self.INPUT_EVENT % self._test_timestamp)
+
+  def _cleanup_pubsub(self):
+test_utils.cleanup_subscriptions([self.input_sub])
+test_utils.cleanup_topics([self.input_topic])
+
+  def tearDown(self):
+self._cleanup_pubsub()
+
+  @attr('IT')
+  def test_leader_board_it(self):
+state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
+
+success_condition = 'total_score=5000 LIMIT 1'
+users_query = ('SELECT total_score FROM [%s:%s.%s] '
+   'WHERE %s' % (self.project,
+  

[jira] [Work logged] (BEAM-3949) IOIT's setup() and teardown() db connection attempt sometimes fail resulting in test flakiness

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3949?focusedWorklogId=111753=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111753
 ]

ASF GitHub Bot logged work on BEAM-3949:


Author: ASF GitHub Bot
Created on: 14/Jun/18 01:24
Start Date: 14/Jun/18 01:24
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #5434: [BEAM-3949] 
IOIT's setup() and teardown() db connection attempt sometimes fail resulting in 
test flakiness
URL: https://github.com/apache/beam/pull/5434#issuecomment-397137548
 
 
   Also, please fixup to a single (or few) commits for merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111753)
Time Spent: 7h 50m  (was: 7h 40m)

> IOIT's setup() and teardown() db connection attempt sometimes fail resulting 
> in test flakiness
> --
>
> Key: BEAM-3949
> URL: https://issues.apache.org/jira/browse/BEAM-3949
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Łukasz Gajowy
>Assignee: Kasia Kucharczyk
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> setup() and teardown() methods sometimes have trouble connecting database in 
> Performance tests. It results in test flakiness. 
> Example logs: 
> [https://builds.apache.org/view/A-D/view/Beam/job/beam_PerformanceTests_HadoopInputFormat/65/console]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3949) IOIT's setup() and teardown() db connection attempt sometimes fail resulting in test flakiness

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3949?focusedWorklogId=111751=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111751
 ]

ASF GitHub Bot logged work on BEAM-3949:


Author: ASF GitHub Bot
Created on: 14/Jun/18 01:24
Start Date: 14/Jun/18 01:24
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5434: [BEAM-3949] IOIT's setup() and teardown() db connection attempt 
sometimes fail resulting in test flakiness
URL: https://github.com/apache/beam/pull/5434#discussion_r195279925
 
 

 ##
 File path: 
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
 ##
 @@ -50,4 +52,46 @@ public static String getHashForRecordCount(int recordCount, 
Map
 
 return PipelineOptionsValidator.validate(optionsType, options);
   }
+
+  /** Interface for passing method to executeWithRetry method. */
+  @FunctionalInterface
+  public interface RetryFunction {
+void run() throws Exception;
+  }
+
+  public static void executeWithRetry(RetryFunction function) throws Exception 
{
+executeWithRetry(maxAttempts, minDelay, function);
+  }
+
+  public static void executeWithRetry(int maxAttempts, long minDelay, 
RetryFunction function)
+  throws Exception {
+ArrayList errorList = new ArrayList<>();
+int attempts = 1;
+long delay = minDelay;
+
+while (attempts <= maxAttempts) {
+  try {
+function.run();
+return;
+  } catch (Exception e) {
+LOG.warn("Attempt #{} of {} failed.", attempts, maxAttempts);
+errorList.add(e);
+if (attempts == maxAttempts) {
+  for (int i = 0; i < errorList.size(); i++) {
+LOG.error(
 
 Review comment:
   I think we should remove this extra LOG.error(). Warning above combined with 
raised exception should be enough.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111751)
Time Spent: 7h 40m  (was: 7.5h)

> IOIT's setup() and teardown() db connection attempt sometimes fail resulting 
> in test flakiness
> --
>
> Key: BEAM-3949
> URL: https://issues.apache.org/jira/browse/BEAM-3949
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Łukasz Gajowy
>Assignee: Kasia Kucharczyk
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> setup() and teardown() methods sometimes have trouble connecting database in 
> Performance tests. It results in test flakiness. 
> Example logs: 
> [https://builds.apache.org/view/A-D/view/Beam/job/beam_PerformanceTests_HadoopInputFormat/65/console]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3949) IOIT's setup() and teardown() db connection attempt sometimes fail resulting in test flakiness

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3949?focusedWorklogId=111752=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111752
 ]

ASF GitHub Bot logged work on BEAM-3949:


Author: ASF GitHub Bot
Created on: 14/Jun/18 01:24
Start Date: 14/Jun/18 01:24
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5434: [BEAM-3949] IOIT's setup() and teardown() db connection attempt 
sometimes fail resulting in test flakiness
URL: https://github.com/apache/beam/pull/5434#discussion_r195280639
 
 

 ##
 File path: 
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
 ##
 @@ -50,4 +52,46 @@ public static String getHashForRecordCount(int recordCount, 
Map
 
 return PipelineOptionsValidator.validate(optionsType, options);
   }
+
+  /** Interface for passing method to executeWithRetry method. */
 
 Review comment:
   Please add more documentation describing the RetryFunction and 
executeWithRetry and when it should be used. Also mention that this is a 
blanket retry that does not consider the error code returned by the service 
(for example, we will retry client errors).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111752)
Time Spent: 7h 40m  (was: 7.5h)

> IOIT's setup() and teardown() db connection attempt sometimes fail resulting 
> in test flakiness
> --
>
> Key: BEAM-3949
> URL: https://issues.apache.org/jira/browse/BEAM-3949
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Łukasz Gajowy
>Assignee: Kasia Kucharczyk
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> setup() and teardown() methods sometimes have trouble connecting database in 
> Performance tests. It results in test flakiness. 
> Example logs: 
> [https://builds.apache.org/view/A-D/view/Beam/job/beam_PerformanceTests_HadoopInputFormat/65/console]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4546) Implement with hot key fanout for combiners

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4546?focusedWorklogId=111750=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111750
 ]

ASF GitHub Bot logged work on BEAM-4546:


Author: ASF GitHub Bot
Created on: 14/Jun/18 01:24
Start Date: 14/Jun/18 01:24
Worklog Time Spent: 10m 
  Work Description: katsiapis commented on issue #5639: [BEAM-4546] Multi 
level combine
URL: https://github.com/apache/beam/pull/5639#issuecomment-397137448
 
 
   R: @xinzha623 
   
   Xin, could you and Foo review this? It will be useful for TFMA 
implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111750)
Time Spent: 1h 20m  (was: 1h 10m)

> Implement with hot key fanout for combiners
> ---
>
> Key: BEAM-4546
> URL: https://issues.apache.org/jira/browse/BEAM-4546
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4483) Update release guide with how to perform release validations

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4483?focusedWorklogId=111739=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111739
 ]

ASF GitHub Bot logged work on BEAM-4483:


Author: ASF GitHub Bot
Created on: 14/Jun/18 00:55
Start Date: 14/Jun/18 00:55
Worklog Time Spent: 10m 
  Work Description: boyuanzz opened a new pull request #468: [BEAM-4483]Add 
instructions about how to perform release validations
URL: https://github.com/apache/beam-site/pull/468
 
 
   r: @aaltay @alanmyrvold
   cc: @pabloem 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111739)
Time Spent: 10m
Remaining Estimate: 0h

> Update release guide with how to perform release validations
> 
>
> Key: BEAM-4483
> URL: https://issues.apache.org/jira/browse/BEAM-4483
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4302) Fix to dependency hell

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4302?focusedWorklogId=111735=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111735
 ]

ASF GitHub Bot logged work on BEAM-4302:


Author: ASF GitHub Bot
Created on: 14/Jun/18 00:42
Start Date: 14/Jun/18 00:42
Worklog Time Spent: 10m 
  Work Description: yifanzou commented on issue #5406: Do Not Merge, 
[BEAM-4302] add beam dependency checks
URL: https://github.com/apache/beam/pull/5406#issuecomment-397131161
 
 
   Run Seed Job


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111735)
Time Spent: 54.5h  (was: 54h 20m)

> Fix to dependency hell
> --
>
> Key: BEAM-4302
> URL: https://issues.apache.org/jira/browse/BEAM-4302
> Project: Beam
>  Issue Type: New Feature
>  Components: testing
>Reporter: yifan zou
>Assignee: yifan zou
>Priority: Major
>  Time Spent: 54.5h
>  Remaining Estimate: 0h
>
> # For Java, a daily Jenkins test to compare version of all Beam dependencies 
> to the latest version available in Maven Central.
>  # For Python, a daily Jenkins test to compare versions of all Beam 
> dependencies to the latest version available in PyPI.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4535) Python tests are failing for Windows

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4535?focusedWorklogId=111734=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111734
 ]

ASF GitHub Bot logged work on BEAM-4535:


Author: ASF GitHub Bot
Created on: 14/Jun/18 00:39
Start Date: 14/Jun/18 00:39
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #5624: [BEAM-4535] Add a 
custom _url_dirname for local filesystems.
URL: https://github.com/apache/beam/pull/5624#issuecomment-397130838
 
 
   I would guess that `IllegalStateException` is unrelated to this python only 
change. Is it correct?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111734)
Time Spent: 1h  (was: 50m)

> Python tests are failing for Windows
> 
>
> Key: BEAM-4535
> URL: https://issues.apache.org/jira/browse/BEAM-4535
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Chamikara Jayalath
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Error is:
> Traceback (most recent call last):
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource_test.py", line 532, in test_read_auto_pattern
>     compression_type=CompressionTypes.AUTO))
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource.py", line 119, in __init__
>     self._validate()
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\options\v
> alue_provider.py", line 133, in _f
>     return fnc(self, *args, **kwargs)
>   File 
> "C:\Users\deft-testing-integra\python_sdk_download\apache_beam\io\fileba
> sedsource.py", line 179, in _validate
>     'No files found based on the file pattern %s' % pattern)
> IOError: No files found based on the file pattern 
> c:\windows\temp\tmpwon5_g\mytemp*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4546) Implement with hot key fanout for combiners

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4546?focusedWorklogId=111732=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111732
 ]

ASF GitHub Bot logged work on BEAM-4546:


Author: ASF GitHub Bot
Created on: 14/Jun/18 00:35
Start Date: 14/Jun/18 00:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5639: 
[BEAM-4546] Multi level combine
URL: https://github.com/apache/beam/pull/5639#discussion_r195274961
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1337,6 +1372,75 @@ def default_type_hints(self):
 return hints
 
 
+class _CombinePerKeyWithHotKeyFanout(PTransform):
+
+  def __init__(self, combine_fn, fanout):
+self._fanout_fn = (
+(lambda key: fanout) if isinstance(fanout, int) else fanout)
+self._combine_fn = combine_fn
+
+  def expand(self, pcoll):
+
+from apache_beam.transforms.trigger import AccumulationMode
+combine_fn = self._combine_fn
+fanout_fn = self._fanout_fn
+
+class SplitHotCold(DoFn):
+  counter = 0
+
+  def process(self, element):
+key, value = element
+fanout = fanout_fn(key)
+if fanout <= 1:
+  # Boolean indicates this is not an accumulator.
+  yield pvalue.TaggedOutput('cold', (key, (False, value)))
+else:
+  self.counter += 1  # Round-robin should be more even than random.
 
 Review comment:
   More even. But this is ambitious with "even than." I'll reword.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111732)
Time Spent: 1h 10m  (was: 1h)

> Implement with hot key fanout for combiners
> ---
>
> Key: BEAM-4546
> URL: https://issues.apache.org/jira/browse/BEAM-4546
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4546) Implement with hot key fanout for combiners

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4546?focusedWorklogId=111730=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111730
 ]

ASF GitHub Bot logged work on BEAM-4546:


Author: ASF GitHub Bot
Created on: 14/Jun/18 00:34
Start Date: 14/Jun/18 00:34
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #5639: 
[BEAM-4546] Multi level combine
URL: https://github.com/apache/beam/pull/5639#discussion_r195274480
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1337,6 +1372,75 @@ def default_type_hints(self):
 return hints
 
 
+class _CombinePerKeyWithHotKeyFanout(PTransform):
+
+  def __init__(self, combine_fn, fanout):
+self._fanout_fn = (
+(lambda key: fanout) if isinstance(fanout, int) else fanout)
+self._combine_fn = combine_fn
+
+  def expand(self, pcoll):
+
+from apache_beam.transforms.trigger import AccumulationMode
+combine_fn = self._combine_fn
+fanout_fn = self._fanout_fn
+
+class SplitHotCold(DoFn):
+  counter = 0
+
+  def process(self, element):
+key, value = element
+fanout = fanout_fn(key)
+if fanout <= 1:
+  # Boolean indicates this is not an accumulator.
+  yield pvalue.TaggedOutput('cold', (key, (False, value)))
+else:
+  self.counter += 1  # Round-robin should be more even than random.
 
 Review comment:
   nit; "should be more" more what?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111730)
Time Spent: 1h  (was: 50m)

> Implement with hot key fanout for combiners
> ---
>
> Key: BEAM-4546
> URL: https://issues.apache.org/jira/browse/BEAM-4546
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4546) Implement with hot key fanout for combiners

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4546?focusedWorklogId=111731=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111731
 ]

ASF GitHub Bot logged work on BEAM-4546:


Author: ASF GitHub Bot
Created on: 14/Jun/18 00:34
Start Date: 14/Jun/18 00:34
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #5639: 
[BEAM-4546] Multi level combine
URL: https://github.com/apache/beam/pull/5639#discussion_r195274812
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1191,6 +1198,34 @@ class CombinePerKey(PTransformWithSideInputs):
   Returns:
 A PObject holding the result of the combine operation.
   """
+  def with_hot_key_fanout(self, fanout):
+"""A per-key combine operation like self but with two levels of 
aggregation.
+
+If a given key is produced by too many upstream bundles, the final
+reduction can become a bottleneck despite partial combining being lifted
+pre-GroupByKey.  In these cases it can be helpful to perform intermediate
+partial aggregations in parallel and then re-group to peform a final
+(per-key) combine.  This is also useful for high-volume keys in streaming
+where combiners are not generally lifted for latency reasons.
+
+Note that a fanout greater than 1 requires the data to be sent through
+two GroupByKeys, and a high fanout can also result in more shuffle data
+due to less per-bundle combining. Setting the fanout for a key at 1 or less
+places values on the "cold key" path that skip the intermeidate level of
 
 Review comment:
   nit; intermeidate -> intermediate


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111731)
Time Spent: 1h  (was: 50m)

> Implement with hot key fanout for combiners
> ---
>
> Key: BEAM-4546
> URL: https://issues.apache.org/jira/browse/BEAM-4546
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4546) Implement with hot key fanout for combiners

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4546?focusedWorklogId=111728=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111728
 ]

ASF GitHub Bot logged work on BEAM-4546:


Author: ASF GitHub Bot
Created on: 14/Jun/18 00:20
Start Date: 14/Jun/18 00:20
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5639: 
[BEAM-4546] Multi level combine
URL: https://github.com/apache/beam/pull/5639#discussion_r195273106
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1337,6 +1350,74 @@ def default_type_hints(self):
 return hints
 
 
+class _CombinePerKeyWithHotKeyFanout(PTransform):
+
+  def __init__(self, combine_fn, fanout):
+self._fanout_fn = (
+(lambda key: fanout) if isinstance(fanout, int) else fanout)
+self._combine_fn = combine_fn
+
+  def expand(self, pcoll):
+
+from apache_beam.transforms.trigger import AccumulationMode
+combine_fn = self._combine_fn
+fanout_fn = self._fanout_fn
+
+class SplitHotCold(DoFn):
+  counter = 0
+  def process(self, element):
+key, value = element
+fanout = fanout_fn(key)
+if not fanout or fanout is 1:
 
 Review comment:
   I've changed it to require an integer. 
   
   There are definitely cases where some keys are much more frequent than 
others, and can be detected. (E.g. word distributions in natural languages.) 
However, this also accepts a plain integer, which I expect will be use most of 
the time, which is why I simply called it `fanout`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111728)
Time Spent: 50m  (was: 40m)

> Implement with hot key fanout for combiners
> ---
>
> Key: BEAM-4546
> URL: https://issues.apache.org/jira/browse/BEAM-4546
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4546) Implement with hot key fanout for combiners

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4546?focusedWorklogId=111726=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111726
 ]

ASF GitHub Bot logged work on BEAM-4546:


Author: ASF GitHub Bot
Created on: 14/Jun/18 00:18
Start Date: 14/Jun/18 00:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5639: 
[BEAM-4546] Multi level combine
URL: https://github.com/apache/beam/pull/5639#discussion_r195272889
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1191,6 +1198,12 @@ class CombinePerKey(PTransformWithSideInputs):
   Returns:
 A PObject holding the result of the combine operation.
   """
+  def with_hot_key_fanout(self, fanout):
 
 Review comment:
   Done. See below about fanoutfn.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111726)
Time Spent: 40m  (was: 0.5h)

> Implement with hot key fanout for combiners
> ---
>
> Key: BEAM-4546
> URL: https://issues.apache.org/jira/browse/BEAM-4546
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4333) Add integration tests for mobile game examples

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4333?focusedWorklogId=111723=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111723
 ]

ASF GitHub Bot logged work on BEAM-4333:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:59
Start Date: 13/Jun/18 23:59
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #5630: 
[BEAM-4333] Add integration tests for python mobile games
URL: https://github.com/apache/beam/pull/5630#discussion_r195270143
 
 

 ##
 File path: sdks/python/apache_beam/examples/leader_board_it_test.py
 ##
 @@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the leader board example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
+Usage:
+
+  python setup.py nosetests --test-pipeline-options=" \
+  --runner=TestDataflowRunner \
+  --project=... \
+  --staging_location=gs://... \
+  --temp_location=gs://... \
+  --output=gs://... \
+  --sdk_location=... \
+
+"""
+
+import logging
+import time
+import unittest
+import uuid
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import leader_board
+from apache_beam.io.gcp.tests import utils
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class LeaderBoardIT(unittest.TestCase):
+
+  # Input event containing user, team, score, processing time, window start.
+  INPUT_EVENT = 'user1,teamA,10,%d,2015-11-02 09:09:28.224'
+  INPUT_TOPIC = 'leader_board_it_input_topic'
+  INPUT_SUB = 'leader_board_it_input_subscription'
+
+  # SHA-1 hash generated from sorted rows reading from BigQuery table
+  DEFAULT_EXPECTED_CHECKSUM = 'de00231fe6730b972c0ff60a99988438911cda53'
+  OUTPUT_DATASET = 'leader_board_it_dataset'
+  OUTPUT_TABLE_USERS = 'leader_board_users'
+  OUTPUT_TABLE_TEAMS = 'leader_board_teams'
+  DEFAULT_INPUT_COUNT = 500
+
+  WAIT_UNTIL_FINISH_DURATION = 10 * 60 * 1000   # in milliseconds
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+_unique_id = str(uuid.uuid4())
+
+# Set up PubSub environment.
+from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(project=self.project)
+unique_topic_name = self.INPUT_TOPIC + _unique_id
+unique_subscrition_name = self.INPUT_SUB + _unique_id
+self.input_topic = self.pubsub_client.topic(unique_topic_name)
+self.input_sub = self.input_topic.subscription(unique_subscrition_name)
+
+self.input_topic.create()
+test_utils.wait_for_topics_created([self.input_topic])
+self.input_sub.create()
+
+# Set up BigQuery environment
+from google.cloud import bigquery
+client = bigquery.Client()
+unique_dataset_name = self.OUTPUT_DATASET + str(int(time.time()))
+self.dataset = client.dataset(unique_dataset_name, project=self.project)
+self.dataset.create()
+
+self._test_timestamp = int(time.time() * 1000)
+
+  def _inject_pubsub_game_events(self, topic, message_count):
+"""Inject game events as test data to PubSub."""
+
+logging.debug('Injecting %d game events to topic %s',
+  message_count, topic.full_name)
+
+for _ in xrange(message_count):
+  topic.publish(self.INPUT_EVENT % self._test_timestamp)
+
+  def _cleanup_pubsub(self):
+test_utils.cleanup_subscriptions([self.input_sub])
+test_utils.cleanup_topics([self.input_topic])
+
+  def tearDown(self):
+self._cleanup_pubsub()
+
+  @attr('IT')
+  def test_leader_board_it(self):
+state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
+
+success_condition = 'total_score=5000 LIMIT 1'
+users_query = ('SELECT total_score FROM [%s:%s.%s] '
+   'WHERE %s' % (self.project,
+   

[jira] [Assigned] (BEAM-4281) GrpcDataServiceTest.testMessageReceivedBySingleClientWhenThereAreMultipleClients is flaky

2018-06-13 Thread Eugene Kirpichov (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov reassigned BEAM-4281:
--

Assignee: Eugene Kirpichov  (was: Thomas Groh)

> GrpcDataServiceTest.testMessageReceivedBySingleClientWhenThereAreMultipleClients
>  is flaky
> -
>
> Key: BEAM-4281
> URL: https://issues.apache.org/jira/browse/BEAM-4281
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>
> This test failed twice in two unrelated PRs:
>  
> [https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/5264/testReport/junit/org.apache.beam.runners.fnexecution.data/GrpcDataServiceTest/testMessageReceivedBySingleClientWhenThereAreMultipleClients/]
>  
> [https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/5248/testReport/junit/org.apache.beam.runners.fnexecution.data/GrpcDataServiceTest/testMessageReceivedBySingleClientWhenThereAreMultipleClients/]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4281) GrpcDataServiceTest.testMessageReceivedBySingleClientWhenThereAreMultipleClients is flaky

2018-06-13 Thread Eugene Kirpichov (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511810#comment-16511810
 ] 

Eugene Kirpichov commented on BEAM-4281:


Will be fixed by https://github.com/apache/beam/pull/5585.

> GrpcDataServiceTest.testMessageReceivedBySingleClientWhenThereAreMultipleClients
>  is flaky
> -
>
> Key: BEAM-4281
> URL: https://issues.apache.org/jira/browse/BEAM-4281
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>
> This test failed twice in two unrelated PRs:
>  
> [https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/5264/testReport/junit/org.apache.beam.runners.fnexecution.data/GrpcDataServiceTest/testMessageReceivedBySingleClientWhenThereAreMultipleClients/]
>  
> [https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/5248/testReport/junit/org.apache.beam.runners.fnexecution.data/GrpcDataServiceTest/testMessageReceivedBySingleClientWhenThereAreMultipleClients/]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle #495

2018-06-13 Thread Apache Jenkins Server
See 


--
[...truncated 54.97 MB...]
at 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
at 
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:391)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:360)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:288)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.AssertionError: OutputSideInputs/ParMultiDo(Anonymous).output: 
org/hamcrest/Matchers
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at 
org.apache.beam.sdk.testing.PAssert$DefaultConcludeFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at 
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
at 
org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn.processElement(PAssert.java:1215)
at 
org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at 
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:42)
at 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:131)
at 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at 
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at 

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111718=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111718
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:28
Start Date: 13/Jun/18 23:28
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195264829
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService 
executorService) {
+return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
artifactToken)
+  throws IOException {
+// TODO: How to make Java/Python agree on names of keys and their values?
+LOG.trace("Parsing pipeline options");
+FlinkPipelineOptions flinkOptions = 
PipelineOptionsTranslation.fromProto(options)
+.as(FlinkPipelineOptions.class);
+
+String invocationId = String.format(
+"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
+LOG.debug("Invoking job {}", invocationId);
+
+// Set Flink Master to [auto] if no option was specified.
+if (flinkOptions.getFlinkMaster() == null) {
+  flinkOptions.setFlinkMaster("[auto]");
+}
+
+flinkOptions.setRunner(null);
 
 Review comment:
   @axelmagn ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111718)
Time Spent: 3h 10m  (was: 3h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-13 Thread Ankur Goenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka resolved BEAM-4290.

   Resolution: Fixed
Fix Version/s: 2.6.0

> ArtifactStagingService that stages to a distributed filesystem
> --
>
> Key: BEAM-4290
> URL: https://issues.apache.org/jira/browse/BEAM-4290
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Eugene Kirpichov
>Assignee: Ankur Goenka
>Priority: Major
> Fix For: 2.6.0
>
>  Time Spent: 15h 40m
>  Remaining Estimate: 0h
>
> Using the job's staging directory from PipelineOptions.
> Physical layout on the distributed filesystem is TBD but it should allow for 
> arbitrary filenames and ideally for eventually avoiding uploading artifacts 
> that are already there.
> Handling credentials is TBD.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111719=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111719
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:29
Start Date: 13/Jun/18 23:29
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195264045
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111720=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111720
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:29
Start Date: 13/Jun/18 23:29
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195263412
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111717=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111717
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:28
Start Date: 13/Jun/18 23:28
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195263336
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-3906) Get Python Wheel Validation Automated

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3906?focusedWorklogId=111716=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111716
 ]

ASF GitHub Bot logged work on BEAM-3906:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:11
Start Date: 13/Jun/18 23:11
Worklog Time Spent: 10m 
  Work Description: yifanzou commented on issue #4943: [BEAM-3906] Automate 
Validation Aganist Python Wheel
URL: https://github.com/apache/beam/pull/4943#issuecomment-397116643
 
 
   @tvalentyn I should modify my words that we couldn't verify these code 
changes until the release manager creating the 2.5.0 RC* wheel versions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111716)
Time Spent: 23h  (was: 22h 50m)

> Get Python Wheel Validation Automated
> -
>
> Key: BEAM-3906
> URL: https://issues.apache.org/jira/browse/BEAM-3906
> Project: Beam
>  Issue Type: Sub-task
>  Components: examples-python, testing
>Reporter: yifan zou
>Assignee: yifan zou
>Priority: Major
>  Time Spent: 23h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4559) Post-Commit tests stability

2018-06-13 Thread Mikhail Gryzykhin (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Gryzykhin reassigned BEAM-4559:
---

Assignee: Mikhail Gryzykhin  (was: Jason Kuster)

> Post-Commit tests stability
> ---
>
> Key: BEAM-4559
> URL: https://issues.apache.org/jira/browse/BEAM-4559
> Project: Beam
>  Issue Type: Improvement
>  Components: testing
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>
> [https://docs.google.com/document/d/1sczGwnCvdHiboVajGVdnZL0rfnr7ViXXAebBAf_uQME/edit#heading=h.t71pj6h7rd0w]
> Follow up on design doc specified above and implement required code work.
>  * Split existing post-commit tests jobs to automatically and manually 
> triggered
>  * Add tracking by JIRA bugs for failing test job
>  * Create document describing post-commit failures handling policies
>  * Add tests status badge to PR template
>  * Create dashboard for post-commit tests
>  * Detect and fix flaky java tests
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2937) Fn API combiner support w/ lifting to PGBK

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2937?focusedWorklogId=111715=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111715
 ]

ASF GitHub Bot logged work on BEAM-2937:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:55
Start Date: 13/Jun/18 22:55
Worklog Time Spent: 10m 
  Work Description: youngoli commented on a change in pull request #5536: 
[BEAM-2937] Add Combine Grouped Values URN.
URL: https://github.com/apache/beam/pull/5536#discussion_r195260622
 
 

 ##
 File path: model/pipeline/src/main/proto/beam_runner_api.proto
 ##
 @@ -228,9 +228,9 @@ message StandardPTransforms {
 // Payload: CombinePayload
 COMBINE_GLOBALLY = 1 [(beam_urn) = "beam:transform:combine_globally:v1"];
 
-// Represents the Combine.groupedValues() operation.
-// If this is produced by an SDK, it is assumed that the SDK understands
-// each of CombineComponents.
+// Represents the Combine Grouped Values transform, as described in the
+// following document:
+// https://s.apache.org/beam-runner-api-combine-model#heading=h.aj86ew4v1wk
 
 Review comment:
   Yeah, that seems like a good idea. Is it as easy as just deleting it and 
changing the numbers on the existing enum values, or would that cause issues?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111715)
Time Spent: 2h 40m  (was: 2.5h)

> Fn API combiner support w/ lifting to PGBK
> --
>
> Key: BEAM-2937
> URL: https://issues.apache.org/jira/browse/BEAM-2937
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Henning Rohde
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The FnAPI should support this optimization. Detailed design: 
> https://s.apache.org/beam-runner-api-combine-model
> Once design is ready, expand subtasks similarly to BEAM-2822.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2080) Add custom maven enforcer rules to catch banned classes and dependencies

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2080?focusedWorklogId=111712=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111712
 ]

ASF GitHub Bot logged work on BEAM-2080:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:49
Start Date: 13/Jun/18 22:49
Worklog Time Spent: 10m 
  Work Description: davorbonaci closed pull request #2688: [BEAM-2080]: Add 
a custom enforcer rule to check for banned classes.
URL: https://github.com/apache/beam/pull/2688
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.test-infra/maven/enforcer-rules/pom.xml 
b/.test-infra/maven/enforcer-rules/pom.xml
new file mode 100644
index 000..af85a773e5f
--- /dev/null
+++ b/.test-infra/maven/enforcer-rules/pom.xml
@@ -0,0 +1,93 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  
+beam-parent
+org.apache.beam
+2.1.0-SNAPSHOT
+../../../pom.xml
+  
+  4.0.0
+  beam-maven-enforcer-rules
+  Apache Beam :: Maven Enforcer Rules
+
+  
+
+  org.apache.maven.enforcer
+  enforcer-api
+  1.4.1
+
+
+  org.apache.maven
+  maven-project
+  2.0.11
+
+
+  org.apache.maven
+  maven-artifact
+  3.5.0
+
+
+  org.apache.maven
+  maven-plugin-api
+  2.0.11
+
+
+  org.codehaus.plexus
+  plexus-container-default
+  1.0-alpha-9
+
+
+  junit
+  junit
+  test
+
+
+  org.apache.maven.plugins
+  maven-shade-plugin
+  3.0.0
+
+  
+  
+
+  
+org.apache.maven.plugins
+maven-enforcer-plugin
+
+
+  
+enforce
+none
+  
+  
+enforce-banned-dependencies
+none
+  
+  
+enforce-banned-classes
+none
+  
+
+  
+
+  
+
\ No newline at end of file
diff --git 
a/.test-infra/maven/enforcer-rules/src/main/java/org/apache/beam/maven/enforcer/rules/BannedClasses.java
 
b/.test-infra/maven/enforcer-rules/src/main/java/org/apache/beam/maven/enforcer/rules/BannedClasses.java
new file mode 100644
index 000..f6cbdd70f46
--- /dev/null
+++ 
b/.test-infra/maven/enforcer-rules/src/main/java/org/apache/beam/maven/enforcer/rules/BannedClasses.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.maven.enforcer.rules;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import org.apache.maven.artifact.Artifact;
+import org.apache.maven.enforcer.rule.api.EnforcerRule;
+import org.apache.maven.enforcer.rule.api.EnforcerRuleException;
+import org.apache.maven.enforcer.rule.api.EnforcerRuleHelper;
+import org.apache.maven.plugin.logging.Log;
+import org.apache.maven.plugins.shade.filter.SimpleFilter;
+import org.apache.maven.project.MavenProject;
+import 
org.codehaus.plexus.component.configurator.expression.ExpressionEvaluationException;
+
+/**
+ * A custom {@link EnforcerRule} that looks at the artifact jar to ensure it 
does not contain any
+ * of the banned classes provided as input to this rule.
+ *
+ * The banned class is specified as a relative path in the artifact jar. A 
fully qualified class
+ * file path or a wildcard path (ending with / or **) is allowed. For example:
+ * 
+ *   a/b/c/d.class
+ *   m/n/**
+ *   x/y/z/
+ * 
+ *
+ * Note: This rule is specifically useful for uber jars.
+ */
+public class BannedClasses implements EnforcerRule {
+
+  private String[] excludes;
+
+  @Override
+  public void execute(EnforcerRuleHelper helper) throws EnforcerRuleException {
+Log log = 

[jira] [Updated] (BEAM-4559) Post-Commit tests stability

2018-06-13 Thread Mikhail Gryzykhin (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Gryzykhin updated BEAM-4559:

Description: 
[https://docs.google.com/document/d/1sczGwnCvdHiboVajGVdnZL0rfnr7ViXXAebBAf_uQME/edit#heading=h.t71pj6h7rd0w]

Follow up on design doc specified above and implement required code work.
 * Split existing post-commit tests jobs to automatically and manually triggered
 * Add tracking by JIRA bugs for failing test job
 * Create document describing post-commit failures handling policies
 * Add tests status badge to PR template
 * Create dashboard for post-commit tests
 * Detect and fix flaky java tests

 

  was:
[https://docs.google.com/document/d/1sczGwnCvdHiboVajGVdnZL0rfnr7ViXXAebBAf_uQME/edit#heading=h.t71pj6h7rd0w]

Follow up on design doc specified above and implement required code work.
 * Split existing post-commit tests jobs to automatically and manually triggered
 * Add tracking by JIRA bugs for failing test job
 * Create document describing post-commit failures handling policies
 * Add tests status badge to PR template
 * Create dashboard for post-commit tests

 


> Post-Commit tests stability
> ---
>
> Key: BEAM-4559
> URL: https://issues.apache.org/jira/browse/BEAM-4559
> Project: Beam
>  Issue Type: Improvement
>  Components: testing
>Reporter: Mikhail Gryzykhin
>Assignee: Jason Kuster
>Priority: Major
>
> [https://docs.google.com/document/d/1sczGwnCvdHiboVajGVdnZL0rfnr7ViXXAebBAf_uQME/edit#heading=h.t71pj6h7rd0w]
> Follow up on design doc specified above and implement required code work.
>  * Split existing post-commit tests jobs to automatically and manually 
> triggered
>  * Add tracking by JIRA bugs for failing test job
>  * Create document describing post-commit failures handling policies
>  * Add tests status badge to PR template
>  * Create dashboard for post-commit tests
>  * Detect and fix flaky java tests
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4559) Post-Commit tests stability

2018-06-13 Thread Mikhail Gryzykhin (JIRA)
Mikhail Gryzykhin created BEAM-4559:
---

 Summary: Post-Commit tests stability
 Key: BEAM-4559
 URL: https://issues.apache.org/jira/browse/BEAM-4559
 Project: Beam
  Issue Type: Improvement
  Components: testing
Reporter: Mikhail Gryzykhin
Assignee: Jason Kuster


[https://docs.google.com/document/d/1sczGwnCvdHiboVajGVdnZL0rfnr7ViXXAebBAf_uQME/edit#heading=h.t71pj6h7rd0w]

Follow up on design doc specified above and implement required code work.
 * Split existing post-commit tests jobs to automatically and manually triggered
 * Add tracking by JIRA bugs for failing test job
 * Create document describing post-commit failures handling policies
 * Add tests status badge to PR template
 * Create dashboard for post-commit tests

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111711=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111711
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:42
Start Date: 13/Jun/18 22:42
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195258050
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111711)
Time Spent: 2h 50m  (was: 2h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 2h 50m
>  

[jira] [Updated] (BEAM-2855) Implement a python version of the nexmark queries

2018-06-13 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/BEAM-2855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

María GH updated BEAM-2855:
---
Component/s: examples-nexmark

> Implement a python version of the nexmark queries
> -
>
> Key: BEAM-2855
> URL: https://issues.apache.org/jira/browse/BEAM-2855
> Project: Beam
>  Issue Type: Improvement
>  Components: examples-nexmark, sdk-py-core, testing
>Reporter: Ismaël Mejía
>Assignee: María GH
>Priority: Minor
>  Labels: newbie, nexmark, starter
>
> Currently we have a Java only implementation of Nexmark, a python based 
> implementation would be nice to have to validate the direct and dataflow 
> runners, but also to validate the new support of multiple SDKs in multiple 
> runners via the runner/fn API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4558) org.apache.beam.runners.direct.portable.ReferenceRunnerTest » pipelineExecution failure in 2.5.0 release branch

2018-06-13 Thread Pablo Estrada (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pablo Estrada reassigned BEAM-4558:
---

Assignee: Pablo Estrada  (was: Thomas Groh)

> org.apache.beam.runners.direct.portable.ReferenceRunnerTest » 
> pipelineExecution failure in 2.5.0 release branch
> ---
>
> Key: BEAM-4558
> URL: https://issues.apache.org/jira/browse/BEAM-4558
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Blocker
>
> Running in release branch: 
> https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/6441/consoleFull
> Failure is here: 
> https://scans.gradle.com/s/xwc6k7jffxj3w/tests/jqhvlvf72f7pg-6uohttmt2cl3u



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4536) Python SDK: Pubsub reading with_attributes broken for Dataflow

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4536?focusedWorklogId=111710=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111710
 ]

ASF GitHub Bot logged work on BEAM-4536:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:28
Start Date: 13/Jun/18 22:28
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #5607: [BEAM-4536] Remove 
with_attributes keyword from ReadFromPubSub.
URL: https://github.com/apache/beam/pull/5607#issuecomment-397100509
 
 
   I've filed https://issues.apache.org/jira/projects/BEAM/issues/BEAM-4558
   I marked it as a release blocker, but is it? @swegner 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111710)
Time Spent: 2h 40m  (was: 2.5h)

> Python SDK: Pubsub reading with_attributes broken for Dataflow
> --
>
> Key: BEAM-4536
> URL: https://issues.apache.org/jira/browse/BEAM-4536
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.5.0
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Using 
> [ReadFromPubsub|https://github.com/apache/beam/blob/e30e0c807321934e862358e1e3be32dc74374aeb/sdks/python/apache_beam/io/gcp/pubsub.py#L106](with_attributes=True)
>  will fail on Dataflow.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111701=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111701
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195211177
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111688=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111688
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195215990
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Driver program that starts a job server. */
+public class FlinkJobServerDriver implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobServerDriver.class);
+
+  private static class ServerConfiguration {
+@Option(
+name = "--job-host",
+required = true,
+usage = "The job server host string"
+)
+private String host = "";
+
+@Option(
+name = "--artifacts-dir",
+usage = "The location to store staged artifact files"
+)
+private String artifactStagingPath = "/tmp/beam-artifact-staging";
+  }
+
+  public static void main(String[] args) {
+ServerConfiguration configuration = new ServerConfiguration();
+CmdLineParser parser = new CmdLineParser(configuration);
+try {
+  parser.parseArgument(args);
+} catch (CmdLineException e) {
+  e.printStackTrace(System.err);
 
 Review comment:
   Logger?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111688)
Time Spent: 50m  (was: 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111694=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111694
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195206698
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111691=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111691
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195213110
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService 
executorService) {
+return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
artifactToken)
+  throws IOException {
+// TODO: How to make Java/Python agree on names of keys and their values?
+LOG.trace("Parsing pipeline options");
+FlinkPipelineOptions flinkOptions = 
PipelineOptionsTranslation.fromProto(options)
+.as(FlinkPipelineOptions.class);
+
+String invocationId = String.format(
+"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
+LOG.debug("Invoking job {}", invocationId);
 
 Review comment:
   Info?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111691)
Time Spent: 1h 20m  (was: 1h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111704=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111704
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195251040
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Driver program that starts a job server. */
+public class FlinkJobServerDriver implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobServerDriver.class);
+
+  private static class ServerConfiguration {
+@Option(
+name = "--job-host",
+required = true,
+usage = "The job server host string"
+)
+private String host = "";
+
+@Option(
+name = "--artifacts-dir",
+usage = "The location to store staged artifact files"
+)
+private String artifactStagingPath = "/tmp/beam-artifact-staging";
+  }
+
+  public static void main(String[] args) {
+ServerConfiguration configuration = new ServerConfiguration();
+CmdLineParser parser = new CmdLineParser(configuration);
+try {
+  parser.parseArgument(args);
+} catch (CmdLineException e) {
+  e.printStackTrace(System.err);
+  printUsage(parser);
+  return;
+}
+FlinkJobServerDriver driver = fromConfig(configuration);
+driver.run();
+  }
+
+  private static void printUsage(CmdLineParser parser) {
+System.err.println(
+String.format(
+"Usage: java %s arguments...", 
FlinkJobServerDriver.class.getSimpleName()));
+parser.printUsage(System.err);
+System.err.println();
+  }
+
+  public static FlinkJobServerDriver fromConfig(ServerConfiguration 
configuration) {
+ThreadFactory threadFactory =
+new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
+ListeningExecutorService executor =
+
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
+ServerFactory serverFactory = ServerFactory.createDefault();
+return create(configuration, executor, serverFactory);
+  }
+
+  public static FlinkJobServerDriver create(
+  ServerConfiguration configuration,
+  ListeningExecutorService executor,
+  ServerFactory serverFactory) {
+return new FlinkJobServerDriver(configuration, executor, serverFactory);
+  }
+
+  private final ListeningExecutorService executor;
 
 Review comment:
   We should move these variables to the top.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time 

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111695=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111695
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195254596
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for BeamFileSystemArtifactSource.
+ */
+@RunWith(JUnit4.class) public class BeamFileSystemArtifactSourceTest {
+
+  BeamFileSystemArtifactStagingService stagingService = new 
BeamFileSystemArtifactStagingService();
+
+  @Rule public TemporaryFolder stagingDir = new TemporaryFolder();
+
+  @Test public void testStagingService() throws Exception {
+String stagingSession = "stagingSession";
+String stagingSessionToken = BeamFileSystemArtifactStagingService
+.generateStagingSessionToken(stagingSession, 
stagingDir.newFolder().getPath());
+List metadata = new ArrayList<>();
+
+
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build());
+putArtifactContents(stagingSessionToken, "first", "file1");
+
+
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file2").build());
+putArtifactContents(stagingSessionToken, "second", "file2");
+
+String stagingToken = commitManifest(stagingSessionToken, metadata);
+
+BeamFileSystemArtifactSource artifactSource = new 
BeamFileSystemArtifactSource(stagingToken);
+Assert.assertEquals("first", getArtifactContents(artifactSource, "file1"));
+Assert.assertEquals("second", getArtifactContents(artifactSource, 
"file2"));
+Assert.assertThat(artifactSource.getManifest().getArtifactList(),
+containsInAnyOrder(metadata.toArray(new 
ArtifactApi.ArtifactMetadata[0])));
+  }
+
+  private String commitManifest(String stagingSessionToken,
+  List artifacts) {
+String[] stagingTokenHolder = new String[1];
+stagingService.commitManifest(
+
ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken)
+
.setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts)).build(),
+new StreamObserver() {
+
+  @Override public void onNext(ArtifactApi.CommitManifestResponse 
commitManifestResponse) {
+stagingTokenHolder[0] = commitManifestResponse.getRetrievalToken();
+  }
+
+  @Override public void onError(Throwable throwable) {
+throw new RuntimeException(throwable);
+  }
+
+  @Override public void onCompleted() {
+  }
+});
+
+return stagingTokenHolder[0];
 
 Review comment:
   We should wait for the commit call to finish before returning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111695)
Time Spent: 1h 50m  (was: 1h 40m)

> Portable Flink runner 

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111705=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111705
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195213503
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService 
executorService) {
+return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
artifactToken)
+  throws IOException {
+// TODO: How to make Java/Python agree on names of keys and their values?
+LOG.trace("Parsing pipeline options");
 
 Review comment:
   Should we log some info here. If possible with some pipeline identifier.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111705)
Time Spent: 2h 40m  (was: 2.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111693
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195205949
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
 
 Review comment:
   Should we only create the context in the if-else block and do the remaining 

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111703=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111703
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195209989
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111699=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111699
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195207823
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111707=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111707
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195210651
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111692=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111692
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195208514
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111700=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111700
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195209379
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111696=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111696
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195209626
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111708=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111708
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195214125
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService 
executorService) {
+return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
artifactToken)
+  throws IOException {
+// TODO: How to make Java/Python agree on names of keys and their values?
+LOG.trace("Parsing pipeline options");
+FlinkPipelineOptions flinkOptions = 
PipelineOptionsTranslation.fromProto(options)
+.as(FlinkPipelineOptions.class);
+
+String invocationId = String.format(
+"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
+LOG.debug("Invoking job {}", invocationId);
+
+// Set Flink Master to [auto] if no option was specified.
+if (flinkOptions.getFlinkMaster() == null) {
+  flinkOptions.setFlinkMaster("[auto]");
+}
+
+flinkOptions.setRunner(null);
 
 Review comment:
   Document why do we need to set runner as null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111708)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111697=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111697
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195209114
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111702=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111702
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195254764
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for BeamFileSystemArtifactSource.
+ */
+@RunWith(JUnit4.class) public class BeamFileSystemArtifactSourceTest {
+
+  BeamFileSystemArtifactStagingService stagingService = new 
BeamFileSystemArtifactStagingService();
+
+  @Rule public TemporaryFolder stagingDir = new TemporaryFolder();
+
+  @Test public void testStagingService() throws Exception {
+String stagingSession = "stagingSession";
+String stagingSessionToken = BeamFileSystemArtifactStagingService
+.generateStagingSessionToken(stagingSession, 
stagingDir.newFolder().getPath());
+List metadata = new ArrayList<>();
+
+
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build());
+putArtifactContents(stagingSessionToken, "first", "file1");
+
+
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file2").build());
+putArtifactContents(stagingSessionToken, "second", "file2");
+
+String stagingToken = commitManifest(stagingSessionToken, metadata);
+
+BeamFileSystemArtifactSource artifactSource = new 
BeamFileSystemArtifactSource(stagingToken);
+Assert.assertEquals("first", getArtifactContents(artifactSource, "file1"));
+Assert.assertEquals("second", getArtifactContents(artifactSource, 
"file2"));
+Assert.assertThat(artifactSource.getManifest().getArtifactList(),
+containsInAnyOrder(metadata.toArray(new 
ArtifactApi.ArtifactMetadata[0])));
+  }
+
+  private String commitManifest(String stagingSessionToken,
+  List artifacts) {
+String[] stagingTokenHolder = new String[1];
+stagingService.commitManifest(
+
ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken)
+
.setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts)).build(),
+new StreamObserver() {
+
+  @Override public void onNext(ArtifactApi.CommitManifestResponse 
commitManifestResponse) {
+stagingTokenHolder[0] = commitManifestResponse.getRetrievalToken();
+  }
+
+  @Override public void onError(Throwable throwable) {
+throw new RuntimeException(throwable);
+  }
+
+  @Override public void onCompleted() {
+  }
+});
+
+return stagingTokenHolder[0];
+  }
+
+  private void putArtifactContents(String stagingSessionToken, String 
contents, String name) {
+StreamObserver outputStreamObserver = 
stagingService
+.putArtifact(new StreamObserver() {
+
+  @Override public void onNext(ArtifactApi.PutArtifactResponse 
putArtifactResponse) {
+  }
+
+  @Override public void onError(Throwable throwable) {
+throw new RuntimeException(throwable);
+  }
+
+  @Override public void onCompleted() {
+  }
+});
+
+

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111706=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111706
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195253607
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 ##
 @@ -60,17 +60,36 @@
 
   public static InMemoryJobService create(
 
 Review comment:
   Can we remove this method?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111706)
Time Spent: 2h 40m  (was: 2.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111698=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111698
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195208381
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  

[jira] [Work logged] (BEAM-1543) TestPipeline: @Rule error message confusing/javadoc doesn't explain proper usage

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-1543?focusedWorklogId=111709=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111709
 ]

ASF GitHub Bot logged work on BEAM-1543:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: ezmicken commented on a change in pull request #2089: 
[BEAM-1543] Updated TestPipeline's javadoc.
URL: https://github.com/apache/beam/pull/2089#discussion_r195254910
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
 ##
 @@ -292,8 +298,8 @@ public void evaluate() throws Throwable {
   public PipelineResult run() {
 checkState(
 enforcement.isPresent(),
-"Attempted to run a pipeline while it's enforcement level was not set. 
Are you "
-+ "using TestPipeline without a @Rule annotation?");
+"Is your TestPipeline declaration missing a @Rule annotation? Usage: "
 
 Review comment:
   I encountered this error, "Is your TestPipeline declaration missing a @Rule 
annotation?" but I did include a @Rule annotation. To resolve the error I had 
to enable abandoned node enforcement. Perhaps the message should be updated?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111709)
Time Spent: 10m
Remaining Estimate: 0h

> TestPipeline: @Rule error message confusing/javadoc doesn't explain proper 
> usage
> 
>
> Key: BEAM-1543
> URL: https://issues.apache.org/jira/browse/BEAM-1543
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Stephen Sisk
>Assignee: Stas Levin
>Priority: Major
> Fix For: 0.6.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I'm working on JdbcIOIT. Trying to run the checked-in version, I encountered 
> this error: 
> "Attempted to run a pipeline while it's enforcement level was not set. Are 
> you using TestPipeline without a @Rule annotation?"
> This error does not give me enough information to diagnose:
> a) what problem is occurring
> b) what I can do to solve it
> Checking the javadocs for TestPipeline, I don't see any information about 
> @Rule or anything of that nature.
> I did some github blame-ing on that string, and found BEAM-1205 and the 
> accompanying PR, however it was also unclear from BEAM-1205 exactly what I'm 
> supposed to do to make this test work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111690=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111690
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195199604
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
 
 Review comment:
   Should we pass the fuser Function in constructor parameter?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111690)
Time Spent: 1h 10m  (was: 1h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> 

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111689=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111689
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195252523
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.sdk.io.FileSystems;
+
+/**
+ * An ArtifactSource suitable for retrieving artifacts uploaded via
+ * {@link BeamFileSystemArtifactStagingService}.
+ */
+public class BeamFileSystemArtifactSource implements ArtifactSource {
+
+  private static final int CHUNK_SIZE = 2 * 1024 * 1024;
+
+  private final String retrievalToken;
+  private ArtifactApi.ProxyManifest proxyManifest;
+
+  public BeamFileSystemArtifactSource(String retrievalToken) {
+this.retrievalToken = retrievalToken;
+  }
+
+  public static BeamFileSystemArtifactSource create(String artifactToken) {
+return new BeamFileSystemArtifactSource(artifactToken);
+  }
+
+  @Override
+  public ArtifactApi.Manifest getManifest() throws IOException {
+return getProxyManifest().getManifest();
+  }
+
+  @Override
+  public void getArtifact(String name,
+  StreamObserver responseObserver) throws 
IOException {
+ReadableByteChannel artifact = FileSystems
+.open(FileSystems.matchNewResource(lookupUri(name), false));
+ByteBuffer buffer = ByteBuffer.allocate(CHUNK_SIZE);
+while (artifact.read(buffer) > -1) {
+  buffer.flip();
+  responseObserver.onNext(
+  
ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(buffer)).build());
+  buffer.clear();
+}
+  }
+
+  private String lookupUri(String name) throws IOException {
+for (ArtifactApi.ProxyManifest.Location location : 
getProxyManifest().getLocationList()) {
+  if (location.getName().equals(name)) {
+return location.getUri();
+  }
+}
+throw new IllegalArgumentException("No such artifact: " + name);
+  }
+
+  private ArtifactApi.ProxyManifest getProxyManifest() throws IOException {
+if (proxyManifest == null) {
+  ArtifactApi.ProxyManifest.Builder builder = 
ArtifactApi.ProxyManifest.newBuilder();
+  JsonFormat.parser().merge(Channels.newReader(
+  FileSystems.open(FileSystems.matchNewResource(retrievalToken, false 
/* is directory */)),
+  StandardCharsets.UTF_8.name()), builder);
 
 Review comment:
   Shall we standardize the characterset for all BeamFileSystemArtifact ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111689)
Time Spent: 1h  (was: 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111687=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111687
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:24
Start Date: 13/Jun/18 22:24
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195199350
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
 
 Review comment:
   This trace is  unnecessary 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111687)
Time Spent: 40m  (was: 0.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time 

[jira] [Work logged] (BEAM-4536) Python SDK: Pubsub reading with_attributes broken for Dataflow

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4536?focusedWorklogId=111685=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111685
 ]

ASF GitHub Bot logged work on BEAM-4536:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:56
Start Date: 13/Jun/18 21:56
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #5607: [BEAM-4536] Remove 
with_attributes keyword from ReadFromPubSub.
URL: https://github.com/apache/beam/pull/5607#issuecomment-397100509
 
 
   I've filed https://issues.apache.org/jira/projects/BEAM/issues/BEAM-4558
   I marked it as a release blocker, but is it? @swegner 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111685)
Time Spent: 2.5h  (was: 2h 20m)

> Python SDK: Pubsub reading with_attributes broken for Dataflow
> --
>
> Key: BEAM-4536
> URL: https://issues.apache.org/jira/browse/BEAM-4536
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.5.0
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Using 
> [ReadFromPubsub|https://github.com/apache/beam/blob/e30e0c807321934e862358e1e3be32dc74374aeb/sdks/python/apache_beam/io/gcp/pubsub.py#L106](with_attributes=True)
>  will fail on Dataflow.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3773) [SQL] Investigate JDBC interface for Beam SQL

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3773?focusedWorklogId=111684=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111684
 ]

ASF GitHub Bot logged work on BEAM-3773:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:54
Start Date: 13/Jun/18 21:54
Worklog Time Spent: 10m 
  Work Description: apilloud commented on issue #5592: [BEAM-3773] [SQL] 
Add support for setting PipelineOptions
URL: https://github.com/apache/beam/pull/5592#issuecomment-397100075
 
 
   This should now handle BeamSqlCli as well. I also fixed the spotless test 
failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111684)
Time Spent: 16h 40m  (was: 16.5h)

> [SQL] Investigate JDBC interface for Beam SQL
> -
>
> Key: BEAM-3773
> URL: https://issues.apache.org/jira/browse/BEAM-3773
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Andrew Pilloud
>Priority: Major
>  Time Spent: 16h 40m
>  Remaining Estimate: 0h
>
> JDBC allows integration with a lot of third-party tools, e.g 
> [Zeppelin|https://zeppelin.apache.org/docs/0.7.0/manual/interpreters.html], 
> [sqlline|https://github.com/julianhyde/sqlline]. We should look into how 
> feasible it is to implement a JDBC interface for Beam SQL



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4333) Add integration tests for mobile game examples

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4333?focusedWorklogId=111683=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111683
 ]

ASF GitHub Bot logged work on BEAM-4333:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:49
Start Date: 13/Jun/18 21:49
Worklog Time Spent: 10m 
  Work Description: mariapython commented on issue #5630: [BEAM-4333] Add 
integration tests for python mobile games
URL: https://github.com/apache/beam/pull/5630#issuecomment-397098965
 
 
   Retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111683)
Time Spent: 50m  (was: 40m)

> Add integration tests for mobile game examples
> --
>
> Key: BEAM-4333
> URL: https://issues.apache.org/jira/browse/BEAM-4333
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Add integration tests for the 4 [mobile game 
> examples|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/game].
> Setup a Jenkins test to run the above integration tests at a daily frequency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4333) Add integration tests for mobile game examples

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4333?focusedWorklogId=111681=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111681
 ]

ASF GitHub Bot logged work on BEAM-4333:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:49
Start Date: 13/Jun/18 21:49
Worklog Time Spent: 10m 
  Work Description: mariapython opened a new pull request #5630: 
[BEAM-4333] Add integration tests for python mobile games
URL: https://github.com/apache/beam/pull/5630
 
 
- [x] Add IT test for examples 1-3 (user score, hourly team score, leader 
board).
- [ ] Add IT test for example 4 (game stats). Pending on on a fix for 
BEAM-4534.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111681)
Time Spent: 40m  (was: 0.5h)

> Add integration tests for mobile game examples
> --
>
> Key: BEAM-4333
> URL: https://issues.apache.org/jira/browse/BEAM-4333
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Add integration tests for the 4 [mobile game 
> examples|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/game].
> Setup a Jenkins test to run the above integration tests at a daily frequency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4333) Add integration tests for mobile game examples

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4333?focusedWorklogId=111678=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111678
 ]

ASF GitHub Bot logged work on BEAM-4333:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:48
Start Date: 13/Jun/18 21:48
Worklog Time Spent: 10m 
  Work Description: mariapython commented on issue #5630: [BEAM-4333] Add 
integration tests for python mobile games
URL: https://github.com/apache/beam/pull/5630#issuecomment-397098652
 
 
   R: @aaltay 
   cc: @angoenka (leaving the google.cloud import, like in 
[here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py#L51),
 per our discussion).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111678)
Time Spent: 20m  (was: 10m)

> Add integration tests for mobile game examples
> --
>
> Key: BEAM-4333
> URL: https://issues.apache.org/jira/browse/BEAM-4333
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add integration tests for the 4 [mobile game 
> examples|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/game].
> Setup a Jenkins test to run the above integration tests at a daily frequency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4333) Add integration tests for mobile game examples

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4333?focusedWorklogId=111679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111679
 ]

ASF GitHub Bot logged work on BEAM-4333:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:48
Start Date: 13/Jun/18 21:48
Worklog Time Spent: 10m 
  Work Description: mariapython closed pull request #5630: [BEAM-4333] Add 
integration tests for python mobile games
URL: https://github.com/apache/beam/pull/5630
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py 
b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 99a8e092822..27ef16dd5d5 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -257,6 +257,9 @@ def run(argv=None):
   type=str,
   required=True,
   help='Pub/Sub topic to read from')
+  parser.add_argument('--subscription',
+  type=str,
+  help='Pub/Sub subscription to read from')
   parser.add_argument('--dataset',
   type=str,
   required=True,
@@ -295,9 +298,17 @@ def run(argv=None):
   with beam.Pipeline(options=options) as p:
 # Read game events from Pub/Sub using custom timestamps, which are 
extracted
 # from the pubsub data elements, and parse the data.
+
+# Read from PubSub into a PCollection.
+if args.subscription:
+  scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+  subscription=args.subscription)
+else:
+  scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+  topic=args.topic)
+
 events = (
-p
-| 'ReadPubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(args.topic)
+scores
 | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
 | 'AddEventTimestamps' >> beam.Map(
 lambda elem: beam.window.TimestampedValue(elem, 
elem['timestamp'])))
diff --git a/sdks/python/apache_beam/examples/hourly_team_score_it_test.py 
b/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
new file mode 100644
index 000..ffea48e524e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/hourly_team_score_it_test.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the hourly team score example.
+
+Code: beam/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+Usage:
+
+  python setup.py nosetests --test-pipeline-options=" \
+  --runner=TestDataflowRunner \
+  --project=... \
+  --staging_location=gs://... \
+  --temp_location=gs://... \
+  --output=gs://... \
+  --sdk_location=... \
+
+"""
+
+import logging
+import time
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.complete.game import hourly_team_score
+from apache_beam.io.gcp.tests import utils
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class HourlyTeamScoreIT(unittest.TestCase):
+
+  DEFAULT_INPUT_FILE = 'gs://dataflow-samples/game/gaming_data*'
+  # SHA-1 hash generated from sorted rows reading from BigQuery table
+  DEFAULT_EXPECTED_CHECKSUM = '4fa761fb5c3341ec573d5d12c6ab75e3b2957a25'
+  OUTPUT_DATASET = 'hourly_team_score_it_dataset'
+  OUTPUT_TABLE = 'leader_board'
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+
+# Set up BigQuery environment
+from google.cloud import bigquery
+client = 

[jira] [Work logged] (BEAM-4536) Python SDK: Pubsub reading with_attributes broken for Dataflow

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4536?focusedWorklogId=111680=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111680
 ]

ASF GitHub Bot logged work on BEAM-4536:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:48
Start Date: 13/Jun/18 21:48
Worklog Time Spent: 10m 
  Work Description: swegner commented on issue #5607: [BEAM-4536] Remove 
with_attributes keyword from ReadFromPubSub.
URL: https://github.com/apache/beam/pull/5607#issuecomment-397098734
 
 
   FYI, the failing test 
[`org.apache.beam.runners.direct.portable.ReferenceRunnerTest.pipelineExecution`](https://github.com/apache/beam/blob/da22e1808ec372e526c5af7c8bf483e72baef1eb/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java),
 I recognize from other PRs as being flaky 
([history](https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/6441/testReport/junit/org.apache.beam.runners.direct.portable/ReferenceRunnerTest/pipelineExecution/history/)).
 This PR seems unrelated and seems unlikely that it caused the regression.
   
   /cc @reuvenlax @tgroh as the owners for this test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111680)
Time Spent: 2h 20m  (was: 2h 10m)

> Python SDK: Pubsub reading with_attributes broken for Dataflow
> --
>
> Key: BEAM-4536
> URL: https://issues.apache.org/jira/browse/BEAM-4536
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.5.0
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Using 
> [ReadFromPubsub|https://github.com/apache/beam/blob/e30e0c807321934e862358e1e3be32dc74374aeb/sdks/python/apache_beam/io/gcp/pubsub.py#L106](with_attributes=True)
>  will fail on Dataflow.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4558) org.apache.beam.runners.direct.portable.ReferenceRunnerTest » pipelineExecution failure in 2.5.0 release branch

2018-06-13 Thread Pablo Estrada (JIRA)
Pablo Estrada created BEAM-4558:
---

 Summary: 
org.apache.beam.runners.direct.portable.ReferenceRunnerTest » pipelineExecution 
failure in 2.5.0 release branch
 Key: BEAM-4558
 URL: https://issues.apache.org/jira/browse/BEAM-4558
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Reporter: Pablo Estrada
Assignee: Thomas Groh


Running in release branch: 
https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/6441/consoleFull

Failure is here: 
https://scans.gradle.com/s/xwc6k7jffxj3w/tests/jqhvlvf72f7pg-6uohttmt2cl3u



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4554) Move sdks/java/core's applyJavaNature back to failOnWarnings: true

2018-06-13 Thread Pablo Estrada (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511697#comment-16511697
 ] 

Pablo Estrada commented on BEAM-4554:
-

https://issues.apache.org/jira/browse/BEAM-4556

> Move sdks/java/core's applyJavaNature back to failOnWarnings: true
> --
>
> Key: BEAM-4554
> URL: https://issues.apache.org/jira/browse/BEAM-4554
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Pablo Estrada
>Assignee: Kenneth Knowles
>Priority: Major
> Fix For: Not applicable
>
>
> This was removed by https://github.com/apache/beam/pull/5609 for the sake of 
> fixing a release, but should be re-enabled ASAP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-4554) Move sdks/java/core's applyJavaNature back to failOnWarnings: true

2018-06-13 Thread Pablo Estrada (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pablo Estrada closed BEAM-4554.
---
   Resolution: Duplicate
Fix Version/s: Not applicable

> Move sdks/java/core's applyJavaNature back to failOnWarnings: true
> --
>
> Key: BEAM-4554
> URL: https://issues.apache.org/jira/browse/BEAM-4554
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Pablo Estrada
>Assignee: Kenneth Knowles
>Priority: Major
> Fix For: Not applicable
>
>
> This was removed by https://github.com/apache/beam/pull/5609 for the sake of 
> fixing a release, but should be re-enabled ASAP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4536) Python SDK: Pubsub reading with_attributes broken for Dataflow

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4536?focusedWorklogId=111674=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111674
 ]

ASF GitHub Bot logged work on BEAM-4536:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:37
Start Date: 13/Jun/18 21:37
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #5607: [BEAM-4536] Remove 
with_attributes keyword from ReadFromPubSub.
URL: https://github.com/apache/beam/pull/5607#issuecomment-397095959
 
 
   I triggered a build on the release branch itself, and it failed:
   
https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/6441/consoleFull


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111674)
Time Spent: 2h 10m  (was: 2h)

> Python SDK: Pubsub reading with_attributes broken for Dataflow
> --
>
> Key: BEAM-4536
> URL: https://issues.apache.org/jira/browse/BEAM-4536
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.5.0
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Using 
> [ReadFromPubsub|https://github.com/apache/beam/blob/e30e0c807321934e862358e1e3be32dc74374aeb/sdks/python/apache_beam/io/gcp/pubsub.py#L106](with_attributes=True)
>  will fail on Dataflow.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4445) Filter pre-commit triggering based on touched files

2018-06-13 Thread Scott Wegner (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511686#comment-16511686
 ] 

Scott Wegner commented on BEAM-4445:


The interaction between phrase triggering and includedRegions appears to be a 
deficiency in the ghprb plugin; I've [filed a 
bug|https://github.com/jenkinsci/ghprb-plugin/issues/678] and it looks like 
it's simple so perhaps we can contribute the fix.

> Filter pre-commit triggering based on touched files
> ---
>
> Key: BEAM-4445
> URL: https://issues.apache.org/jira/browse/BEAM-4445
> Project: Beam
>  Issue Type: Sub-task
>  Components: build-system, testing
>Reporter: Scott Wegner
>Assignee: Scott Wegner
>Priority: Minor
>  Labels: beam-site-automation-reliability
> Fix For: Not applicable
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> This is discussed in the [Beam-Site Automation 
> Reliability|https://s.apache.org/beam-site-automation] design, under 
> "Pre-Commit Job Filtering"
> The proposal is to filter pre-commit job triggered on PR's based on which 
> files are touched. The impact is that most PRs will only run one set of 
> relevant tests, rather than all three. This will decrease test overhead and 
> the impact of flaky tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4536) Python SDK: Pubsub reading with_attributes broken for Dataflow

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4536?focusedWorklogId=111666=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111666
 ]

ASF GitHub Bot logged work on BEAM-4536:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:29
Start Date: 13/Jun/18 21:29
Worklog Time Spent: 10m 
  Work Description: udim commented on issue #5607: [BEAM-4536] Remove 
with_attributes keyword from ReadFromPubSub.
URL: https://github.com/apache/beam/pull/5607#issuecomment-397093621
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111666)
Time Spent: 2h  (was: 1h 50m)

> Python SDK: Pubsub reading with_attributes broken for Dataflow
> --
>
> Key: BEAM-4536
> URL: https://issues.apache.org/jira/browse/BEAM-4536
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.5.0
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Using 
> [ReadFromPubsub|https://github.com/apache/beam/blob/e30e0c807321934e862358e1e3be32dc74374aeb/sdks/python/apache_beam/io/gcp/pubsub.py#L106](with_attributes=True)
>  will fail on Dataflow.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4546) Implement with hot key fanout for combiners

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4546?focusedWorklogId=111665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111665
 ]

ASF GitHub Bot logged work on BEAM-4546:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:24
Start Date: 13/Jun/18 21:24
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #5639: 
[BEAM-4546] Multi level combine
URL: https://github.com/apache/beam/pull/5639#discussion_r195240878
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1337,6 +1350,74 @@ def default_type_hints(self):
 return hints
 
 
+class _CombinePerKeyWithHotKeyFanout(PTransform):
+
+  def __init__(self, combine_fn, fanout):
+self._fanout_fn = (
+(lambda key: fanout) if isinstance(fanout, int) else fanout)
+self._combine_fn = combine_fn
+
+  def expand(self, pcoll):
+
+from apache_beam.transforms.trigger import AccumulationMode
+combine_fn = self._combine_fn
+fanout_fn = self._fanout_fn
+
+class SplitHotCold(DoFn):
+  counter = 0
+  def process(self, element):
+key, value = element
+fanout = fanout_fn(key)
+if not fanout or fanout is 1:
 
 Review comment:
   Should it be invalid to return None from fanout_fn?
   
   Also a high level question, do we expect users to have an idea about what 
keys would be hot to a point of how much fanout they will want per key? Would 
it be simpler to just accept an fanout integer that will apply to all keys?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111665)

> Implement with hot key fanout for combiners
> ---
>
> Key: BEAM-4546
> URL: https://issues.apache.org/jira/browse/BEAM-4546
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4546) Implement with hot key fanout for combiners

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4546?focusedWorklogId=111664=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111664
 ]

ASF GitHub Bot logged work on BEAM-4546:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:24
Start Date: 13/Jun/18 21:24
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #5639: 
[BEAM-4546] Multi level combine
URL: https://github.com/apache/beam/pull/5639#discussion_r195240432
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1191,6 +1198,12 @@ class CombinePerKey(PTransformWithSideInputs):
   Returns:
 A PObject holding the result of the combine operation.
   """
+  def with_hot_key_fanout(self, fanout):
 
 Review comment:
   Could you add a comment about how this will be used, and what will it do? 
Especially what is expected from fanout fn? (Would it be a better to rename 
fanout to fanoutfn?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111664)
Time Spent: 0.5h  (was: 20m)

> Implement with hot key fanout for combiners
> ---
>
> Key: BEAM-4546
> URL: https://issues.apache.org/jira/browse/BEAM-4546
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle #494

2018-06-13 Thread Apache Jenkins Server
See 


Changes:

[robbe.sneyders] Futurize utils subpackage

[github] Remove GPL findbugs dependency (#5609)

[aaltay] Futurize portability subpackage (#5385)

[altay] Futurize unpackaged files

[altay] resolved six.string_types equivalency

[altay] Futurize testing subpackage

[altay] Futurize tools subpackage

[altay] Remove old_div

--
[...truncated 54.54 MB...]
at 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
at 
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:391)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:360)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:288)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.AssertionError: OutputSideInputs/ParMultiDo(Anonymous).output: 
org/hamcrest/Matchers
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at 
org.apache.beam.sdk.testing.PAssert$DefaultConcludeFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at 
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
at 
org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn.processElement(PAssert.java:1215)
at 
org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at 
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at 
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:42)
at 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:131)
at 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at 

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111663=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111663
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:16
Start Date: 13/Jun/18 21:16
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195238847
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 ##
 @@ -60,17 +60,28 @@
 
   public static InMemoryJobService create(
   Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
-return new InMemoryJobService(stagingServiceDescriptor, invoker);
+return new InMemoryJobService(stagingServiceDescriptor, (String session) 
-> "token", invoker);
+  }
+
+  public static InMemoryJobService create(
+  Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
+  Function stagingServiceTokenProvider,
 
 Review comment:
   Good point. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111663)
Time Spent: 0.5h  (was: 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: beam_PostCommit_Java_GradleBuild #766

2018-06-13 Thread Apache Jenkins Server
See 


Changes:

[ccy] [BEAM-4549] Use per-pipeline unique ids for side inputs in

--
[...truncated 17.61 MB...]

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testReportFailures 
STANDARD_OUT
Dataflow SDK version: 2.6.0-SNAPSHOT

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testReportFailures 
STANDARD_ERROR
Jun 13, 2018 9:12:10 PM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To access the Dataflow monitoring console, please navigate to 
https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-06-13_14_12_09-14940987038604621772?project=apache-beam-testing

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testReportFailures 
STANDARD_OUT
Submitted job: 2018-06-13_14_12_09-14940987038604621772

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testReportFailures 
STANDARD_ERROR
Jun 13, 2018 9:12:10 PM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To cancel the job using the 'gcloud' tool, run:
> gcloud dataflow jobs --project=apache-beam-testing cancel 
--region=us-central1 2018-06-13_14_12_09-14940987038604621772
Jun 13, 2018 9:12:10 PM org.apache.beam.runners.dataflow.TestDataflowRunner 
run
INFO: Running Dataflow job 2018-06-13_14_12_09-14940987038604621772 with 0 
expected assertions.
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:09.614Z: Autoscaling is enabled for job 
2018-06-13_14_12_09-14940987038604621772. The number of workers will be between 
1 and 1000.
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:09.646Z: Autoscaling was automatically enabled for 
job 2018-06-13_14_12_09-14940987038604621772.
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:12.590Z: Checking required Cloud APIs are enabled.
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:12.900Z: Checking permissions granted to controller 
Service Account.
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:16.912Z: Worker configuration: n1-standard-1 in 
us-central1-a.
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:17.373Z: Expanding CoGroupByKey operations into 
optimizable parts.
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:17.583Z: Expanding GroupByKey operations into 
optimizable parts.
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:17.607Z: Lifting ValueCombiningMappingFns into 
MergeBucketsMappingFns
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:17.921Z: Fusing adjacent ParDo, Read, Write, and 
Flatten operations
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:17.954Z: Elided trivial flatten 
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:17.987Z: Fusing consumer SpannerIO.Write/Write 
mutations to Cloud Spanner/Wait.OnSignal/Wait/Map into SpannerIO.Write/Write 
mutations to Cloud Spanner/Create seed/Read(CreateSource)
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:18.033Z: Fusing consumer SpannerIO.Write/Write 
mutations to Cloud Spanner/Read information schema into SpannerIO.Write/Write 
mutations to Cloud Spanner/Wait.OnSignal/Wait/Map
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:18.069Z: Fusing consumer SpannerIO.Write/Write 
mutations to Cloud Spanner/Schema 
View/Combine.GloballyAsSingletonView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
 into SpannerIO.Write/Write mutations to Cloud Spanner/Schema 
View/Combine.GloballyAsSingletonView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)
Jun 13, 2018 9:12:22 PM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-13T21:12:18.112Z: Fusing consumer SpannerIO.Write/Write 
mutations to Cloud Spanner/Schema 
View/Combine.GloballyAsSingletonView/ParDo(IsmRecordForSingularValuePerWindow) 
into 

  1   2   3   >