[beam] branch master updated: [BEAM-6828] Adding ValueProvider support for BQ transforms.

2019-03-15 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 1c28082  [BEAM-6828] Adding ValueProvider support for BQ transforms.
 new 10d4839  Merge pull request #8045 from pabloem/vp-bq
1c28082 is described below

commit 1c280820a23ad662788183f0a75fab9865acbec0
Author: pabloem 
AuthorDate: Tue Mar 12 16:11:56 2019 -0700

[BEAM-6828] Adding ValueProvider support for BQ transforms.
---
 .../examples/cookbook/bigquery_tornadoes.py|  2 +-
 sdks/python/apache_beam/io/gcp/bigquery.py | 40 +---
 .../apache_beam/io/gcp/bigquery_file_loads.py  |  5 ++-
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 43 --
 sdks/python/apache_beam/io/gcp/bigquery_test.py| 39 
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 27 --
 6 files changed, 117 insertions(+), 39 deletions(-)

diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py 
b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 9db0f73..9be3f89 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -75,7 +75,7 @@ def run(argv=None):
'or DATASET.TABLE.'))
 
   parser.add_argument('--gcs_location',
-  required=True,
+  required=False,
   help=('GCS Location to store files to load '
 'data into Bigquery'))
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index a1b6f99..27ee67a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -626,6 +626,8 @@ class BigQueryWriteFn(DoFn):
 """
 if schema is None:
   return schema
+elif isinstance(schema, (str, unicode)):
+  return bigquery_tools.parse_table_schema_from_json(schema)
 elif isinstance(schema, dict):
   return bigquery_tools.parse_table_schema_from_json(json.dumps(schema))
 else:
@@ -644,7 +646,7 @@ class BigQueryWriteFn(DoFn):
 num_retries=1,
 max_delay_secs=1500))
 
-  def _create_table_if_needed(self, schema, table_reference):
+  def _create_table_if_needed(self, table_reference, schema=None):
 str_table_reference = '%s:%s.%s' % (
 table_reference.projectId,
 table_reference.datasetId,
@@ -673,8 +675,8 @@ class BigQueryWriteFn(DoFn):
   schema = destination[1]
   destination = destination[0]
   self._create_table_if_needed(
-  schema,
-  bigquery_tools.parse_table_reference(destination))
+  bigquery_tools.parse_table_reference(destination),
+  schema)
 
 row = element[1]
 self._rows_buffer[destination].append(row)
@@ -766,7 +768,7 @@ class WriteToBigQuery(PTransform):
 """Initialize a WriteToBigQuery transform.
 
 Args:
-  table (str, callable): The ID of the table, or a callable
+  table (str, callable, ValueProvider): The ID of the table, or a callable
  that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
  numbers ``0-9``, or underscores ``_``. If dataset argument is
  :data:`None` then the table argument must contain the entire table
@@ -782,10 +784,11 @@ class WriteToBigQuery(PTransform):
   project (str): The ID of the project containing this table or
 :data:`None` if the table reference is specified entirely by the table
 argument.
