[GitHub] [beam] pabloem commented on pull request #11560: Auto-inferring project for ReadFromBigQuery

2020-05-01 Thread GitBox


pabloem commented on pull request #11560:
URL: https://github.com/apache/beam/pull/11560#issuecomment-622670110


   PAssing precommit: 
https://builds.apache.org/job/beam_PostCommit_Python37_PR/132/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11560: Auto-inferring project for ReadFromBigQuery

2020-05-01 Thread GitBox


pabloem commented on pull request #11560:
URL: https://github.com/apache/beam/pull/11560#issuecomment-622669830


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…

2020-05-01 Thread GitBox


rahul8383 commented on pull request #11581:
URL: https://github.com/apache/beam/pull/11581#issuecomment-622661943


   I will make the necessary changes as suggested by @TheNeuralBit in 
https://lists.apache.org/thread.html/r281e2913379c9733f6ac5baa08f361cc4ebe880a9880b2d54d6095b0%40%3Cdev.beam.apache.org%3E



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

2020-05-01 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r418843954



##
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
##
@@ -59,7 +59,7 @@
   + "AL1|2|allergy|Z91.013^Personal history of allergy to sea 
food^ZAL|SEVERE|Swollen face|\r"
   + "AL1|3|allergy|Z91.040^Latex allergy^ZAL|MODERATE|Raised, 
itchy, red rash|",
   // Another ADT Message
-  "MSH|^~\\&|hl7Integration|hl7Integration|ADT^A08|||2.5|\r"
+  
"MSH|^~\\&|hl7Integration|hl7Integration|||20190309132544||ADT^A08|||2.5|\r"

Review comment:
   this time stamp will always be present in valid HL7v2 Messages.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

2020-05-01 Thread GitBox


jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r418843954



##
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
##
@@ -59,7 +59,7 @@
   + "AL1|2|allergy|Z91.013^Personal history of allergy to sea 
food^ZAL|SEVERE|Swollen face|\r"
   + "AL1|3|allergy|Z91.040^Latex allergy^ZAL|MODERATE|Raised, 
itchy, red rash|",
   // Another ADT Message
-  "MSH|^~\\&|hl7Integration|hl7Integration|ADT^A08|||2.5|\r"
+  
"MSH|^~\\&|hl7Integration|hl7Integration|||20190309132544||ADT^A08|||2.5|\r"

Review comment:
   this time stamp will always be present in valid HL7v2 Messages.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf opened a new pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

2020-05-01 Thread GitBox


jaketf opened a new pull request #11596:
URL: https://github.com/apache/beam/pull/11596


   This PR is an experiment to illustrate a potential strategy for implementing 
HL7v2IO.ListMessages as a splittable DoFn that carves the sendTime dimension up 
into time ranges and uses the Messages.List API.
   
   ## DISCLAIMER
   This PR is currently a discussion piece to help drive the decision of the 
future of HL7v2IO.ListMessages. 
   
   It has excessive logging for demonstrating behavior / debugging and is 
currently broken.
   
   CC: @pabloem
   
   ## Questions
   - What are the prototypical tests for validating splitable DoFn's various 
methods (e.g. SplitRestriction) stability?
   
   ## Current Status: Broken
   Currently get confusing NPE with OffsetRangeTracker when running
   
   `./gradlew :sdks:java:io:google-cloud-platform:integrationTest --tests 
"org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOReadIT.testHL7v2IO_ListHL7v2Messages_filtered"`
   
   I seem to get the expected initial daily splits:
   ```
   WARNING: splitting initial sendTime restriction of [minSendTime, now): 
[2019-03-09T13:24:44.000Z,2020-05-02T02:18:51.850Z), or [1552137884000, 
1588385931850). 
   total days: 419 
   into 420 splits. 
   Last split: [1588339484000, 1588385931850)
   ```
   
   NPE Error:
   ```
   java.lang.NullPointerException
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.NullPointerException
at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at 
org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOReadIT.testHL7v2IO_ListHL7v2Messages_filtered(HL7v2IOReadIT.java:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 

[GitHub] [beam] rahul8383 edited a comment on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…

2020-05-01 Thread GitBox


rahul8383 edited a comment on pull request #11581:
URL: https://github.com/apache/beam/pull/11581#issuecomment-622652488


   JdbcIO.Read -> SqlTransform.query(SELECT COUNT(*) FROM PCOLLECTION /*Any 
query*/ ) throws NPE if the input PCollection to SqlTransform has JdbcIO 
specific Logical Types(defined in org.apache.beam.sdk.io.jdbc.LogicalTypes)  in 
its Schema.
   
   Please find the Source Table Schema and the attached exception stack trace 
in the JIRA ticket: BEAM-8307.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…

2020-05-01 Thread GitBox


rahul8383 commented on pull request #11581:
URL: https://github.com/apache/beam/pull/11581#issuecomment-622652488


   JdbcIO.Read -> SqlTransform.query(SELECT COUNT(*) FROM PCOLLECTION //Any 
query ) would throw NPE if the input PCollection to SqlTransform JdbcIO 
specific Logical Types in its Schema.
   
   I have provided the Source Table Schema and attached the Exception stack 
trace in the JIRA ticket.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax commented on pull request #11559: [BEAM-9836] Excluding spark runner for KeyTests

2020-05-01 Thread GitBox


reuvenlax commented on pull request #11559:
URL: https://github.com/apache/beam/pull/11559#issuecomment-622649935


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…

2020-05-01 Thread GitBox


reuvenlax commented on pull request #11581:
URL: https://github.com/apache/beam/pull/11581#issuecomment-622649872


   Where do you see the NPE?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


ibzib commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418813369



##
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##
@@ -247,25 +247,27 @@ public void reduce(Iterable> 
iterable, Collector transformAndTimerId, Timer timerValue) -> {
-FnDataReceiver fnTimerReceiver =
-bundle.getTimerReceivers().get(transformAndTimerId);
-Preconditions.checkNotNull(
-fnTimerReceiver, "No FnDataReceiver found for %s", 
transformAndTimerId);
-try {
-  fnTimerReceiver.accept(timerValue);
-} catch (Exception e) {
-  throw new RuntimeException(
-  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
-}
-  },
-  currentTimerKey);
+while (timerInternals.hasPendingTimers()) {

Review comment:
   Made #11595 to update Spark.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11595: [BEAM-9801] Fire timers set within timers in Spark.

2020-05-01 Thread GitBox


ibzib commented on pull request #11595:
URL: https://github.com/apache/beam/pull/11595#issuecomment-622648458


   Run Python Spark ValidatesRunner



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib opened a new pull request #11595: [BEAM-9801] Fire timers set within timers in Spark.

2020-05-01 Thread GitBox


ibzib opened a new pull request #11595:
URL: https://github.com/apache/beam/pull/11595


   I refactored a bit first because there were so many nested `for` and `try` 
blocks it hurt my eyes, in addition to unnecessary abstraction.
   
   Probably we should consider fixing up Spark Python validates runner and 
making it a precommit, so we catch things like this in the future. I don't 
think it's too expensive.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 

[GitHub] [beam] ananvay commented on pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.

2020-05-01 Thread GitBox


ananvay commented on pull request #11593:
URL: https://github.com/apache/beam/pull/11593#issuecomment-622631226


   Thanks Robert, LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.

2020-05-01 Thread GitBox


ibzib commented on a change in pull request #11593:
URL: https://github.com/apache/beam/pull/11593#discussion_r418781167



##
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##
@@ -345,7 +345,7 @@ def visit_transform(self, transform_node):
   for ix, side_input in enumerate(transform_node.side_inputs):
 access_pattern = side_input._side_input_data().access_pattern
 if access_pattern == common_urns.side_inputs.ITERABLE.urn:
-  if use_unified_worker:
+  if use_unified_worker or not use_fn_api:

Review comment:
   So `side_input.pvalue.element_type = typehints.Any` is a side effect and 
`new_side_input = _DataflowIterableSideInput(side_input)` has no effect?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11560: Auto-inferring project for ReadFromBigQuery

2020-05-01 Thread GitBox


pabloem commented on pull request #11560:
URL: https://github.com/apache/beam/pull/11560#issuecomment-622611048


   Run Python 3.7 PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


lukecwik commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418778828



##
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##
@@ -247,25 +247,27 @@ public void reduce(Iterable> 
iterable, Collector transformAndTimerId, Timer timerValue) -> {
-FnDataReceiver fnTimerReceiver =
-bundle.getTimerReceivers().get(transformAndTimerId);
-Preconditions.checkNotNull(
-fnTimerReceiver, "No FnDataReceiver found for %s", 
transformAndTimerId);
-try {
-  fnTimerReceiver.accept(timerValue);
-} catch (Exception e) {
-  throw new RuntimeException(
-  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
-}
-  },
-  currentTimerKey);
+while (timerInternals.hasPendingTimers()) {

Review comment:
   Also, I made this same comment earlier today and it was lost somehow.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


lukecwik commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418778740



##
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##
@@ -247,25 +247,27 @@ public void reduce(Iterable> 
iterable, Collector transformAndTimerId, Timer timerValue) -> {
-FnDataReceiver fnTimerReceiver =
-bundle.getTimerReceivers().get(transformAndTimerId);
-Preconditions.checkNotNull(
-fnTimerReceiver, "No FnDataReceiver found for %s", 
transformAndTimerId);
-try {
-  fnTimerReceiver.accept(timerValue);
-} catch (Exception e) {
-  throw new RuntimeException(
-  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
-}
-  },
-  currentTimerKey);
+while (timerInternals.hasPendingTimers()) {

Review comment:
   It makes sense to do this in batch as well since the processing timers 
should get dropped and any newly scheduled event time timers should be 
continually fired.
   
   This would require updating FlinkExecutableStageFunction and 
SparkExecutableStageFunction





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rohdesamuel opened a new pull request #11594: [BEAM-9692] Replace apply_WriteToBigQuery with PTransformOverride

2020-05-01 Thread GitBox


rohdesamuel opened a new pull request #11594:
URL: https://github.com/apache/beam/pull/11594


   Replace apply_WriteToBigQuery with PTransformOverride
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] robertwb commented on a change in pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.

2020-05-01 Thread GitBox


robertwb commented on a change in pull request #11593:
URL: https://github.com/apache/beam/pull/11593#discussion_r418777832



##
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##
@@ -345,7 +345,7 @@ def visit_transform(self, transform_node):
   for ix, side_input in enumerate(transform_node.side_inputs):
 access_pattern = side_input._side_input_data().access_pattern
 if access_pattern == common_urns.side_inputs.ITERABLE.urn:
-  if use_unified_worker:
+  if use_unified_worker or not use_fn_api:

Review comment:
   The coder gets updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] apilloud commented on a change in pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType

2020-05-01 Thread GitBox


apilloud commented on a change in pull request #11272:
URL: https://github.com/apache/beam/pull/11272#discussion_r418773897



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * A date without a time-zone.
+ *
+ * It cannot represent an instant on the time-line without additional 
information such as an
+ * offset or time-zone.
+ */
+public class Date implements Schema.LogicalType {

Review comment:
   We should consider not using a JVM, it adds performance overhead too. 邏 
   
   I'm reasonably convinced the wire format is good and the conversion here is 
lossless, so if there isn't a easy drop-in replacement leave this as is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.

2020-05-01 Thread GitBox


robertwb commented on pull request #11593:
URL: https://github.com/apache/beam/pull/11593#issuecomment-622596915


   R: @ibzib @ananvay 
   CC: @lukecwik 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11593: [BEAM-9870] Always generate Dataflow-compatible FnApi protos.

2020-05-01 Thread GitBox


robertwb opened a new pull request #11593:
URL: https://github.com/apache/beam/pull/11593


   For various reasons, Dataflow patches up the proto representations of side 
inputs.
   This change ensures these mutations are reflected regardless of whether 
FnAPI was
   explicitly requested.
   
   After this change the Beam protos produced are identical (though the v1beta3
   representation may change significantly).
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 

[GitHub] [beam] Hannah-Jiang commented on a change in pull request #11584: [BEAM-9136]support isRelease tag for docker build command & update release guide

2020-05-01 Thread GitBox


Hannah-Jiang commented on a change in pull request #11584:
URL: https://github.com/apache/beam/pull/11584#discussion_r418766264



##
File path: website/src/contribute/release-guide.md
##
@@ -688,8 +688,20 @@ Verify that files are 
[present](https://dist.apache.org/repos/dist/dev/beam).
 * Build Python images and push to DockerHub.
 
 ```
