[beam] branch master updated: Merge #8686: [BEAM-7399] Blog post on looping timers
This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 1aee9a8 Merge #8686: [BEAM-7399] Blog post on looping timers 1aee9a8 is described below commit 1aee9a87616b21a0af0911708c9e0a4ad35f82be Author: Reza Rokni <7542791+rezaro...@users.noreply.github.com> AuthorDate: Wed Jun 12 10:41:06 2019 +0800 Merge #8686: [BEAM-7399] Blog post on looping timers --- website/src/_data/authors.yml | 5 +- website/src/_posts/2019-06-11-looping-timers.md | 342 2 files changed, 346 insertions(+), 1 deletion(-) diff --git a/website/src/_data/authors.yml b/website/src/_data/authors.yml index e6ce735..eb37ed5 100644 --- a/website/src/_data/authors.yml +++ b/website/src/_data/authors.yml @@ -115,4 +115,7 @@ rfernand: mbaetens: name: Matthias Baetens email: baetensmatth...@gmail.com -twitter: matthiasbaetens \ No newline at end of file +twitter: matthiasbaetens +rez: +name: Reza Rokni +email: r...@google.com diff --git a/website/src/_posts/2019-06-11-looping-timers.md b/website/src/_posts/2019-06-11-looping-timers.md new file mode 100644 index 000..ff1cb5d --- /dev/null +++ b/website/src/_posts/2019-06-11-looping-timers.md @@ -0,0 +1,342 @@ +--- +layout: post +title: "Looping timers in Apache Beam" +date: 2019-06-11 00:00:01 -0800 +excerpt_separator: +categories: blog +authors: + - rez + - klk +--- + + +Apache Beam’s primitives let you build expressive data pipelines, suitable for a +variety of use cases. One specific use case is the analysis of time series data +in which continuous sequences across window boundaries are important. A few fun +challenges arise as you tackle this type of data and in this blog we will +explore one of those in more detail and make use of the Timer API +([blog post]({{ site.baseurl }}/blog/2017/08/28/timely-processing.html)) +using the "looping timer" pattern. + + + +With Beam in streaming mode, you can take streams of data and build analytical +transforms to produce results on the data. But for time series data, the absence +of data is useful information. So how can we produce results in the absence of +data? + +Let's use a more concrete example to illustrate the requirement. Imagine you +have a simple pipeline that sums the number of events coming from an IoT device +every minute. We would like to produce the value 0 when no data has been seen +within a specific time interval. So why can this get tricky? Well it is easy to +build a simple pipeline that counts events as they arrive, but when there is no +event, there is nothing to count! + +Let's build a simple pipeline to work with: + +``` + // We will start our timer at 1 sec from the fixed upper boundary of our + // minute window + Instant now = Instant.parse("2000-01-01T00:00:59Z"); + + // - Create some dummy data + + // Create 3 elements, incrementing by 1 minute and leaving a time gap between + // element 2 and element 3 + TimestampedValue> time_1 = +TimestampedValue.of(KV.of("Key_A", 1), now); + + TimestampedValue> time_2 = +TimestampedValue.of(KV.of("Key_A", 2), +now.plus(Duration.standardMinutes(1))); + + // No Value for start time + 2 mins + TimestampedValue> time_3 = +TimestampedValue.of(KV.of("Key_A", 3), +now.plus(Duration.standardMinutes(3))); + + // Create pipeline + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() +.as(PipelineOptions.class); + + Pipeline p = Pipeline.create(options); + + // Apply a fixed window of duration 1 min and Sum the results + p.apply(Create.timestamped(time_1, time_2, time_3)) + .apply( + Window.>into( +FixedWindows.of(Duration.standardMinutes(1 +.apply(Sum.integersPerKey()) +.apply(ParDo.of(new DoFn, KV>() { + + @ProcessElement public void process(ProcessContext c) { +LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp()); + } + })); + + p.run(); +``` + +Running that pipeline will result in the following output: + +``` +INFO LoopingTimer - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z +INFO LoopingTimer - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z +INFO LoopingTimer - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z +``` + +> Note: The lack of order in the output should be expected, however the +> key-window tuple is correctly computed. + + +As expected, we see output in each of the interval windows which had a data +point with a timestamp between the minimum and maximum value of the window. +There was a data point at timestamps 00:00:59, 00:01:59 and 00:03:59, which +fell into the following interval windows. + +* [00:00:00, 00:00:59.999) +* [00:01:00, 00:01:59.999) +* [00:03:00, 00:03:59.999) + +But as there was no
[beam] branch master updated: Add aliases for machine_type and disk_type flags to match existing documentation (#8603)
This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 37b76b6 Add aliases for machine_type and disk_type flags to match existing documentation (#8603) 37b76b6 is described below commit 37b76b67b5d0cbd92e6a3fadee67f9fcf93cbc5d Author: Ahmet Altay AuthorDate: Tue Jun 11 14:22:42 2019 -0700 Add aliases for machine_type and disk_type flags to match existing documentation (#8603) * Add aliases for machine_type and disk_type flags to match existing documentation --- sdks/python/apache_beam/options/pipeline_options.py | 4 ++-- sdks/python/apache_beam/options/pipeline_options_test.py | 13 + 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index fde1a7e..1155018 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -558,7 +558,7 @@ class WorkerOptions(PipelineOptions): help= ('If and how to autoscale the workerpool.')) parser.add_argument( -'--worker_machine_type', +'--worker_machine_type', '--machine_type', dest='machine_type', default=None, help=('Machine type to create Dataflow worker VMs as. See ' @@ -574,7 +574,7 @@ class WorkerOptions(PipelineOptions): ('Remote worker disk size, in gigabytes, or 0 to use the default size. ' 'If not set, the Dataflow service will use a reasonable default.')) parser.add_argument( -'--worker_disk_type', +'--worker_disk_type', '--disk_type', dest='disk_type', default=None, help=('Specifies what type of persistent disk should be used.')) diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 5c51725..e082c8d 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -28,6 +28,7 @@ from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions from apache_beam.options.pipeline_options import TypeOptions +from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.options.value_provider import StaticValueProvider from apache_beam.transforms.display import DisplayData @@ -252,6 +253,18 @@ class PipelineOptionsTest(unittest.TestCase): options = PipelineOptions(flags=['']) self.assertEqual(options.get_all_options()['experiments'], None) + def test_worker_options(self): +options = PipelineOptions(['--machine_type', 'abc', '--disk_type', 'def']) +worker_options = options.view_as(WorkerOptions) +self.assertEqual(worker_options.machine_type, 'abc') +self.assertEqual(worker_options.disk_type, 'def') + +options = PipelineOptions( +['--worker_machine_type', 'abc', '--worker_disk_type', 'def']) +worker_options = options.view_as(WorkerOptions) +self.assertEqual(worker_options.machine_type, 'abc') +self.assertEqual(worker_options.disk_type, 'def') + def test_option_modifications_are_shared_between_views(self): pipeline_options = PipelineOptions([ '--mock_option', 'value', '--mock_flag',
[beam] 01/01: Merge pull request #8812: [BEAM-7044] portable Spark: support stateful dofns
This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 4f22c546a9985ae95f65ee19fada1a91e1864aca Merge: e7aadf5 d779459 Author: Maximilian Michels AuthorDate: Tue Jun 11 23:11:42 2019 +0200 Merge pull request #8812: [BEAM-7044] portable Spark: support stateful dofns runners/spark/job-server/build.gradle | 3 +- .../beam/runners/spark/coders/CoderHelpers.java| 36 --- .../spark/translation/GroupCombineFunctions.java | 33 +- .../SparkBatchPortablePipelineTranslator.java | 70 +++--- .../translation/SparkExecutableStageFunction.java | 25 .../spark/translation/TransformTranslator.java | 4 +- .../spark/translation/TranslationUtils.java| 31 -- .../runners/portability/spark_runner_test.py | 4 -- 8 files changed, 172 insertions(+), 34 deletions(-)
[beam] branch master updated (e7aadf5 -> 4f22c54)
This is an automated email from the ASF dual-hosted git repository. mxm pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from e7aadf5 Use when/then style for Mockito stubs add d779459 [BEAM-7044] portable Spark: support stateful dofns new 4f22c54 Merge pull request #8812: [BEAM-7044] portable Spark: support stateful dofns The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: runners/spark/job-server/build.gradle | 3 +- .../beam/runners/spark/coders/CoderHelpers.java| 36 --- .../spark/translation/GroupCombineFunctions.java | 33 +- .../SparkBatchPortablePipelineTranslator.java | 70 +++--- .../translation/SparkExecutableStageFunction.java | 25 .../spark/translation/TransformTranslator.java | 4 +- .../spark/translation/TranslationUtils.java| 31 -- .../runners/portability/spark_runner_test.py | 4 -- 8 files changed, 172 insertions(+), 34 deletions(-)
[beam] 01/01: Use when/then style for Mockito stubs
This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit e7aadf58f348a17bb2e74781d201e40ccf996a80 Merge: 76a1d51 36df7b4 Author: Lukasz Cwik AuthorDate: Tue Jun 11 11:35:15 2019 -0700 Use when/then style for Mockito stubs .../extensions/gcp/util/CustomHttpErrorsTest.java | 8 ++-- .../io/kinesis/DynamicCheckpointGeneratorTest.java | 28 ++-- .../beam/sdk/io/kinesis/RecordFilterTest.java | 12 +++--- .../beam/sdk/io/kinesis/ShardCheckpointTest.java | 5 +-- .../io/kinesis/SimplifiedKinesisClientTest.java| 50 +++--- .../io/kinesis/StartingPointShardsFinderTest.java | 38 6 files changed, 67 insertions(+), 74 deletions(-)
[beam] branch master updated (76a1d51 -> e7aadf5)
This is an automated email from the ASF dual-hosted git repository. lcwik pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 76a1d51 [BEAM-5995] add Jenkins job with GBK Python load tests (#8151) add 36df7b4 Use when/then style for Mockito stubs new e7aadf5 Use when/then style for Mockito stubs The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../extensions/gcp/util/CustomHttpErrorsTest.java | 8 ++-- .../io/kinesis/DynamicCheckpointGeneratorTest.java | 28 ++-- .../beam/sdk/io/kinesis/RecordFilterTest.java | 12 +++--- .../beam/sdk/io/kinesis/ShardCheckpointTest.java | 5 +-- .../io/kinesis/SimplifiedKinesisClientTest.java| 50 +++--- .../io/kinesis/StartingPointShardsFinderTest.java | 38 6 files changed, 67 insertions(+), 74 deletions(-)
[beam] branch master updated (47ba869 -> 76a1d51)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 47ba869 Merge pull request #8783 from robinyqiu/resilience add 76a1d51 [BEAM-5995] add Jenkins job with GBK Python load tests (#8151) No new revisions were added by this update. Summary of changes: .test-infra/jenkins/LoadTestsBuilder.groovy| 12 +- .../jenkins/job_LoadTests_Combine_Java.groovy | 7 +- .test-infra/jenkins/job_LoadTests_GBK_Java.groovy | 9 +- .../jenkins/job_LoadTests_Java_Smoke.groovy| 8 +- .../jenkins/job_LoadTests_ParDo_Java.groovy| 7 +- .test-infra/jenkins/job_LoadTests_Python.groovy| 143 +++-- ...on.groovy => job_LoadTests_Python_Smoke.groovy} | 21 +-- .../apache_beam/testing/load_tests/build.gradle| 5 +- 8 files changed, 128 insertions(+), 84 deletions(-) copy .test-infra/jenkins/{job_LoadTests_Python.groovy => job_LoadTests_Python_Smoke.groovy} (72%)
[beam] branch master updated: Remove the unnecessary enable_health_checker flag
This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a8bdcc5 Remove the unnecessary enable_health_checker flag new 47ba869 Merge pull request #8783 from robinyqiu/resilience a8bdcc5 is described below commit a8bdcc599aa1686717f76a048e82151d0c2b4fe2 Author: Yueyang Qiu AuthorDate: Thu Jun 6 16:54:56 2019 -0700 Remove the unnecessary enable_health_checker flag --- .../runners/dataflow/internal/apiclient.py | 7 -- .../runners/dataflow/internal/apiclient_test.py| 28 -- 2 files changed, 35 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 8b7f9e3..1eec4d1 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -191,13 +191,6 @@ class Environment(object): if ('use_multiple_sdk_containers' not in debug_options_experiments and 'no_use_multiple_sdk_containers' not in debug_options_experiments): debug_options_experiments.append('use_multiple_sdk_containers') - # Add enable_health_checker flag if it's not already present. Do not - # add the flag if 'disable_health_checker' is present. - # TODO[BEAM-7466]: Cleanup enable_health_checker once Python SDK 2.13 - # becomes unsupported. - if ('enable_health_checker' not in debug_options_experiments and - 'disable_health_checker' not in debug_options_experiments): -debug_options_experiments.append('enable_health_checker') # FlexRS if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED': self.proto.flexResourceSchedulingGoal = ( diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index b67d0ce..0c75b4b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -500,34 +500,6 @@ class UtilTest(unittest.TestCase): self.assertNotIn( 'use_multiple_sdk_containers', environment.proto.experiments) - def test_experiment_enable_health_checker(self): -pipeline_options = PipelineOptions( -['--project', 'test_project', '--job_name', 'test_job_name', - '--temp_location', 'gs://test-location/temp', - '--experiments', 'beam_fn_api']) -environment = apiclient.Environment( -[], pipeline_options, 1, FAKE_PIPELINE_URL) -self.assertIn('enable_health_checker', environment.proto.experiments) - -pipeline_options = PipelineOptions( -['--project', 'test_project', '--job_name', 'test_job_name', - '--temp_location', 'gs://test-location/temp', - '--experiments', 'beam_fn_api', - '--experiments', 'enable_health_checker']) -environment = apiclient.Environment( -[], pipeline_options, 1, FAKE_PIPELINE_URL) -self.assertIn('enable_health_checker', environment.proto.experiments) - -pipeline_options = PipelineOptions( -['--project', 'test_project', '--job_name', 'test_job_name', - '--temp_location', 'gs://test-location/temp', - '--experiments', 'beam_fn_api', - '--experiments', 'disable_health_checker']) -environment = apiclient.Environment( -[], pipeline_options, 1, FAKE_PIPELINE_URL) -self.assertNotIn('enable_health_checker', environment.proto.experiments) -self.assertIn('disable_health_checker', environment.proto.experiments) - @mock.patch( 'apache_beam.runners.dataflow.internal.apiclient.sys.version_info', (3, 5))
[beam] branch master updated: [BEAM-7526] Fix toBeamValue logic in BigQueryUtils
This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2e17f12 [BEAM-7526] Fix toBeamValue logic in BigQueryUtils new 01f14a0 Merge pull request #8814 from charithe/beam-7526 2e17f12 is described below commit 2e17f12fa8fea43fb93a47f1770e163bac65cb3a Author: Charith Ellawala AuthorDate: Tue Jun 11 11:09:11 2019 +0100 [BEAM-7526] Fix toBeamValue logic in BigQueryUtils --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 7 +-- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 9 +++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 1a87875..3ef1507 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -444,12 +444,15 @@ public class BigQueryUtils { if (jsonBQValue instanceof List) { return ((List) jsonBQValue) .stream() + .map(v -> ((Map) v).get("v")) .map(v -> toBeamValue(fieldType.getCollectionElementType(), v)) .collect(toList()); } -if (jsonBQValue instanceof TableRow) { - return toBeamRow(fieldType.getRowSchema(), (TableRow) jsonBQValue); +if (jsonBQValue instanceof Map) { + TableRow tr = new TableRow(); + tr.putAll((Map) jsonBQValue); + return toBeamRow(fieldType.getRowSchema(), tr); } throw new UnsupportedOperationException( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 3315598..2226be9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -131,7 +131,11 @@ public class BigQueryUtilsTest { Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 124L)).build(); private static final TableRow BQ_ARRAY_ROW = - new TableRow().set("ids", Arrays.asList("123", "124")); + new TableRow() + .set( + "ids", + Arrays.asList( + Collections.singletonMap("v", "123"), Collections.singletonMap("v", "124"))); private static final Row ROW_ROW = Row.withSchema(ROW_TYPE).addValues(FLAT_ROW).build(); @@ -141,7 +145,8 @@ public class BigQueryUtilsTest { Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) Arrays.asList(FLAT_ROW)).build(); private static final TableRow BQ_ARRAY_ROW_ROW = - new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW)); + new TableRow() + .set("rows", Collections.singletonList(Collections.singletonMap("v", BQ_FLAT_ROW))); private static final TableSchema BQ_FLAT_TYPE = new TableSchema().setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID));
[beam] branch master updated (bd81d3f -> 2451601)
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from bd81d3f Merge pull request #8806 from mxm/release-blog-post add bae5db1 [BEAM-6865] refactor InMemoryBagUserStateFactory to standalone class add 2451601 Merge pull request #8802: [BEAM-6865] refactor InMemoryBagUserStateFactory to standalone class No new revisions were added by this update. Summary of changes: .../functions/FlinkExecutableStageFunction.java| 98 + .../state/InMemoryBagUserStateFactory.java | 119 + 2 files changed, 120 insertions(+), 97 deletions(-) create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java