-  schema (str): The schema to be used if the BigQuery table to write has to
-be created. This can be either specified as a
-:class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema`
+  schema (str,dict,ValueProvider): The schema to be used if the
+BigQuery table to write has to be created. This can be either specified
+as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
+or a python dictionary, or the string or dictionary itself.
 object or a single string  of the form
 ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
 separated list of fields. Here ``'type'`` should specify the BigQuery
@@ -925,6 +928,8 @@ bigquery_v2_messages.TableSchema):
   return WriteToBigQuery.table_schema_to_dict(table_schema)
 elif isinstance(schema, bigquery.TableSchema):
   return WriteToBigQuery.table_schema_to_dict(schema)
+elif isinstance(schema, vp.ValueProvider):
+  return schema
 else:
   raise TypeError('Unexpected schema argument: %s.' % schema)
 
@@ -934,7 +939,7 @@ bigquery_v2_messages.TableSchema):
 # TODO(pabloem): Use a different 

[beam] branch aaltay-patch-1 deleted (was 6adb5a5)

2019-03-15 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch aaltay-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git.


 was 6adb5a5  Fix a typo

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



[beam] branch master updated (652adec -> e08db7f)

2019-03-15 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 652adec  Merge pull request #8044 from amaliujia/rw_add_list_row
 add 6adb5a5  Fix a typo
 add e08db7f  Merge pull request #8073 from apache/aaltay-patch-1

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/metrics/execution.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[beam] branch aaltay-patch-1 created (now 6adb5a5)

2019-03-15 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a change to branch aaltay-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git.


  at 6adb5a5  Fix a typo

This branch includes the following new commits:

 new 6adb5a5  Fix a typo

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[beam] 01/01: Fix a typo

2019-03-15 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch aaltay-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6adb5a5763abf973786e868d103b24d8a30f8862
Author: Ahmet Altay 
AuthorDate: Fri Mar 15 16:23:54 2019 -0700

Fix a typo
---
 sdks/python/apache_beam/metrics/execution.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/metrics/execution.py 
b/sdks/python/apache_beam/metrics/execution.py
index 57a0a09..4f873b8 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -49,7 +49,7 @@ class MetricKey(object):
   """Key used to identify instance of metric cell.
 
   Metrics are internally keyed by the name of the step they're associated with,
-  the name and namespace (if it is a user defined metric) of the the metric,
+  the name and namespace (if it is a user defined metric) of the metric,
   and any extra label metadata added by the runner specific metric collection
   service.
   """



[beam] branch aaltay-patch-1 deleted (was 47db394)

2019-03-15 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a change to branch aaltay-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git.


 was 47db394  Fix a typo

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



[beam] branch master updated: [BEAM-6814] toListRow in BeamEnumerableConverter.

2019-03-15 Thread anton
This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 5b2fb1c  [BEAM-6814] toListRow in BeamEnumerableConverter.
 new 652adec  Merge pull request #8044 from amaliujia/rw_add_list_row
5b2fb1c is described below

commit 5b2fb1c854706369472a7afbde2e8bcabd4df659
Author: amaliujia 
AuthorDate: Tue Mar 12 15:51:17 2019 -0700

[BEAM-6814] toListRow in BeamEnumerableConverter.
---
 .../sql/impl/rel/BeamEnumerableConverter.java  | 63 --
 .../sql/impl/rel/BeamEnumerableConverterTest.java  | 17 ++
 2 files changed, 65 insertions(+), 15 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 3fb9a67..755d589 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -129,6 +129,15 @@ public class BeamEnumerableConverter extends ConverterImpl 
implements Enumerable
 return options;
   }
 
+  static List toRowList(PipelineOptions options, BeamRelNode node) {
+if (node instanceof BeamIOSinkRel) {
+  throw new UnsupportedOperationException("Does not support BeamIOSinkRel 
in toRowList.");
+} else if (isLimitQuery(node)) {
+  throw new UnsupportedOperationException("Does not support queries with 
LIMIT in toRowList.");
+}
+return collectRowList(options, node);
+  }
+
   static Enumerable toEnumerable(PipelineOptions options, BeamRelNode 
node) {
 if (node instanceof BeamIOSinkRel) {
   return count(options, node);
@@ -143,7 +152,7 @@ public class BeamEnumerableConverter extends ConverterImpl 
implements Enumerable
   PipelineOptions options,
   BeamRelNode node,
   DoFn doFn,
-  Queue values,
+  Queue values,
   int limitCount) {
 options.as(DirectOptions.class).setBlockOnRun(false);
 Pipeline pipeline = Pipeline.create(options);
@@ -174,9 +183,36 @@ public class BeamEnumerableConverter extends ConverterImpl 
implements Enumerable
 return result;
   }
 
+  private static void runCollector(PipelineOptions options, BeamRelNode node) {
+Pipeline pipeline = Pipeline.create(options);
+PCollection resultCollection = 
BeamSqlRelUtils.toPCollection(pipeline, node);
+resultCollection.apply(ParDo.of(new Collector()));
+PipelineResult result = pipeline.run();
+result.waitUntilFinish();
+  }
+
+  private static List collectRowList(PipelineOptions options, BeamRelNode 
node) {
+long id = options.getOptionsId();
+Queue values = new ConcurrentLinkedQueue<>();
+
+checkArgument(
+options
+.getRunner()
+.getCanonicalName()
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"collectRowList is only available in direct runner.");
+
+Collector.globalValues.put(id, values);
+
+runCollector(options, node);
+
+Collector.globalValues.remove(id);
+return values.stream().collect(Collectors.toList());
+  }
+
   private static Enumerable collect(PipelineOptions options, 
BeamRelNode node) {
 long id = options.getOptionsId();
-Queue values = new ConcurrentLinkedQueue<>();
+Queue values = new ConcurrentLinkedQueue<>();
 
 checkArgument(
 options
@@ -187,20 +223,16 @@ public class BeamEnumerableConverter extends 
ConverterImpl implements Enumerable
 
 Collector.globalValues.put(id, values);
 
-Pipeline pipeline = Pipeline.create(options);
-PCollection resultCollection = 
BeamSqlRelUtils.toPCollection(pipeline, node);
-resultCollection.apply(ParDo.of(new Collector()));
-PipelineResult result = pipeline.run();
-result.waitUntilFinish();
+runCollector(options, node);
 
 Collector.globalValues.remove(id);
 
-return Linq4j.asEnumerable(unboxValues(values));
+return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values));
   }
 
   private static Enumerable limitCollect(PipelineOptions options, 
BeamRelNode node) {
 long id = options.getOptionsId();
-ConcurrentLinkedQueue values = new ConcurrentLinkedQueue<>();
+ConcurrentLinkedQueue values = new ConcurrentLinkedQueue<>();
 
 checkArgument(
 options
@@ -220,15 +252,15 @@ public class BeamEnumerableConverter extends 
ConverterImpl implements Enumerable
   values.remove();
 }
 
-return Linq4j.asEnumerable(unboxValues(values));
+return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values));
   }
 
   private static class Collector extends DoFn {
 
 // This will only work on the direct runner.
-private static final 

[beam] branch master updated: [BEAM-6185] Upgrade Spark to version 2.4.0

2019-03-15 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 8d4ce24  [BEAM-6185] Upgrade Spark to version 2.4.0
 new e68ba74  Merge pull request #7216: [BEAM-6185] Upgrade Spark to 
version 2.4.0
8d4ce24 is described below

commit 8d4ce24ffaae2cb4c5332c5f7692a48e007fb731
Author: Jean-Baptiste Onofré 
AuthorDate: Thu Dec 6 12:18:56 2018 +0100

[BEAM-6185] Upgrade Spark to version 2.4.0
---
 buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 7c50a52..8ef5246 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -354,7 +354,7 @@ class BeamModulePlugin implements Plugin {
 def hamcrest_version = "1.3"
 def hadoop_version = "2.7.3"
 def jackson_version = "2.9.8"
-def spark_version = "2.3.2"
+def spark_version = "2.4.0"
 def nemo_version = "0.1"
 def apex_core_version = "3.7.0"
 def apex_malhar_version = "3.4.0"



[beam] branch master updated: [BEAM-6773] Add ValueProvider to CassandraIO.Read

2019-03-15 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 24406c9  [BEAM-6773] Add ValueProvider to CassandraIO.Read
 new 6e96e23  Merge pull request #8024: [BEAM-6773] Add ValueProvider to 
CassandraIO.Read
24406c9 is described below

commit 24406c9d3259a511646be661c687c73e3c2a531b
Author: Radoslaw Stankiewicz 
AuthorDate: Wed Mar 6 17:37:26 2019 +0100

[BEAM-6773] Add ValueProvider to CassandraIO.Read

Enable parametrization at runtime and enable future integrations with 
templates
---
 .../apache/beam/sdk/io/cassandra/CassandraIO.java  | 181 +
 .../beam/sdk/io/cassandra/CassandraIOTest.java |  20 +++
 2 files changed, 167 insertions(+), 34 deletions(-)

diff --git 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 06e6d1a..1e34a27 100644
--- 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -131,16 +132,16 @@ public class CassandraIO {
   @AutoValue
   public abstract static class Read extends PTransform> {
 @Nullable
-abstract List hosts();
+abstract ValueProvider> hosts();
 
 @Nullable
-abstract Integer port();
+abstract ValueProvider port();
 
 @Nullable
-abstract String keyspace();
+abstract ValueProvider keyspace();
 
 @Nullable
-abstract String table();
+abstract ValueProvider table();
 
 @Nullable
 abstract Class entity();
@@ -149,10 +150,10 @@ public class CassandraIO {
 abstract Coder coder();
 
 @Nullable
-abstract String username();
+abstract ValueProvider username();
 
 @Nullable
-abstract String password();
+abstract ValueProvider password();
 
 @Nullable
 abstract String encryptedPassword();
@@ -161,16 +162,16 @@ public class CassandraIO {
 abstract PasswordDecrypter passwordDecrypter();
 
 @Nullable
-abstract String localDc();
+abstract ValueProvider localDc();
 
 @Nullable
-abstract String consistencyLevel();
+abstract ValueProvider consistencyLevel();
 
 @Nullable
-abstract String where();
+abstract ValueProvider where();
 
 @Nullable
-abstract Integer minNumberOfSplits();
+abstract ValueProvider minNumberOfSplits();
 
 abstract Builder builder();
 
@@ -178,24 +179,44 @@ public class CassandraIO {
 public Read withHosts(List hosts) {
   checkArgument(hosts != null, "hosts can not be null");
   checkArgument(!hosts.isEmpty(), "hosts can not be empty");
+  return 
builder().setHosts(ValueProvider.StaticValueProvider.of(hosts)).build();
+}
+
+/** Specify the hosts of the Apache Cassandra instances. */
+public Read withHosts(ValueProvider> hosts) {
   return builder().setHosts(hosts).build();
 }
 
 /** Specify the port number of the Apache Cassandra instances. */
 public Read withPort(int port) {
   checkArgument(port > 0, "port must be > 0, but was: %s", port);
+  return 
builder().setPort(ValueProvider.StaticValueProvider.of(port)).build();
+}
+
+/** Specify the port number of the Apache Cassandra instances. */
+public Read withPort(ValueProvider port) {
   return builder().setPort(port).build();
 }
 
 /** Specify the Cassandra keyspace where to read data. */
 public Read withKeyspace(String keyspace) {
   checkArgument(keyspace != null, "keyspace can not be null");
+  return 
builder().setKeyspace(ValueProvider.StaticValueProvider.of(keyspace)).build();
+}
+
+/** Specify the Cassandra keyspace where to read data. */
+public Read withKeyspace(ValueProvider keyspace) {
   return builder().setKeyspace(keyspace).build();
 }
 
 /** Specify the Cassandra table where to read data. */
 public Read withTable(String table) {
   checkArgument(table != null, "table can not be null");
+  return 
builder().setTable(ValueProvider.StaticValueProvider.of(table)).build();
+}
+
+/** Specify the Cassandra table where to read data. */
+public Read withTable(ValueProvider table) {
   return builder().setTable(table).build();
 }
 
@@ -218,12 +239,22 @@ public class CassandraIO {
 /** Specify the username for authentication. */

[beam] branch master updated: Add MetricResultMatcher verification for user metrics, GBK element count, and process time to an integration test. (#8038)

2019-03-15 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 2c2b9d2  Add MetricResultMatcher verification for user metrics, GBK 
element count, and process time to an integration test. (#8038)
2c2b9d2 is described below

commit 2c2b9d2ba7c3cea048688a8fc0abd25030046005
Author: Alex Amato 
AuthorDate: Fri Mar 15 13:26:34 2019 -0700

Add MetricResultMatcher verification for user metrics, GBK element count, 
and process time to an integration test. (#8038)

[BEAM-6844] Add MetricResultMatcher verification for user metrics, GBK 
element count, and process time to an integration test.
---
 .../dataflow/dataflow_exercise_metrics_pipeline.py | 199 +
 .../dataflow_exercise_metrics_pipeline_test.py |  68 +++
 .../runners/dataflow/dataflow_metrics.py   |  80 +++--
 .../runners/dataflow/dataflow_metrics_test.py  |  10 +-
 4 files changed, 336 insertions(+), 21 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py
 
b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py
new file mode 100644
index 000..a77497f
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import time
+
+from hamcrest.library.number.ordering_comparison import greater_than
+
+import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.testing.metric_result_matchers import DistributionMatcher
+from apache_beam.testing.metric_result_matchers import MetricResultMatcher
+
+SLEEP_TIME_SECS = 1
+INPUT = [0, 0, 0, 100]
+METRIC_NAMESPACE = ('apache_beam.runners.dataflow.'
+'dataflow_exercise_metrics_pipeline.UserMetricsDoFn')
+
+
+def common_metric_matchers():
+  """MetricResult matchers common to all tests."""
+  # TODO(ajamato): Matcher for the 'metrics' step's ElementCount.
+  # TODO(ajamato): Matcher for the 'metrics' step's MeanByteCount.
+  # TODO(ajamato): Matcher for the start and finish exec times.
+  # TODO(ajamato): Matcher for a gauge metric once implemented in dataflow.
+  matchers = [
+  # User Counter Metrics.
+  MetricResultMatcher(
+  name='total_values',
+  namespace=METRIC_NAMESPACE,
+  step='metrics',
+  attempted=sum(INPUT),
+  committed=sum(INPUT)
+  ),
+  MetricResultMatcher(
+  name='ExecutionTime_StartBundle',
+  step='metrics',
+  attempted=greater_than(0),
+  committed=greater_than(0)
+  ),
+  MetricResultMatcher(
+  name='ExecutionTime_ProcessElement',
+  step='metrics',
+  attempted=greater_than(0),
+  committed=greater_than(0)
+  ),
+  MetricResultMatcher(
+  name='ExecutionTime_FinishBundle',
+  step='metrics',
+  attempted=greater_than(0),
+  committed=greater_than(0)
+  )
+  ]
+
+  pcoll_names = [
+  'GroupByKey/Reify-out0',
+  'GroupByKey/Read-out0',
+  'map_to_common_key-out0',
+  'GroupByKey/GroupByWindow-out0',
+  'GroupByKey/Read-out0',
+  'GroupByKey/Reify-out0'
+  ]
+  for name in pcoll_names:
+matchers.extend([
+MetricResultMatcher(
+name='ElementCount',
+labels={
+'output_user_name': name,
+'original_name': '%s-ElementCount' % name
+},
+attempted=greater_than(0),
+committed=greater_than(0)
+),
+MetricResultMatcher(
+name='MeanByteCount',
+labels={
+'output_user_name': name,
+'original_name': '%s-MeanByteCount' % name
+},
+attempted=greater_than(0),
+committed=greater_than(0)
+),
+])
+  return matchers
+
+
+def fn_api_metric_matchers():
+  """MetricResult matchers with adjusted step names for the FN API DF 

[beam] branch master updated: [BEAM-6330] KinesisIO, updated AWS SDK version

2019-03-15 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 024199f  [BEAM-6330] KinesisIO, updated AWS SDK version
 new 2e56253  Merge pull request #8072: [BEAM-6330] Update AWS SDK version 
to version 1.11.519 (for KinesisIO)
024199f is described below

commit 024199f0b418e5c715dda630ad01d80ce068f076
Author: Alexey Romanenko 
AuthorDate: Fri Mar 15 18:44:44 2019 +0100

[BEAM-6330] KinesisIO, updated AWS SDK version
---
 sdks/java/io/kinesis/build.gradle  |  2 +-
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java | 39 ++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/sdks/java/io/kinesis/build.gradle 
b/sdks/java/io/kinesis/build.gradle
index 127db4f..8c5cee3 100644
--- a/sdks/java/io/kinesis/build.gradle
+++ b/sdks/java/io/kinesis/build.gradle
@@ -37,7 +37,7 @@ test {
   forkEvery 1
 }
 
-def aws_version = "1.11.255"
+def aws_version = "1.11.519"
 
 dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 4749d2f..dc3fdbe 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -37,8 +37,12 @@ import 
com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest
 import 
com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult;
 import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
 import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerRequest;
+import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerResult;
 import com.amazonaws.services.kinesis.model.DescribeLimitsRequest;
 import com.amazonaws.services.kinesis.model.DescribeLimitsResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamConsumerRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamConsumerResult;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest;
@@ -53,6 +57,10 @@ import 
com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import 
com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
 import 
com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
+import com.amazonaws.services.kinesis.model.ListStreamConsumersRequest;
+import com.amazonaws.services.kinesis.model.ListStreamConsumersResult;
 import com.amazonaws.services.kinesis.model.ListStreamsRequest;
 import com.amazonaws.services.kinesis.model.ListStreamsResult;
 import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest;
@@ -64,6 +72,8 @@ import com.amazonaws.services.kinesis.model.PutRecordResult;
 import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import com.amazonaws.services.kinesis.model.PutRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.RegisterStreamConsumerRequest;
+import com.amazonaws.services.kinesis.model.RegisterStreamConsumerResult;
 import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest;
 import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult;
 import com.amazonaws.services.kinesis.model.Shard;
@@ -278,6 +288,12 @@ class AmazonKinesisMock implements AmazonKinesis {
   }
 
   @Override
+  public DeregisterStreamConsumerResult deregisterStreamConsumer(
+  DeregisterStreamConsumerRequest deregisterStreamConsumerRequest) {
+throw new RuntimeException("Not implemented");
+  }
+
+  @Override
   public DescribeLimitsResult describeLimits(DescribeLimitsRequest 
describeLimitsRequest) {
 throw new RuntimeException("Not implemented");
   }
@@ -299,6 +315,12 @@ class AmazonKinesisMock implements AmazonKinesis {
   }
 
   @Override
+  public DescribeStreamConsumerResult describeStreamConsumer(
+  DescribeStreamConsumerRequest describeStreamConsumerRequest) {
+throw new RuntimeException("Not implemented");
+  }
+
+  @Override
   public DescribeStreamSummaryResult describeStreamSummary(
   DescribeStreamSummaryRequest describeStreamSummaryRequest) {
 throw new RuntimeException("Not implemented");
@@ -335,6 +357,17 @@ class AmazonKinesisMock implements AmazonKinesis 

[beam] branch master updated: [BEAM-6771] MetricsContainerStepMap#equals required for Spark.

2019-03-15 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 468cafe  [BEAM-6771] MetricsContainerStepMap#equals required for Spark.
 new e51aa5f  Merge pull request #8032: [BEAM-6771] Add 
MetricsContainerStepMap equals method
468cafe is described below

commit 468cafed059c16e4c018ea240a04ce54edc5fd0d
Author: Kyle Winkelman 
AuthorDate: Mon Mar 11 17:11:18 2019 -0500

[BEAM-6771] MetricsContainerStepMap#equals required for Spark.
---
 .../beam/runners/core/metrics/CounterCell.java | 18 +++
 .../beam/runners/core/metrics/DirtyState.java  | 16 ++
 .../runners/core/metrics/DistributionCell.java | 18 +++
 .../beam/runners/core/metrics/GaugeCell.java   | 18 +++
 .../runners/core/metrics/MetricsContainerImpl.java | 19 
 .../core/metrics/MetricsContainerStepMap.java  | 19 +---
 .../beam/runners/core/metrics/MetricsMap.java  | 16 ++
 .../beam/runners/core/metrics/CounterCellTest.java | 30 +++
 .../beam/runners/core/metrics/DirtyStateTest.java  | 21 +
 .../runners/core/metrics/DistributionCellTest.java | 31 +++
 .../beam/runners/core/metrics/GaugeCellTest.java   | 30 +++
 .../core/metrics/MetricsContainerImplTest.java | 35 ++
 .../core/metrics/MetricsContainerStepMapTest.java  | 30 +++
 .../beam/runners/core/metrics/MetricsMapTest.java  | 22 ++
 14 files changed, 312 insertions(+), 11 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
index d1abafc..9ca5cdb 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.metrics;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -90,4 +91,21 @@ public class CounterCell implements Counter, 
MetricCell {
   public MetricName getName() {
 return name;
   }
+
+  @Override
+  public boolean equals(Object object) {
+if (object instanceof CounterCell) {
+  CounterCell counterCell = (CounterCell) object;
+  return Objects.equals(dirty, counterCell.dirty)
+  && Objects.equals(value.get(), counterCell.value.get())
+  && Objects.equals(name, counterCell.name);
+}
+
+return false;
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(dirty, value.get(), name);
+  }
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
index 7759d5e..2503d83 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core.metrics;
 
 import java.io.Serializable;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -95,4 +96,19 @@ public class DirtyState implements Serializable {
   public void afterCommit() {
 dirty.compareAndSet(State.COMMITTING, State.CLEAN);
   }
+
+  @Override
+  public boolean equals(Object object) {
+if (object instanceof DirtyState) {
+  DirtyState dirtyState = (DirtyState) object;
+  return Objects.equals(dirty.get(), dirtyState.dirty.get());
+}
+
+return false;
+  }
+
+  @Override
+  public int hashCode() {
+return dirty.get().hashCode();
+  }
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
index c39fee0..ca85de2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.metrics;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -84,4 +85,21 @@ public class DistributionCell implements Distribution, 
MetricCell {
   public MetricName getName() {
 return name;
   }
+
+