-./gradlew :sdks:python:container:buildAll -Pdocker-tag=${RELEASE}_rc{RC_NUM}
+./gradlew :sdks:python:container:buildAll -Pdocker-pull-licenses 
-Pdocker-tag=${RELEASE}_rc{RC_NUM}
+```
+
+Verify that third party licenses are included by logging in to the images. For 
Python SDK images, there should be around 80 ~ 100 dependencies. 
+Please note that dependencies for the SDKs with different Python versions 
vary. 
+Need to verify all Python images by replacing `${ver}` in the following 
command to `python2.7, python3.5, python3.6, python3.7`.

Review comment:
   @ibzib , this part should be verified BEFORE pushing Python images. I'm 
not sure when this PR can be merged because of website issues, so explicitly 
pinging you here.

##
File path: website/src/contribute/release-guide.md
##
@@ -699,7 +711,18 @@ done
 * Build Java images and push to DockerHub.
 
 ```
-./gradlew :sdks:java:container:dockerPush -Pdocker-pull-licenses 
-Pdocker-tag=${RELEASE}_rc{RC_NUM}
+./gradlew :sdks:java:container:docker -Pdocker-pull-licenses 
-Pdocker-tag=${RELEASE}_rc{RC_NUM}
+```
+
+Verify that third party licenses are included by logging in to the images. For 
Java SDK images, there should be around 1400 dependencies. 
+```
+docker run -it --entrypoint=/bin/bash 
apache/beam_java_sdk:${RELEASE}_rc{RC_NUM}
+ls -al /opt/apache/beam/third_party_licenses/ | wc -l
+```
+
+After verifying the third party licenses are included correctly, push the 
images to DockerHub.
+```

Review comment:
   @ibzib , this part changed slightly. Please note the change from 
`dockerPush` to `docker` at L714.
   After creating the Java image, we should verify it BEFORE pushing to 
DockerHub. I'm not sure when this PR can be merged because of website issues, 
so explicitly pinging you here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType

2020-05-01 Thread GitBox


TheNeuralBit commented on a change in pull request #11272:
URL: https://github.com/apache/beam/pull/11272#discussion_r418765847



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * A date without a time-zone.
+ *
+ * It cannot represent an instant on the time-line without additional 
information such as an
+ * offset or time-zone.
+ */
+public class Date implements Schema.LogicalType {

Review comment:
   I guess `java.sql.Date` is another option for a java type backed by 
millis.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] Hannah-Jiang commented on pull request #11551: Cherrypick to release-2.21.0

2020-05-01 Thread GitBox


Hannah-Jiang commented on pull request #11551:
URL: https://github.com/apache/beam/pull/11551#issuecomment-622595455


   > @Hannah-Jiang I'm not sure what, if anything really needs to be 
cherry-picked here. Java licenses were already cherry-picked in #11421, and the 
remainder of the changes are just usability improvements if I understand 
correctly. As long as licenses are present in the release images, I think we 
should be fine. WDYT?
   
   Yes, this is a great idea. That PR functions well and this cherry pick is 
mainly for improvement. I tried to create images from release branch, and they 
look good. Please feel free to skip this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType

2020-05-01 Thread GitBox


