[beam] branch master updated: [BEAM-6828] Adding ValueProvider support for BQ transforms.
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 1c28082 [BEAM-6828] Adding ValueProvider support for BQ transforms. new 10d4839 Merge pull request #8045 from pabloem/vp-bq 1c28082 is described below commit 1c280820a23ad662788183f0a75fab9865acbec0 Author: pabloem AuthorDate: Tue Mar 12 16:11:56 2019 -0700 [BEAM-6828] Adding ValueProvider support for BQ transforms. --- .../examples/cookbook/bigquery_tornadoes.py| 2 +- sdks/python/apache_beam/io/gcp/bigquery.py | 40 +--- .../apache_beam/io/gcp/bigquery_file_loads.py | 5 ++- .../apache_beam/io/gcp/bigquery_file_loads_test.py | 43 -- sdks/python/apache_beam/io/gcp/bigquery_test.py| 39 sdks/python/apache_beam/io/gcp/bigquery_tools.py | 27 -- 6 files changed, 117 insertions(+), 39 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py index 9db0f73..9be3f89 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py @@ -75,7 +75,7 @@ def run(argv=None): 'or DATASET.TABLE.')) parser.add_argument('--gcs_location', - required=True, + required=False, help=('GCS Location to store files to load ' 'data into Bigquery')) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index a1b6f99..27ee67a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -626,6 +626,8 @@ class BigQueryWriteFn(DoFn): """ if schema is None: return schema +elif isinstance(schema, (str, unicode)): + return bigquery_tools.parse_table_schema_from_json(schema) elif isinstance(schema, dict): return bigquery_tools.parse_table_schema_from_json(json.dumps(schema)) else: @@ -644,7 +646,7 @@ class BigQueryWriteFn(DoFn): num_retries=1, max_delay_secs=1500)) - def _create_table_if_needed(self, schema, table_reference): + def _create_table_if_needed(self, table_reference, schema=None): str_table_reference = '%s:%s.%s' % ( table_reference.projectId, table_reference.datasetId, @@ -673,8 +675,8 @@ class BigQueryWriteFn(DoFn): schema = destination[1] destination = destination[0] self._create_table_if_needed( - schema, - bigquery_tools.parse_table_reference(destination)) + bigquery_tools.parse_table_reference(destination), + schema) row = element[1] self._rows_buffer[destination].append(row) @@ -766,7 +768,7 @@ class WriteToBigQuery(PTransform): """Initialize a WriteToBigQuery transform. Args: - table (str, callable): The ID of the table, or a callable + table (str, callable, ValueProvider): The ID of the table, or a callable that returns it. The ID must contain only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset argument is :data:`None` then the table argument must contain the entire table @@ -782,10 +784,11 @@ class WriteToBigQuery(PTransform): project (str): The ID of the project containing this table or :data:`None` if the table reference is specified entirely by the table argument. - schema (str): The schema to be used if the BigQuery table to write has to -be created. This can be either specified as a -:class:`~apache_beam.io.gcp.internal.clients.bigquery.\ -bigquery_v2_messages.TableSchema` + schema (str,dict,ValueProvider): The schema to be used if the +BigQuery table to write has to be created. This can be either specified +as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, +or a python dictionary, or the string or dictionary itself. object or a single string of the form ``'field1:type1,field2:type2,field3:type3'`` that defines a comma separated list of fields. Here ``'type'`` should specify the BigQuery @@ -925,6 +928,8 @@ bigquery_v2_messages.TableSchema): return WriteToBigQuery.table_schema_to_dict(table_schema) elif isinstance(schema, bigquery.TableSchema): return WriteToBigQuery.table_schema_to_dict(schema) +elif isinstance(schema, vp.ValueProvider): + return schema else: raise TypeError('Unexpected schema argument: %s.' % schema) @@ -934,7 +939,7 @@ bigquery_v2_messages.TableSchema): # TODO(pabloem): Use a different
[beam] branch aaltay-patch-1 deleted (was 6adb5a5)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch aaltay-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git. was 6adb5a5 Fix a typo The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[beam] branch master updated (652adec -> e08db7f)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 652adec Merge pull request #8044 from amaliujia/rw_add_list_row add 6adb5a5 Fix a typo add e08db7f Merge pull request #8073 from apache/aaltay-patch-1 No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/metrics/execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch aaltay-patch-1 created (now 6adb5a5)
This is an automated email from the ASF dual-hosted git repository. altay pushed a change to branch aaltay-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git. at 6adb5a5 Fix a typo This branch includes the following new commits: new 6adb5a5 Fix a typo The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[beam] 01/01: Fix a typo
This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch aaltay-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git commit 6adb5a5763abf973786e868d103b24d8a30f8862 Author: Ahmet Altay AuthorDate: Fri Mar 15 16:23:54 2019 -0700 Fix a typo --- sdks/python/apache_beam/metrics/execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 57a0a09..4f873b8 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -49,7 +49,7 @@ class MetricKey(object): """Key used to identify instance of metric cell. Metrics are internally keyed by the name of the step they're associated with, - the name and namespace (if it is a user defined metric) of the the metric, + the name and namespace (if it is a user defined metric) of the metric, and any extra label metadata added by the runner specific metric collection service. """
[beam] branch aaltay-patch-1 deleted (was 47db394)
This is an automated email from the ASF dual-hosted git repository. altay pushed a change to branch aaltay-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git. was 47db394 Fix a typo The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[beam] branch master updated: [BEAM-6814] toListRow in BeamEnumerableConverter.
This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5b2fb1c [BEAM-6814] toListRow in BeamEnumerableConverter. new 652adec Merge pull request #8044 from amaliujia/rw_add_list_row 5b2fb1c is described below commit 5b2fb1c854706369472a7afbde2e8bcabd4df659 Author: amaliujia AuthorDate: Tue Mar 12 15:51:17 2019 -0700 [BEAM-6814] toListRow in BeamEnumerableConverter. --- .../sql/impl/rel/BeamEnumerableConverter.java | 63 -- .../sql/impl/rel/BeamEnumerableConverterTest.java | 17 ++ 2 files changed, 65 insertions(+), 15 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 3fb9a67..755d589 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -129,6 +129,15 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable return options; } + static List toRowList(PipelineOptions options, BeamRelNode node) { +if (node instanceof BeamIOSinkRel) { + throw new UnsupportedOperationException("Does not support BeamIOSinkRel in toRowList."); +} else if (isLimitQuery(node)) { + throw new UnsupportedOperationException("Does not support queries with LIMIT in toRowList."); +} +return collectRowList(options, node); + } + static Enumerable toEnumerable(PipelineOptions options, BeamRelNode node) { if (node instanceof BeamIOSinkRel) { return count(options, node); @@ -143,7 +152,7 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable PipelineOptions options, BeamRelNode node, DoFn doFn, - Queue values, + Queue values, int limitCount) { options.as(DirectOptions.class).setBlockOnRun(false); Pipeline pipeline = Pipeline.create(options); @@ -174,9 +183,36 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable return result; } + private static void runCollector(PipelineOptions options, BeamRelNode node) { +Pipeline pipeline = Pipeline.create(options); +PCollection resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node); +resultCollection.apply(ParDo.of(new Collector())); +PipelineResult result = pipeline.run(); +result.waitUntilFinish(); + } + + private static List collectRowList(PipelineOptions options, BeamRelNode node) { +long id = options.getOptionsId(); +Queue values = new ConcurrentLinkedQueue<>(); + +checkArgument( +options +.getRunner() +.getCanonicalName() +.equals("org.apache.beam.runners.direct.DirectRunner"), +"collectRowList is only available in direct runner."); + +Collector.globalValues.put(id, values); + +runCollector(options, node); + +Collector.globalValues.remove(id); +return values.stream().collect(Collectors.toList()); + } + private static Enumerable collect(PipelineOptions options, BeamRelNode node) { long id = options.getOptionsId(); -Queue values = new ConcurrentLinkedQueue<>(); +Queue values = new ConcurrentLinkedQueue<>(); checkArgument( options @@ -187,20 +223,16 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable Collector.globalValues.put(id, values); -Pipeline pipeline = Pipeline.create(options); -PCollection resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node); -resultCollection.apply(ParDo.of(new Collector())); -PipelineResult result = pipeline.run(); -result.waitUntilFinish(); +runCollector(options, node); Collector.globalValues.remove(id); -return Linq4j.asEnumerable(unboxValues(values)); +return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values)); } private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) { long id = options.getOptionsId(); -ConcurrentLinkedQueue values = new ConcurrentLinkedQueue<>(); +ConcurrentLinkedQueue values = new ConcurrentLinkedQueue<>(); checkArgument( options @@ -220,15 +252,15 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable values.remove(); } -return Linq4j.asEnumerable(unboxValues(values)); +return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values)); } private static class Collector extends DoFn { // This will only work on the direct runner. -private static final
[beam] branch master updated: [BEAM-6185] Upgrade Spark to version 2.4.0
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 8d4ce24 [BEAM-6185] Upgrade Spark to version 2.4.0 new e68ba74 Merge pull request #7216: [BEAM-6185] Upgrade Spark to version 2.4.0 8d4ce24 is described below commit 8d4ce24ffaae2cb4c5332c5f7692a48e007fb731 Author: Jean-Baptiste Onofré AuthorDate: Thu Dec 6 12:18:56 2018 +0100 [BEAM-6185] Upgrade Spark to version 2.4.0 --- buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 7c50a52..8ef5246 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -354,7 +354,7 @@ class BeamModulePlugin implements Plugin { def hamcrest_version = "1.3" def hadoop_version = "2.7.3" def jackson_version = "2.9.8" -def spark_version = "2.3.2" +def spark_version = "2.4.0" def nemo_version = "0.1" def apex_core_version = "3.7.0" def apex_malhar_version = "3.4.0"
[beam] branch master updated: [BEAM-6773] Add ValueProvider to CassandraIO.Read
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 24406c9 [BEAM-6773] Add ValueProvider to CassandraIO.Read new 6e96e23 Merge pull request #8024: [BEAM-6773] Add ValueProvider to CassandraIO.Read 24406c9 is described below commit 24406c9d3259a511646be661c687c73e3c2a531b Author: Radoslaw Stankiewicz AuthorDate: Wed Mar 6 17:37:26 2019 +0100 [BEAM-6773] Add ValueProvider to CassandraIO.Read Enable parametrization at runtime and enable future integrations with templates --- .../apache/beam/sdk/io/cassandra/CassandraIO.java | 181 + .../beam/sdk/io/cassandra/CassandraIOTest.java | 20 +++ 2 files changed, 167 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java index 06e6d1a..1e34a27 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -131,16 +132,16 @@ public class CassandraIO { @AutoValue public abstract static class Read extends PTransform> { @Nullable -abstract List hosts(); +abstract ValueProvider> hosts(); @Nullable -abstract Integer port(); +abstract ValueProvider port(); @Nullable -abstract String keyspace(); +abstract ValueProvider keyspace(); @Nullable -abstract String table(); +abstract ValueProvider table(); @Nullable abstract Class entity(); @@ -149,10 +150,10 @@ public class CassandraIO { abstract Coder coder(); @Nullable -abstract String username(); +abstract ValueProvider username(); @Nullable -abstract String password(); +abstract ValueProvider password(); @Nullable abstract String encryptedPassword(); @@ -161,16 +162,16 @@ public class CassandraIO { abstract PasswordDecrypter passwordDecrypter(); @Nullable -abstract String localDc(); +abstract ValueProvider localDc(); @Nullable -abstract String consistencyLevel(); +abstract ValueProvider consistencyLevel(); @Nullable -abstract String where(); +abstract ValueProvider where(); @Nullable -abstract Integer minNumberOfSplits(); +abstract ValueProvider minNumberOfSplits(); abstract Builder builder(); @@ -178,24 +179,44 @@ public class CassandraIO { public Read withHosts(List hosts) { checkArgument(hosts != null, "hosts can not be null"); checkArgument(!hosts.isEmpty(), "hosts can not be empty"); + return builder().setHosts(ValueProvider.StaticValueProvider.of(hosts)).build(); +} + +/** Specify the hosts of the Apache Cassandra instances. */ +public Read withHosts(ValueProvider> hosts) { return builder().setHosts(hosts).build(); } /** Specify the port number of the Apache Cassandra instances. */ public Read withPort(int port) { checkArgument(port > 0, "port must be > 0, but was: %s", port); + return builder().setPort(ValueProvider.StaticValueProvider.of(port)).build(); +} + +/** Specify the port number of the Apache Cassandra instances. */ +public Read withPort(ValueProvider port) { return builder().setPort(port).build(); } /** Specify the Cassandra keyspace where to read data. */ public Read withKeyspace(String keyspace) { checkArgument(keyspace != null, "keyspace can not be null"); + return builder().setKeyspace(ValueProvider.StaticValueProvider.of(keyspace)).build(); +} + +/** Specify the Cassandra keyspace where to read data. */ +public Read withKeyspace(ValueProvider keyspace) { return builder().setKeyspace(keyspace).build(); } /** Specify the Cassandra table where to read data. */ public Read withTable(String table) { checkArgument(table != null, "table can not be null"); + return builder().setTable(ValueProvider.StaticValueProvider.of(table)).build(); +} + +/** Specify the Cassandra table where to read data. */ +public Read withTable(ValueProvider table) { return builder().setTable(table).build(); } @@ -218,12 +239,22 @@ public class CassandraIO { /** Specify the username for authentication. */
[beam] branch master updated: Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2c2b9d2 Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038) 2c2b9d2 is described below commit 2c2b9d2ba7c3cea048688a8fc0abd25030046005 Author: Alex Amato AuthorDate: Fri Mar 15 13:26:34 2019 -0700 Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038) [BEAM-6844] Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. --- .../dataflow/dataflow_exercise_metrics_pipeline.py | 199 + .../dataflow_exercise_metrics_pipeline_test.py | 68 +++ .../runners/dataflow/dataflow_metrics.py | 80 +++-- .../runners/dataflow/dataflow_metrics_test.py | 10 +- 4 files changed, 336 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py new file mode 100644 index 000..a77497f --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py @@ -0,0 +1,199 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A word-counting workflow.""" + +from __future__ import absolute_import + +import time + +from hamcrest.library.number.ordering_comparison import greater_than + +import apache_beam as beam +from apache_beam.metrics import Metrics +from apache_beam.testing.metric_result_matchers import DistributionMatcher +from apache_beam.testing.metric_result_matchers import MetricResultMatcher + +SLEEP_TIME_SECS = 1 +INPUT = [0, 0, 0, 100] +METRIC_NAMESPACE = ('apache_beam.runners.dataflow.' +'dataflow_exercise_metrics_pipeline.UserMetricsDoFn') + + +def common_metric_matchers(): + """MetricResult matchers common to all tests.""" + # TODO(ajamato): Matcher for the 'metrics' step's ElementCount. + # TODO(ajamato): Matcher for the 'metrics' step's MeanByteCount. + # TODO(ajamato): Matcher for the start and finish exec times. + # TODO(ajamato): Matcher for a gauge metric once implemented in dataflow. + matchers = [ + # User Counter Metrics. + MetricResultMatcher( + name='total_values', + namespace=METRIC_NAMESPACE, + step='metrics', + attempted=sum(INPUT), + committed=sum(INPUT) + ), + MetricResultMatcher( + name='ExecutionTime_StartBundle', + step='metrics', + attempted=greater_than(0), + committed=greater_than(0) + ), + MetricResultMatcher( + name='ExecutionTime_ProcessElement', + step='metrics', + attempted=greater_than(0), + committed=greater_than(0) + ), + MetricResultMatcher( + name='ExecutionTime_FinishBundle', + step='metrics', + attempted=greater_than(0), + committed=greater_than(0) + ) + ] + + pcoll_names = [ + 'GroupByKey/Reify-out0', + 'GroupByKey/Read-out0', + 'map_to_common_key-out0', + 'GroupByKey/GroupByWindow-out0', + 'GroupByKey/Read-out0', + 'GroupByKey/Reify-out0' + ] + for name in pcoll_names: +matchers.extend([ +MetricResultMatcher( +name='ElementCount', +labels={ +'output_user_name': name, +'original_name': '%s-ElementCount' % name +}, +attempted=greater_than(0), +committed=greater_than(0) +), +MetricResultMatcher( +name='MeanByteCount', +labels={ +'output_user_name': name, +'original_name': '%s-MeanByteCount' % name +}, +attempted=greater_than(0), +committed=greater_than(0) +), +]) + return matchers + + +def fn_api_metric_matchers(): + """MetricResult matchers with adjusted step names for the FN API DF
[beam] branch master updated: [BEAM-6330] KinesisIO, updated AWS SDK version
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 024199f [BEAM-6330] KinesisIO, updated AWS SDK version new 2e56253 Merge pull request #8072: [BEAM-6330] Update AWS SDK version to version 1.11.519 (for KinesisIO) 024199f is described below commit 024199f0b418e5c715dda630ad01d80ce068f076 Author: Alexey Romanenko AuthorDate: Fri Mar 15 18:44:44 2019 +0100 [BEAM-6330] KinesisIO, updated AWS SDK version --- sdks/java/io/kinesis/build.gradle | 2 +- .../beam/sdk/io/kinesis/AmazonKinesisMock.java | 39 ++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kinesis/build.gradle b/sdks/java/io/kinesis/build.gradle index 127db4f..8c5cee3 100644 --- a/sdks/java/io/kinesis/build.gradle +++ b/sdks/java/io/kinesis/build.gradle @@ -37,7 +37,7 @@ test { forkEvery 1 } -def aws_version = "1.11.255" +def aws_version = "1.11.519" dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java index 4749d2f..dc3fdbe 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java @@ -37,8 +37,12 @@ import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult; import com.amazonaws.services.kinesis.model.DeleteStreamRequest; import com.amazonaws.services.kinesis.model.DeleteStreamResult; +import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerRequest; +import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerResult; import com.amazonaws.services.kinesis.model.DescribeLimitsRequest; import com.amazonaws.services.kinesis.model.DescribeLimitsResult; +import com.amazonaws.services.kinesis.model.DescribeStreamConsumerRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamConsumerResult; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest; @@ -53,6 +57,10 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; +import com.amazonaws.services.kinesis.model.ListStreamConsumersRequest; +import com.amazonaws.services.kinesis.model.ListStreamConsumersResult; import com.amazonaws.services.kinesis.model.ListStreamsRequest; import com.amazonaws.services.kinesis.model.ListStreamsResult; import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest; @@ -64,6 +72,8 @@ import com.amazonaws.services.kinesis.model.PutRecordResult; import com.amazonaws.services.kinesis.model.PutRecordsRequest; import com.amazonaws.services.kinesis.model.PutRecordsResult; import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.RegisterStreamConsumerRequest; +import com.amazonaws.services.kinesis.model.RegisterStreamConsumerResult; import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest; import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult; import com.amazonaws.services.kinesis.model.Shard; @@ -278,6 +288,12 @@ class AmazonKinesisMock implements AmazonKinesis { } @Override + public DeregisterStreamConsumerResult deregisterStreamConsumer( + DeregisterStreamConsumerRequest deregisterStreamConsumerRequest) { +throw new RuntimeException("Not implemented"); + } + + @Override public DescribeLimitsResult describeLimits(DescribeLimitsRequest describeLimitsRequest) { throw new RuntimeException("Not implemented"); } @@ -299,6 +315,12 @@ class AmazonKinesisMock implements AmazonKinesis { } @Override + public DescribeStreamConsumerResult describeStreamConsumer( + DescribeStreamConsumerRequest describeStreamConsumerRequest) { +throw new RuntimeException("Not implemented"); + } + + @Override public DescribeStreamSummaryResult describeStreamSummary( DescribeStreamSummaryRequest describeStreamSummaryRequest) { throw new RuntimeException("Not implemented"); @@ -335,6 +357,17 @@ class AmazonKinesisMock implements AmazonKinesis
[beam] branch master updated: [BEAM-6771] MetricsContainerStepMap#equals required for Spark.
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 468cafe [BEAM-6771] MetricsContainerStepMap#equals required for Spark. new e51aa5f Merge pull request #8032: [BEAM-6771] Add MetricsContainerStepMap equals method 468cafe is described below commit 468cafed059c16e4c018ea240a04ce54edc5fd0d Author: Kyle Winkelman AuthorDate: Mon Mar 11 17:11:18 2019 -0500 [BEAM-6771] MetricsContainerStepMap#equals required for Spark. --- .../beam/runners/core/metrics/CounterCell.java | 18 +++ .../beam/runners/core/metrics/DirtyState.java | 16 ++ .../runners/core/metrics/DistributionCell.java | 18 +++ .../beam/runners/core/metrics/GaugeCell.java | 18 +++ .../runners/core/metrics/MetricsContainerImpl.java | 19 .../core/metrics/MetricsContainerStepMap.java | 19 +--- .../beam/runners/core/metrics/MetricsMap.java | 16 ++ .../beam/runners/core/metrics/CounterCellTest.java | 30 +++ .../beam/runners/core/metrics/DirtyStateTest.java | 21 + .../runners/core/metrics/DistributionCellTest.java | 31 +++ .../beam/runners/core/metrics/GaugeCellTest.java | 30 +++ .../core/metrics/MetricsContainerImplTest.java | 35 ++ .../core/metrics/MetricsContainerStepMapTest.java | 30 +++ .../beam/runners/core/metrics/MetricsMapTest.java | 22 ++ 14 files changed, 312 insertions(+), 11 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java index d1abafc..9ca5cdb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core.metrics; +import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -90,4 +91,21 @@ public class CounterCell implements Counter, MetricCell { public MetricName getName() { return name; } + + @Override + public boolean equals(Object object) { +if (object instanceof CounterCell) { + CounterCell counterCell = (CounterCell) object; + return Objects.equals(dirty, counterCell.dirty) + && Objects.equals(value.get(), counterCell.value.get()) + && Objects.equals(name, counterCell.name); +} + +return false; + } + + @Override + public int hashCode() { +return Objects.hash(dirty, value.get(), name); + } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java index 7759d5e..2503d83 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core.metrics; import java.io.Serializable; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -95,4 +96,19 @@ public class DirtyState implements Serializable { public void afterCommit() { dirty.compareAndSet(State.COMMITTING, State.CLEAN); } + + @Override + public boolean equals(Object object) { +if (object instanceof DirtyState) { + DirtyState dirtyState = (DirtyState) object; + return Objects.equals(dirty.get(), dirtyState.dirty.get()); +} + +return false; + } + + @Override + public int hashCode() { +return dirty.get().hashCode(); + } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java index c39fee0..ca85de2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core.metrics; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -84,4 +85,21 @@ public class DistributionCell implements Distribution, MetricCell { public MetricName getName() { return name; } + +