[beam] branch master updated: Update Dataflow container version to 20190213

2019-02-19 Thread ccy
This is an automated email from the ASF dual-hosted git repository.

ccy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 97cc27d  Update Dataflow container version to 20190213
 new d49996c  Merge pull request #7891 from 
charlesccychen/update-container-20190213
97cc27d is described below

commit 97cc27d1cea9cec8a2714635a5fab79abbcdb7ea
Author: Charles Chen 
AuthorDate: Tue Feb 19 13:12:39 2019 -0800

Update Dataflow container version to 20190213
---
 sdks/python/apache_beam/runners/dataflow/internal/names.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py 
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index e42381e..aa60087 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -38,7 +38,7 @@ SERIALIZED_SOURCE_KEY = 'serialized_source'
 
 # Update this version to the next version whenever there is a change that will
 # require changes to legacy Dataflow worker execution environment.
-BEAM_CONTAINER_VERSION = 'beam-master-20190208'
+BEAM_CONTAINER_VERSION = 'beam-master-20190213'
 # Update this version to the next version whenever there is a change that
 # requires changes to SDK harness container or SDK harness launcher.
 BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20190213'



[beam] branch master updated: [BEAM-6639] Retry pulling clickhouse-server docker image

2019-02-19 Thread kenn
This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new efe66f1  [BEAM-6639] Retry pulling clickhouse-server docker image
 new 399de52  Merge pull request #7797: [BEAM-6639] Retry pulling 
clickhouse-server docker image
efe66f1 is described below

commit efe66f156fedc0fda412f8737801f4e0777966db
Author: Gleb Kanterov 
AuthorDate: Sat Feb 9 16:59:44 2019 +0100

[BEAM-6639] Retry pulling clickhouse-server docker image
---
 .../beam/sdk/io/clickhouse/BaseClickHouseTest.java | 61 --
 1 file changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
index 44e98f7..edf29a5 100644
--- 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
@@ -18,12 +18,25 @@
 package org.apache.beam.sdk.io.clickhouse;
 
 import com.github.dockerjava.api.command.CreateContainerCmd;
+import com.github.dockerjava.api.model.Image;
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.List;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
+import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
 import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.ClickHouseContainer;
 import org.testcontainers.containers.GenericContainer;