TheNeuralBit commented on a change in pull request #11272:
URL: https://github.com/apache/beam/pull/11272#discussion_r418764453



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalDate;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * A date without a time-zone.
+ *
+ * It cannot represent an instant on the time-line without additional 
information such as an
+ * offset or time-zone.
+ */
+public class Date implements Schema.LogicalType {

Review comment:
   That is unfortunate... but what in-memory type should we use instead? 
joda.time.LocalDate uses a millisecond long, do we want to add another joda 
dependency?
   
   We could access the base type (wire format type) directly in SQL with 
[Row#getBaseValue](https://github.com/apache/beam/blob/7b890654e6bbfcab79a2f5677f1badd54bd444aa/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L424),
 but unfortunately Rows store logical types as the input type (in memory format 
type), so that wouldn't actually avoid a conversion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11592: [DO NOT REVIEW] Test only

2020-05-01 Thread GitBox


boyuanzz commented on pull request #11592:
URL: https://github.com/apache/beam/pull/11592#issuecomment-622592085


   Run Dataflow PortabilityApi ValidatesRunner



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


ibzib commented on a change in pull request #11591:
URL: https://github.com/apache/beam/pull/11591#discussion_r418762563



##
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##
@@ -247,25 +247,27 @@ public void reduce(Iterable> 
iterable, Collector transformAndTimerId, Timer timerValue) -> {
-FnDataReceiver fnTimerReceiver =
-bundle.getTimerReceivers().get(transformAndTimerId);
-Preconditions.checkNotNull(
-fnTimerReceiver, "No FnDataReceiver found for %s", 
transformAndTimerId);
-try {
-  fnTimerReceiver.accept(timerValue);
-} catch (Exception e) {
-  throw new RuntimeException(
-  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
-}
-  },
-  currentTimerKey);
+while (timerInternals.hasPendingTimers()) {

Review comment:
   As expected, Spark is failing `test_pardo_timers`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


ibzib commented on a change in pull request #11591:
URL: https://github.com/apache/beam/pull/11591#discussion_r418761275



##
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##
@@ -247,25 +247,27 @@ public void reduce(Iterable> 
iterable, Collector transformAndTimerId, Timer timerValue) -> {
-FnDataReceiver fnTimerReceiver =
-bundle.getTimerReceivers().get(transformAndTimerId);
-Preconditions.checkNotNull(
-fnTimerReceiver, "No FnDataReceiver found for %s", 
transformAndTimerId);
-try {
-  fnTimerReceiver.accept(timerValue);
-} catch (Exception e) {
-  throw new RuntimeException(
-  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
-}
-  },
-  currentTimerKey);
+while (timerInternals.hasPendingTimers()) {

Review comment:
   Good question. Spark basically does the same thing here (ideally this 
code should be shared..) @mxm 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


ibzib commented on pull request #11591:
URL: https://github.com/apache/beam/pull/11591#issuecomment-622589471


   Run Python Spark ValidatesRunner



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


ibzib commented on pull request #11591:
URL: https://github.com/apache/beam/pull/11591#issuecomment-622589337


   Run Java Spark PortableValidatesRunner Batch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType

2020-05-01 Thread GitBox


TheNeuralBit commented on a change in pull request #11272:
URL: https://github.com/apache/beam/pull/11272#discussion_r418759689



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##
@@ -427,17 +430,12 @@ private static Expression value(
 
 private static Expression value(Expression value, Schema.FieldType type) {
   if (type.getTypeName().isLogicalType()) {
-Expression millisField = Expressions.call(value, "getMillis");
 String logicalId = type.getLogicalType().getIdentifier();
 if (logicalId.equals(TimeType.IDENTIFIER)) {
-  return nullOr(value, Expressions.convert_(millisField, int.class));
-} else if (logicalId.equals(DateType.IDENTIFIER)) {
-  value =
-  nullOr(
-  value,
-  Expressions.convert_(
-  Expressions.divide(millisField, 
Expressions.constant(MILLIS_PER_DAY)),
-  int.class));
+  return nullOr(
+  value, Expressions.convert_(Expressions.call(value, 
"getMillis"), int.class));
+} else if (logicalId.equals(SqlTypes.DATE.getIdentifier())) {

Review comment:
   what about using a switch statement? Is there any style guidance on 
using switch on a String in java?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


lukecwik commented on a change in pull request #11591:
URL: https://github.com/apache/beam/pull/11591#discussion_r418756561



##
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##
@@ -247,25 +247,27 @@ public void reduce(Iterable> 
iterable, Collector transformAndTimerId, Timer timerValue) -> {
-FnDataReceiver fnTimerReceiver =
-bundle.getTimerReceivers().get(transformAndTimerId);
-Preconditions.checkNotNull(
-fnTimerReceiver, "No FnDataReceiver found for %s", 
transformAndTimerId);
-try {
-  fnTimerReceiver.accept(timerValue);
-} catch (Exception e) {
-  throw new RuntimeException(
-  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
-}
-  },
-  currentTimerKey);
+while (timerInternals.hasPendingTimers()) {

Review comment:
   Doesn't it make sense to make this change in batch as well for 
spark/flink in FlinkExecutableStageFunction and SparkExecutableStageFunction?
   
   Any watermark based timers should continue to be eligible and continue to 
fire while in batch while the processing time timers should be dropped.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


lukecwik commented on a change in pull request #11591:
URL: https://github.com/apache/beam/pull/11591#discussion_r418756561



##
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##
@@ -247,25 +247,27 @@ public void reduce(Iterable> 
iterable, Collector transformAndTimerId, Timer timerValue) -> {
-FnDataReceiver fnTimerReceiver =
-bundle.getTimerReceivers().get(transformAndTimerId);
-Preconditions.checkNotNull(
-fnTimerReceiver, "No FnDataReceiver found for %s", 
transformAndTimerId);
-try {
-  fnTimerReceiver.accept(timerValue);
-} catch (Exception e) {
-  throw new RuntimeException(
-  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
-}
-  },
-  currentTimerKey);
+while (timerInternals.hasPendingTimers()) {

Review comment:
   Doesn't it make sense to make this change in batch as well for 
spark/flink in FlinkExecutableStageFunction and SparkExecutableStageFunction





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11592: [DO NOT REVIEW] Test only

2020-05-01 Thread GitBox


boyuanzz commented on pull request #11592:
URL: https://github.com/apache/beam/pull/11592#issuecomment-622583832


   Run Dataflow PortabilityApi ValidatesRunner



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz opened a new pull request #11592: [DO NOT REVIEW] Test only

2020-05-01 Thread GitBox


boyuanzz opened a new pull request #11592:
URL: https://github.com/apache/beam/pull/11592


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] ibzib opened a new pull request #11591: [BEAM-9801] [cherry-pick] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


ibzib opened a new pull request #11591:
URL: https://github.com/apache/beam/pull/11591


   I had to resolve a minor merge conflict between 325e0f1 and 
8de324f22ca04b3716abf58ba77c2a3c117263a2.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 

[GitHub] [beam] allenpradeep commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

2020-05-01 Thread GitBox


allenpradeep commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-622580239


   This is great niel.  With these changes, there are 3 modes of using 
SpannerIO write.
   a) Use the conventional way(as it was till now) with a grouping factor where 
data is grouped, sorted, batched and written as per parameters
   b) Batching without grouping - Set grouping factor as 1 with a larger 
batched bytes or cells. This will just ensure data is just batched without sort.
   c) No Batching - Set any of the max rows or max mutations or batch bytes to 
0 or 1.
   
   Questions:
   1) What mode should our import pipeline use? Should it use option b as data 
in AVRO seems already sorted?
   2) Where should we document these modes of operation so that some customer 
can use these?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


ibzib commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-622576973


   Flakes are BEAM-9767 and BEAM-8912. I'm going to merge this so we can go 
ahead with the release.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11585: [BEAM-9860] Make job_endpoint required for PortableRunner

2020-05-01 Thread GitBox


ibzib commented on pull request #11585:
URL: https://github.com/apache/beam/pull/11585#issuecomment-622575793


   Test flake: BEAM-9767



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik opened a new pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

2020-05-01 Thread GitBox


lukecwik opened a new pull request #11590:
URL: https://github.com/apache/beam/pull/11590


   Existing performance suffered because of the use of timed waits and also due 
to the increase in number of "threading" objects being invoked.
   
   Using the benchmark from 
https://issues.apache.org/jira/browse/BEAM-8944?focusedCommentId=17074641=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17074641
   
   Improved performance from 5.52s down to 1.82s which is faster then the 
ThreadPoolExecutor with 12 threads (the default being used before) but still 
slower then the ThreadPoolExecutor with 1 thread.
   
   The prior performance was:
   ```
uses 5.52247905731
584051 function calls in 5.495 seconds
   
  Ordered by: internal time
   
  ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   862504.3860.0004.3860.000 {method 'acquire' of 
'thread.lock' objects}
   198000.1500.0004.7730.000 Queue.py:150(get)
   296980.1230.0000.3650.000 threading.py:373(notify)
99000.1140.0000.1140.000 {method '__enter__' of 
'thread.lock' objects}
98990.1060.0000.4630.000 
thread_pool_executor.py:103(accepted_work)
91880.0890.0004.0950.000 threading.py:309(wait)
99030.0780.0000.1290.000 threading.py:260(__init__)
99000.0610.0001.0920.000 
thread_pool_executor.py:133(submit)
98990.0550.0000.3580.000 threading.py:576(set)
   388860.0440.0000.2850.000 threading.py:300(_is_owned)
99000.0410.0000.1860.000 _base.py:318(__init__)
   289870.0230.0000.0310.000 Queue.py:200(_qsize)
99000.0220.0000.0300.000 threading.py:132(__init__)
   388870.0210.0000.0210.000 threading.py:64(_note)
98990.0190.0000.1630.000 threading.py:400(notifyAll)
   197990.0180.0000.0230.000 Queue.py:208(_get)
   388870.0160.0000.0160.000 {method 'release' of 
'thread.lock' objects}
99030.0160.0000.1450.000 threading.py:242(Condition)
   198060.0140.0000.0140.000 threading.py:59(__init__)
99000.0140.0000.1270.000 threading.py:285(__enter__)
91880.0130.0000.0900.000 
threading.py:297(_acquire_restore)
99000.0130.0000.0430.000 threading.py:114(RLock)
99000.0110.0000.0110.000 
thread_pool_executor.py:34(__init__)
   388860.0110.0000.0110.000 {len}
99000.0100.0000.0120.000 threading.py:288(__exit__)
91880.0080.0000.0120.000 threading.py:294(_release_save)
   190920.0060.0000.0060.000 {thread.allocate_lock}
   197990.0050.0000.0050.000 {method 'popleft' of 
'collections.deque' objects}
91880.0040.0000.0040.000 {method 'append' of 'list' 
objects}
98990.0030.0000.0030.000 {method 'remove' of 'list' 
objects}
99000.0020.0000.0020.000 {method '__exit__' of 
'thread.lock' objects}
   10.0000.0000.0000.000 {thread.start_new_thread}
   10.0000.0000.0000.000 threading.py:647(__init__)
   10.0000.0000.0000.000 threading.py:717(start)
   10.0000.0000.0000.000 
thread_pool_executor.py:58(__init__)
   10.0000.0000.0000.000 threading.py:620(_newname)
   10.0000.0000.0000.000 _weakrefset.py:83(add)
   10.0000.0000.0000.000 threading.py:597(wait)
   20.0000.0000.0000.000 threading.py:561(__init__)
   10.0000.0000.0000.000 
threading.py:1142(currentThread)
   20.0000.0000.0000.000 threading.py:542(Event)
   10.0000.0000.0000.000 threading.py:700(_set_daemon)
   10.0000.0000.0000.000 threading.py:1014(daemon)
   20.0000.0000.0000.000 threading.py:570(isSet)
   10.0000.0000.0000.000 threading.py:999(daemon)
   10.0000.0000.0000.000 {thread.get_ident}
   10.0000.0000.0000.000 {method 'add' of 'set' objects}
   10.0000.0000.0000.000 {method 'disable' of 
'_lsprof.Profiler' objects}
   ```
   
   The new performance is:
   ```
uses 1.82196497917
504935 function calls in 1.787 seconds
   
  Ordered by: internal time
   
  ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   622330.9400.0000.9400.000 {method 'acquire' of 
'thread.lock' objects}
99000.1420.000

[GitHub] [beam] lukecwik commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

2020-05-01 Thread GitBox


lukecwik commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-62261


   R: @mxm @pabloem 
   CC: @robertwb 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] allenpradeep commented on a change in pull request #11532: [BEAM-9822] Disable grouping when streaming

2020-05-01 Thread GitBox


allenpradeep commented on a change in pull request #11532:
URL: https://github.com/apache/beam/pull/11532#discussion_r418711729



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -1066,7 +1079,12 @@ public SpannerWriteResult 
expand(PCollection input) {
   spec.getBatchSizeBytes(),
   spec.getMaxNumMutations(),
   spec.getMaxNumRows(),
-  spec.getGroupingFactor(),
+  // Do not group on streaming unless explicitly 
set.
+  spec.getGroupingFactor()
+  .orElse(
+  input.isBounded() == IsBounded.BOUNDED

Review comment:
   I was wondering if this condition needs to be based on the input passed 
to this stage or based on some parameter from the user?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11456: [BEAM-7554] Add MillisInstant logical type to replace DATETIME

2020-05-01 Thread GitBox


TheNeuralBit commented on a change in pull request #11456:
URL: https://github.com/apache/beam/pull/11456#discussion_r418708682



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MillisInstant.java
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/** A timestamp represented as milliseconds since the epoch. */
+public class MillisInstant extends MillisType {
+  public static final String IDENTIFIER = 
"beam:logical_type:millis_instant:v1";

Review comment:
   @reuvenlax what do you think about making this type (and maybe 
NanosInstant as well) parameterized by timezone?
   
   The [arrow 
approach](https://github.com/apache/arrow/blob/master/format/Schema.fbs#L178) 
seems useful: un-specified timezone indicates time-zone naive (e.g. joda time 
Instant), otherwise time zone parameter should reference a value in tzdata (and 
would map to joda time DateTime).
   
   cc: @alexvanboxel 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11575: [BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform

2020-05-01 Thread GitBox


robertwb commented on pull request #11575:
URL: https://github.com/apache/beam/pull/11575#issuecomment-622533616


   The change looks good to me, once all its prerequisites get in. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #11452: [BEAM-9692] Move apply_Read to PTransformOverride

2020-05-01 Thread GitBox


robertwb commented on a change in pull request #11452:
URL: https://github.com/apache/beam/pull/11452#discussion_r418694513



##
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##
@@ -117,13 +117,15 @@ class DataflowRunner(PipelineRunner):
   # TODO: Remove the apache_beam.pipeline dependency in 
CreatePTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import 
CombineValuesPTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import 
CreatePTransformOverride
-  from apache_beam.runners.dataflow.ptransform_overrides import 
ReadPTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import 
JrhReadPTransformOverride
+  from apache_beam.runners.dataflow.ptransform_overrides import 
ReadPTransformOverride
+  from apache_beam.runners.dataflow.ptransform_overrides import 
NativeReadPTransformOverride
 
-  # Thesse overrides should be applied before the proto representation of the
+  # These overrides should be applied before the proto representation of the
   # graph is created.
   _PTRANSFORM_OVERRIDES = [
-  CombineValuesPTransformOverride()
+  CombineValuesPTransformOverride(),

Review comment:
   Correct, this change does not change the existing behavior, but the 
concern is that the existing behavior might be sub-optimal. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11403: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch

2020-05-01 Thread GitBox


ibzib commented on pull request #11403:
URL: https://github.com/apache/beam/pull/11403#issuecomment-622523122


   Run Dataflow PortabilityApi ValidatesRunner



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] allenpradeep commented on pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.

2020-05-01 Thread GitBox


allenpradeep commented on pull request #11528:
URL: https://github.com/apache/beam/pull/11528#issuecomment-622521247


   LGTM. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11551: Cherrypick to release-2.21.0

2020-05-01 Thread GitBox


ibzib commented on pull request #11551:
URL: https://github.com/apache/beam/pull/11551#issuecomment-622517286


   @Hannah-Jiang I'm not sure what, if anything really needs to be 
cherry-picked here. Java licenses were already cherry-picked in #11421, and the 
remainder of the changes are just usability improvements if I understand 
correctly. As long as licenses are present in the release images, I think we 
should be fine. WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

2020-05-01 Thread GitBox


Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-622514637


   @rezarokni FYI



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11589: WIP: [BEAM-9623] Support for SQL TableProviders in Python SqlTransform

2020-05-01 Thread GitBox


TheNeuralBit commented on pull request #11589:
URL: https://github.com/apache/beam/pull/11589#issuecomment-622511334


   Run XVR_Flink PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit opened a new pull request #11589: WIP: [BEAM-9623] Support for SQL TableProviders in Python SqlTransform

2020-05-01 Thread GitBox


TheNeuralBit opened a new pull request #11589:
URL: https://github.com/apache/beam/pull/11589


   Currently this just hard-codes support for DataCatalogTableProvider. We 
should instead make it possible to specify and configure HCatalog instances as 
well.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
 

[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

2020-05-01 Thread GitBox


Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-622510185


   > This is looking good. Something I think we should do is change the 
underlying implementation of ReadFromBigQuery
   
   This would be a bigger change than what I'd like to put in this PR. Can we 
do it in separate PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] Ardagan commented on a change in pull request #11477: [BEAM-9650] Add PeriodicSequence generator.

2020-05-01 Thread GitBox


Ardagan commented on a change in pull request #11477:
URL: https://github.com/apache/beam/pull/11477#discussion_r418673674



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
##
@@ -21,33 +21,69 @@
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
-import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
- * A {@link PTransform} which generates a sequence of timestamped elements at 
given interval in
- * runtime.
+ * A {@link PTransform} which generates a sequence of timestamped elements at 
given runtime
+ * interval.
  *
- * Receives a PCollection> where each element triggers the 
generation of sequence and
- * has following elements: 0: first element timestamp 1: last element 
timestamp 2: interval
+ * Transform will not output elements prior to target time. Transform can 
output elements at any
+ * time after target time.
  *
- * All elements that have timestamp in the past will be output right away. 
Elements that have
- * timestamp in the future will be delayed.
- *
- * Transform will not output elements prior to target timestamp. Transform 
can output elements at
- * any time after target timestamp.
+ * Multiple elements can be output at given moment if their timestamp is 
earlier than current
+ * time.
  */
 @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
-public class PeriodicSequence extends PTransform>, 
PCollection> {
+public class PeriodicSequence
+extends PTransform, 
PCollection> {
+
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class SequenceDefinition {
+public Instant first;
+public Instant last;
+public Long durationMilliSec;
+
+public SequenceDefinition() {}
+
+public SequenceDefinition(Instant first, Instant last, Duration duration) {
+  this.first = first;
+  this.last = last;
+  this.durationMilliSec = duration.getMillis();
+}
+
+@Override
+public boolean equals(Object obj) {
+  if (this == obj) {
+return true;
+  }
+
+  if (obj == null || obj.getClass() != this.getClass()) {
+return false;
+  }
+
+  SequenceDefinition src = (SequenceDefinition) obj;
+  return src.first.equals(this.first)
+  && src.last.equals(this.last)
+  && src.durationMilliSec.equals(this.durationMilliSec);
+}
+
+@Override
+public int hashCode() {
+  int result = Objects.hash(first, last, durationMilliSec);
+  return result;
+}

Review comment:
   I tried to use it, but didn't manage to get AutoValue work. It failed to 
properly detect constructor for generated class. I tried to debug it, but it 
would be better to handle in separate PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment

2020-05-01 Thread GitBox


robertwb commented on a change in pull request #11039:
URL: https://github.com/apache/beam/pull/11039#discussion_r418651280



##
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##
@@ -1271,6 +1271,11 @@ message DeferredArtifactPayload {
 message ArtifactStagingToRolePayload {
   // A generated staged name (relative path under staging directory).
   string staged_name = 1;
+
+  // (Optional) An artifact name when a runner supports it.
+  // For example, DataflowRunner requires predefined names for some artifacts
+  // such as "dataflow-worker.jar", "windmill_main".
+  string alias_name = 2;

Review comment:
   Why does this have to be distinct from staged_name? (Also, eventually we 
hope that designated roles can remove the need for magic names.)

##
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
##
@@ -210,56 +209,55 @@ public static Environment createProcessEnvironment(
 }
   }
 
-  private static List getArtifacts(List 
stagingFiles) {
-Set pathsToStage = Sets.newHashSet(stagingFiles);
+  public static List getArtifacts(
+  List stagingFiles, StagingFileNameGenerator generator) {
 ImmutableList.Builder artifactsBuilder = 
ImmutableList.builder();
-for (String path : pathsToStage) {
+for (String path : ImmutableSet.copyOf(stagingFiles)) {

Review comment:
   Isn't order important to preserve? (Also, why do we need to make a copy?)

##
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##
@@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) {
 "Executing pipeline on the Dataflow Service, which will have billing 
implications "
 + "related to Google Compute Engine usage and other Google Cloud 
Services.");
 
-List packages = options.getStager().stageDefaultFiles();
+// Capture the sdkComponents for look up during step translations
+SdkComponents sdkComponents = SdkComponents.create();
+
+DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+String workerHarnessContainerImageURL = 
DataflowRunner.getContainerImageForJob(dataflowOptions);
+RunnerApi.Environment defaultEnvironmentForDataflow =
+Environments.createDockerEnvironment(workerHarnessContainerImageURL);
+
+sdkComponents.registerEnvironment(
+defaultEnvironmentForDataflow
+.toBuilder()
+.addAllDependencies(getDefaultArtifacts())

Review comment:
   How does this get invoked for cross-language pipelines?

##
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
##
@@ -336,25 +323,26 @@ public DataflowPackage stageToFile(
 final AtomicInteger numCached = new AtomicInteger(0);
 List> destinationPackages = new 
ArrayList<>();
 
-for (String classpathElement : classpathElements) {
-  DataflowPackage sourcePackage = new DataflowPackage();
-  if (classpathElement.contains("=")) {
-String[] components = classpathElement.split("=", 2);
-sourcePackage.setName(components[0]);
-sourcePackage.setLocation(components[1]);
-  } else {
-sourcePackage.setName(null);
-sourcePackage.setLocation(classpathElement);
+for (StagedFile classpathElement : classpathElements) {
+  DataflowPackage targetPackage = classpathElement.getStagedPackage();
+  String source = classpathElement.getSource();
+  if (source.contains("=")) {

Review comment:
   Why do we have to handle this here and above?

##
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##
@@ -772,6 +783,88 @@ private Debuggee registerDebuggee(CloudDebugger 
debuggerClient, String uniquifie
 }
   }
 
+  private List stageArtifacts(RunnerApi.Pipeline pipeline) {
+ImmutableList.Builder filesToStageBuilder = 
ImmutableList.builder();
+for (Map.Entry entry :
+pipeline.getComponents().getEnvironmentsMap().entrySet()) {
+  for (RunnerApi.ArtifactInformation info : 
entry.getValue().getDependenciesList()) {
+if 
(!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn()))
 {
+  throw new RuntimeException(
+  String.format("unsupported artifact type %s", 
info.getTypeUrn()));
+}
+RunnerApi.ArtifactFilePayload filePayload;
+try {
+  filePayload = 
RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload());
+} catch (InvalidProtocolBufferException e) {
+  throw new RuntimeException("Error parsing artifact file payload.", 
e);
+}
+if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO)
+.equals(info.getRoleUrn())) {
+  throw new RuntimeException(
+ 

[GitHub] [beam] ibzib commented on pull request #11585: [BEAM-9860] Make job_endpoint required for PortableRunner

2020-05-01 Thread GitBox


ibzib commented on pull request #11585:
URL: https://github.com/apache/beam/pull/11585#issuecomment-622500284


   Run Python 3.7 PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #11470: [BEAM-9791] Add precommit for dataflow runner v2

2020-05-01 Thread GitBox


tvalentyn commented on pull request #11470:
URL: https://github.com/apache/beam/pull/11470#issuecomment-622495831


   nvm



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #11470: [BEAM-9791] Add precommit for dataflow runner v2

2020-05-01 Thread GitBox


tvalentyn commented on pull request #11470:
URL: https://github.com/apache/beam/pull/11470#issuecomment-622494788


   What is Dataflow runner V2? There are no details here or in the JIRA. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.

2020-05-01 Thread GitBox


chamikaramj commented on pull request #11557:
URL: https://github.com/apache/beam/pull/11557#issuecomment-622489105


   Run XVR_Spark PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks

2020-05-01 Thread GitBox


mxm commented on pull request #11558:
URL: https://github.com/apache/beam/pull/11558#issuecomment-622489303


   Run Python Load Tests ParDo Flink Streaming



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.

2020-05-01 Thread GitBox


chamikaramj commented on pull request #11557:
URL: https://github.com/apache/beam/pull/11557#issuecomment-622489032


   Run XVR_Flink PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

2020-05-01 Thread GitBox


mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-622488737


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #10384: [BEAM-8933] Utilities for converting Arrow schemas and reading Arrow batches as Rows

2020-05-01 Thread GitBox


TheNeuralBit commented on a change in pull request #10384:
URL: https://github.com/apache/beam/pull/10384#discussion_r418649632



##
File path: 
sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.arrow;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.Text;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.CachingFactory;
+import org.apache.beam.sdk.schemas.Factory;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+/**
+ * Utilities to create {@link Iterable}s of Beam {@link Row} instances backed 
by Arrow record
+ * batches.
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+public class ArrowConversion {
+  /** Converts Arrow schema to Beam row schema. */
+  public static Schema toBeamSchema(org.apache.arrow.vector.types.pojo.Schema 
schema) {
+return toBeamSchema(schema.getFields());
+  }
+
+  public static Schema 
toBeamSchema(List fields) {
+Schema.Builder builder = Schema.builder();
+for (org.apache.arrow.vector.types.pojo.Field field : fields) {
+  Field beamField = toBeamField(field);
+  builder.addField(beamField);
+}
+return builder.build();
+  }
+
+  /** Get Beam Field from Arrow Field. */
+  private static Field toBeamField(org.apache.arrow.vector.types.pojo.Field 
field) {
+FieldType beamFieldType = toFieldType(field.getFieldType(), 
field.getChildren());
+return Field.of(field.getName(), beamFieldType);
+  }
+
+  /** Converts Arrow FieldType to Beam FieldType. */
+  private static FieldType toFieldType(
+  org.apache.arrow.vector.types.pojo.FieldType arrowFieldType,
+  List childrenFields) {
+FieldType fieldType =
+arrowFieldType
+.getType()
+.accept(
+new ArrowType.ArrowTypeVisitor() {
+  @Override
+  public FieldType visit(ArrowType.Null type) {
+throw new IllegalArgumentException(
+"Type \'" + type.toString() + "\' not supported.");
+  }
+
+  @Override
+  public FieldType visit(ArrowType.Struct type) {
+return FieldType.row(toBeamSchema(childrenFields));
+  }
+
+  @Override
+  public FieldType visit(ArrowType.List type) {
+checkArgument(
+childrenFields.size() == 1,
+"Encountered "
++ childrenFields.size()
++ " child fields for list type, expected 1");
+return 
FieldType.array(toBeamField(childrenFields.get(0)).getType());
+  }
+
+  @Override
+  public FieldType visit(ArrowType.FixedSizeList type) {
+throw new IllegalArgumentException(
+"Type \'" + type.toString() + "\' not supported.");
+  }
+
+  @Override
+  public FieldType visit(ArrowType.Union type) {
+throw new IllegalArgumentException(
+"Type \'" + type.toString() + "\' not supported.");
+  }
+
+  @Override
+  public FieldType visit(ArrowType.Map 

[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks

2020-05-01 Thread GitBox


mxm commented on pull request #11558:
URL: https://github.com/apache/beam/pull/11558#issuecomment-622481501


   Run Seed Job



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks

2020-05-01 Thread GitBox


mxm commented on pull request #11558:
URL: https://github.com/apache/beam/pull/11558#issuecomment-622480730


   Run Seed Job



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #11444: [BEAM-9701] loosen fastavro version restriction for Python 3.8 support

2020-05-01 Thread GitBox


tvalentyn commented on pull request #11444:
URL: https://github.com/apache/beam/pull/11444#issuecomment-622469723


   You would need to install beam from HEAD though, since those changes are not 
yet  included in released versions. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11456: [BEAM-7554] Add MillisInstant logical type to replace DATETIME

2020-05-01 Thread GitBox


TheNeuralBit commented on a change in pull request #11456:
URL: https://github.com/apache/beam/pull/11456#discussion_r418628471



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##
@@ -257,10 +257,8 @@ private Transform(
   // Combining over a single field, so extract just that field.
   combined =
   (combined == null)
-  ? byFields.aggregateFieldBaseValue(
-  inputs.get(0), combineFn, fieldAggregation.outputField)
-  : combined.aggregateFieldBaseValue(
-  inputs.get(0), combineFn, fieldAggregation.outputField);
+  ? byFields.aggregateField(inputs.get(0), combineFn, 
fieldAggregation.outputField)
+  : combined.aggregateField(inputs.get(0), combineFn, 
fieldAggregation.outputField);

Review comment:
   What about adding the option for SqlTransform to convert unknown logical 
types to their base type at the input? I think that behavior would be 
effectively the same





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11456: [BEAM-7554] Add MillisInstant logical type to replace DATETIME

2020-05-01 Thread GitBox


TheNeuralBit commented on a change in pull request #11456:
URL: https://github.com/apache/beam/pull/11456#discussion_r418625619



##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
##
@@ -419,7 +420,6 @@ public int hashCode() {
 FLOAT,
 DOUBLE,
 STRING, // String.
-DATETIME, // Date and time.

Review comment:
   Yeah we could deprecate it for a release or two before removing. Would 
we also need to keep support for the primitive DATETIME in IOs and SQL? I'd 
need to think about how that transition would work.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on a change in pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-05-01 Thread GitBox


tvalentyn commented on a change in pull request #11086:
URL: https://github.com/apache/beam/pull/11086#discussion_r418609817



##
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##
@@ -45,8 +45,8 @@
 may use some caching techniques to share the side inputs between calls in order
 to avoid excessive reading:::
 
-  main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
-  side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
+  main_table = pipeline | 'VeryBig' >> beam.io.ReadFroBigQuery(...)

Review comment:
   Typo in From

##
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##
@@ -78,6 +72,12 @@
 or a table. Pipeline construction will fail with a validation error if neither
 or both are specified.
 
+When reading from BigQuery using `BigQuerySource`, bytes are returned as

Review comment:
   Can we add some guidance when to use which?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-05-01 Thread GitBox


tvalentyn commented on pull request #11086:
URL: https://github.com/apache/beam/pull/11086#issuecomment-622455437


   Run Python 2 PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.

2020-05-01 Thread GitBox


mxm commented on pull request #11557:
URL: https://github.com/apache/beam/pull/11557#issuecomment-622450697


   > Good point about needing to remove Kafka, etc. for Flink. (Not sure how 
that would interact with its use via an embedded environment; I'll let that be 
a later PR.)
   
   I think the embedded environment should have all the required dependencies 
in the classpath, similarly to how the expansion service has them at expansion 
time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on a change in pull request #11557: [BEAM-9845] Stage artifacts over expansion service.

2020-05-01 Thread GitBox


mxm commented on a change in pull request #11557:
URL: https://github.com/apache/beam/pull/11557#discussion_r418609854



##
File path: sdks/python/apache_beam/pipeline.py
##
@@ -211,6 +211,8 @@ def __init__(self, runner=None, options=None, argv=None):
 experiments.append('beam_fn_api')
 self._options.view_as(DebugOptions).experiments = experiments
 
+self.local_tempdir = tempfile.mkdtemp(prefix='beam-pipeline-temp')

Review comment:
   That's fine then, wasn't sure whether this is a unique directory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] suztomo commented on pull request #11586: [BEAM-9444] GCP Library BOM as a Map

2020-05-01 Thread GitBox


suztomo commented on pull request #11586:
URL: https://github.com/apache/beam/pull/11586#issuecomment-622447857


   @iemejia Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on pull request #11444: [BEAM-9701] loosen fastavro version restriction for Python 3.8 support

2020-05-01 Thread GitBox


tvalentyn commented on pull request #11444:
URL: https://github.com/apache/beam/pull/11444#issuecomment-622447887


   @vishalaj1, yes, please try again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #11554: Website - Migrated Jekyll to Hugo

2020-05-01 Thread GitBox


robertwb commented on a change in pull request #11554:
URL: https://github.com/apache/beam/pull/11554#discussion_r418604535



##
File path: website/www/site/content/en/blog/beam-2.19.0.md
##
@@ -25,7 +24,7 @@ limitations under the License.
 -->
 
 We are happy to present the new 2.19.0 release of Beam. This release includes 
both improvements and new functionality.
-See the [download page]({{ site.baseurl 
}}/get-started/downloads/#2190-2020-02-04) for this release.

Review comment:
   Won't this change cause issues with staging? 

##
File path: website/www/site/content/en/blog/beam-2.20.0.md
##
@@ -43,7 +41,6 @@ Python SDK: . (#10223).
 * [BEAM-8841](https://issues.apache.org/jira/browse/BEAM-8841) Added ability 
to write to BigQuery via Avro file loads
 * [BEAM-9228](https://issues.apache.org/jira/browse/BEAM-9228) Direct runner 
for FnApi supports further parallelism
 * [BEAM-8550](https://issues.apache.org/jira/browse/BEAM-8550) Support for 
@RequiresTimeSortedInput in Flink and Spark
-* [BEAM-6857](https://issues.apache.org/jira/browse/BEAM-6857) Added support 
for dynamic timers

Review comment:
   Why was this removed? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks

2020-05-01 Thread GitBox


mxm commented on pull request #11558:
URL: https://github.com/apache/beam/pull/11558#issuecomment-622442663


   Run Python Load Tests ParDo Flink Streaming



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.

2020-05-01 Thread GitBox


robertwb commented on pull request #11557:
URL: https://github.com/apache/beam/pull/11557#issuecomment-622441649


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.

2020-05-01 Thread GitBox


robertwb commented on pull request #11557:
URL: https://github.com/apache/beam/pull/11557#issuecomment-622441715


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] vishalaj1 commented on pull request #11444: [BEAM-9701] loosen fastavro version restriction for Python 3.8 support

2020-05-01 Thread GitBox


vishalaj1 commented on pull request #11444:
URL: https://github.com/apache/beam/pull/11444#issuecomment-622430319


   Hi,
   
   I had this problem when i tried installing 3 days back.
   
   https://github.com/fastavro/fastavro/issues/427
   
   Can you please let me know if this is fixed as well?
   
   Regards,
   Vishal



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #11588: [BEAM-9776] Fixes filesystem not found error

2020-05-01 Thread GitBox


kennknowles commented on pull request #11588:
URL: https://github.com/apache/beam/pull/11588#issuecomment-622427418


   @apilloud any pitfalls here?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] sjvanrossum opened a new pull request #11588: [BEAM-9776] Fixes filesystem not found error

2020-05-01 Thread GitBox


sjvanrossum opened a new pull request #11588:
URL: https://github.com/apache/beam/pull/11588


   Registers standard filesystems. This fixes the linked JIRA issue, which 
occurs if a virtual filesystem table is queried before a local filesystem table 
is queried.
   
   This assumes that Beam SQL Shell is compiled as mentioned at 
https://beam.apache.org/documentation/dsls/sql/shell/. Without bundling Java IO 
the error will still occur.
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 

[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks

2020-05-01 Thread GitBox


mxm commented on pull request #11558:
URL: https://github.com/apache/beam/pull/11558#issuecomment-622401634


   Run Python Load Tests ParDo Flink Streaming



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on pull request #11558: [BEAM-8742] Add stateful and timely processing benchmarks

2020-05-01 Thread GitBox


mxm commented on pull request #11558:
URL: https://github.com/apache/beam/pull/11558#issuecomment-622401502


   I've created a new dashboard here includes data from the batch tests and the 
two new streaming tests: 
https://apache-beam-testing.appspot.com/explore?dashboard=5751884853805056
   
   
![Untitled](https://user-images.githubusercontent.com/837221/80810980-46bf9400-8bc5-11ea-99e4-bb042d23a74d.png)
   
   The streaming tests have data for the runtime and for the checkpoint 
duration (min/max/average).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] suztomo commented on pull request #11586: [BEAM-9444] GCP Library BOM as a Map

2020-05-01 Thread GitBox


suztomo commented on pull request #11586:
URL: https://github.com/apache/beam/pull/11586#issuecomment-622398262


   Java precommit check failed on 
   
org.apache.beam.runners.flink.FlinkSavepointTest.testSavepointRestorePortable 
“test timed out after 60 seconds“. This seems Transient failure.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] iemejia commented on pull request #11586: [BEAM-9444] GCP Library BOM as a Map

2020-05-01 Thread GitBox


iemejia commented on pull request #11586:
URL: https://github.com/apache/beam/pull/11586#issuecomment-622362982


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] iemejia commented on pull request #11586: [BEAM-9444] GCP Library BOM as a Map

2020-05-01 Thread GitBox


iemejia commented on pull request #11586:
URL: https://github.com/apache/beam/pull/11586#issuecomment-622362502


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] nielm commented on pull request #11438: [BEAM-9505] Remove spurious error message in SpannerIO when streaming.

2020-05-01 Thread GitBox


nielm commented on pull request #11438:
URL: https://github.com/apache/beam/pull/11438#issuecomment-622348150


   @chamikaramj can you merge please :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on a change in pull request #11578: [BEAM-8025] Increase the number of retrials en retrial delay in case of load in CassandraIOTest

2020-05-01 Thread GitBox


mxm commented on a change in pull request #11578:
URL: https://github.com/apache/beam/pull/11578#discussion_r418487133



##
File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##
@@ -153,18 +153,22 @@ public static void beforeClass() throws Exception {
 
   private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
 int tried = 0;

Review comment:
   ```suggestion
   Exception exception = null;
   int tried = 0;
   ```

##
File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##
@@ -153,18 +153,22 @@ public static void beforeClass() throws Exception {
 
   private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
 int tried = 0;
-while (tried < 3) {
+int delay = 5000;
+while (tried < 5) {
   try {
 return builder.buildNativeCluster();
   } catch (NoHostAvailableException e) {

Review comment:
   I couldn't find that this exception of type RuntimeException is thrown 
during cluster startup. We may want to catch all `Exception` here. 

##
File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##
@@ -153,18 +153,22 @@ public static void beforeClass() throws Exception {
 
   private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
 int tried = 0;
-while (tried < 3) {
+int delay = 5000;
+while (tried < 5) {
   try {
 return builder.buildNativeCluster();
   } catch (NoHostAvailableException e) {
 tried++;

Review comment:
   ```suggestion
   if (exception == null) {
 exception = e;
   } else {
 exception.addSuppressed(e);
   }
   tried++;
   ```

##
File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##
@@ -153,18 +153,22 @@ public static void beforeClass() throws Exception {
 
   private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
 int tried = 0;
-while (tried < 3) {
+int delay = 5000;
+while (tried < 5) {
   try {
 return builder.buildNativeCluster();
   } catch (NoHostAvailableException e) {
 tried++;
 try {
-  Thread.sleep(1000L);
+  Thread.sleep(delay);
 } catch (InterruptedException e1) {

Review comment:
   ```suggestion
   } catch (InterruptedException e1) {
 Thread.currentThread().interrupt();
 throw new RuntimeException("interrupted");
   }
   ```

##
File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##
@@ -153,18 +153,22 @@ public static void beforeClass() throws Exception {
 
   private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
 int tried = 0;
-while (tried < 3) {
+int delay = 5000;
+while (tried < 5) {
   try {
 return builder.buildNativeCluster();
   } catch (NoHostAvailableException e) {
 tried++;
 try {
-  Thread.sleep(1000L);
+  Thread.sleep(delay);
 } catch (InterruptedException e1) {
 }
   }
 }
-throw new RuntimeException("Unable to create embedded Cassandra cluster");
+throw new RuntimeException(
+String.format(
+"Unable to create embedded Cassandra cluster: tried %d times with 
%d delay",
+tried, delay));

Review comment:
   ```suggestion
   throw new RuntimeException(
   String.format(
   "Unable to create embedded Cassandra cluster: tried %d times 
with %d delay",
   tried, delay), 
   exception);
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…

2020-05-01 Thread GitBox


rahul8383 commented on pull request #11581:
URL: https://github.com/apache/beam/pull/11581#issuecomment-622308296


   I agree that this is not a scalable solution. Providing a Calcite 
RelDataType Mapping for every Logical Type defined(which is the solution 
presented in this PR) by every IO is not scalable.
   
   Another approach to solving the problem is: 
   Provide Calcite RelDataType Mapping depending on the Base Type defined in 
the logical type. 
   But, as CHAR and VARCHAR logical types have Base Type as STRING, we have to 
choose a default Calcite RelDataType Mapping. The same thing applies to BINARY 
and VARBINARY logical types.
   We also have to choose a default RelDataType Mapping if the Base Type is 
DATETIME.
   By using this approach, I think we might be missing some built-in 
Aggregation functions provided by Calcite for specific types.
   
   I was also thinking if we can use the IDENTIFIER of the logical type to 
determine the corresponding Calcite RelDataType. But, as the IDENTIFIER type is 
String and not an enum, it cannot be used. For example, all the logical types 
defined by JdbcIO use java.sql.JDBCType name as the IDENTIFIER.
   
   Please correct me if my understanding is incorrect.   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11557: [BEAM-9845] Stage artifacts over expansion service.

2020-05-01 Thread GitBox


robertwb commented on pull request #11557:
URL: https://github.com/apache/beam/pull/11557#issuecomment-622304615







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] henryken commented on a change in pull request #11564: [Beam-9679] Add Core Transforms section / Map lesson to the Go SDK katas

2020-05-01 Thread GitBox


henryken commented on a change in pull request #11564:
URL: https://github.com/apache/beam/pull/11564#discussion_r418443043



##
File path: learning/katas/go/Core Transforms/Map/ParDo/pkg/task/task.go
##
@@ -18,8 +18,9 @@ package task
 import "github.com/apache/beam/sdks/go/pkg/beam"
 
 func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
-   processFn := func(element int) int {
-   return element * 10
-   }
-   return beam.ParDo(s, processFn, input)
+   return beam.ParDo(s, multiplyBy10Fn, input)
 }
+
+func multiplyBy10Fn(element int) int {
+   return element * 10

Review comment:
   The "* 10" is not covered by the answer placeholder. Is it intentional?
   
   
![image](https://user-images.githubusercontent.com/5459430/80789287-24764800-8bbe-11ea-9594-d279d633e9e7.png)
   

##
File path: learning/katas/go/Core Transforms/Map/lesson-info.yaml
##
@@ -20,5 +20,4 @@
 content:
 - ParDo
 - ParDo OneToMany
-- MapElements
-- FlatMapElements
+- ParDo struct

Review comment:
   Can maybe slightly rename to "ParDo Struct"?

##
File path: learning/katas/go/Core Transforms/Map/ParDo struct/pkg/task/task.go
##
@@ -18,10 +18,7 @@ package task
 import "github.com/apache/beam/sdks/go/pkg/beam"
 
 func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
-   processFn := {
-   Factor: 5,
-   }
-   return beam.ParDo(s, processFn, input)
+   return beam.ParDo(s, {Factor: 5}, input)

Review comment:
   How about having the whole "{...}" be part of the answer 
placeholder?
   
![image](https://user-images.githubusercontent.com/5459430/80789407-8171fe00-8bbe-11ea-8ec6-e8ed8989ec2e.png)
   

##
File path: learning/katas/go/Core Transforms/Map/ParDo struct/task.md
##
@@ -16,10 +16,10 @@
 specific language governing permissions and limitations
 under the License.
 -->
-# Mapping Elements using structs
+# Using a struct as a DoFn

Review comment:
   Can we have the ParDo mentioned, e.g. "ParDo - using a struct as a DoFn"?

##
File path: learning/katas/go/Core Transforms/Map/ParDo 
OneToMany/pkg/task/task.go
##
@@ -21,10 +21,10 @@ import (
 )
 
 func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
-   return beam.ParDo(s, processFn, input)
+   return beam.ParDo(s, tokenizeFn, input)
 }
 
-func processFn(input string, emit func(out string)) {
+func tokenizeFn(input string, emit func(out string)) {

Review comment:
   The answer placeholder(s) seems weird here. Maybe it requires a fix?
   Or alternatively, we can use a similar answer placeholder as in the "ParDo" 
task, i.e. having just empty "fund ApplyTransforms"?
   
![image](https://user-images.githubusercontent.com/5459430/80789567-e88fb280-8bbe-11ea-9185-b87c867f1bee.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax commented on a change in pull request #11456: [BEAM-7554] Add MillisInstant logical type to replace DATETIME

2020-05-01 Thread GitBox


reuvenlax commented on a change in pull request #11456:
URL: https://github.com/apache/beam/pull/11456#discussion_r418443258



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##
@@ -257,10 +257,8 @@ private Transform(
   // Combining over a single field, so extract just that field.
   combined =
   (combined == null)
-  ? byFields.aggregateFieldBaseValue(
-  inputs.get(0), combineFn, fieldAggregation.outputField)
-  : combined.aggregateFieldBaseValue(
-  inputs.get(0), combineFn, fieldAggregation.outputField);
+  ? byFields.aggregateField(inputs.get(0), combineFn, 
fieldAggregation.outputField)
+  : combined.aggregateField(inputs.get(0), combineFn, 
fieldAggregation.outputField);

Review comment:
   I'm not sure I agree with this change. I think it's important that SQL 
work over user logical types by interpreting it as the base value. The user 
writing the SQL statement usually understands the base type of their logical 
type, and can write the SQL statement appropriately. This will break that.

##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##
@@ -416,36 +417,33 @@ private static Expression value(
 
   Expression value =
   Expressions.convert_(
-  Expressions.call(
-  expression,
-  "getBaseValue",
-  Expressions.constant(index),
-  Expressions.constant(convertTo)),
-  convertTo);
+  Expressions.call(expression, "getValue", 
Expressions.constant(index)), convertTo);

Review comment:
   I have the same concern with this change.

##
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
##
@@ -419,7 +420,6 @@ public int hashCode() {
 FLOAT,
 DOUBLE,
 STRING, // String.
-DATETIME, // Date and time.

Review comment:
   I'm a little worried about this. Empirically many users are using 
schemas. Maybe we should start off by leaving DATETIME around and remove it 
later in another PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…

2020-05-01 Thread GitBox


rahul8383 commented on pull request #11581:
URL: https://github.com/apache/beam/pull/11581#issuecomment-622277282


   @reuvenlax 
   I have provided the source table schema and attached the NPE that I have 
faced in [BEAM-8307](https://issues.apache.org/jira/browse/BEAM-8307)
   
   NPE is thrown as Calcite's RelDataType cannot be found for the JdbcIO 
Logical Type.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…

2020-05-01 Thread GitBox


reuvenlax commented on pull request #11581:
URL: https://github.com/apache/beam/pull/11581#issuecomment-622273882


   I'm wondering if this is a scalable solution. 
   
   In general SQL is supposed to be able to handle unknown logical types, and 
simply treat them as the base type. If you'r seeing a NPE, maybe we need to fix 
that. Where do you see the NPE?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] Akshay-Iyangar commented on pull request #10078: [BEAM-8542] Change write to async in AWS SNS IO & remove retry logic

2020-05-01 Thread GitBox


Akshay-Iyangar commented on pull request #10078:
URL: https://github.com/apache/beam/pull/10078#issuecomment-622269940


   @aromanenko-dev - addressed the feedback. thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org