[beam] branch master updated: Update Dataflow container version to 20190213
This is an automated email from the ASF dual-hosted git repository. ccy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 97cc27d Update Dataflow container version to 20190213 new d49996c Merge pull request #7891 from charlesccychen/update-container-20190213 97cc27d is described below commit 97cc27d1cea9cec8a2714635a5fab79abbcdb7ea Author: Charles Chen AuthorDate: Tue Feb 19 13:12:39 2019 -0800 Update Dataflow container version to 20190213 --- sdks/python/apache_beam/runners/dataflow/internal/names.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index e42381e..aa60087 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -38,7 +38,7 @@ SERIALIZED_SOURCE_KEY = 'serialized_source' # Update this version to the next version whenever there is a change that will # require changes to legacy Dataflow worker execution environment. -BEAM_CONTAINER_VERSION = 'beam-master-20190208' +BEAM_CONTAINER_VERSION = 'beam-master-20190213' # Update this version to the next version whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20190213'
[beam] branch master updated: [BEAM-6639] Retry pulling clickhouse-server docker image
This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new efe66f1 [BEAM-6639] Retry pulling clickhouse-server docker image new 399de52 Merge pull request #7797: [BEAM-6639] Retry pulling clickhouse-server docker image efe66f1 is described below commit efe66f156fedc0fda412f8737801f4e0777966db Author: Gleb Kanterov AuthorDate: Sat Feb 9 16:59:44 2019 +0100 [BEAM-6639] Retry pulling clickhouse-server docker image --- .../beam/sdk/io/clickhouse/BaseClickHouseTest.java | 61 -- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java index 44e98f7..edf29a5 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java @@ -18,12 +18,25 @@ package org.apache.beam.sdk.io.clickhouse; import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.model.Image; +import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.List; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner; +import org.joda.time.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.BindMode; import org.testcontainers.containers.ClickHouseContainer; import org.testcontainers.containers.GenericContainer; @@ -37,8 +50,16 @@ public class BaseClickHouseTest { public static GenericContainer zookeeper; public static ClickHouseContainer clickHouse; + // yandex/clickhouse-server:19.1.6 + // use SHA256 not to pull docker hub for tag if image already exists locally + private static final String CLICKHOUSE_IMAGE = + "yandex/clickhouse-server@" + + "sha256:c75f66f3619ca70a9f7215966505eaed2fc0ca0ee7d6a7b5407d1b14df8ddefc"; + + private static final Logger LOG = LoggerFactory.getLogger(BaseClickHouseTest.class); + @BeforeClass - public static void setup() { + public static void setup() throws IOException, InterruptedException { // network sharing doesn't work with ClassRule network = Network.newNetwork(); @@ -48,11 +69,13 @@ public class BaseClickHouseTest { .withExposedPorts(2181) .withNetwork(network) .withNetworkAliases("zookeeper"); + +// so far zookeeper container always starts successfully, so no extra retries zookeeper.start(); clickHouse = (ClickHouseContainer) -new ClickHouseContainer("yandex/clickhouse-server:19.1") +new ClickHouseContainer(CLICKHOUSE_IMAGE) .withStartupAttempts(10) .withCreateContainerCmdModifier( // type inference for `(CreateContainerCmd) -> cmd.` doesn't work @@ -65,7 +88,39 @@ public class BaseClickHouseTest { "config.d/zookeeper_default.xml", "/etc/clickhouse-server/config.d/zookeeper_default.xml", BindMode.READ_ONLY); -clickHouse.start(); + +BackOff backOff = +FluentBackoff.DEFAULT +.withMaxRetries(3) +.withInitialBackoff(Duration.standardSeconds(15)) +.backoff(); + +// try to start clickhouse-server a couple of times, see BEAM-6639 +while (true) { + try { +Unreliables.retryUntilSuccess( +10, +() -> { + DockerClientFactory.instance() + .checkAndPullImage(DockerClientFactory.instance().client(), CLICKHOUSE_IMAGE); + + return null; +}); + +clickHouse.start(); +break; + } catch (Exception e) { +if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) { + throw e; +} else { + List images = + DockerClientFactory.instance().client().listImagesCmd().withShowAll(true).exec(); + String listImagesOutput = "listImagesCmd:\n" + Joiner.on('\n').join(images) + "\n"; + + LOG.warn("failed to start clickhouse-server\n\n" + listImagesOutput, e); +} + } +} } @AfterClass
[beam] branch master updated: [BEAM-6459] Don't run setupVirtualenv during clean
This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 354bca0 [BEAM-6459] Don't run setupVirtualenv during clean new dc05153 Merge pull request #7897: [BEAM-6459] Don't run setupVirtualenv during clean 354bca0 is described below commit 354bca011543a17cdde2993449b0d4c6174558bc Author: Udi Meiri AuthorDate: Tue Feb 19 16:19:15 2019 -0800 [BEAM-6459] Don't run setupVirtualenv during clean `cleanPython` no longer depends on `setupVirtualenv`. --- .../src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 779915d..3fd4435 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1631,11 +1631,14 @@ class BeamModulePlugin implements Plugin { } project.installGcpTest.mustRunAfter project.sdist - project.task('cleanPython', dependsOn: 'setupVirtualenv') { + project.task('cleanPython') { doLast { + def activate = "${project.ext.envdir}/bin/activate" project.exec { executable 'sh' -args '-c', ". ${project.ext.envdir}/bin/activate && python ${pythonRootDir}/setup.py clean" +args '-c', "if [ -e ${activate} ]; then " + +". ${activate} && python ${pythonRootDir}/setup.py clean; " + +"fi" } project.delete project.buildDir // Gradle build directory project.delete project.ext.envdir // virtualenv directory
[beam] branch master updated (b065099 -> 5bfba4c)
This is an automated email from the ASF dual-hosted git repository. ccy pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from b065099 Merge pull request #7858 from tvalentyn/patch-39 new 880d00b Unskip tests new f87f7b2 Fix typehinting for Python 3 versions < 3.5.3 new 5bfba4c Merge pull request #7873 from RobbeSneyders/tuple-typing The 20258 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../typehints/native_type_compatibility.py | 22 +- .../typehints/native_type_compatibility_test.py| 10 -- .../apache_beam/typehints/typed_pipeline_test.py | 6 -- 3 files changed, 21 insertions(+), 17 deletions(-)
[beam] branch master updated: Update FnAPI dev container version.
This is an automated email from the ASF dual-hosted git repository. ccy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ac6c425 Update FnAPI dev container version. new b065099 Merge pull request #7858 from tvalentyn/patch-39 ac6c425 is described below commit ac6c4256a8258dcd4618e5693fbe1fb16d324a38 Author: tvalentyn AuthorDate: Fri Feb 15 14:05:16 2019 -0800 Update FnAPI dev container version. --- sdks/python/apache_beam/runners/dataflow/internal/names.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 5670d6f..e42381e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -41,7 +41,7 @@ SERIALIZED_SOURCE_KEY = 'serialized_source' BEAM_CONTAINER_VERSION = 'beam-master-20190208' # Update this version to the next version whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. -BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20190208' +BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20190213' # TODO(BEAM-5939): Remove these shared names once Dataflow worker is updated. PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
[beam] branch master updated: Revert "[BEAM-6553] A Python SDK sink that supports File Loads into BQ (#7655)"
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 583cc5e Revert "[BEAM-6553] A Python SDK sink that supports File Loads into BQ (#7655)" new b25dd39 Merge pull request #7887 from pabloem/fix-bq-iss 583cc5e is described below commit 583cc5eb476335faa46394c13eb7dbd1fda6a2b1 Author: Pablo AuthorDate: Tue Feb 19 11:44:28 2019 -0800 Revert "[BEAM-6553] A Python SDK sink that supports File Loads into BQ (#7655)" This reverts commit cdea885872b3be7de9ba22f22700be89f7d53766. --- .../io/gcp/big_query_query_to_table_it_test.py | 7 - .../io/gcp/big_query_query_to_table_pipeline.py| 7 +- sdks/python/apache_beam/io/gcp/bigquery.py | 97 +--- .../apache_beam/io/gcp/bigquery_file_loads.py | 601 - .../apache_beam/io/gcp/bigquery_file_loads_test.py | 462 sdks/python/apache_beam/io/gcp/bigquery_tools.py | 92 +--- .../apache_beam/io/gcp/tests/bigquery_matcher.py | 58 +- .../runners/dataflow/dataflow_runner.py| 21 + sdks/python/apache_beam/transforms/ptransform.py | 6 +- 9 files changed, 55 insertions(+), 1296 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py index 8980261..43db185 100644 --- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py @@ -128,11 +128,8 @@ class BigQueryQueryToTableIT(unittest.TestCase): project=self.project, query=verify_query, checksum=expected_checksum)] - -gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table extra_opts = {'query': LEGACY_QUERY, 'output': self.output_table, - 'gs_location': gs_location, 'output_schema': DIALECT_OUTPUT_SCHEMA, 'use_standard_sql': False, 'on_success_matcher': all_of(*pipeline_verifiers)} @@ -147,10 +144,8 @@ class BigQueryQueryToTableIT(unittest.TestCase): project=self.project, query=verify_query, checksum=expected_checksum)] -gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table extra_opts = {'query': STANDARD_QUERY, 'output': self.output_table, - 'gs_location': gs_location, 'output_schema': DIALECT_OUTPUT_SCHEMA, 'use_standard_sql': True, 'on_success_matcher': all_of(*pipeline_verifiers)} @@ -191,11 +186,9 @@ class BigQueryQueryToTableIT(unittest.TestCase): query=verify_query, checksum=expected_checksum)] self._setup_new_types_env() -gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table extra_opts = { 'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE), 'output': self.output_table, -'gs_location': gs_location, 'output_schema': NEW_TYPES_OUTPUT_SCHEMA, 'use_standard_sql': False, 'on_success_matcher': all_of(*pipeline_verifiers)} diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py index b35f4e5..26b418a 100644 --- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py @@ -50,9 +50,6 @@ def run_bq_pipeline(argv=None): help='Output BQ table to write results to.') parser.add_argument('--kms_key', default=None, help='Use this Cloud KMS key with BigQuery.') - parser.add_argument('--gs_location', - default=None, - help='GCS bucket location to use to store files.') known_args, pipeline_args = parser.parse_known_args(argv) table_schema = parse_table_schema_from_json(known_args.output_schema) @@ -65,12 +62,12 @@ def run_bq_pipeline(argv=None): (p | 'read' >> beam.io.Read(beam.io.BigQuerySource( query=known_args.query, use_standard_sql=known_args.use_standard_sql, kms_key=kms_key)) - | 'write' >> beam.io.WriteToBigQuery( + | 'write' >> beam.io.Write(beam.io.BigQuerySink( known_args.output, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY, - gs_location=known_args.gs_location)) + kms_key=known_args.kms_key))) result = p.run() result.wait_until_finish() diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index b1c1f90..8f3011b 100
[beam] branch master updated: Use same trigger for Py2 and Py3 postcommit test suites.
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ed86afa Use same trigger for Py2 and Py3 postcommit test suites. new 240c723 Merge pull request #7881 from tvalentyn/patch-40 ed86afa is described below commit ed86afaaae4532c52a85a30bd5ddccc68f55becb Author: tvalentyn AuthorDate: Tue Feb 19 09:42:09 2019 -0800 Use same trigger for Py2 and Py3 postcommit test suites. --- .test-infra/jenkins/job_PostCommit_Python3_Verify.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.test-infra/jenkins/job_PostCommit_Python3_Verify.groovy b/.test-infra/jenkins/job_PostCommit_Python3_Verify.groovy index 1eb1556..e51ccce 100644 --- a/.test-infra/jenkins/job_PostCommit_Python3_Verify.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python3_Verify.groovy @@ -20,7 +20,7 @@ import CommonJobProperties as commonJobProperties import PostcommitJobBuilder // This job defines the Python postcommit tests. -PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python3_Verify', 'Run Python3 PostCommit', +PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python3_Verify', 'Run Python PostCommit', 'Python SDK PostCommit Tests on Python 3', this) { description('Runs postcommit tests on the Python SDK on Python 3.')
[beam] branch master updated (0a3b1c3 -> b2fa119)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 0a3b1c3 Merge pull request #7884: Fix job_PreCommit_Java_Examples_Dataflow glob. add b2fa119 Merge pull request #7865: [BEAM-6701] Add logical types to schema No new revisions were added by this update. Summary of changes: .../java/org/apache/beam/sdk/coders/RowCoder.java | 2 + .../apache/beam/sdk/coders/RowCoderGenerator.java | 6 +- .../beam/sdk/schemas/FieldTypeDescriptors.java | 4 + .../org/apache/beam/sdk/schemas/LogicalTypes.java | 114 ++ .../java/org/apache/beam/sdk/schemas/Schema.java | 172 ++--- .../apache/beam/sdk/schemas/utils/AvroUtils.java | 29 ++-- .../main/java/org/apache/beam/sdk/values/Row.java | 27 +++- .../apache/beam/sdk/schemas/AvroSchemaTest.java| 10 +- .../org/apache/beam/sdk/schemas/SchemaTest.java| 24 +++ .../sdk/extensions/sql/impl/rel/BeamCalcRel.java | 42 +++-- .../sql/impl/rel/BeamEnumerableConverter.java | 22 +-- .../extensions/sql/impl/schema/BeamTableUtils.java | 14 +- .../impl/transform/BeamBuiltinAggregations.java| 48 +++--- .../transform/agg/AggregationCombineFnAdapter.java | 2 +- .../extensions/sql/impl/utils/CalciteUtils.java| 156 ++- .../provider/bigquery/BigQueryReadWriteIT.java | 3 +- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 26 ++-- 17 files changed, 555 insertions(+), 146 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/LogicalTypes.java
[beam] branch master updated: Fix job_PreCommit_Java_Examples_Dataflow glob.
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6089a6c Fix job_PreCommit_Java_Examples_Dataflow glob. new 0a3b1c3 Merge pull request #7884: Fix job_PreCommit_Java_Examples_Dataflow glob. 6089a6c is described below commit 6089a6c041d6af435bd0804d84bac17661bab6c4 Author: David Rieber AuthorDate: Tue Feb 19 10:40:28 2019 -0800 Fix job_PreCommit_Java_Examples_Dataflow glob. --- .test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy b/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy index 2da2497..e02dd12 100644 --- a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy +++ b/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy @@ -26,7 +26,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder( triggerPathPatterns: [ '^model/.*$', '^sdks/java/.*$', - '^runners/google-cloud-dataflow/.*$', + '^runners/google-cloud-dataflow-java/.*$', '^examples/java/.*$', '^release/.*$', ]
[beam] branch spark-runner_structured-streaming updated: Fix getSideInputs
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 9fcc955 Fix getSideInputs 9fcc955 is described below commit 9fcc955f5722dcc7899f6ec91b9432444a8dd46c Author: Alexey Romanenko AuthorDate: Tue Feb 19 17:01:04 2019 +0100 Fix getSideInputs --- .../translation/TranslationContext.java| 11 ++ .../CreatePCollectionViewTranslatorBatch.java | 40 ++ .../translation/batch/ParDoTranslatorBatch.java| 1 + .../translation/batch/PipelineTranslatorBatch.java | 4 +++ .../translation/batch/ParDoTest.java | 27 +++ 5 files changed, 83 insertions(+) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 6711b1c..013ef75 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.SparkConf; @@ -61,6 +62,8 @@ public class TranslationContext { @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy private SparkSession sparkSession; + private final Map, Dataset> broadcastDataSets; + public TranslationContext(SparkPipelineOptions options) { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(options.getSparkMaster()); @@ -73,6 +76,7 @@ public class TranslationContext { this.serializablePipelineOptions = new SerializablePipelineOptions(options); this.datasets = new HashMap<>(); this.leaves = new HashSet<>(); +this.broadcastDataSets = new HashMap<>(); } public SparkSession getSparkSession() { @@ -128,6 +132,13 @@ public class TranslationContext { } } + public void setSideInputDataset( + PCollectionView value, Dataset> set) { +if (!broadcastDataSets.containsKey(value)) { + broadcastDataSets.put(value, set); +} + } + // // PCollections methods // diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java new file mode 100644 index 000..df4d252 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java @@ -0,0 +1,40 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; +import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.spark.sql.Dataset; + +import java.io.IOException; + +class CreatePCollectionViewTranslatorBatch +implements TransformTranslator, PCollection>> { + + @Override + public void translateTransform( + PTransform, PCollection> transform, TranslationContext context) { + +Dataset> inputDataSet = context.getDataset(context.getInput()); + +@SuppressWarnings("unchecked") AppliedPTransform< +PCollection, PCollection, +PTransform, PCollection>> +application = +(AppliedPTransform< +PCollection, PCollection, +PTransform, PCollection>>) +context.getCurrentTransform(); +PCollectionView input; +try { + input = CreatePCollectionViewTranslation.getView(application); +} ca
[beam] branch master updated: [BEAM-6268] Adjust Cassandra ports
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0fd297e [BEAM-6268] Adjust Cassandra ports new ce64ad5 Merge pull request #7317: [BEAM-6268] Adjust Cassandra ports 0fd297e is described below commit 0fd297ed628ca94bbc342621aceaca0f9ba71172 Author: Alexey Romanenko AuthorDate: Wed Dec 19 16:39:01 2018 +0100 [BEAM-6268] Adjust Cassandra ports --- .../beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java| 6 -- .../sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java | 6 +- .../java/io/hadoop-input-format/src/test/resources/cassandra.yaml | 8 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java index 4b88e32..6000c79 100644 --- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java @@ -38,20 +38,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests to validate HadoopFormatIO for embedded Cassandra instance. */ -@Ignore("Ignored because of BEAM-6268") @RunWith(JUnit4.class) public class HadoopFormatIOCassandraTest implements Serializable { private static final long serialVersionUID = 1L; private static final String CASSANDRA_KEYSPACE = "beamdb"; private static final String CASSANDRA_HOST = "127.0.0.1"; private static final String CASSANDRA_TABLE = "scientists"; + private static final String CASSANDRA_NATIVE_PORT_PROPERTY = "cassandra.input.native.port"; private static final String CASSANDRA_THRIFT_PORT_PROPERTY = "cassandra.input.thrift.port"; private static final String CASSANDRA_THRIFT_ADDRESS_PROPERTY = "cassandra.input.thrift.address"; private static final String CASSANDRA_PARTITIONER_CLASS_PROPERTY = @@ -60,6 +59,7 @@ public class HadoopFormatIOCassandraTest implements Serializable { private static final String CASSANDRA_KEYSPACE_PROPERTY = "cassandra.input.keyspace"; private static final String CASSANDRA_COLUMNFAMILY_PROPERTY = "cassandra.input.columnfamily"; private static final String CASSANDRA_PORT = "9061"; + private static final String CASSANDRA_NATIVE_PORT = "9042"; private static transient Cluster cluster; private static transient Session session; private static final long TEST_DATA_ROW_COUNT = 10L; @@ -140,6 +140,7 @@ public class HadoopFormatIOCassandraTest implements Serializable { */ private Configuration getConfiguration() { Configuration conf = new Configuration(); +conf.set(CASSANDRA_NATIVE_PORT_PROPERTY, CASSANDRA_NATIVE_PORT); conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, CASSANDRA_PORT); conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, CASSANDRA_HOST); conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, CASSANDRA_PARTITIONER_CLASS_VALUE); @@ -189,6 +190,7 @@ public class HadoopFormatIOCassandraTest implements Serializable { .addContactPoint(CASSANDRA_HOST) .withClusterName("beam") .withSocketOptions(socketOptions) +.withPort(Integer.valueOf(CASSANDRA_NATIVE_PORT)) .build(); session = cluster.connect(); createCassandraData(); diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java index 0273261..c8d7d8d 100644 --- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java +++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java @@ -50,6 +50,7 @@ public class HIFIOWithEmbeddedCassandraTest implements Serializable { private static final String CASSANDRA_KEYSPACE = "beamdb"; private static final String CASSANDRA_HOST = "127.0.0.1"; private static final String CASSANDRA_TABLE = "scientists"; + private static final String CASSANDRA_NATIVE_PORT_PROPERTY = "cassandra.input.native.port"; private static final String CASSANDRA_THRIFT_PORT_PROPERTY = "cassandra.input.thrift.port"; private static final String CASSANDRA_THRIFT_ADDRESS_PROPERTY = "cassandra.input.thrift.address";
[beam] branch master updated: [BEAM-6699] Configure artifact server port in DockerizedJobContainer
This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 3e5afb5 [BEAM-6699] Configure artifact server port in DockerizedJobContainer new 95791d2 Merge pull request #7870: [BEAM-6699] Configure artifact server port in DockerizedJobContainer 3e5afb5 is described below commit 3e5afb5b18dcc490ce65f2b6bd186b815a19dbf1 Author: Maximilian Michels AuthorDate: Mon Feb 18 10:27:32 2019 +0100 [BEAM-6699] Configure artifact server port in DockerizedJobContainer On Docker-On-Mac, we need to explicitly set a port because we have to configure port forwardings when bringing up the job server container. The artifact server did not configure a port and we tried to use 0 as a port forwarding. --- .../apache_beam/runners/portability/job_server.py | 39 ++ .../runners/portability/portable_runner.py | 2 ++ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index 8498deb..844f07e 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -57,9 +57,10 @@ class DockerizedJobServer(object): # "sibling" containers for the SDK harness. "-v", ':'.join([docker_path, "/bin/docker"]), "-v", "/var/run/docker.sock:/var/run/docker.sock"] -self.job_port = DockerizedJobServer._pick_port(self.job_port) -# artifact_port 0 suggest to pick a dynamic port. -self.artifact_port = self.artifact_port if self.artifact_port else 0 + +self.job_port, self.artifact_port = \ +DockerizedJobServer._pick_port(self.job_port, self.artifact_port) + args = ['--job-host', self.job_host, '--job-port', str(self.job_port), '--artifact-port', str(self.artifact_port)] @@ -74,7 +75,6 @@ class DockerizedJobServer(object): cmd += ["-p", "{}:{}".format(self.artifact_port, self.artifact_port)] cmd += ["-p", "{0}-{1}:{0}-{1}".format( self.harness_port_range[0], self.harness_port_range[1])] - args += ["--artifact-port", "{}".format(self.artifact_port)] else: # This shouldn't be set for MacOS because it detroys port forwardings, # even though host networking is not supported on MacOS. @@ -109,12 +109,25 @@ class DockerizedJobServer(object): self.docker_process.kill() @staticmethod - def _pick_port(port): -if port: - return port -# Not perfect, but we have to provide a port to the subprocess. -s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -s.bind(('localhost', 0)) -_, port = s.getsockname() -s.close() -return port + def _pick_port(*ports): +""" +Returns a list of ports, same length as input ports list, but replaces +all None or 0 ports with a random free port. +""" +sockets = [] + +def find_free_port(port): + if port: +return port + else: +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sockets.append(s) +s.bind(('localhost', 0)) +_, free_port = s.getsockname() +return free_port + +ports = list(map(find_free_port, ports)) +# Close sockets only now to avoid the same port to be chosen twice +for s in sockets: + s.close() +return ports diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 87f05d3..97d9c01 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -23,6 +23,7 @@ import json import logging import os import threading +import time from concurrent import futures import grpc @@ -239,6 +240,7 @@ class PortableRunner(runner.PipelineRunner): num_retries += 1 if num_retries > max_retries: raise e + time.sleep(1) options_response = send_options_request()