[beam] branch master updated: Merge #8686: [BEAM-7399] Blog post on looping timers

2019-06-11 Thread kenn
This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 1aee9a8  Merge #8686: [BEAM-7399] Blog post on looping timers
1aee9a8 is described below

commit 1aee9a87616b21a0af0911708c9e0a4ad35f82be
Author: Reza Rokni <7542791+rezaro...@users.noreply.github.com>
AuthorDate: Wed Jun 12 10:41:06 2019 +0800

Merge #8686: [BEAM-7399] Blog post on looping timers
---
 website/src/_data/authors.yml   |   5 +-
 website/src/_posts/2019-06-11-looping-timers.md | 342 
 2 files changed, 346 insertions(+), 1 deletion(-)

diff --git a/website/src/_data/authors.yml b/website/src/_data/authors.yml
index e6ce735..eb37ed5 100644
--- a/website/src/_data/authors.yml
+++ b/website/src/_data/authors.yml
@@ -115,4 +115,7 @@ rfernand:
 mbaetens:
 name: Matthias Baetens
 email: baetensmatth...@gmail.com
-twitter: matthiasbaetens
\ No newline at end of file
+twitter: matthiasbaetens
+rez:
+name: Reza Rokni
+email: r...@google.com
diff --git a/website/src/_posts/2019-06-11-looping-timers.md 
b/website/src/_posts/2019-06-11-looping-timers.md
new file mode 100644
index 000..ff1cb5d
--- /dev/null
+++ b/website/src/_posts/2019-06-11-looping-timers.md
@@ -0,0 +1,342 @@
+---
+layout: post
+title:  "Looping timers in Apache Beam"
+date:   2019-06-11 00:00:01 -0800
+excerpt_separator: 
+categories: blog
+authors:
+  - rez
+  - klk
+---
+
+
+Apache Beam’s primitives let you build expressive data pipelines, suitable for 
a
+variety of use cases. One specific use case is the analysis of time series data
+in which continuous sequences across window boundaries are important. A few fun
+challenges arise as you tackle this type of data and in this blog we will
+explore one of those in more detail and make use of the Timer API
+([blog post]({{ site.baseurl }}/blog/2017/08/28/timely-processing.html))
+using the "looping timer" pattern.
+
+
+
+With Beam in streaming mode, you can take streams of data and build analytical
+transforms to produce results on the data. But for time series data, the 
absence
+of data is useful information. So how can we produce results in the absence of
+data?
+
+Let's use a more concrete example to illustrate the requirement. Imagine you
+have a simple pipeline that sums the number of events coming from an IoT device
+every minute. We would like to produce the value 0 when no data has been seen
+within a specific time interval. So why can this get tricky? Well it is easy to
+build a simple pipeline that counts events as they arrive, but when there is no
+event, there is nothing to count!
+
+Let's build a simple pipeline to work with:
+
+```
+  // We will start our timer at 1 sec from the fixed upper boundary of our
+  // minute window
+  Instant now = Instant.parse("2000-01-01T00:00:59Z");
+
+  // - Create some dummy data
+
+  // Create 3 elements, incrementing by 1 minute and leaving a time gap between
+  // element 2 and element 3
+  TimestampedValue> time_1 =
+TimestampedValue.of(KV.of("Key_A", 1), now);
+
+  TimestampedValue> time_2 =
+TimestampedValue.of(KV.of("Key_A", 2),
+now.plus(Duration.standardMinutes(1)));
+
+  // No Value for start time + 2 mins
+  TimestampedValue> time_3 =
+TimestampedValue.of(KV.of("Key_A", 3),
+now.plus(Duration.standardMinutes(3)));
+
+  // Create pipeline
+  PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
+.as(PipelineOptions.class);
+
+  Pipeline p = Pipeline.create(options);
+
+  // Apply a fixed window of duration 1 min and Sum the results
+  p.apply(Create.timestamped(time_1, time_2, time_3))
+   .apply(
+  Window.>into(
+FixedWindows.of(Duration.standardMinutes(1
+.apply(Sum.integersPerKey())
+.apply(ParDo.of(new DoFn, KV>() {
+
+  @ProcessElement public void process(ProcessContext c) {
+LOG.info("Value is {} timestamp is {}", c.element(), 
c.timestamp());
+  }
+   }));
+
+  p.run();
+```
+
+Running that pipeline will result in the following output:
+
+```
+INFO  LoopingTimer  - Value is KV{Key_A, 1} timestamp is 
2000-01-01T00:00:59.999Z
+INFO  LoopingTimer  - Value is KV{Key_A, 3} timestamp is 
2000-01-01T00:03:59.999Z
+INFO  LoopingTimer  - Value is KV{Key_A, 2} timestamp is 
2000-01-01T00:01:59.999Z
+```
+
+> Note: The lack of order in the output should be expected, however the
+> key-window tuple is correctly computed.
+
+
+As expected, we see output in each of the interval windows which had a data
+point with a timestamp between the minimum and maximum value of the window.
+There was a data point at timestamps  00:00:59,  00:01:59 and  00:03:59, which
+fell into the following interval windows.
+
+*  [00:00:00, 00:00:59.999)
+*  [00:01:00, 00:01:59.999)
+*  [00:03:00, 00:03:59.999)
+
+But as there was no 

