[GitHub] [beam] pabloem commented on pull request #11560: Auto-inferring project for ReadFromBigQuery
pabloem commented on pull request #11560: URL: https://github.com/apache/beam/pull/11560#issuecomment-622670110 PAssing precommit: https://builds.apache.org/job/beam_PostCommit_Python37_PR/132/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11560: Auto-inferring project for ReadFromBigQuery
pabloem commented on pull request #11560: URL: https://github.com/apache/beam/pull/11560#issuecomment-622669830 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…
rahul8383 commented on pull request #11581: URL: https://github.com/apache/beam/pull/11581#issuecomment-622661943 I will make the necessary changes as suggested by @TheNeuralBit in https://lists.apache.org/thread.html/r281e2913379c9733f6ac5baa08f361cc4ebe880a9880b2d54d6095b0%40%3Cdev.beam.apache.org%3E This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r418843954 ## File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java ## @@ -59,7 +59,7 @@ + "AL1|2|allergy|Z91.013^Personal history of allergy to sea food^ZAL|SEVERE|Swollen face|\r" + "AL1|3|allergy|Z91.040^Latex allergy^ZAL|MODERATE|Raised, itchy, red rash|", // Another ADT Message - "MSH|^~\\&|hl7Integration|hl7Integration|ADT^A08|||2.5|\r" + "MSH|^~\\&|hl7Integration|hl7Integration|||20190309132544||ADT^A08|||2.5|\r" Review comment: this time stamp will always be present in valid HL7v2 Messages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r418843954 ## File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java ## @@ -59,7 +59,7 @@ + "AL1|2|allergy|Z91.013^Personal history of allergy to sea food^ZAL|SEVERE|Swollen face|\r" + "AL1|3|allergy|Z91.040^Latex allergy^ZAL|MODERATE|Raised, itchy, red rash|", // Another ADT Message - "MSH|^~\\&|hl7Integration|hl7Integration|ADT^A08|||2.5|\r" + "MSH|^~\\&|hl7Integration|hl7Integration|||20190309132544||ADT^A08|||2.5|\r" Review comment: this time stamp will always be present in valid HL7v2 Messages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf opened a new pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages
jaketf opened a new pull request #11596: URL: https://github.com/apache/beam/pull/11596 This PR is an experiment to illustrate a potential strategy for implementing HL7v2IO.ListMessages as a splittable DoFn that carves the sendTime dimension up into time ranges and uses the Messages.List API. ## DISCLAIMER This PR is currently a discussion piece to help drive the decision of the future of HL7v2IO.ListMessages. It has excessive logging for demonstrating behavior / debugging and is currently broken. CC: @pabloem ## Questions - What are the prototypical tests for validating splitable DoFn's various methods (e.g. SplitRestriction) stability? ## Current Status: Broken Currently get confusing NPE with OffsetRangeTracker when running `./gradlew :sdks:java:io:google-cloud-platform:integrationTest --tests "org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOReadIT.testHL7v2IO_ListHL7v2Messages_filtered"` I seem to get the expected initial daily splits: ``` WARNING: splitting initial sendTime restriction of [minSendTime, now): [2019-03-09T13:24:44.000Z,2020-05-02T02:18:51.850Z), or [1552137884000, 1588385931850). total days: 419 into 420 splits. Last split: [1588339484000, 1588385931850) ``` NPE Error: ``` java.lang.NullPointerException org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) at org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOReadIT.testHL7v2IO_ListHL7v2Messages_filtered(HL7v2IOReadIT.java:135) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.ParentRunner.run(ParentRunner.java:412) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at
[GitHub] [beam] rahul8383 edited a comment on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…
rahul8383 edited a comment on pull request #11581: URL: https://github.com/apache/beam/pull/11581#issuecomment-622652488 JdbcIO.Read -> SqlTransform.query(SELECT COUNT(*) FROM PCOLLECTION /*Any query*/ ) throws NPE if the input PCollection to SqlTransform has JdbcIO specific Logical Types(defined in org.apache.beam.sdk.io.jdbc.LogicalTypes) in its Schema. Please find the Source Table Schema and the attached exception stack trace in the JIRA ticket: BEAM-8307. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…
rahul8383 commented on pull request #11581: URL: https://github.com/apache/beam/pull/11581#issuecomment-622652488 JdbcIO.Read -> SqlTransform.query(SELECT COUNT(*) FROM PCOLLECTION //Any query ) would throw NPE if the input PCollection to SqlTransform JdbcIO specific Logical Types in its Schema. I have provided the Source Table Schema and attached the Exception stack trace in the JIRA ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #11559: [BEAM-9836] Excluding spark runner for KeyTests
reuvenlax commented on pull request #11559: URL: https://github.com/apache/beam/pull/11559#issuecomment-622649935 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…
reuvenlax commented on pull request #11581: URL: https://github.com/apache/beam/pull/11581#issuecomment-622649872 Where do you see the NPE? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback
ibzib commented on a change in pull request #11492: URL: https://github.com/apache/beam/pull/11492#discussion_r418813369 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -247,25 +247,27 @@ public void reduce(Iterable> iterable, Collector transformAndTimerId, Timer timerValue) -> { -FnDataReceiver fnTimerReceiver = -bundle.getTimerReceivers().get(transformAndTimerId); -Preconditions.checkNotNull( -fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId); -try { - fnTimerReceiver.accept(timerValue); -} catch (Exception e) { - throw new RuntimeException( - String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); -} - }, - currentTimerKey); +while (timerInternals.hasPendingTimers()) { Review comment: Made #11595 to update Spark. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11595: [BEAM-9801] Fire timers set within timers in Spark.
ibzib commented on pull request #11595: URL: https://github.com/apache/beam/pull/11595#issuecomment-622648458 Run Python Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib opened a new pull request #11595: [BEAM-9801] Fire timers set within timers in Spark.
ibzib opened a new pull request #11595: URL: https://github.com/apache/beam/pull/11595 I refactored a bit first because there were so many nested `for` and `try` blocks it hurt my eyes, in addition to unnecessary abstraction. Probably we should consider fixing up Spark Python validates runner and making it a precommit, so we catch things like this in the future. I don't think it's too expensive. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
[GitHub] [beam] ananvay commented on pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.
ananvay commented on pull request #11593: URL: https://github.com/apache/beam/pull/11593#issuecomment-622631226 Thanks Robert, LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on a change in pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.
ibzib commented on a change in pull request #11593: URL: https://github.com/apache/beam/pull/11593#discussion_r418781167 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -345,7 +345,7 @@ def visit_transform(self, transform_node): for ix, side_input in enumerate(transform_node.side_inputs): access_pattern = side_input._side_input_data().access_pattern if access_pattern == common_urns.side_inputs.ITERABLE.urn: - if use_unified_worker: + if use_unified_worker or not use_fn_api: Review comment: So `side_input.pvalue.element_type = typehints.Any` is a side effect and `new_side_input = _DataflowIterableSideInput(side_input)` has no effect? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11560: Auto-inferring project for ReadFromBigQuery
pabloem commented on pull request #11560: URL: https://github.com/apache/beam/pull/11560#issuecomment-622611048 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback
lukecwik commented on a change in pull request #11492: URL: https://github.com/apache/beam/pull/11492#discussion_r418778828 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -247,25 +247,27 @@ public void reduce(Iterable> iterable, Collector transformAndTimerId, Timer timerValue) -> { -FnDataReceiver fnTimerReceiver = -bundle.getTimerReceivers().get(transformAndTimerId); -Preconditions.checkNotNull( -fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId); -try { - fnTimerReceiver.accept(timerValue); -} catch (Exception e) { - throw new RuntimeException( - String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); -} - }, - currentTimerKey); +while (timerInternals.hasPendingTimers()) { Review comment: Also, I made this same comment earlier today and it was lost somehow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback
lukecwik commented on a change in pull request #11492: URL: https://github.com/apache/beam/pull/11492#discussion_r418778740 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -247,25 +247,27 @@ public void reduce(Iterable> iterable, Collector transformAndTimerId, Timer timerValue) -> { -FnDataReceiver fnTimerReceiver = -bundle.getTimerReceivers().get(transformAndTimerId); -Preconditions.checkNotNull( -fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId); -try { - fnTimerReceiver.accept(timerValue); -} catch (Exception e) { - throw new RuntimeException( - String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); -} - }, - currentTimerKey); +while (timerInternals.hasPendingTimers()) { Review comment: It makes sense to do this in batch as well since the processing timers should get dropped and any newly scheduled event time timers should be continually fired. This would require updating FlinkExecutableStageFunction and SparkExecutableStageFunction This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rohdesamuel opened a new pull request #11594: [BEAM-9692] Replace apply_WriteToBigQuery with PTransformOverride
rohdesamuel opened a new pull request #11594: URL: https://github.com/apache/beam/pull/11594 Replace apply_WriteToBigQuery with PTransformOverride Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] robertwb commented on a change in pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.
robertwb commented on a change in pull request #11593: URL: https://github.com/apache/beam/pull/11593#discussion_r418777832 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -345,7 +345,7 @@ def visit_transform(self, transform_node): for ix, side_input in enumerate(transform_node.side_inputs): access_pattern = side_input._side_input_data().access_pattern if access_pattern == common_urns.side_inputs.ITERABLE.urn: - if use_unified_worker: + if use_unified_worker or not use_fn_api: Review comment: The coder gets updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud commented on a change in pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType
apilloud commented on a change in pull request #11272: URL: https://github.com/apache/beam/pull/11272#discussion_r418773897 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import java.time.LocalDate; +import org.apache.beam.sdk.schemas.Schema; + +/** + * A date without a time-zone. + * + * It cannot represent an instant on the time-line without additional information such as an + * offset or time-zone. + */ +public class Date implements Schema.LogicalType { Review comment: We should consider not using a JVM, it adds performance overhead too. 邏 I'm reasonably convinced the wire format is good and the conversion here is lossless, so if there isn't a easy drop-in replacement leave this as is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.
robertwb commented on pull request #11593: URL: https://github.com/apache/beam/pull/11593#issuecomment-622596915 R: @ibzib @ananvay CC: @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.
robertwb opened a new pull request #11593: URL: https://github.com/apache/beam/pull/11593 For various reasons, Dataflow patches up the proto representations of side inputs. This change ensures these mutations are reflected regardless of whether FnAPI was explicitly requested. After this change the Beam protos produced are identical (though the v1beta3 representation may change significantly). Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
[GitHub] [beam] Hannah-Jiang commented on a change in pull request #11584: [BEAM-9136]support isRelease tag for docker build command & update release guide
Hannah-Jiang commented on a change in pull request #11584: URL: https://github.com/apache/beam/pull/11584#discussion_r418766264 ## File path: website/src/contribute/release-guide.md ## @@ -688,8 +688,20 @@ Verify that files are [present](https://dist.apache.org/repos/dist/dev/beam). * Build Python images and push to DockerHub. ``` -./gradlew :sdks:python:container:buildAll -Pdocker-tag=${RELEASE}_rc{RC_NUM} +./gradlew :sdks:python:container:buildAll -Pdocker-pull-licenses -Pdocker-tag=${RELEASE}_rc{RC_NUM} +``` + +Verify that third party licenses are included by logging in to the images. For Python SDK images, there should be around 80 ~ 100 dependencies. +Please note that dependencies for the SDKs with different Python versions vary. +Need to verify all Python images by replacing `${ver}` in the following command to `python2.7, python3.5, python3.6, python3.7`. Review comment: @ibzib , this part should be verified BEFORE pushing Python images. I'm not sure when this PR can be merged because of website issues, so explicitly pinging you here. ## File path: website/src/contribute/release-guide.md ## @@ -699,7 +711,18 @@ done * Build Java images and push to DockerHub. ``` -./gradlew :sdks:java:container:dockerPush -Pdocker-pull-licenses -Pdocker-tag=${RELEASE}_rc{RC_NUM} +./gradlew :sdks:java:container:docker -Pdocker-pull-licenses -Pdocker-tag=${RELEASE}_rc{RC_NUM} +``` + +Verify that third party licenses are included by logging in to the images. For Java SDK images, there should be around 1400 dependencies. +``` +docker run -it --entrypoint=/bin/bash apache/beam_java_sdk:${RELEASE}_rc{RC_NUM} +ls -al /opt/apache/beam/third_party_licenses/ | wc -l +``` + +After verifying the third party licenses are included correctly, push the images to DockerHub. +``` Review comment: @ibzib , this part changed slightly. Please note the change from `dockerPush` to `docker` at L714. After creating the Java image, we should verify it BEFORE pushing to DockerHub. I'm not sure when this PR can be merged because of website issues, so explicitly pinging you here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType
TheNeuralBit commented on a change in pull request #11272: URL: https://github.com/apache/beam/pull/11272#discussion_r418765847 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import java.time.LocalDate; +import org.apache.beam.sdk.schemas.Schema; + +/** + * A date without a time-zone. + * + * It cannot represent an instant on the time-line without additional information such as an + * offset or time-zone. + */ +public class Date implements Schema.LogicalType { Review comment: I guess `java.sql.Date` is another option for a java type backed by millis. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] Hannah-Jiang commented on pull request #11551: Cherrypick to release-2.21.0
Hannah-Jiang commented on pull request #11551: URL: https://github.com/apache/beam/pull/11551#issuecomment-622595455 > @Hannah-Jiang I'm not sure what, if anything really needs to be cherry-picked here. Java licenses were already cherry-picked in #11421, and the remainder of the changes are just usability improvements if I understand correctly. As long as licenses are present in the release images, I think we should be fine. WDYT? Yes, this is a great idea. That PR functions well and this cherry pick is mainly for improvement. I tried to create images from release branch, and they look good. Please feel free to skip this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType
TheNeuralBit commented on a change in pull request #11272: URL: https://github.com/apache/beam/pull/11272#discussion_r418764453 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import java.time.LocalDate; +import org.apache.beam.sdk.schemas.Schema; + +/** + * A date without a time-zone. + * + * It cannot represent an instant on the time-line without additional information such as an + * offset or time-zone. + */ +public class Date implements Schema.LogicalType { Review comment: That is unfortunate... but what in-memory type should we use instead? joda.time.LocalDate uses a millisecond long, do we want to add another joda dependency? We could access the base type (wire format type) directly in SQL with [Row#getBaseValue](https://github.com/apache/beam/blob/7b890654e6bbfcab79a2f5677f1badd54bd444aa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L424), but unfortunately Rows store logical types as the input type (in memory format type), so that wouldn't actually avoid a conversion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11592: [DO NOT REVIEW] Test only
boyuanzz commented on pull request #11592: URL: https://github.com/apache/beam/pull/11592#issuecomment-622592085 Run Dataflow PortabilityApi ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on a change in pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback
ibzib commented on a change in pull request #11591: URL: https://github.com/apache/beam/pull/11591#discussion_r418762563 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -247,25 +247,27 @@ public void reduce(Iterable> iterable, Collector transformAndTimerId, Timer timerValue) -> { -FnDataReceiver fnTimerReceiver = -bundle.getTimerReceivers().get(transformAndTimerId); -Preconditions.checkNotNull( -fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId); -try { - fnTimerReceiver.accept(timerValue); -} catch (Exception e) { - throw new RuntimeException( - String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); -} - }, - currentTimerKey); +while (timerInternals.hasPendingTimers()) { Review comment: As expected, Spark is failing `test_pardo_timers` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on a change in pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback
ibzib commented on a change in pull request #11591: URL: https://github.com/apache/beam/pull/11591#discussion_r418761275 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -247,25 +247,27 @@ public void reduce(Iterable> iterable, Collector transformAndTimerId, Timer timerValue) -> { -FnDataReceiver fnTimerReceiver = -bundle.getTimerReceivers().get(transformAndTimerId); -Preconditions.checkNotNull( -fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId); -try { - fnTimerReceiver.accept(timerValue); -} catch (Exception e) { - throw new RuntimeException( - String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); -} - }, - currentTimerKey); +while (timerInternals.hasPendingTimers()) { Review comment: Good question. Spark basically does the same thing here (ideally this code should be shared..) @mxm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback
ibzib commented on pull request #11591: URL: https://github.com/apache/beam/pull/11591#issuecomment-622589471 Run Python Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback
ibzib commented on pull request #11591: URL: https://github.com/apache/beam/pull/11591#issuecomment-622589337 Run Java Spark PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType
TheNeuralBit commented on a change in pull request #11272: URL: https://github.com/apache/beam/pull/11272#discussion_r418759689 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java ## @@ -427,17 +430,12 @@ private static Expression value( private static Expression value(Expression value, Schema.FieldType type) { if (type.getTypeName().isLogicalType()) { -Expression millisField = Expressions.call(value, "getMillis"); String logicalId = type.getLogicalType().getIdentifier(); if (logicalId.equals(TimeType.IDENTIFIER)) { - return nullOr(value, Expressions.convert_(millisField, int.class)); -} else if (logicalId.equals(DateType.IDENTIFIER)) { - value = - nullOr( - value, - Expressions.convert_( - Expressions.divide(millisField, Expressions.constant(MILLIS_PER_DAY)), - int.class)); + return nullOr( + value, Expressions.convert_(Expressions.call(value, "getMillis"), int.class)); +} else if (logicalId.equals(SqlTypes.DATE.getIdentifier())) { Review comment: what about using a switch statement? Is there any style guidance on using switch on a String in java? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback
lukecwik commented on a change in pull request #11591: URL: https://github.com/apache/beam/pull/11591#discussion_r418756561 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -247,25 +247,27 @@ public void reduce(Iterable> iterable, Collector transformAndTimerId, Timer timerValue) -> { -FnDataReceiver fnTimerReceiver = -bundle.getTimerReceivers().get(transformAndTimerId); -Preconditions.checkNotNull( -fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId); -try { - fnTimerReceiver.accept(timerValue); -} catch (Exception e) { - throw new RuntimeException( - String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); -} - }, - currentTimerKey); +while (timerInternals.hasPendingTimers()) { Review comment: Doesn't it make sense to make this change in batch as well for spark/flink in FlinkExecutableStageFunction and SparkExecutableStageFunction? Any watermark based timers should continue to be eligible and continue to fire while in batch while the processing time timers should be dropped. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback
lukecwik commented on a change in pull request #11591: URL: https://github.com/apache/beam/pull/11591#discussion_r418756561 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -247,25 +247,27 @@ public void reduce(Iterable> iterable, Collector transformAndTimerId, Timer timerValue) -> { -FnDataReceiver fnTimerReceiver = -bundle.getTimerReceivers().get(transformAndTimerId); -Preconditions.checkNotNull( -fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId); -try { - fnTimerReceiver.accept(timerValue); -} catch (Exception e) { - throw new RuntimeException( - String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); -} - }, - currentTimerKey); +while (timerInternals.hasPendingTimers()) { Review comment: Doesn't it make sense to make this change in batch as well for spark/flink in FlinkExecutableStageFunction and SparkExecutableStageFunction This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11592: [DO NOT REVIEW] Test only
boyuanzz commented on pull request #11592: URL: https://github.com/apache/beam/pull/11592#issuecomment-622583832 Run Dataflow PortabilityApi ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz opened a new pull request #11592: [DO NOT REVIEW] Test only
boyuanzz opened a new pull request #11592: URL: https://github.com/apache/beam/pull/11592 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] ibzib opened a new pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback
ibzib opened a new pull request #11591: URL: https://github.com/apache/beam/pull/11591 I had to resolve a minor merge conflict between 325e0f1 and 8de324f22ca04b3716abf58ba77c2a3c117263a2. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build
[GitHub] [beam] allenpradeep commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'
allenpradeep commented on pull request #11570: URL: https://github.com/apache/beam/pull/11570#issuecomment-622580239 This is great niel. With these changes, there are 3 modes of using SpannerIO write. a) Use the conventional way(as it was till now) with a grouping factor where data is grouped, sorted, batched and written as per parameters b) Batching without grouping - Set grouping factor as 1 with a larger batched bytes or cells. This will just ensure data is just batched without sort. c) No Batching - Set any of the max rows or max mutations or batch bytes to 0 or 1. Questions: 1) What mode should our import pipeline use? Should it use option b as data in AVRO seems already sorted? 2) Where should we document these modes of operation so that some customer can use these? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback
ibzib commented on pull request #11492: URL: https://github.com/apache/beam/pull/11492#issuecomment-622576973 Flakes are BEAM-9767 and BEAM-8912. I'm going to merge this so we can go ahead with the release. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11585: [BEAM-9860] Make job_endpoint required for PortableRunner
ibzib commented on pull request #11585: URL: https://github.com/apache/beam/pull/11585#issuecomment-622575793 Test flake: BEAM-9767 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik opened a new pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance
lukecwik opened a new pull request #11590: URL: https://github.com/apache/beam/pull/11590 Existing performance suffered because of the use of timed waits and also due to the increase in number of "threading" objects being invoked. Using the benchmark from https://issues.apache.org/jira/browse/BEAM-8944?focusedCommentId=17074641=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17074641 Improved performance from 5.52s down to 1.82s which is faster then the ThreadPoolExecutor with 12 threads (the default being used before) but still slower then the ThreadPoolExecutor with 1 thread. The prior performance was: ``` uses 5.52247905731 584051 function calls in 5.495 seconds Ordered by: internal time ncalls tottime percall cumtime percall filename:lineno(function) 862504.3860.0004.3860.000 {method 'acquire' of 'thread.lock' objects} 198000.1500.0004.7730.000 Queue.py:150(get) 296980.1230.0000.3650.000 threading.py:373(notify) 99000.1140.0000.1140.000 {method '__enter__' of 'thread.lock' objects} 98990.1060.0000.4630.000 thread_pool_executor.py:103(accepted_work) 91880.0890.0004.0950.000 threading.py:309(wait) 99030.0780.0000.1290.000 threading.py:260(__init__) 99000.0610.0001.0920.000 thread_pool_executor.py:133(submit) 98990.0550.0000.3580.000 threading.py:576(set) 388860.0440.0000.2850.000 threading.py:300(_is_owned) 99000.0410.0000.1860.000 _base.py:318(__init__) 289870.0230.0000.0310.000 Queue.py:200(_qsize) 99000.0220.0000.0300.000 threading.py:132(__init__) 388870.0210.0000.0210.000 threading.py:64(_note) 98990.0190.0000.1630.000 threading.py:400(notifyAll) 197990.0180.0000.0230.000 Queue.py:208(_get) 388870.0160.0000.0160.000 {method 'release' of 'thread.lock' objects} 99030.0160.0000.1450.000 threading.py:242(Condition) 198060.0140.0000.0140.000 threading.py:59(__init__) 99000.0140.0000.1270.000 threading.py:285(__enter__) 91880.0130.0000.0900.000 threading.py:297(_acquire_restore) 99000.0130.0000.0430.000 threading.py:114(RLock) 99000.0110.0000.0110.000 thread_pool_executor.py:34(__init__) 388860.0110.0000.0110.000 {len} 99000.0100.0000.0120.000 threading.py:288(__exit__) 91880.0080.0000.0120.000 threading.py:294(_release_save) 190920.0060.0000.0060.000 {thread.allocate_lock} 197990.0050.0000.0050.000 {method 'popleft' of 'collections.deque' objects} 91880.0040.0000.0040.000 {method 'append' of 'list' objects} 98990.0030.0000.0030.000 {method 'remove' of 'list' objects} 99000.0020.0000.0020.000 {method '__exit__' of 'thread.lock' objects} 10.0000.0000.0000.000 {thread.start_new_thread} 10.0000.0000.0000.000 threading.py:647(__init__) 10.0000.0000.0000.000 threading.py:717(start) 10.0000.0000.0000.000 thread_pool_executor.py:58(__init__) 10.0000.0000.0000.000 threading.py:620(_newname) 10.0000.0000.0000.000 _weakrefset.py:83(add) 10.0000.0000.0000.000 threading.py:597(wait) 20.0000.0000.0000.000 threading.py:561(__init__) 10.0000.0000.0000.000 threading.py:1142(currentThread) 20.0000.0000.0000.000 threading.py:542(Event) 10.0000.0000.0000.000 threading.py:700(_set_daemon) 10.0000.0000.0000.000 threading.py:1014(daemon) 20.0000.0000.0000.000 threading.py:570(isSet) 10.0000.0000.0000.000 threading.py:999(daemon) 10.0000.0000.0000.000 {thread.get_ident} 10.0000.0000.0000.000 {method 'add' of 'set' objects} 10.0000.0000.0000.000 {method 'disable' of '_lsprof.Profiler' objects} ``` The new performance is: ``` uses 1.82196497917 504935 function calls in 1.787 seconds Ordered by: internal time ncalls tottime percall cumtime percall filename:lineno(function) 622330.9400.0000.9400.000 {method 'acquire' of 'thread.lock' objects} 99000.1420.000
[GitHub] [beam] lukecwik commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance
lukecwik commented on pull request #11590: URL: https://github.com/apache/beam/pull/11590#issuecomment-62261 R: @mxm @pabloem CC: @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] allenpradeep commented on a change in pull request #11532: [BEAM-9822] Disable grouping when streaming
allenpradeep commented on a change in pull request #11532: URL: https://github.com/apache/beam/pull/11532#discussion_r418711729 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ## @@ -1066,7 +1079,12 @@ public SpannerWriteResult expand(PCollection input) { spec.getBatchSizeBytes(), spec.getMaxNumMutations(), spec.getMaxNumRows(), - spec.getGroupingFactor(), + // Do not group on streaming unless explicitly set. + spec.getGroupingFactor() + .orElse( + input.isBounded() == IsBounded.BOUNDED Review comment: I was wondering if this condition needs to be based on the input passed to this stage or based on some parameter from the user? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11456: [BEAM-7554] Add MillisInstant logical type to replace DATETIME
TheNeuralBit commented on a change in pull request #11456: URL: https://github.com/apache/beam/pull/11456#discussion_r418708682 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MillisInstant.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import org.joda.time.Instant; +import org.joda.time.ReadableInstant; + +/** A timestamp represented as milliseconds since the epoch. */ +public class MillisInstant extends MillisType { + public static final String IDENTIFIER = "beam:logical_type:millis_instant:v1"; Review comment: @reuvenlax what do you think about making this type (and maybe NanosInstant as well) parameterized by timezone? The [arrow approach](https://github.com/apache/arrow/blob/master/format/Schema.fbs#L178) seems useful: un-specified timezone indicates time-zone naive (e.g. joda time Instant), otherwise time zone parameter should reference a value in tzdata (and would map to joda time DateTime). cc: @alexvanboxel This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11575: [BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform
robertwb commented on pull request #11575: URL: https://github.com/apache/beam/pull/11575#issuecomment-622533616 The change looks good to me, once all its prerequisites get in. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11452: [BEAM-9692] Move apply_Read to PTransformOverride
robertwb commented on a change in pull request #11452: URL: https://github.com/apache/beam/pull/11452#discussion_r418694513 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -117,13 +117,15 @@ class DataflowRunner(PipelineRunner): # TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import CombineValuesPTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride - from apache_beam.runners.dataflow.ptransform_overrides import ReadPTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import JrhReadPTransformOverride + from apache_beam.runners.dataflow.ptransform_overrides import ReadPTransformOverride + from apache_beam.runners.dataflow.ptransform_overrides import NativeReadPTransformOverride - # Thesse overrides should be applied before the proto representation of the + # These overrides should be applied before the proto representation of the # graph is created. _PTRANSFORM_OVERRIDES = [ - CombineValuesPTransformOverride() + CombineValuesPTransformOverride(), Review comment: Correct, this change does not change the existing behavior, but the concern is that the existing behavior might be sub-optimal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11403: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch
ibzib commented on pull request #11403: URL: https://github.com/apache/beam/pull/11403#issuecomment-622523122 Run Dataflow PortabilityApi ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] allenpradeep commented on pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
allenpradeep commented on pull request #11528: URL: https://github.com/apache/beam/pull/11528#issuecomment-622521247 LGTM. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11551: Cherrypick to release-2.21.0
ibzib commented on pull request #11551: URL: https://github.com/apache/beam/pull/11551#issuecomment-622517286 @Hannah-Jiang I'm not sure what, if anything really needs to be cherry-picked here. Java licenses were already cherry-picked in #11421, and the remainder of the changes are just usability improvements if I understand correctly. As long as licenses are present in the release images, I think we should be fine. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform
Ardagan commented on pull request #11582: URL: https://github.com/apache/beam/pull/11582#issuecomment-622514637 @rezarokni FYI This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11589: WIP: [BEAM-9623] Support for SQL TableProviders in Python SqlTransform
TheNeuralBit commented on pull request #11589: URL: https://github.com/apache/beam/pull/11589#issuecomment-622511334 Run XVR_Flink PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit opened a new pull request #11589: WIP: [BEAM-9623] Support for SQL TableProviders in Python SqlTransform
TheNeuralBit opened a new pull request #11589: URL: https://github.com/apache/beam/pull/11589 Currently this just hard-codes support for DataCatalogTableProvider. We should instead make it possible to specify and configure HCatalog instances as well. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform
Ardagan commented on pull request #11582: URL: https://github.com/apache/beam/pull/11582#issuecomment-622510185 > This is looking good. Something I think we should do is change the underlying implementation of ReadFromBigQuery This would be a bigger change than what I'd like to put in this PR. Can we do it in separate PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] Ardagan commented on a change in pull request #11477: [BEAM-9650] Add PeriodicSequence generator.
Ardagan commented on a change in pull request #11477: URL: https://github.com/apache/beam/pull/11477#discussion_r418673674 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java ## @@ -21,33 +21,69 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; -import java.util.List; +import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.schemas.JavaFieldSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.splittabledofn.Sizes; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.joda.time.Duration; import org.joda.time.Instant; /** - * A {@link PTransform} which generates a sequence of timestamped elements at given interval in - * runtime. + * A {@link PTransform} which generates a sequence of timestamped elements at given runtime + * interval. * - * Receives a PCollection> where each element triggers the generation of sequence and - * has following elements: 0: first element timestamp 1: last element timestamp 2: interval + * Transform will not output elements prior to target time. Transform can output elements at any + * time after target time. * - * All elements that have timestamp in the past will be output right away. Elements that have - * timestamp in the future will be delayed. - * - * Transform will not output elements prior to target timestamp. Transform can output elements at - * any time after target timestamp. + * Multiple elements can be output at given moment if their timestamp is earlier than current + * time. */ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) -public class PeriodicSequence extends PTransform>, PCollection> { +public class PeriodicSequence +extends PTransform, PCollection> { + + @DefaultSchema(JavaFieldSchema.class) + public static class SequenceDefinition { +public Instant first; +public Instant last; +public Long durationMilliSec; + +public SequenceDefinition() {} + +public SequenceDefinition(Instant first, Instant last, Duration duration) { + this.first = first; + this.last = last; + this.durationMilliSec = duration.getMillis(); +} + +@Override +public boolean equals(Object obj) { + if (this == obj) { +return true; + } + + if (obj == null || obj.getClass() != this.getClass()) { +return false; + } + + SequenceDefinition src = (SequenceDefinition) obj; + return src.first.equals(this.first) + && src.last.equals(this.last) + && src.durationMilliSec.equals(this.durationMilliSec); +} + +@Override +public int hashCode() { + int result = Objects.hash(first, last, durationMilliSec); + return result; +} Review comment: I tried to use it, but didn't manage to get AutoValue work. It failed to properly detect constructor for generated class. I tried to debug it, but it would be better to handle in separate PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
robertwb commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r418651280 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -1271,6 +1271,11 @@ message DeferredArtifactPayload { message ArtifactStagingToRolePayload { // A generated staged name (relative path under staging directory). string staged_name = 1; + + // (Optional) An artifact name when a runner supports it. + // For example, DataflowRunner requires predefined names for some artifacts + // such as "dataflow-worker.jar", "windmill_main". + string alias_name = 2; Review comment: Why does this have to be distinct from staged_name? (Also, eventually we hope that designated roles can remove the need for magic names.) ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java ## @@ -210,56 +209,55 @@ public static Environment createProcessEnvironment( } } - private static List getArtifacts(List stagingFiles) { -Set pathsToStage = Sets.newHashSet(stagingFiles); + public static List getArtifacts( + List stagingFiles, StagingFileNameGenerator generator) { ImmutableList.Builder artifactsBuilder = ImmutableList.builder(); -for (String path : pathsToStage) { +for (String path : ImmutableSet.copyOf(stagingFiles)) { Review comment: Isn't order important to preserve? (Also, why do we need to make a copy?) ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ## @@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) { "Executing pipeline on the Dataflow Service, which will have billing implications " + "related to Google Compute Engine usage and other Google Cloud Services."); -List packages = options.getStager().stageDefaultFiles(); +// Capture the sdkComponents for look up during step translations +SdkComponents sdkComponents = SdkComponents.create(); + +DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); +String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); +RunnerApi.Environment defaultEnvironmentForDataflow = +Environments.createDockerEnvironment(workerHarnessContainerImageURL); + +sdkComponents.registerEnvironment( +defaultEnvironmentForDataflow +.toBuilder() +.addAllDependencies(getDefaultArtifacts()) Review comment: How does this get invoked for cross-language pipelines? ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ## @@ -336,25 +323,26 @@ public DataflowPackage stageToFile( final AtomicInteger numCached = new AtomicInteger(0); List> destinationPackages = new ArrayList<>(); -for (String classpathElement : classpathElements) { - DataflowPackage sourcePackage = new DataflowPackage(); - if (classpathElement.contains("=")) { -String[] components = classpathElement.split("=", 2); -sourcePackage.setName(components[0]); -sourcePackage.setLocation(components[1]); - } else { -sourcePackage.setName(null); -sourcePackage.setLocation(classpathElement); +for (StagedFile classpathElement : classpathElements) { + DataflowPackage targetPackage = classpathElement.getStagedPackage(); + String source = classpathElement.getSource(); + if (source.contains("=")) { Review comment: Why do we have to handle this here and above? ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ## @@ -772,6 +783,88 @@ private Debuggee registerDebuggee(CloudDebugger debuggerClient, String uniquifie } } + private List stageArtifacts(RunnerApi.Pipeline pipeline) { +ImmutableList.Builder filesToStageBuilder = ImmutableList.builder(); +for (Map.Entry entry : +pipeline.getComponents().getEnvironmentsMap().entrySet()) { + for (RunnerApi.ArtifactInformation info : entry.getValue().getDependenciesList()) { +if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn())) { + throw new RuntimeException( + String.format("unsupported artifact type %s", info.getTypeUrn())); +} +RunnerApi.ArtifactFilePayload filePayload; +try { + filePayload = RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload()); +} catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Error parsing artifact file payload.", e); +} +if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO) +.equals(info.getRoleUrn())) { + throw new RuntimeException( +
[GitHub] [beam] ibzib commented on pull request #11585: [BEAM-9860] Make job_endpoint required for PortableRunner
ibzib commented on pull request #11585: URL: https://github.com/apache/beam/pull/11585#issuecomment-622500284 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #11470: [BEAM-9791] Add precommit for dataflow runner v2
tvalentyn commented on pull request #11470: URL: https://github.com/apache/beam/pull/11470#issuecomment-622495831 nvm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #11470: [BEAM-9791] Add precommit for dataflow runner v2
tvalentyn commented on pull request #11470: URL: https://github.com/apache/beam/pull/11470#issuecomment-622494788 What is Dataflow runner V2? There are no details here or in the JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.
chamikaramj commented on pull request #11557: URL: https://github.com/apache/beam/pull/11557#issuecomment-622489105 Run XVR_Spark PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks
mxm commented on pull request #11558: URL: https://github.com/apache/beam/pull/11558#issuecomment-622489303 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.
chamikaramj commented on pull request #11557: URL: https://github.com/apache/beam/pull/11557#issuecomment-622489032 Run XVR_Flink PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback
mxm commented on pull request #11492: URL: https://github.com/apache/beam/pull/11492#issuecomment-622488737 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #10384: [BEAM-8933] Utilities for converting Arrow schemas and reading Arrow batches as Rows
TheNeuralBit commented on a change in pull request #10384: URL: https://github.com/apache/beam/pull/10384#discussion_r418649632 ## File path: sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java ## @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.arrow; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.util.Text; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.CachingFactory; +import org.apache.beam.sdk.schemas.Factory; +import org.apache.beam.sdk.schemas.FieldValueGetter; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +/** + * Utilities to create {@link Iterable}s of Beam {@link Row} instances backed by Arrow record + * batches. + */ +@Experimental(Experimental.Kind.SCHEMAS) +public class ArrowConversion { + /** Converts Arrow schema to Beam row schema. */ + public static Schema toBeamSchema(org.apache.arrow.vector.types.pojo.Schema schema) { +return toBeamSchema(schema.getFields()); + } + + public static Schema toBeamSchema(List fields) { +Schema.Builder builder = Schema.builder(); +for (org.apache.arrow.vector.types.pojo.Field field : fields) { + Field beamField = toBeamField(field); + builder.addField(beamField); +} +return builder.build(); + } + + /** Get Beam Field from Arrow Field. */ + private static Field toBeamField(org.apache.arrow.vector.types.pojo.Field field) { +FieldType beamFieldType = toFieldType(field.getFieldType(), field.getChildren()); +return Field.of(field.getName(), beamFieldType); + } + + /** Converts Arrow FieldType to Beam FieldType. */ + private static FieldType toFieldType( + org.apache.arrow.vector.types.pojo.FieldType arrowFieldType, + List childrenFields) { +FieldType fieldType = +arrowFieldType +.getType() +.accept( +new ArrowType.ArrowTypeVisitor() { + @Override + public FieldType visit(ArrowType.Null type) { +throw new IllegalArgumentException( +"Type \'" + type.toString() + "\' not supported."); + } + + @Override + public FieldType visit(ArrowType.Struct type) { +return FieldType.row(toBeamSchema(childrenFields)); + } + + @Override + public FieldType visit(ArrowType.List type) { +checkArgument( +childrenFields.size() == 1, +"Encountered " ++ childrenFields.size() ++ " child fields for list type, expected 1"); +return FieldType.array(toBeamField(childrenFields.get(0)).getType()); + } + + @Override + public FieldType visit(ArrowType.FixedSizeList type) { +throw new IllegalArgumentException( +"Type \'" + type.toString() + "\' not supported."); + } + + @Override + public FieldType visit(ArrowType.Union type) { +throw new IllegalArgumentException( +"Type \'" + type.toString() + "\' not supported."); + } + + @Override + public FieldType visit(ArrowType.Map
[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks
mxm commented on pull request #11558: URL: https://github.com/apache/beam/pull/11558#issuecomment-622481501 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks
mxm commented on pull request #11558: URL: https://github.com/apache/beam/pull/11558#issuecomment-622480730 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #11444: [BEAM-9701] loosen fastavro version restriction for Python 3.8 support
tvalentyn commented on pull request #11444: URL: https://github.com/apache/beam/pull/11444#issuecomment-622469723 You would need to install beam from HEAD though, since those changes are not yet included in released versions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11456: [BEAM-7554] Add MillisInstant logical type to replace DATETIME
TheNeuralBit commented on a change in pull request #11456: URL: https://github.com/apache/beam/pull/11456#discussion_r418628471 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java ## @@ -257,10 +257,8 @@ private Transform( // Combining over a single field, so extract just that field. combined = (combined == null) - ? byFields.aggregateFieldBaseValue( - inputs.get(0), combineFn, fieldAggregation.outputField) - : combined.aggregateFieldBaseValue( - inputs.get(0), combineFn, fieldAggregation.outputField); + ? byFields.aggregateField(inputs.get(0), combineFn, fieldAggregation.outputField) + : combined.aggregateField(inputs.get(0), combineFn, fieldAggregation.outputField); Review comment: What about adding the option for SqlTransform to convert unknown logical types to their base type at the input? I think that behavior would be effectively the same This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11456: [BEAM-7554] Add MillisInstant logical type to replace DATETIME
TheNeuralBit commented on a change in pull request #11456: URL: https://github.com/apache/beam/pull/11456#discussion_r418625619 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java ## @@ -419,7 +420,6 @@ public int hashCode() { FLOAT, DOUBLE, STRING, // String. -DATETIME, // Date and time. Review comment: Yeah we could deprecate it for a release or two before removing. Would we also need to keep support for the primitive DATETIME in IOs and SQL? I'd need to think about how that transition would work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on a change in pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
tvalentyn commented on a change in pull request #11086: URL: https://github.com/apache/beam/pull/11086#discussion_r418609817 ## File path: sdks/python/apache_beam/io/gcp/bigquery.py ## @@ -45,8 +45,8 @@ may use some caching techniques to share the side inputs between calls in order to avoid excessive reading::: - main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource() - side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource() + main_table = pipeline | 'VeryBig' >> beam.io.ReadFroBigQuery(...) Review comment: Typo in From ## File path: sdks/python/apache_beam/io/gcp/bigquery.py ## @@ -78,6 +72,12 @@ or a table. Pipeline construction will fail with a validation error if neither or both are specified. +When reading from BigQuery using `BigQuerySource`, bytes are returned as Review comment: Can we add some guidance when to use which? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
tvalentyn commented on pull request #11086: URL: https://github.com/apache/beam/pull/11086#issuecomment-622455437 Run Python 2 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.
mxm commented on pull request #11557: URL: https://github.com/apache/beam/pull/11557#issuecomment-622450697 > Good point about needing to remove Kafka, etc. for Flink. (Not sure how that would interact with its use via an embedded environment; I'll let that be a later PR.) I think the embedded environment should have all the required dependencies in the classpath, similarly to how the expansion service has them at expansion time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on a change in pull request #11557: [BEAM-9845] Stage artifacts over expansion service.
mxm commented on a change in pull request #11557: URL: https://github.com/apache/beam/pull/11557#discussion_r418609854 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -211,6 +211,8 @@ def __init__(self, runner=None, options=None, argv=None): experiments.append('beam_fn_api') self._options.view_as(DebugOptions).experiments = experiments +self.local_tempdir = tempfile.mkdtemp(prefix='beam-pipeline-temp') Review comment: That's fine then, wasn't sure whether this is a unique directory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] suztomo commented on pull request #11586: [BEAM-9444] GCP Library BOM as a Map
suztomo commented on pull request #11586: URL: https://github.com/apache/beam/pull/11586#issuecomment-622447857 @iemejia Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #11444: [BEAM-9701] loosen fastavro version restriction for Python 3.8 support
tvalentyn commented on pull request #11444: URL: https://github.com/apache/beam/pull/11444#issuecomment-622447887 @vishalaj1, yes, please try again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11554: Website - Migrated Jekyll to Hugo
robertwb commented on a change in pull request #11554: URL: https://github.com/apache/beam/pull/11554#discussion_r418604535 ## File path: website/www/site/content/en/blog/beam-2.19.0.md ## @@ -25,7 +24,7 @@ limitations under the License. --> We are happy to present the new 2.19.0 release of Beam. This release includes both improvements and new functionality. -See the [download page]({{ site.baseurl }}/get-started/downloads/#2190-2020-02-04) for this release. Review comment: Won't this change cause issues with staging? ## File path: website/www/site/content/en/blog/beam-2.20.0.md ## @@ -43,7 +41,6 @@ Python SDK: . (#10223). * [BEAM-8841](https://issues.apache.org/jira/browse/BEAM-8841) Added ability to write to BigQuery via Avro file loads * [BEAM-9228](https://issues.apache.org/jira/browse/BEAM-9228) Direct runner for FnApi supports further parallelism * [BEAM-8550](https://issues.apache.org/jira/browse/BEAM-8550) Support for @RequiresTimeSortedInput in Flink and Spark -* [BEAM-6857](https://issues.apache.org/jira/browse/BEAM-6857) Added support for dynamic timers Review comment: Why was this removed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks
mxm commented on pull request #11558: URL: https://github.com/apache/beam/pull/11558#issuecomment-622442663 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.
robertwb commented on pull request #11557: URL: https://github.com/apache/beam/pull/11557#issuecomment-622441649 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.
robertwb commented on pull request #11557: URL: https://github.com/apache/beam/pull/11557#issuecomment-622441715 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] vishalaj1 commented on pull request #11444: [BEAM-9701] loosen fastavro version restriction for Python 3.8 support
vishalaj1 commented on pull request #11444: URL: https://github.com/apache/beam/pull/11444#issuecomment-622430319 Hi, I had this problem when i tried installing 3 days back. https://github.com/fastavro/fastavro/issues/427 Can you please let me know if this is fixed as well? Regards, Vishal This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #11588: [BEAM-9776] Fixes filesystem not found error
kennknowles commented on pull request #11588: URL: https://github.com/apache/beam/pull/11588#issuecomment-622427418 @apilloud any pitfalls here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] sjvanrossum opened a new pull request #11588: [BEAM-9776] Fixes filesystem not found error
sjvanrossum opened a new pull request #11588: URL: https://github.com/apache/beam/pull/11588 Registers standard filesystems. This fixes the linked JIRA issue, which occurs if a virtual filesystem table is queried before a local filesystem table is queried. This assumes that Beam SQL Shell is compiled as mentioned at https://beam.apache.org/documentation/dsls/sql/shell/. Without bundling Java IO the error will still occur. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks
mxm commented on pull request #11558: URL: https://github.com/apache/beam/pull/11558#issuecomment-622401634 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks
mxm commented on pull request #11558: URL: https://github.com/apache/beam/pull/11558#issuecomment-622401502 I've created a new dashboard here includes data from the batch tests and the two new streaming tests: https://apache-beam-testing.appspot.com/explore?dashboard=5751884853805056 ![Untitled](https://user-images.githubusercontent.com/837221/80810980-46bf9400-8bc5-11ea-99e4-bb042d23a74d.png) The streaming tests have data for the runtime and for the checkpoint duration (min/max/average). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] suztomo commented on pull request #11586: [BEAM-9444] GCP Library BOM as a Map
suztomo commented on pull request #11586: URL: https://github.com/apache/beam/pull/11586#issuecomment-622398262 Java precommit check failed on org.apache.beam.runners.flink.FlinkSavepointTest.testSavepointRestorePortable “test timed out after 60 seconds“. This seems Transient failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia commented on pull request #11586: [BEAM-9444] GCP Library BOM as a Map
iemejia commented on pull request #11586: URL: https://github.com/apache/beam/pull/11586#issuecomment-622362982 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia commented on pull request #11586: [BEAM-9444] GCP Library BOM as a Map
iemejia commented on pull request #11586: URL: https://github.com/apache/beam/pull/11586#issuecomment-622362502 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11438: [BEAM-9505] Remove spurious error message in SpannerIO when streaming.
nielm commented on pull request #11438: URL: https://github.com/apache/beam/pull/11438#issuecomment-622348150 @chamikaramj can you merge please :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on a change in pull request #11578: [BEAM-8025] Increase the number of retrials en retrial delay in case of load in CassandraIOTest
mxm commented on a change in pull request #11578: URL: https://github.com/apache/beam/pull/11578#discussion_r418487133 ## File path: sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java ## @@ -153,18 +153,22 @@ public static void beforeClass() throws Exception { private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) { int tried = 0; Review comment: ```suggestion Exception exception = null; int tried = 0; ``` ## File path: sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java ## @@ -153,18 +153,22 @@ public static void beforeClass() throws Exception { private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) { int tried = 0; -while (tried < 3) { +int delay = 5000; +while (tried < 5) { try { return builder.buildNativeCluster(); } catch (NoHostAvailableException e) { Review comment: I couldn't find that this exception of type RuntimeException is thrown during cluster startup. We may want to catch all `Exception` here. ## File path: sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java ## @@ -153,18 +153,22 @@ public static void beforeClass() throws Exception { private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) { int tried = 0; -while (tried < 3) { +int delay = 5000; +while (tried < 5) { try { return builder.buildNativeCluster(); } catch (NoHostAvailableException e) { tried++; Review comment: ```suggestion if (exception == null) { exception = e; } else { exception.addSuppressed(e); } tried++; ``` ## File path: sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java ## @@ -153,18 +153,22 @@ public static void beforeClass() throws Exception { private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) { int tried = 0; -while (tried < 3) { +int delay = 5000; +while (tried < 5) { try { return builder.buildNativeCluster(); } catch (NoHostAvailableException e) { tried++; try { - Thread.sleep(1000L); + Thread.sleep(delay); } catch (InterruptedException e1) { Review comment: ```suggestion } catch (InterruptedException e1) { Thread.currentThread().interrupt(); throw new RuntimeException("interrupted"); } ``` ## File path: sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java ## @@ -153,18 +153,22 @@ public static void beforeClass() throws Exception { private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) { int tried = 0; -while (tried < 3) { +int delay = 5000; +while (tried < 5) { try { return builder.buildNativeCluster(); } catch (NoHostAvailableException e) { tried++; try { - Thread.sleep(1000L); + Thread.sleep(delay); } catch (InterruptedException e1) { } } } -throw new RuntimeException("Unable to create embedded Cassandra cluster"); +throw new RuntimeException( +String.format( +"Unable to create embedded Cassandra cluster: tried %d times with %d delay", +tried, delay)); Review comment: ```suggestion throw new RuntimeException( String.format( "Unable to create embedded Cassandra cluster: tried %d times with %d delay", tried, delay), exception); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…
rahul8383 commented on pull request #11581: URL: https://github.com/apache/beam/pull/11581#issuecomment-622308296 I agree that this is not a scalable solution. Providing a Calcite RelDataType Mapping for every Logical Type defined(which is the solution presented in this PR) by every IO is not scalable. Another approach to solving the problem is: Provide Calcite RelDataType Mapping depending on the Base Type defined in the logical type. But, as CHAR and VARCHAR logical types have Base Type as STRING, we have to choose a default Calcite RelDataType Mapping. The same thing applies to BINARY and VARBINARY logical types. We also have to choose a default RelDataType Mapping if the Base Type is DATETIME. By using this approach, I think we might be missing some built-in Aggregation functions provided by Calcite for specific types. I was also thinking if we can use the IDENTIFIER of the logical type to determine the corresponding Calcite RelDataType. But, as the IDENTIFIER type is String and not an enum, it cannot be used. For example, all the logical types defined by JdbcIO use java.sql.JDBCType name as the IDENTIFIER. Please correct me if my understanding is incorrect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.
robertwb commented on pull request #11557: URL: https://github.com/apache/beam/pull/11557#issuecomment-622304615 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] henryken commented on a change in pull request #11564: [Beam-9679] Add Core Transforms section / Map lesson to the Go SDK katas
henryken commented on a change in pull request #11564: URL: https://github.com/apache/beam/pull/11564#discussion_r418443043 ## File path: learning/katas/go/Core Transforms/Map/ParDo/pkg/task/task.go ## @@ -18,8 +18,9 @@ package task import "github.com/apache/beam/sdks/go/pkg/beam" func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { - processFn := func(element int) int { - return element * 10 - } - return beam.ParDo(s, processFn, input) + return beam.ParDo(s, multiplyBy10Fn, input) } + +func multiplyBy10Fn(element int) int { + return element * 10 Review comment: The "* 10" is not covered by the answer placeholder. Is it intentional? ![image](https://user-images.githubusercontent.com/5459430/80789287-24764800-8bbe-11ea-9594-d279d633e9e7.png) ## File path: learning/katas/go/Core Transforms/Map/lesson-info.yaml ## @@ -20,5 +20,4 @@ content: - ParDo - ParDo OneToMany -- MapElements -- FlatMapElements +- ParDo struct Review comment: Can maybe slightly rename to "ParDo Struct"? ## File path: learning/katas/go/Core Transforms/Map/ParDo struct/pkg/task/task.go ## @@ -18,10 +18,7 @@ package task import "github.com/apache/beam/sdks/go/pkg/beam" func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { - processFn := { - Factor: 5, - } - return beam.ParDo(s, processFn, input) + return beam.ParDo(s, {Factor: 5}, input) Review comment: How about having the whole "{...}" be part of the answer placeholder? ![image](https://user-images.githubusercontent.com/5459430/80789407-8171fe00-8bbe-11ea-8ec6-e8ed8989ec2e.png) ## File path: learning/katas/go/Core Transforms/Map/ParDo struct/task.md ## @@ -16,10 +16,10 @@ specific language governing permissions and limitations under the License. --> -# Mapping Elements using structs +# Using a struct as a DoFn Review comment: Can we have the ParDo mentioned, e.g. "ParDo - using a struct as a DoFn"? ## File path: learning/katas/go/Core Transforms/Map/ParDo OneToMany/pkg/task/task.go ## @@ -21,10 +21,10 @@ import ( ) func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { - return beam.ParDo(s, processFn, input) + return beam.ParDo(s, tokenizeFn, input) } -func processFn(input string, emit func(out string)) { +func tokenizeFn(input string, emit func(out string)) { Review comment: The answer placeholder(s) seems weird here. Maybe it requires a fix? Or alternatively, we can use a similar answer placeholder as in the "ParDo" task, i.e. having just empty "fund ApplyTransforms"? ![image](https://user-images.githubusercontent.com/5459430/80789567-e88fb280-8bbe-11ea-9185-b87c867f1bee.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on a change in pull request #11456: [BEAM-7554] Add MillisInstant logical type to replace DATETIME
reuvenlax commented on a change in pull request #11456: URL: https://github.com/apache/beam/pull/11456#discussion_r418443258 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java ## @@ -257,10 +257,8 @@ private Transform( // Combining over a single field, so extract just that field. combined = (combined == null) - ? byFields.aggregateFieldBaseValue( - inputs.get(0), combineFn, fieldAggregation.outputField) - : combined.aggregateFieldBaseValue( - inputs.get(0), combineFn, fieldAggregation.outputField); + ? byFields.aggregateField(inputs.get(0), combineFn, fieldAggregation.outputField) + : combined.aggregateField(inputs.get(0), combineFn, fieldAggregation.outputField); Review comment: I'm not sure I agree with this change. I think it's important that SQL work over user logical types by interpreting it as the base value. The user writing the SQL statement usually understands the base type of their logical type, and can write the SQL statement appropriately. This will break that. ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java ## @@ -416,36 +417,33 @@ private static Expression value( Expression value = Expressions.convert_( - Expressions.call( - expression, - "getBaseValue", - Expressions.constant(index), - Expressions.constant(convertTo)), - convertTo); + Expressions.call(expression, "getValue", Expressions.constant(index)), convertTo); Review comment: I have the same concern with this change. ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java ## @@ -419,7 +420,6 @@ public int hashCode() { FLOAT, DOUBLE, STRING, // String. -DATETIME, // Date and time. Review comment: I'm a little worried about this. Empirically many users are using schemas. Maybe we should start off by leaving DATETIME around and remove it later in another PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…
rahul8383 commented on pull request #11581: URL: https://github.com/apache/beam/pull/11581#issuecomment-622277282 @reuvenlax I have provided the source table schema and attached the NPE that I have faced in [BEAM-8307](https://issues.apache.org/jira/browse/BEAM-8307) NPE is thrown as Calcite's RelDataType cannot be found for the JdbcIO Logical Type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…
reuvenlax commented on pull request #11581: URL: https://github.com/apache/beam/pull/11581#issuecomment-622273882 I'm wondering if this is a scalable solution. In general SQL is supposed to be able to handle unknown logical types, and simply treat them as the base type. If you'r seeing a NPE, maybe we need to fix that. Where do you see the NPE? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] Akshay-Iyangar commented on pull request #10078: [BEAM-8542] Change write to async in AWS SNS IO & remove retry logic
Akshay-Iyangar commented on pull request #10078: URL: https://github.com/apache/beam/pull/10078#issuecomment-622269940 @aromanenko-dev - addressed the feedback. thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org