@@ -37,8 +50,16 @@ public class BaseClickHouseTest {
   public static GenericContainer zookeeper;
   public static ClickHouseContainer clickHouse;
 
+  // yandex/clickhouse-server:19.1.6
+  // use SHA256 not to pull docker hub for tag if image already exists locally
+  private static final String CLICKHOUSE_IMAGE =
+  "yandex/clickhouse-server@"
+  + 
"sha256:c75f66f3619ca70a9f7215966505eaed2fc0ca0ee7d6a7b5407d1b14df8ddefc";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseClickHouseTest.class);
+
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws IOException, InterruptedException {
 // network sharing doesn't work with ClassRule
 network = Network.newNetwork();
 
@@ -48,11 +69,13 @@ public class BaseClickHouseTest {
 .withExposedPorts(2181)
 .withNetwork(network)
 .withNetworkAliases("zookeeper");
+
+// so far zookeeper container always starts successfully, so no extra 
retries
 zookeeper.start();
 
 clickHouse =
 (ClickHouseContainer)
-new ClickHouseContainer("yandex/clickhouse-server:19.1")
+new ClickHouseContainer(CLICKHOUSE_IMAGE)
 .withStartupAttempts(10)
 .withCreateContainerCmdModifier(
 // type inference for `(CreateContainerCmd) -> cmd.` 
doesn't work
@@ -65,7 +88,39 @@ public class BaseClickHouseTest {
 "config.d/zookeeper_default.xml",
 "/etc/clickhouse-server/config.d/zookeeper_default.xml",
 BindMode.READ_ONLY);
-clickHouse.start();
+
+BackOff backOff =
+FluentBackoff.DEFAULT
+.withMaxRetries(3)
+.withInitialBackoff(Duration.standardSeconds(15))
+.backoff();
+
+// try to start clickhouse-server a couple of times, see BEAM-6639
+while (true) {
+  try {
+Unreliables.retryUntilSuccess(
+10,
+() -> {
+  DockerClientFactory.instance()
+  .checkAndPullImage(DockerClientFactory.instance().client(), 
CLICKHOUSE_IMAGE);
+
+  return null;
+});
+
+clickHouse.start();
+break;
+  } catch (Exception e) {
+if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) {
+  throw e;
+} else {
+  List images =
+  
DockerClientFactory.instance().client().listImagesCmd().withShowAll(true).exec();
+  String listImagesOutput = "listImagesCmd:\n" + 
Joiner.on('\n').join(images) + "\n";
+
+  LOG.warn("failed to start clickhouse-server\n\n" + listImagesOutput, 
e);
+}
+  }
+}
   }
 
   @AfterClass



[beam] branch master updated: [BEAM-6459] Don't run setupVirtualenv during clean

2019-02-19 Thread kenn
This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 354bca0  [BEAM-6459] Don't run setupVirtualenv during clean
 new dc05153  Merge pull request #7897: [BEAM-6459] Don't run 
setupVirtualenv during clean
354bca0 is described below

commit 354bca011543a17cdde2993449b0d4c6174558bc
Author: Udi Meiri 
AuthorDate: Tue Feb 19 16:19:15 2019 -0800

[BEAM-6459] Don't run setupVirtualenv during clean

`cleanPython` no longer depends on `setupVirtualenv`.
---
 .../src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 779915d..3fd4435 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1631,11 +1631,14 @@ class BeamModulePlugin implements Plugin {
   }
   project.installGcpTest.mustRunAfter project.sdist
 
-  project.task('cleanPython', dependsOn: 'setupVirtualenv') {
+  project.task('cleanPython') {
 doLast {
+  def activate = "${project.ext.envdir}/bin/activate"
   project.exec {
 executable 'sh'
-args '-c', ". ${project.ext.envdir}/bin/activate && python 
${pythonRootDir}/setup.py clean"
+args '-c', "if [ -e ${activate} ]; then " +
+". ${activate} && python ${pythonRootDir}/setup.py clean; 
" +
+"fi"
   }
   project.delete project.buildDir // Gradle build directory
   project.delete project.ext.envdir   // virtualenv directory



[beam] branch master updated (b065099 -> 5bfba4c)

2019-02-19 Thread ccy
This is an automated email from the ASF dual-hosted git repository.

ccy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from b065099  Merge pull request #7858 from tvalentyn/patch-39
 new 880d00b  Unskip tests
 new f87f7b2  Fix typehinting for Python 3 versions < 3.5.3
 new 5bfba4c  Merge pull request #7873 from RobbeSneyders/tuple-typing

The 20258 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../typehints/native_type_compatibility.py | 22 +-
 .../typehints/native_type_compatibility_test.py| 10 --
 .../apache_beam/typehints/typed_pipeline_test.py   |  6 --
 3 files changed, 21 insertions(+), 17 deletions(-)



[beam] branch master updated: Update FnAPI dev container version.

2019-02-19 Thread ccy
This is an automated email from the ASF dual-hosted git repository.

ccy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ac6c425  Update FnAPI dev container version.
 new b065099  Merge pull request #7858 from tvalentyn/patch-39
ac6c425 is described below

commit ac6c4256a8258dcd4618e5693fbe1fb16d324a38
Author: tvalentyn 
AuthorDate: Fri Feb 15 14:05:16 2019 -0800

Update FnAPI dev container version.
---
 sdks/python/apache_beam/runners/dataflow/internal/names.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py 
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 5670d6f..e42381e 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -41,7 +41,7 @@ SERIALIZED_SOURCE_KEY = 'serialized_source'
 BEAM_CONTAINER_VERSION = 'beam-master-20190208'
 # Update this version to the next version whenever there is a change that
 # requires changes to SDK harness container or SDK harness launcher.
-BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20190208'
+BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20190213'
 
 # TODO(BEAM-5939): Remove these shared names once Dataflow worker is updated.
 PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'



[beam] branch master updated: Revert "[BEAM-6553] A Python SDK sink that supports File Loads into BQ (#7655)"

2019-02-19 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 583cc5e  Revert "[BEAM-6553] A Python SDK sink that supports File 
Loads into BQ (#7655)"
 new b25dd39  Merge pull request #7887 from pabloem/fix-bq-iss
583cc5e is described below

commit 583cc5eb476335faa46394c13eb7dbd1fda6a2b1
Author: Pablo 
AuthorDate: Tue Feb 19 11:44:28 2019 -0800

Revert "[BEAM-6553] A Python SDK sink that supports File Loads into BQ 
(#7655)"

This reverts commit cdea885872b3be7de9ba22f22700be89f7d53766.
---
 .../io/gcp/big_query_query_to_table_it_test.py |   7 -
 .../io/gcp/big_query_query_to_table_pipeline.py|   7 +-
 sdks/python/apache_beam/io/gcp/bigquery.py |  97 +---
 .../apache_beam/io/gcp/bigquery_file_loads.py  | 601 -
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 462 
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |  92 +---
 .../apache_beam/io/gcp/tests/bigquery_matcher.py   |  58 +-
 .../runners/dataflow/dataflow_runner.py|  21 +
 sdks/python/apache_beam/transforms/ptransform.py   |   6 +-
 9 files changed, 55 insertions(+), 1296 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py 
b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 8980261..43db185 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -128,11 +128,8 @@ class BigQueryQueryToTableIT(unittest.TestCase):
 project=self.project,
 query=verify_query,
 checksum=expected_checksum)]
-
-gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
 extra_opts = {'query': LEGACY_QUERY,
   'output': self.output_table,
-  'gs_location': gs_location,
   'output_schema': DIALECT_OUTPUT_SCHEMA,
   'use_standard_sql': False,
   'on_success_matcher': all_of(*pipeline_verifiers)}
@@ -147,10 +144,8 @@ class BigQueryQueryToTableIT(unittest.TestCase):
 project=self.project,
 query=verify_query,
 checksum=expected_checksum)]
-gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
 extra_opts = {'query': STANDARD_QUERY,
   'output': self.output_table,
-  'gs_location': gs_location,
   'output_schema': DIALECT_OUTPUT_SCHEMA,
   'use_standard_sql': True,
   'on_success_matcher': all_of(*pipeline_verifiers)}
@@ -191,11 +186,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
 query=verify_query,
 checksum=expected_checksum)]
 self._setup_new_types_env()
-gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
 extra_opts = {
 'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
 'output': self.output_table,
-'gs_location': gs_location,
 'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
 'use_standard_sql': False,
 'on_success_matcher': all_of(*pipeline_verifiers)}
diff --git 
a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py 
b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index b35f4e5..26b418a 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -50,9 +50,6 @@ def run_bq_pipeline(argv=None):
   help='Output BQ table to write results to.')
   parser.add_argument('--kms_key', default=None,
   help='Use this Cloud KMS key with BigQuery.')
-  parser.add_argument('--gs_location',
-  default=None,
-  help='GCS bucket location to use to store files.')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   table_schema = parse_table_schema_from_json(known_args.output_schema)
@@ -65,12 +62,12 @@ def run_bq_pipeline(argv=None):
   (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
   query=known_args.query, use_standard_sql=known_args.use_standard_sql,
   kms_key=kms_key))
-   | 'write' >> beam.io.WriteToBigQuery(
+   | 'write' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
-   gs_location=known_args.gs_location))
+   kms_key=known_args.kms_key)))
 
   result = p.run()
   result.wait_until_finish()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index b1c1f90..8f3011b 100

[beam] branch master updated: Use same trigger for Py2 and Py3 postcommit test suites.

2019-02-19 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ed86afa  Use same trigger for Py2 and Py3 postcommit test suites.
 new 240c723  Merge pull request #7881 from tvalentyn/patch-40
ed86afa is described below

commit ed86afaaae4532c52a85a30bd5ddccc68f55becb
Author: tvalentyn 
AuthorDate: Tue Feb 19 09:42:09 2019 -0800

Use same trigger for Py2 and Py3 postcommit test suites.
---
 .test-infra/jenkins/job_PostCommit_Python3_Verify.groovy | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.test-infra/jenkins/job_PostCommit_Python3_Verify.groovy 
b/.test-infra/jenkins/job_PostCommit_Python3_Verify.groovy
index 1eb1556..e51ccce 100644
--- a/.test-infra/jenkins/job_PostCommit_Python3_Verify.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python3_Verify.groovy
@@ -20,7 +20,7 @@ import CommonJobProperties as commonJobProperties
 import PostcommitJobBuilder
 
 // This job defines the Python postcommit tests.
-PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python3_Verify', 'Run 
Python3 PostCommit',
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python3_Verify', 'Run 
Python PostCommit',
 'Python SDK PostCommit Tests on Python 3', this) {
   description('Runs postcommit tests on the Python SDK on Python 3.')
 



[beam] branch master updated (0a3b1c3 -> b2fa119)

2019-02-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 0a3b1c3  Merge pull request #7884: Fix 
job_PreCommit_Java_Examples_Dataflow glob.
 add b2fa119  Merge pull request #7865: [BEAM-6701] Add logical types to 
schema

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/beam/sdk/coders/RowCoder.java  |   2 +
 .../apache/beam/sdk/coders/RowCoderGenerator.java  |   6 +-
 .../beam/sdk/schemas/FieldTypeDescriptors.java |   4 +
 .../org/apache/beam/sdk/schemas/LogicalTypes.java  | 114 ++
 .../java/org/apache/beam/sdk/schemas/Schema.java   | 172 ++---
 .../apache/beam/sdk/schemas/utils/AvroUtils.java   |  29 ++--
 .../main/java/org/apache/beam/sdk/values/Row.java  |  27 +++-
 .../apache/beam/sdk/schemas/AvroSchemaTest.java|  10 +-
 .../org/apache/beam/sdk/schemas/SchemaTest.java|  24 +++
 .../sdk/extensions/sql/impl/rel/BeamCalcRel.java   |  42 +++--
 .../sql/impl/rel/BeamEnumerableConverter.java  |  22 +--
 .../extensions/sql/impl/schema/BeamTableUtils.java |  14 +-
 .../impl/transform/BeamBuiltinAggregations.java|  48 +++---
 .../transform/agg/AggregationCombineFnAdapter.java |   2 +-
 .../extensions/sql/impl/utils/CalciteUtils.java| 156 ++-
 .../provider/bigquery/BigQueryReadWriteIT.java |   3 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|  26 ++--
 17 files changed, 555 insertions(+), 146 deletions(-)
 create mode 100644 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/LogicalTypes.java



[beam] branch master updated: Fix job_PreCommit_Java_Examples_Dataflow glob.

2019-02-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6089a6c  Fix job_PreCommit_Java_Examples_Dataflow glob.
 new 0a3b1c3  Merge pull request #7884: Fix 
job_PreCommit_Java_Examples_Dataflow glob.
6089a6c is described below

commit 6089a6c041d6af435bd0804d84bac17661bab6c4
Author: David Rieber 
AuthorDate: Tue Feb 19 10:40:28 2019 -0800

Fix job_PreCommit_Java_Examples_Dataflow glob.
---
 .test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy 
b/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
index 2da2497..e02dd12 100644
--- a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
@@ -26,7 +26,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
 triggerPathPatterns: [
   '^model/.*$',
   '^sdks/java/.*$',
-  '^runners/google-cloud-dataflow/.*$',
+  '^runners/google-cloud-dataflow-java/.*$',
   '^examples/java/.*$',
   '^release/.*$',
 ]



[beam] branch spark-runner_structured-streaming updated: Fix getSideInputs

2019-02-19 Thread aromanenko
This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
 new 9fcc955  Fix getSideInputs
9fcc955 is described below

commit 9fcc955f5722dcc7899f6ec91b9432444a8dd46c
Author: Alexey Romanenko 
AuthorDate: Tue Feb 19 17:01:04 2019 +0100

Fix getSideInputs
---
 .../translation/TranslationContext.java| 11 ++
 .../CreatePCollectionViewTranslatorBatch.java  | 40 ++
 .../translation/batch/ParDoTranslatorBatch.java|  1 +
 .../translation/batch/PipelineTranslatorBatch.java |  4 +++
 .../translation/batch/ParDoTest.java   | 27 +++
 5 files changed, 83 insertions(+)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 6711b1c..013ef75 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.SparkConf;
@@ -61,6 +62,8 @@ public class TranslationContext {
   @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
   private SparkSession sparkSession;
 
+  private final Map, Dataset> broadcastDataSets;
+
   public TranslationContext(SparkPipelineOptions options) {
 SparkConf sparkConf = new SparkConf();
 sparkConf.setMaster(options.getSparkMaster());
@@ -73,6 +76,7 @@ public class TranslationContext {
 this.serializablePipelineOptions = new 
SerializablePipelineOptions(options);
 this.datasets = new HashMap<>();
 this.leaves = new HashSet<>();
+this.broadcastDataSets = new HashMap<>();
   }
 
   public SparkSession getSparkSession() {
@@ -128,6 +132,13 @@ public class TranslationContext {
 }
   }
 
+  public  void setSideInputDataset(
+  PCollectionView value, Dataset> set) {
+if (!broadcastDataSets.containsKey(value)) {
+  broadcastDataSets.put(value, set);
+}
+  }
+
   // 

   //  PCollections methods
   // 

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java
new file mode 100644
index 000..df4d252
--- /dev/null
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java
@@ -0,0 +1,40 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import 
org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.spark.sql.Dataset;
+
+import java.io.IOException;
+
+class CreatePCollectionViewTranslatorBatch
+implements TransformTranslator, 
PCollection>> {
+
+  @Override
+  public void translateTransform(
+  PTransform, PCollection> transform, 
TranslationContext context) {
+
+Dataset> inputDataSet = 
context.getDataset(context.getInput());
+
+@SuppressWarnings("unchecked") AppliedPTransform<
+PCollection, PCollection,
+PTransform, PCollection>>
+application =
+(AppliedPTransform<
+PCollection, PCollection,
+PTransform, PCollection>>)
+context.getCurrentTransform();
+PCollectionView input;
+try {
+  input = CreatePCollectionViewTranslation.getView(application);
+} ca

[beam] branch master updated: [BEAM-6268] Adjust Cassandra ports

2019-02-19 Thread aromanenko
This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 0fd297e  [BEAM-6268] Adjust Cassandra ports
 new ce64ad5  Merge pull request #7317: [BEAM-6268] Adjust Cassandra ports
0fd297e is described below

commit 0fd297ed628ca94bbc342621aceaca0f9ba71172
Author: Alexey Romanenko 
AuthorDate: Wed Dec 19 16:39:01 2018 +0100

[BEAM-6268] Adjust Cassandra ports
---
 .../beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java| 6 --
 .../sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java | 6 +-
 .../java/io/hadoop-input-format/src/test/resources/cassandra.yaml | 8 
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
index 4b88e32..6000c79 100644
--- 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
@@ -38,20 +38,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /** Tests to validate HadoopFormatIO for embedded Cassandra instance. */
-@Ignore("Ignored because of BEAM-6268")
 @RunWith(JUnit4.class)
 public class HadoopFormatIOCassandraTest implements Serializable {
   private static final long serialVersionUID = 1L;
   private static final String CASSANDRA_KEYSPACE = "beamdb";
   private static final String CASSANDRA_HOST = "127.0.0.1";
   private static final String CASSANDRA_TABLE = "scientists";
+  private static final String CASSANDRA_NATIVE_PORT_PROPERTY = 
"cassandra.input.native.port";
   private static final String CASSANDRA_THRIFT_PORT_PROPERTY = 
"cassandra.input.thrift.port";
   private static final String CASSANDRA_THRIFT_ADDRESS_PROPERTY = 
"cassandra.input.thrift.address";
   private static final String CASSANDRA_PARTITIONER_CLASS_PROPERTY =
@@ -60,6 +59,7 @@ public class HadoopFormatIOCassandraTest implements 
Serializable {
   private static final String CASSANDRA_KEYSPACE_PROPERTY = 
"cassandra.input.keyspace";
   private static final String CASSANDRA_COLUMNFAMILY_PROPERTY = 
"cassandra.input.columnfamily";
   private static final String CASSANDRA_PORT = "9061";
+  private static final String CASSANDRA_NATIVE_PORT = "9042";
   private static transient Cluster cluster;
   private static transient Session session;
   private static final long TEST_DATA_ROW_COUNT = 10L;
@@ -140,6 +140,7 @@ public class HadoopFormatIOCassandraTest implements 
Serializable {
*/
   private Configuration getConfiguration() {
 Configuration conf = new Configuration();
+conf.set(CASSANDRA_NATIVE_PORT_PROPERTY, CASSANDRA_NATIVE_PORT);
 conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, CASSANDRA_PORT);
 conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, CASSANDRA_HOST);
 conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, 
CASSANDRA_PARTITIONER_CLASS_VALUE);
@@ -189,6 +190,7 @@ public class HadoopFormatIOCassandraTest implements 
Serializable {
 .addContactPoint(CASSANDRA_HOST)
 .withClusterName("beam")
 .withSocketOptions(socketOptions)
+.withPort(Integer.valueOf(CASSANDRA_NATIVE_PORT))
 .build();
 session = cluster.connect();
 createCassandraData();
diff --git 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
index 0273261..c8d7d8d 100644
--- 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
+++ 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
@@ -50,6 +50,7 @@ public class HIFIOWithEmbeddedCassandraTest implements 
Serializable {
   private static final String CASSANDRA_KEYSPACE = "beamdb";
   private static final String CASSANDRA_HOST = "127.0.0.1";
   private static final String CASSANDRA_TABLE = "scientists";
+  private static final String CASSANDRA_NATIVE_PORT_PROPERTY = 
"cassandra.input.native.port";
   private static final String CASSANDRA_THRIFT_PORT_PROPERTY = 
"cassandra.input.thrift.port";
   private static final String CASSANDRA_THRIFT_ADDRESS_PROPERTY = 
"cassandra.input.thrift.address";

[beam] branch master updated: [BEAM-6699] Configure artifact server port in DockerizedJobContainer

2019-02-19 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 3e5afb5  [BEAM-6699] Configure artifact server port in 
DockerizedJobContainer
 new 95791d2  Merge pull request #7870: [BEAM-6699] Configure artifact 
server port in DockerizedJobContainer
3e5afb5 is described below

commit 3e5afb5b18dcc490ce65f2b6bd186b815a19dbf1
Author: Maximilian Michels 
AuthorDate: Mon Feb 18 10:27:32 2019 +0100

[BEAM-6699] Configure artifact server port in DockerizedJobContainer

On Docker-On-Mac, we need to explicitly set a port because we have to 
configure
port forwardings when bringing up the job server container. The artifact 
server
did not configure a port and we tried to use 0 as a port forwarding.
---
 .../apache_beam/runners/portability/job_server.py  | 39 ++
 .../runners/portability/portable_runner.py |  2 ++
 2 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/job_server.py 
b/sdks/python/apache_beam/runners/portability/job_server.py
index 8498deb..844f07e 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -57,9 +57,10 @@ class DockerizedJobServer(object):
# "sibling" containers for the SDK harness.
"-v", ':'.join([docker_path, "/bin/docker"]),
"-v", "/var/run/docker.sock:/var/run/docker.sock"]
-self.job_port = DockerizedJobServer._pick_port(self.job_port)
-# artifact_port 0 suggest to pick a dynamic port.
-self.artifact_port = self.artifact_port if self.artifact_port else 0
+
+self.job_port, self.artifact_port = \
+DockerizedJobServer._pick_port(self.job_port, self.artifact_port)
+
 args = ['--job-host', self.job_host,
 '--job-port', str(self.job_port),
 '--artifact-port', str(self.artifact_port)]
@@ -74,7 +75,6 @@ class DockerizedJobServer(object):
   cmd += ["-p", "{}:{}".format(self.artifact_port, self.artifact_port)]
   cmd += ["-p", "{0}-{1}:{0}-{1}".format(
   self.harness_port_range[0], self.harness_port_range[1])]
-  args += ["--artifact-port", "{}".format(self.artifact_port)]
 else:
   # This shouldn't be set for MacOS because it detroys port forwardings,
   # even though host networking is not supported on MacOS.
@@ -109,12 +109,25 @@ class DockerizedJobServer(object):
 self.docker_process.kill()
 
   @staticmethod
-  def _pick_port(port):
-if port:
-  return port
-# Not perfect, but we have to provide a port to the subprocess.
-s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-s.bind(('localhost', 0))
-_, port = s.getsockname()
-s.close()
-return port
+  def _pick_port(*ports):
+"""
+Returns a list of ports, same length as input ports list, but replaces
+all None or 0 ports with a random free port.
+"""
+sockets = []
+
+def find_free_port(port):
+  if port:
+return port
+  else:
+s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+sockets.append(s)
+s.bind(('localhost', 0))
+_, free_port = s.getsockname()
+return free_port
+
+ports = list(map(find_free_port, ports))
+# Close sockets only now to avoid the same port to be chosen twice
+for s in sockets:
+  s.close()
+return ports
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 87f05d3..97d9c01 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -23,6 +23,7 @@ import json
 import logging
 import os
 import threading
+import time
 from concurrent import futures
 
 import grpc
@@ -239,6 +240,7 @@ class PortableRunner(runner.PipelineRunner):
   num_retries += 1
   if num_retries > max_retries:
 raise e
+  time.sleep(1)
 
 options_response = send_options_request()