[beam] branch master updated: Add aliases for machine_type and disk_type flags to match existing documentation (#8603)

2019-06-11 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 37b76b6  Add aliases for machine_type and disk_type flags to match 
existing documentation (#8603)
37b76b6 is described below

commit 37b76b67b5d0cbd92e6a3fadee67f9fcf93cbc5d
Author: Ahmet Altay 
AuthorDate: Tue Jun 11 14:22:42 2019 -0700

Add aliases for machine_type and disk_type flags to match existing 
documentation (#8603)

* Add aliases for machine_type and disk_type flags to match existing 
documentation
---
 sdks/python/apache_beam/options/pipeline_options.py  |  4 ++--
 sdks/python/apache_beam/options/pipeline_options_test.py | 13 +
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index fde1a7e..1155018 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -558,7 +558,7 @@ class WorkerOptions(PipelineOptions):
 help=
 ('If and how to autoscale the workerpool.'))
 parser.add_argument(
-'--worker_machine_type',
+'--worker_machine_type', '--machine_type',
 dest='machine_type',
 default=None,
 help=('Machine type to create Dataflow worker VMs as. See '
@@ -574,7 +574,7 @@ class WorkerOptions(PipelineOptions):
 ('Remote worker disk size, in gigabytes, or 0 to use the default size. 
'
  'If not set, the Dataflow service will use a reasonable default.'))
 parser.add_argument(
-'--worker_disk_type',
+'--worker_disk_type', '--disk_type',
 dest='disk_type',
 default=None,
 help=('Specifies what type of persistent disk should be used.'))
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py 
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 5c51725..e082c8d 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -28,6 +28,7 @@ from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import ProfilingOptions
 from apache_beam.options.pipeline_options import TypeOptions
+from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.options.value_provider import StaticValueProvider
 from apache_beam.transforms.display import DisplayData
@@ -252,6 +253,18 @@ class PipelineOptionsTest(unittest.TestCase):
 options = PipelineOptions(flags=[''])
 self.assertEqual(options.get_all_options()['experiments'], None)
 
+  def test_worker_options(self):
+options = PipelineOptions(['--machine_type', 'abc', '--disk_type', 'def'])
+worker_options = options.view_as(WorkerOptions)
+self.assertEqual(worker_options.machine_type, 'abc')
+self.assertEqual(worker_options.disk_type, 'def')
+
+options = PipelineOptions(
+['--worker_machine_type', 'abc', '--worker_disk_type', 'def'])
+worker_options = options.view_as(WorkerOptions)
+self.assertEqual(worker_options.machine_type, 'abc')
+self.assertEqual(worker_options.disk_type, 'def')
+
   def test_option_modifications_are_shared_between_views(self):
 pipeline_options = PipelineOptions([
 '--mock_option', 'value', '--mock_flag',



[beam] 01/01: Merge pull request #8812: [BEAM-7044] portable Spark: support stateful dofns

2019-06-11 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4f22c546a9985ae95f65ee19fada1a91e1864aca
Merge: e7aadf5 d779459
Author: Maximilian Michels 
AuthorDate: Tue Jun 11 23:11:42 2019 +0200

Merge pull request #8812: [BEAM-7044] portable Spark: support stateful dofns

 runners/spark/job-server/build.gradle  |  3 +-
 .../beam/runners/spark/coders/CoderHelpers.java| 36 ---
 .../spark/translation/GroupCombineFunctions.java   | 33 +-
 .../SparkBatchPortablePipelineTranslator.java  | 70 +++---
 .../translation/SparkExecutableStageFunction.java  | 25 
 .../spark/translation/TransformTranslator.java |  4 +-
 .../spark/translation/TranslationUtils.java| 31 --
 .../runners/portability/spark_runner_test.py   |  4 --
 8 files changed, 172 insertions(+), 34 deletions(-)



[beam] branch master updated (e7aadf5 -> 4f22c54)

2019-06-11 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from e7aadf5  Use when/then style for Mockito stubs
 add d779459  [BEAM-7044] portable Spark: support stateful dofns
 new 4f22c54  Merge pull request #8812: [BEAM-7044] portable Spark: support 
stateful dofns

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 runners/spark/job-server/build.gradle  |  3 +-
 .../beam/runners/spark/coders/CoderHelpers.java| 36 ---
 .../spark/translation/GroupCombineFunctions.java   | 33 +-
 .../SparkBatchPortablePipelineTranslator.java  | 70 +++---
 .../translation/SparkExecutableStageFunction.java  | 25 
 .../spark/translation/TransformTranslator.java |  4 +-
 .../spark/translation/TranslationUtils.java| 31 --
 .../runners/portability/spark_runner_test.py   |  4 --
 8 files changed, 172 insertions(+), 34 deletions(-)



[beam] 01/01: Use when/then style for Mockito stubs

2019-06-11 Thread lcwik
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e7aadf58f348a17bb2e74781d201e40ccf996a80
Merge: 76a1d51 36df7b4
Author: Lukasz Cwik 
AuthorDate: Tue Jun 11 11:35:15 2019 -0700

Use when/then style for Mockito stubs

 .../extensions/gcp/util/CustomHttpErrorsTest.java  |  8 ++--
 .../io/kinesis/DynamicCheckpointGeneratorTest.java | 28 ++--
 .../beam/sdk/io/kinesis/RecordFilterTest.java  | 12 +++---
 .../beam/sdk/io/kinesis/ShardCheckpointTest.java   |  5 +--
 .../io/kinesis/SimplifiedKinesisClientTest.java| 50 +++---
 .../io/kinesis/StartingPointShardsFinderTest.java  | 38 
 6 files changed, 67 insertions(+), 74 deletions(-)



[beam] branch master updated (76a1d51 -> e7aadf5)

2019-06-11 Thread lcwik
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 76a1d51  [BEAM-5995] add Jenkins job with GBK Python load tests (#8151)
 add 36df7b4  Use when/then style for Mockito stubs
 new e7aadf5  Use when/then style for Mockito stubs

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../extensions/gcp/util/CustomHttpErrorsTest.java  |  8 ++--
 .../io/kinesis/DynamicCheckpointGeneratorTest.java | 28 ++--
 .../beam/sdk/io/kinesis/RecordFilterTest.java  | 12 +++---
 .../beam/sdk/io/kinesis/ShardCheckpointTest.java   |  5 +--
 .../io/kinesis/SimplifiedKinesisClientTest.java| 50 +++---
 .../io/kinesis/StartingPointShardsFinderTest.java  | 38 
 6 files changed, 67 insertions(+), 74 deletions(-)



[beam] branch master updated (47ba869 -> 76a1d51)

2019-06-11 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 47ba869  Merge pull request #8783 from robinyqiu/resilience
 add 76a1d51  [BEAM-5995] add Jenkins job with GBK Python load tests (#8151)

No new revisions were added by this update.

Summary of changes:
 .test-infra/jenkins/LoadTestsBuilder.groovy|  12 +-
 .../jenkins/job_LoadTests_Combine_Java.groovy  |   7 +-
 .test-infra/jenkins/job_LoadTests_GBK_Java.groovy  |   9 +-
 .../jenkins/job_LoadTests_Java_Smoke.groovy|   8 +-
 .../jenkins/job_LoadTests_ParDo_Java.groovy|   7 +-
 .test-infra/jenkins/job_LoadTests_Python.groovy| 143 +++--
 ...on.groovy => job_LoadTests_Python_Smoke.groovy} |  21 +--
 .../apache_beam/testing/load_tests/build.gradle|   5 +-
 8 files changed, 128 insertions(+), 84 deletions(-)
 copy .test-infra/jenkins/{job_LoadTests_Python.groovy => 
job_LoadTests_Python_Smoke.groovy} (72%)



[beam] branch master updated: Remove the unnecessary enable_health_checker flag

2019-06-11 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new a8bdcc5  Remove the unnecessary enable_health_checker flag
 new 47ba869  Merge pull request #8783 from robinyqiu/resilience
a8bdcc5 is described below

commit a8bdcc599aa1686717f76a048e82151d0c2b4fe2
Author: Yueyang Qiu 
AuthorDate: Thu Jun 6 16:54:56 2019 -0700

Remove the unnecessary enable_health_checker flag
---
 .../runners/dataflow/internal/apiclient.py |  7 --
 .../runners/dataflow/internal/apiclient_test.py| 28 --
 2 files changed, 35 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 8b7f9e3..1eec4d1 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -191,13 +191,6 @@ class Environment(object):
   if ('use_multiple_sdk_containers' not in debug_options_experiments and
   'no_use_multiple_sdk_containers' not in debug_options_experiments):
 debug_options_experiments.append('use_multiple_sdk_containers')
-  # Add enable_health_checker flag if it's not already present. Do not
-  # add the flag if 'disable_health_checker' is present.
-  # TODO[BEAM-7466]: Cleanup enable_health_checker once Python SDK 2.13
-  # becomes unsupported.
-  if ('enable_health_checker' not in debug_options_experiments and
-  'disable_health_checker' not in debug_options_experiments):
-debug_options_experiments.append('enable_health_checker')
 # FlexRS
 if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
   self.proto.flexResourceSchedulingGoal = (
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index b67d0ce..0c75b4b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -500,34 +500,6 @@ class UtilTest(unittest.TestCase):
 self.assertNotIn(
 'use_multiple_sdk_containers', environment.proto.experiments)
 
-  def test_experiment_enable_health_checker(self):
-pipeline_options = PipelineOptions(
-['--project', 'test_project', '--job_name', 'test_job_name',
- '--temp_location', 'gs://test-location/temp',
- '--experiments', 'beam_fn_api'])
-environment = apiclient.Environment(
-[], pipeline_options, 1, FAKE_PIPELINE_URL)
-self.assertIn('enable_health_checker', environment.proto.experiments)
-
-pipeline_options = PipelineOptions(
-['--project', 'test_project', '--job_name', 'test_job_name',
- '--temp_location', 'gs://test-location/temp',
- '--experiments', 'beam_fn_api',
- '--experiments', 'enable_health_checker'])
-environment = apiclient.Environment(
-[], pipeline_options, 1, FAKE_PIPELINE_URL)
-self.assertIn('enable_health_checker', environment.proto.experiments)
-
-pipeline_options = PipelineOptions(
-['--project', 'test_project', '--job_name', 'test_job_name',
- '--temp_location', 'gs://test-location/temp',
- '--experiments', 'beam_fn_api',
- '--experiments', 'disable_health_checker'])
-environment = apiclient.Environment(
-[], pipeline_options, 1, FAKE_PIPELINE_URL)
-self.assertNotIn('enable_health_checker', environment.proto.experiments)
-self.assertIn('disable_health_checker', environment.proto.experiments)
-
   @mock.patch(
   'apache_beam.runners.dataflow.internal.apiclient.sys.version_info',
   (3, 5))



[beam] branch master updated: [BEAM-7526] Fix toBeamValue logic in BigQueryUtils

2019-06-11 Thread anton
This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 2e17f12  [BEAM-7526] Fix toBeamValue logic in BigQueryUtils
 new 01f14a0  Merge pull request #8814 from charithe/beam-7526
2e17f12 is described below

commit 2e17f12fa8fea43fb93a47f1770e163bac65cb3a
Author: Charith Ellawala 
AuthorDate: Tue Jun 11 11:09:11 2019 +0100

[BEAM-7526] Fix toBeamValue logic in BigQueryUtils
---
 .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java  | 7 +--
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java   | 9 +++--
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 1a87875..3ef1507 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -444,12 +444,15 @@ public class BigQueryUtils {
 if (jsonBQValue instanceof List) {
   return ((List) jsonBQValue)
   .stream()
+  .map(v -> ((Map) v).get("v"))
   .map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
   .collect(toList());
 }
 
-if (jsonBQValue instanceof TableRow) {
-  return toBeamRow(fieldType.getRowSchema(), (TableRow) jsonBQValue);
+if (jsonBQValue instanceof Map) {
+  TableRow tr = new TableRow();
+  tr.putAll((Map) jsonBQValue);
+  return toBeamRow(fieldType.getRowSchema(), tr);
 }
 
 throw new UnsupportedOperationException(
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 3315598..2226be9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -131,7 +131,11 @@ public class BigQueryUtilsTest {
   Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 
124L)).build();
 
   private static final TableRow BQ_ARRAY_ROW =
-  new TableRow().set("ids", Arrays.asList("123", "124"));
+  new TableRow()
+  .set(
+  "ids",
+  Arrays.asList(
+  Collections.singletonMap("v", "123"), 
Collections.singletonMap("v", "124")));
 
   private static final Row ROW_ROW = 
Row.withSchema(ROW_TYPE).addValues(FLAT_ROW).build();
 
@@ -141,7 +145,8 @@ public class BigQueryUtilsTest {
   Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) 
Arrays.asList(FLAT_ROW)).build();
 
   private static final TableRow BQ_ARRAY_ROW_ROW =
-  new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW));
+  new TableRow()
+  .set("rows", Collections.singletonList(Collections.singletonMap("v", 
BQ_FLAT_ROW)));
 
   private static final TableSchema BQ_FLAT_TYPE =
   new TableSchema().setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, 
VALID));



[beam] branch master updated (bd81d3f -> 2451601)

2019-06-11 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from bd81d3f  Merge pull request #8806 from mxm/release-blog-post
 add bae5db1  [BEAM-6865] refactor InMemoryBagUserStateFactory to 
standalone class
 add 2451601  Merge pull request #8802: [BEAM-6865] refactor 
InMemoryBagUserStateFactory to standalone class

No new revisions were added by this update.

Summary of changes:
 .../functions/FlinkExecutableStageFunction.java|  98 +
 .../state/InMemoryBagUserStateFactory.java | 119 +
 2 files changed, 120 insertions(+), 97 deletions(-)
 create mode 100644 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java