[beam] branch master updated: [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the new Samza version (#8279)

2019-04-11 Thread xinyu
This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 540327e  [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the 
new Samza version (#8279)
540327e is described below

commit 540327eab201ede710681cd07de8f8105a506730
Author: xinyuiscool 
AuthorDate: Thu Apr 11 18:29:32 2019 -0700

[BEAM-7059] SamzaRunner: fix the job.id inconsistency in the new Samza 
version (#8279)
---
 .../beam/runners/samza/SamzaExecutionContext.java  | 33 --
 .../org/apache/beam/runners/samza/SamzaRunner.java |  5 ++--
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 10 ++-
 .../runners/samza/translation/ConfigBuilder.java   |  2 ++
 4 files changed, 38 insertions(+), 12 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
index af65135..0867e51 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
@@ -59,6 +59,7 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
   private GrpcFnServer fnDataServer;
   private GrpcFnServer fnStateServer;
   private ControlClientPool controlClientPool;
+  private ExecutorService dataExecutor;
   private IdGenerator idGenerator = IdGenerators.incrementingLongs();
 
   public SamzaExecutionContext(SamzaPipelineOptions options) {
@@ -92,7 +93,7 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
 if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
   try {
 controlClientPool = MapControlClientPool.create();
-final ExecutorService dataExecutor = Executors.newCachedThreadPool();
+dataExecutor = Executors.newCachedThreadPool();
 
 fnControlServer =
 GrpcFnServer.allocatePortAndCreateFor(
@@ -100,18 +101,23 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
 controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
 ServerFactory.createWithPortSupplier(
 () -> 
SamzaRunnerOverrideConfigs.getFnControlPort(options)));
+LOG.info("Started control server on port {}", 
fnControlServer.getServer().getPort());
 
 fnDataServer =
 GrpcFnServer.allocatePortAndCreateFor(
 GrpcDataService.create(dataExecutor, 
OutboundObserverFactory.serverDirect()),
 ServerFactory.createDefault());
+LOG.info("Started data server on port {}", 
fnDataServer.getServer().getPort());
 
 fnStateServer =
 GrpcFnServer.allocatePortAndCreateFor(
 GrpcStateService.create(), ServerFactory.createDefault());
+LOG.info("Started state server on port {}", 
fnStateServer.getServer().getPort());
 
 final long waitTimeoutMs =
 SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+LOG.info("Control client wait timeout config: " + waitTimeoutMs);
+
 final InstructionRequestHandler instructionHandler =
 controlClientPool.getSource().take(SAMZA_WORKER_ID, 
Duration.ofMillis(waitTimeoutMs));
 final EnvironmentFactory environmentFactory =
@@ -120,6 +126,7 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
 jobBundleFactory =
 SingleEnvironmentInstanceJobBundleFactory.create(
 environmentFactory, fnDataServer, fnStateServer, idGenerator);
+LOG.info("Started job bundle factory");
   } catch (Exception e) {
 throw new RuntimeException(
 "Running samza in Beam portable mode but failed to create job 
bundle factory", e);
@@ -131,19 +138,29 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
 
   @Override
   public void stop() {
-closeFnServer(fnControlServer);
+closeAutoClosable(fnControlServer, "controlServer");
 fnControlServer = null;
-closeFnServer(fnDataServer);
+closeAutoClosable(fnDataServer, "dataServer");
 fnDataServer = null;
-closeFnServer(fnStateServer);
+closeAutoClosable(fnStateServer, "stateServer");
 fnStateServer = null;
+if (dataExecutor != null) {
+  dataExecutor.shutdown();
+  dataExecutor = null;
+}
+controlClientPool = null;
+closeAutoClosable(jobBundleFactory, "jobBundle");
+jobBundleFactory = null;
   }
 
-  private void closeFnServer(GrpcFnServer fnServer) {
-try (AutoCloseable closer = fnServer) {
-  // do nothing
+  private static void closeAutoClosable(AutoCloseable closeable, String name) {
+try (AutoCloseable closer = closeable) {
+  LOG.info("Closed {

[beam] branch master updated: [BEAM-7046] Restore os.environ in HttpClientTest

2019-04-11 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 34a4e37  [BEAM-7046] Restore os.environ in HttpClientTest
 new e93cdf1  Merge pull request #8266 from udim/http_proxy
34a4e37 is described below

commit 34a4e37f10eab0d1d83188739cdf7baf58d2922c
Author: Udi Meiri 
AuthorDate: Tue Apr 9 15:57:39 2019 -0700

[BEAM-7046] Restore os.environ in HttpClientTest

This was causing warnings about parsing http_proxy in other tests that
use grpc.
---
 .../apache_beam/internal/http_client_test.py   | 87 +++---
 1 file changed, 44 insertions(+), 43 deletions(-)

diff --git a/sdks/python/apache_beam/internal/http_client_test.py 
b/sdks/python/apache_beam/internal/http_client_test.py
index 460b1db..9755edf 100644
--- a/sdks/python/apache_beam/internal/http_client_test.py
+++ b/sdks/python/apache_beam/internal/http_client_test.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import
 import os
 import unittest
 
+import mock
 from httplib2 import ProxyInfo
 
 from apache_beam.internal.http_client import DEFAULT_HTTP_TIMEOUT_SECONDS
@@ -31,52 +32,52 @@ from apache_beam.internal.http_client import 
proxy_info_from_environment_var
 class HttpClientTest(unittest.TestCase):
 
   def test_proxy_from_env_http_with_port(self):
-os.environ['http_proxy'] = 'http://localhost:9000'
-proxy_info = proxy_info_from_environment_var('http_proxy')
-expected = ProxyInfo(3, 'localhost', 9000)
-self.assertEquals(str(expected), str(proxy_info))
+with mock.patch.dict(os.environ, http_proxy='http://localhost:9000'):
+  proxy_info = proxy_info_from_environment_var('http_proxy')
+  expected = ProxyInfo(3, 'localhost', 9000)
+  self.assertEquals(str(expected), str(proxy_info))
 
   def test_proxy_from_env_https_with_port(self):
-os.environ['https_proxy'] = 'https://localhost:9000'
-proxy_info = proxy_info_from_environment_var('https_proxy')
-expected = ProxyInfo(3, 'localhost', 9000)
-self.assertEquals(str(expected), str(proxy_info))
+with mock.patch.dict(os.environ, https_proxy='https://localhost:9000'):
+  proxy_info = proxy_info_from_environment_var('https_proxy')
+  expected = ProxyInfo(3, 'localhost', 9000)
+  self.assertEquals(str(expected), str(proxy_info))
 
   def test_proxy_from_env_http_without_port(self):
-os.environ['http_proxy'] = 'http://localhost'
-proxy_info = proxy_info_from_environment_var('http_proxy')
-expected = ProxyInfo(3, 'localhost', 80)
-self.assertEquals(str(expected), str(proxy_info))
+with mock.patch.dict(os.environ, http_proxy='http://localhost'):
+  proxy_info = proxy_info_from_environment_var('http_proxy')
+  expected = ProxyInfo(3, 'localhost', 80)
+  self.assertEquals(str(expected), str(proxy_info))
 
   def test_proxy_from_env_https_without_port(self):
-os.environ['https_proxy'] = 'https://localhost'
-proxy_info = proxy_info_from_environment_var('https_proxy')
-expected = ProxyInfo(3, 'localhost', 443)
-self.assertEquals(str(expected), str(proxy_info))
+with mock.patch.dict(os.environ, https_proxy='https://localhost'):
+  proxy_info = proxy_info_from_environment_var('https_proxy')
+  expected = ProxyInfo(3, 'localhost', 443)
+  self.assertEquals(str(expected), str(proxy_info))
 
   def test_proxy_from_env_http_without_method(self):
-os.environ['http_proxy'] = 'localhost:8000'
-proxy_info = proxy_info_from_environment_var('http_proxy')
-expected = ProxyInfo(3, 'localhost', 8000)
-self.assertEquals(str(expected), str(proxy_info))
+with mock.patch.dict(os.environ, http_proxy='localhost:8000'):
+  proxy_info = proxy_info_from_environment_var('http_proxy')
+  expected = ProxyInfo(3, 'localhost', 8000)
+  self.assertEquals(str(expected), str(proxy_info))
 
   def test_proxy_from_env_https_without_method(self):
-os.environ['https_proxy'] = 'localhost:8000'
-proxy_info = proxy_info_from_environment_var('https_proxy')
-expected = ProxyInfo(3, 'localhost', 8000)
-self.assertEquals(str(expected), str(proxy_info))
+with mock.patch.dict(os.environ, https_proxy='localhost:8000'):
+  proxy_info = proxy_info_from_environment_var('https_proxy')
+  expected = ProxyInfo(3, 'localhost', 8000)
+  self.assertEquals(str(expected), str(proxy_info))
 
   def test_proxy_from_env_http_without_port_without_method(self):
-os.environ['http_proxy'] = 'localhost'
-proxy_info = proxy_info_from_environment_var('http_proxy')
-expected = ProxyInfo(3, 'localhost', 80)
-self.assertEquals(str(expected), str(proxy_info))
+with mock.patch.dict(os.environ, http_proxy='localhost'):
+  proxy_info = proxy_info_from_environment_var('http_proxy')
+  expected = ProxyInfo(3, 'localhost', 80)
+  self.assertEqu

[beam] branch dot-log created (now c33dd9e)

2019-04-11 Thread jaku
This is an automated email from the ASF dual-hosted git repository.

jaku pushed a change to branch dot-log
in repository https://gitbox.apache.org/repos/asf/beam.git.


  at c33dd9e  Add the appropriate extension to log files.

This branch includes the following new commits:

 new c33dd9e  Add the appropriate extension to log files.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[beam] 01/01: Add the appropriate extension to log files.

2019-04-11 Thread jaku
This is an automated email from the ASF dual-hosted git repository.

jaku pushed a commit to branch dot-log
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c33dd9eae2185014cf4bd6f5bc7ecd6405378ad7
Author: jasonkuster 
AuthorDate: Thu Apr 11 17:12:46 2019 -0700

Add the appropriate extension to log files.

Files are created with the filename. extension which is not a valid 
filetype and can cause difficulties for people when trying to open logs. Set it 
to add an appropriate extension instead.
---
 .../runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
index 7428390..0fdb4ae 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
@@ -273,7 +273,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
 @Override
 public OutputStream get() {
   try {
-String filename = filepath + "." + formatter.format(new Date());
+String filename = filepath + "." + formatter.format(new Date()) + 
".log";
 return new BufferedOutputStream(
 new FileOutputStream(new File(filename), true /* append */));
   } catch (IOException e) {



[beam] branch master updated (8334eb5 -> e4cf63b)

2019-04-11 Thread markliu
This is an automated email from the ASF dual-hosted git repository.

markliu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 8334eb5  Merge pull request #8283 from ibzib/options-exception
 new 9cb9175  Add Python 3.6 and 3.7 test suites
 new 92af51a  Skip tests failing on Python 3.7
 new e9d50a0  Deactivate Python 3.6 and 3.7 cython test suites.
 new e4cf63b  Merge pull request #8036 from RobbeSneyders/python-3.6-3.7

The 20947 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build.gradle   |  1 +
 .../apache_beam/transforms/ptransform_test.py  |  4 ++
 .../typehints/native_type_compatibility_test.py|  6 +++
 .../apache_beam/typehints/typed_pipeline_test.py   | 10 
 .../python/apache_beam/typehints/typehints_test.py |  6 +++
 sdks/python/test-suites/tox/py36/build.gradle  | 15 +-
 .../test-suites/tox/{py35 => py37}/build.gradle| 36 ++
 sdks/python/tox.ini| 56 +-
 settings.gradle|  2 +
 9 files changed, 113 insertions(+), 23 deletions(-)
 copy sdks/python/test-suites/tox/{py35 => py37}/build.gradle (63%)



[beam] branch master updated: [BEAM-7053] prevent errors in Spark options

2019-04-11 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d0f1968  [BEAM-7053] prevent errors in Spark options
 new 8334eb5  Merge pull request #8283 from ibzib/options-exception
d0f1968 is described below

commit d0f19682e62d090fe96175805198f198ed313391
Author: Kyle Weaver 
AuthorDate: Wed Apr 10 14:22:25 2019 -0700

[BEAM-7053] prevent errors in Spark options
---
 .../main/java/org/apache/beam/runners/spark/SparkJobInvoker.java  | 8 
 1 file changed, 8 insertions(+)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
index cea4b07..e47c851 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
@@ -56,6 +56,14 @@ public class SparkJobInvoker extends JobInvoker {
 String.format("%s_%s", sparkOptions.getJobName(), 
UUID.randomUUID().toString());
 LOG.info("Invoking job {}", invocationId);
 
+// Options can't be translated to proto if runner class is unresolvable, 
so set it to null.
+sparkOptions.setRunner(null);
+
+if (sparkOptions.getAppName() == null) {
+  LOG.debug("App name was null. Using invocationId {}", invocationId);
+  sparkOptions.setAppName(invocationId);
+}
+
 return createJobInvocation(
 invocationId, retrievalToken, executorService, pipeline, sparkOptions);
   }



[beam] branch spark-runner_structured-streaming updated (3533c89 -> 573cf0c)

2019-04-11 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 3533c89  Add source streaming test
 new 6b84c2f  Specify checkpointLocation at the pipeline start
 new dbe88d4  Clean unneeded 0 arg constructor in batch source
 new 5e584ce  Remove unneeded 0 arg constructor in batch source
 new 980a922  Clean streaming source
 new 4562a89  Fllow up on offsets for streaming source
 new 573cf0c  Deal with checkpoint and offset based read

The 19459 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../structuredstreaming/SparkPipelineOptions.java  |  12 ---
 .../translation/TranslationContext.java|  14 ++-
 .../translation/batch/DatasetSourceBatch.java  |   2 -
 .../streaming/DatasetSourceStreaming.java  | 111 +
 .../streaming/ReadSourceTranslatorStreaming.java   |  22 +---
 5 files changed, 105 insertions(+), 56 deletions(-)



[beam] branch master updated (51a13f0 -> c1841e2)

2019-04-11 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 51a13f0  Merge pull request #8272 from 
ajamato/mean_byte_count_cy_combiner_only
 new 9d4bab5  [BEAM-6677] Pulling job server in beam init action
 new 0419113  [BEAM-6677] Quickfix: Prevent not finding nodes during Flink 
configuration on Dataproc
 new a5da15f  [BEAM-6677] Add GCLOUD_ZONE and DATAPROC_VERSION for easier 
customization
 new 1943416  [BEAM-6677] Add JobServer support to create_flink_cluster.sh
 new c1841e2  Merge pull request #8250 from 
lgajowy/BEAM-6677_flink-job-server-dataproc

The 20941 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .test-infra/dataproc/create_flink_cluster.sh | 45 ++--
 .test-infra/dataproc/init-actions/beam.sh| 34 ++---
 .test-infra/dataproc/init-actions/flink.sh   |  9 +++---
 3 files changed, 58 insertions(+), 30 deletions(-)



[beam] branch master updated: [BEAM-4374] Add Beam Distribution Accumulator to use in python's counter factory.

2019-04-11 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f876166  [BEAM-4374] Add Beam Distribution Accumulator to use in 
python's counter factory.
 new 51a13f0  Merge pull request #8272 from 
ajamato/mean_byte_count_cy_combiner_only
f876166 is described below

commit f876166ba02a94ea66954a586810f1f15d36d98e
Author: Alex Amato 
AuthorDate: Wed Mar 13 17:56:17 2019 -0700

[BEAM-4374] Add Beam Distribution Accumulator to use in python's counter
factory.
---
 .../runners/dataflow/internal/apiclient.py | 21 ---
 .../runners/dataflow/internal/apiclient_test.py| 44 +-
 .../python/apache_beam/transforms/cy_combiners.pxd | 10 +
 sdks/python/apache_beam/transforms/cy_combiners.py | 36 ++
 sdks/python/apache_beam/utils/counters.py  |  1 +
 5 files changed, 105 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index e0a1a56..b0b1325 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -829,8 +829,8 @@ def translate_distribution(distribution_update, 
metric_update_proto):
   """Translate metrics DistributionUpdate to dataflow distribution update.
 
   Args:
-distribution_update: Instance of DistributionData or
-DataflowDistributionCounter.
+distribution_update: Instance of DistributionData,
+DistributionInt64Accumulator or DataflowDistributionCounter.
 metric_update_proto: Used for report metrics.
   """
   dist_update_proto = dataflow.DistributionUpdate()
@@ -838,7 +838,7 @@ def translate_distribution(distribution_update, 
metric_update_proto):
   dist_update_proto.max = to_split_int(distribution_update.max)
   dist_update_proto.count = to_split_int(distribution_update.count)
   dist_update_proto.sum = to_split_int(distribution_update.sum)
-  # DatadflowDistributionCounter needs to translate histogram
+  # DataflowDistributionCounter needs to translate histogram
   if isinstance(distribution_update, DataflowDistributionCounter):
 dist_update_proto.histogram = dataflow.Histogram()
 distribution_update.translate_to_histogram(dist_update_proto.histogram)
@@ -969,6 +969,11 @@ def 
_verify_interpreter_version_is_supported(pipeline_options):
 
 
 # To enable a counter on the service, add it to this dictionary.
+# This is required for the legacy python dataflow runner, as portability
+# does not communicate to the service via python code, but instead via a
+# a runner harness (in C++ or Java).
+# TODO(BEAM-7050) : Remove this antipattern, legacy dataflow python
+# pipelines will break whenever a new cy_combiner type is used.
 structured_counter_translations = {
 cy_combiners.CountCombineFn: (
 dataflow.CounterMetadata.KindValueValuesEnum.SUM,
@@ -1005,7 +1010,10 @@ structured_counter_translations = {
 MetricUpdateTranslators.translate_boolean),
 cy_combiners.DataflowDistributionCounterFn: (
 dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
-translate_distribution)
+translate_distribution),
+cy_combiners.DistributionInt64Fn: (
+dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
+translate_distribution),
 }
 
 
@@ -1045,5 +1053,8 @@ counter_translations = {
 MetricUpdateTranslators.translate_boolean),
 cy_combiners.DataflowDistributionCounterFn: (
 dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION,
-translate_distribution)
+translate_distribution),
+cy_combiners.DistributionInt64Fn: (
+dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
+translate_distribution),
 }
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 77eba7c..2f65716 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -163,7 +163,24 @@ class UtilTest(unittest.TestCase):
 self.assertEqual((split_number.lowBits, split_number.highBits),
  (0, number))
 
-  def test_translate_distribution(self):
+  def test_translate_distribution_using_accumulator(self):
+metric_update = dataflow.CounterUpdate()
+accumulator = mock.Mock()
+accumulator.min = 1
+accumulator.max = 15
+accumulator.sum = 16
+accumulator.count = 2
+apiclient.translate_distribution(accumulator, metric_update)
+self.assertEqual(metric_update.distribution.min.lowBits,
+ accumulator.min)
+self.assertEqual(metric_update.distribution.max.lowBits,
+  

[beam] branch master updated: Refactoring code from direct runner, and adding unit test for processing time timers. (#8271)

2019-04-11 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new cbe4dfb  Refactoring code from direct runner, and adding unit test for 
processing time timers. (#8271)
cbe4dfb is described below

commit cbe4dfbdbe5d0da5152568853ee5e17334dd1b54
Author: Pablo 
AuthorDate: Thu Apr 11 11:35:25 2019 -0700

Refactoring code from direct runner, and adding unit test for processing 
time timers. (#8271)

* Small refactor of direct runner code, and adding unit test.

* Fixing lint issue
---
 sdks/python/apache_beam/runners/common.py  |  8 +--
 .../apache_beam/runners/direct/direct_runner.py| 11 ++--
 .../runners/direct/evaluation_context.py   | 28 ++
 .../apache_beam/transforms/userstate_test.py   | 59 ++
 4 files changed, 85 insertions(+), 21 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index f1fda35..84ac116 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -547,7 +547,7 @@ class PerWindowInvoker(DoFnInvoker):
   try:
 self.current_windowed_value = windowed_value
 self.restriction_tracker = restriction_tracker
-return self._invoke_per_window(
+return self._invoke_process_per_window(
 windowed_value, additional_args, additional_kwargs,
 output_processor)
   finally:
@@ -556,14 +556,14 @@ class PerWindowInvoker(DoFnInvoker):
 
 elif self.has_windowed_inputs and len(windowed_value.windows) != 1:
   for w in windowed_value.windows:
-self._invoke_per_window(
+self._invoke_process_per_window(
 WindowedValue(windowed_value.value, windowed_value.timestamp, 
(w,)),
 additional_args, additional_kwargs, output_processor)
 else:
-  self._invoke_per_window(
+  self._invoke_process_per_window(
   windowed_value, additional_args, additional_kwargs, output_processor)
 
-  def _invoke_per_window(
+  def _invoke_process_per_window(
   self, windowed_value, additional_args,
   additional_kwargs, output_processor):
 if self.has_windowed_inputs:
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 43e8c7f..e880460 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -69,11 +69,6 @@ class SwitchingDirectRunner(PipelineRunner):
   """
 
   def run_pipeline(self, pipeline, options):
-use_fnapi_runner = True
-
-# Streaming mode is not yet supported on the FnApiRunner.
-if options.view_as(StandardOptions).streaming:
-  use_fnapi_runner = False
 
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
@@ -113,8 +108,10 @@ class SwitchingDirectRunner(PipelineRunner):
   self.supported_by_fnapi_runner = False
 
 # Check whether all transforms used in the pipeline are supported by the
-# FnApiRunner.
-use_fnapi_runner = _FnApiRunnerSupportVisitor().accept(pipeline)
+# FnApiRunner, and the pipeline was not meant to be run as streaming.
+use_fnapi_runner = (
+_FnApiRunnerSupportVisitor().accept(pipeline)
+and not options.view_as(StandardOptions).streaming)
 
 # Also ensure grpc is available.
 try:
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 24b05b6..a042ded 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -274,16 +274,7 @@ class EvaluationContext(object):
result.logical_metric_updates)
 
   # If the result is for a view, update side inputs container.
-  if (result.uncommitted_output_bundles
-  and result.uncommitted_output_bundles[0].pcollection
-  in self._pcollection_to_views):
-for view in self._pcollection_to_views[
-result.uncommitted_output_bundles[0].pcollection]:
-  for committed_bundle in committed_bundles:
-# side_input must be materialized.
-self._side_inputs_container.add_values(
-view,
-committed_bundle.get_elements_iterable(make_copy=True))
+  self._update_side_inputs_container(committed_bundles, result)
 
   # Tasks generated from unblocked side inputs as the watermark progresses.
   tasks = self._watermark_manager.update_watermarks(
@@ -304,6 +295,23 @@ class EvaluationContext(object):
 existing_keyed_state[k] = v
   return committed_bundles
 
+  def _upd

[beam] branch master updated: Update dataflow worker container version (#8275)

2019-04-11 Thread lcwik
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 9893e2c  Update dataflow worker container version (#8275)
9893e2c is described below

commit 9893e2c52ee8a0c1e83de043af628c1a09b39892
Author: Boyuan Zhang <36090911+boyua...@users.noreply.github.com>
AuthorDate: Thu Apr 11 09:39:06 2019 -0700

Update dataflow worker container version (#8275)
---
 runners/google-cloud-dataflow-java/build.gradle | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index c9cefa5..558813c 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -39,7 +39,7 @@ processResources {
   filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
 'dataflow.legacy_environment_major_version' : '7',
 'dataflow.fnapi_environment_major_version' : '7',
-'dataflow.container_version' : 'beam-master-20190308'
+'dataflow.container_version' : 'beam-master-20190410'
   ]
 }
 



[beam] branch master updated: Update the spec for extract_output and make it clear the input accumulator is mutable

2019-04-11 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 0100149  Update the spec for extract_output and make it clear the 
input accumulator is mutable
 new cc903a4  Merge pull request #8264 from robinyqiu/typo
0100149 is described below

commit 0100149c44bf07c9ac8dc93fde44289d97d2ede5
Author: Yueyang Qiu 
AuthorDate: Tue Apr 9 15:03:41 2019 -0700

Update the spec for extract_output and make it clear the input accumulator 
is mutable
---
 .../core/src/main/java/org/apache/beam/sdk/transforms/Combine.java | 2 ++
 sdks/python/apache_beam/transforms/core.py | 3 ++-
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 54e6f08..7444062 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -359,6 +359,8 @@ public class Combine {
 /**
  * Returns the output value that is the result of combining all the input 
values represented by
  * the given accumulator.
+ *
+ * @param accumulator can be modified for efficiency
  */
 public abstract OutputT extractOutput(AccumT accumulator);
 
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index a445152..a159abd 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -667,7 +667,8 @@ class CombineFn(WithTypeHints, HasDisplayData, 
urns.RunnerApiFn):
 
 Args:
   accumulator: the final accumulator value computed by this CombineFn
-for the entire input key or PCollection.
+for the entire input key or PCollection. Can be modified for
+efficiency.
   *args: Additional arguments and side inputs.
   **kwargs: Additional arguments and side inputs.
 """



[beam] branch master updated: [BEAM-6935] Spark portable runner: implement side inputs

2019-04-11 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new c45a50f  [BEAM-6935] Spark portable runner: implement side inputs
 new 16b58ad  Merge pull request #8220: [BEAM-6935] Spark portable runner: 
implement side inputs
c45a50f is described below

commit c45a50fb092171ce4fa5f8b0758a584911d4f50d
Author: Kyle Weaver 
AuthorDate: Thu Mar 28 19:16:51 2019 -0700

[BEAM-6935] Spark portable runner: implement side inputs
---
 .../functions/FlinkExecutableStageFunction.java|  4 +-
 .../translation/BatchSideInputHandlerFactory.java} | 35 ++---
 .../BatchSideInputHandlerFactoryTest.java} | 40 +++
 .../runners/spark/translation/BoundedDataset.java  |  9 
 .../SparkBatchPortablePipelineTranslator.java  | 47 +++--
 .../translation/SparkExecutableStageFunction.java  | 59 +++---
 .../runners/spark/SparkPortableExecutionTest.java  | 36 +
 .../SparkExecutableStageFunctionTest.java  | 15 +++---
 8 files changed, 181 insertions(+), 64 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index e7dafa8..c02aa65 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -54,6 +54,7 @@ import 
org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
+import 
org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.io.FileSystems;
@@ -167,7 +168,8 @@ public class FlinkExecutableStageFunction extends 
AbstractRichFunction
   RuntimeContext runtimeContext) {
 final StateRequestHandler sideInputHandler;
 StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory =
-FlinkBatchSideInputHandlerFactory.forStage(executableStage, 
runtimeContext);
+BatchSideInputHandlerFactory.forStage(
+executableStage, runtimeContext::getBroadcastVariable);
 try {
   sideInputHandler =
   StateRequestHandlers.forSideInputHandlerFactory(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
similarity index 87%
rename from 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
rename to 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
index 798c32b..5460898 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.translation.functions;
+package org.apache.beam.runners.fnexecution.translation;
 
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 
@@ -43,24 +43,25 @@ import org.apache.beam.sdk.values.KV;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMultimap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
-import org.apache.flink.api.common.functions.RuntimeContext;
 
-/**
- * {@link StateRequestHandler} that uses a Flink {@link RuntimeContext} to 
access Flink broadcast
- * variable that represent side inputs.
- */
-class FlinkBatchSideInputHandlerFactory implements SideInputHandlerFactory {
+/** {@link StateRequestHandler} that uses a {@link SideInputGetter} to access 
side inputs. */
+public class BatchSideInputHandlerFactory implements SideInputHandlerFactory {
 
   // Map from side input id to global PCollection id.
   private final Map sideInputToCollection;
-  private final RuntimeContext runtimeContext;
+  private final SideInputGetter sideInputGetter;

[beam] branch master updated: [BEAM-6935] Spark portable runner: implement side inputs

2019-04-11 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new c45a50f  [BEAM-6935] Spark portable runner: implement side inputs
 new 16b58ad  Merge pull request #8220: [BEAM-6935] Spark portable runner: 
implement side inputs
c45a50f is described below

commit c45a50fb092171ce4fa5f8b0758a584911d4f50d
Author: Kyle Weaver 
AuthorDate: Thu Mar 28 19:16:51 2019 -0700

[BEAM-6935] Spark portable runner: implement side inputs
---
 .../functions/FlinkExecutableStageFunction.java|  4 +-
 .../translation/BatchSideInputHandlerFactory.java} | 35 ++---
 .../BatchSideInputHandlerFactoryTest.java} | 40 +++
 .../runners/spark/translation/BoundedDataset.java  |  9 
 .../SparkBatchPortablePipelineTranslator.java  | 47 +++--
 .../translation/SparkExecutableStageFunction.java  | 59 +++---
 .../runners/spark/SparkPortableExecutionTest.java  | 36 +
 .../SparkExecutableStageFunctionTest.java  | 15 +++---
 8 files changed, 181 insertions(+), 64 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index e7dafa8..c02aa65 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -54,6 +54,7 @@ import 
org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
+import 
org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.io.FileSystems;
@@ -167,7 +168,8 @@ public class FlinkExecutableStageFunction extends 
AbstractRichFunction
   RuntimeContext runtimeContext) {
 final StateRequestHandler sideInputHandler;
 StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory =
-FlinkBatchSideInputHandlerFactory.forStage(executableStage, 
runtimeContext);
+BatchSideInputHandlerFactory.forStage(
+executableStage, runtimeContext::getBroadcastVariable);
 try {
   sideInputHandler =
   StateRequestHandlers.forSideInputHandlerFactory(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
similarity index 87%
rename from 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
rename to 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
index 798c32b..5460898 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.translation.functions;
+package org.apache.beam.runners.fnexecution.translation;
 
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 
@@ -43,24 +43,25 @@ import org.apache.beam.sdk.values.KV;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMultimap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
-import org.apache.flink.api.common.functions.RuntimeContext;
 
-/**
- * {@link StateRequestHandler} that uses a Flink {@link RuntimeContext} to 
access Flink broadcast
- * variable that represent side inputs.
- */
-class FlinkBatchSideInputHandlerFactory implements SideInputHandlerFactory {
+/** {@link StateRequestHandler} that uses a {@link SideInputGetter} to access 
side inputs. */
+public class BatchSideInputHandlerFactory implements SideInputHandlerFactory {
 
   // Map from side input id to global PCollection id.
   private final Map sideInputToCollection;
-  private final RuntimeContext runtimeContext;
+  private final SideInputGetter sideInputGetter;

[beam] branch master updated: [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not

2019-04-11 Thread aromanenko
This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new bc9aa73  [BEAM-7041] Let the user control if he wants to wrap the 
provided DataSource as a poolable one or not
 new 702df1b  Merge pull request #8257: [BEAM-7041] Let the user control if 
he wants to wrap the provided DataSource as a poolable one or not
bc9aa73 is described below

commit bc9aa730009909d9c632fce669bff5ce25d9d81a
Author: Jean-Baptiste Onofré 
AuthorDate: Tue Apr 9 17:15:21 2019 +0200

[BEAM-7041] Let the user control if he wants to wrap the provided 
DataSource as a poolable one or not
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 45 ++
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java| 14 +++
 2 files changed, 44 insertions(+), 15 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index e6f2699..8c824a8 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -242,6 +242,8 @@ public class JdbcIO {
 @Nullable
 abstract DataSource getDataSource();
 
+abstract boolean isPoolingDataSource();
+
 abstract Builder builder();
 
 @AutoValue.Builder
@@ -258,14 +260,22 @@ public class JdbcIO {
 
   abstract Builder setDataSource(DataSource dataSource);
 
+  abstract Builder setPoolingDataSource(boolean poolingDataSource);
+
   abstract DataSourceConfiguration build();
 }
 
 public static DataSourceConfiguration create(DataSource dataSource) {
+  return create(dataSource, true);
+}
+
+public static DataSourceConfiguration create(
+DataSource dataSource, boolean isPoolingDataSource) {
   checkArgument(dataSource != null, "dataSource can not be null");
   checkArgument(dataSource instanceof Serializable, "dataSource must be 
Serializable");
   return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
   .setDataSource(dataSource)
+  .setPoolingDataSource(isPoolingDataSource)
   .build();
 }
 
@@ -284,6 +294,7 @@ public class JdbcIO {
   return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
   .setDriverClassName(driverClassName)
   .setUrl(url)
+  .setPoolingDataSource(true)
   .build();
 }
 
@@ -356,21 +367,25 @@ public class JdbcIO {
 current = basicDataSource;
   }
 
-  // wrapping the datasource as a pooling datasource
-  DataSourceConnectionFactory connectionFactory = new 
DataSourceConnectionFactory(current);
-  PoolableConnectionFactory poolableConnectionFactory =
-  new PoolableConnectionFactory(connectionFactory, null);
-  GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
-  poolConfig.setMaxTotal(1);
-  poolConfig.setMinIdle(0);
-  poolConfig.setMinEvictableIdleTimeMillis(1);
-  poolConfig.setSoftMinEvictableIdleTimeMillis(3);
-  GenericObjectPool connectionPool =
-  new GenericObjectPool(poolableConnectionFactory, poolConfig);
-  poolableConnectionFactory.setPool(connectionPool);
-  poolableConnectionFactory.setDefaultAutoCommit(false);
-  poolableConnectionFactory.setDefaultReadOnly(false);
-  return new PoolingDataSource(connectionPool);
+  if (isPoolingDataSource()) {
+// wrapping the datasource as a pooling datasource
+DataSourceConnectionFactory connectionFactory = new 
DataSourceConnectionFactory(current);
+PoolableConnectionFactory poolableConnectionFactory =
+new PoolableConnectionFactory(connectionFactory, null);
+GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+poolConfig.setMaxTotal(1);
+poolConfig.setMinIdle(0);
+poolConfig.setMinEvictableIdleTimeMillis(1);
+poolConfig.setSoftMinEvictableIdleTimeMillis(3);
+GenericObjectPool connectionPool =
+new GenericObjectPool(poolableConnectionFactory, poolConfig);
+poolableConnectionFactory.setPool(connectionPool);
+poolableConnectionFactory.setDefaultAutoCommit(false);
+poolableConnectionFactory.setDefaultReadOnly(false);
+return new PoolingDataSource(connectionPool);
+  } else {
+return current;
+  }
 }
   }
 
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 0e9127a..3e45363 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -45,6 +45,7 @@ i