[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-20 Thread GitBox


boyuanzz commented on pull request #11756:
URL: https://github.com/apache/beam/pull/11756#issuecomment-631891047


   Run Java Flink PortableValidatesRunner Streaming



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-20 Thread GitBox


boyuanzz commented on pull request #11756:
URL: https://github.com/apache/beam/pull/11756#issuecomment-631890990


   Run Java Flink PortableValidatesRunner Batch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tweise commented on pull request #11722: Bump Flink 1.10 version

2020-05-20 Thread GitBox


tweise commented on pull request #11722:
URL: https://github.com/apache/beam/pull/11722#issuecomment-631884230


   Unrelated test failure 
`org.apache.beam.sdk.extensions.ml.VideoIntelligenceIT.annotateVideoFromURINoContext`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tweise merged pull request #11722: Bump Flink 1.10 version

2020-05-20 Thread GitBox


tweise merged pull request #11722:
URL: https://github.com/apache/beam/pull/11722


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-20 Thread GitBox


boyuanzz commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r428438971



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -962,16 +971,25 @@ private Progress getProgress() {
 .build());
   }
 
-  private  void processTimer(String timerId, TimeDomain timeDomain, 
Timer timer) {
+  private  void processTimer(
+  String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) {
 currentTimer = timer;
 currentTimeDomain = timeDomain;
 onTimerContext = new OnTimerContext<>(timer.getUserKey());
+String timerId =
+timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)

Review comment:
   Ack. Thanks!

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+}
 
 @Override
-public void set(String timerId, Instant absoluteTime) {}
+public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
   I thought it would be good time to revisit the dynamic timer API design 
but it's not in the scope of this PR. Let's leave it as now. Thanks!

##
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##
@@ -947,49 +910,213 @@ public void testTimers() throws Exception {
 timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)),
 timerInGlobalWindow("X", new Instant(1700L), new Instant(10022L)),
 timerInGlobalWindow("C", new Instant(1800L), new Instant(10022L)),
-timerInGlobalWindow("B", new Instant(1900L), new 
Instant(10022L;
+timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L)),
+timerInGlobalWindow("B", new Instant(2000L), new Instant(10032L)),
+timerInGlobalWindow("Y", new Instant(2100L), new 
Instant(10042L;
+assertThat(
+fakeTimerClient.getTimers(eventFamilyTimer),
+contains(
+timerInGlobalWindow("X", "event-timer1", new Instant(1000L), new 
Instant(1003L)),
+timerInGlobalWindow("Y", "event-timer1", new Instant(1100L), new 
Instant(1103L)),
+timerInGlobalWindow("X", "event-timer1", new Instant(1200L), new 
Instant(1203L)),
+timerInGlobalWindow("Y", "event-timer1", new Instant(1300L), new 
Instant(1303L)),
+timerInGlobalWindow("A", "event-timer1", new Instant(1400L), new 
Instant(2413L)),
+timerInGlobalWindow("B", "event-timer1", new Instant(1500L), new 
Instant(2513L)),
+timerInGlobalWindow("A", "event-timer1", new Instant(1600L), new 
Instant(2613L)),
+timerInGlobalWindow("X", "event-timer1", new Instant(1700L), new 
Instant(1723L)),
+timerInGlobalWindow("C", "event-timer1", new Instant(1800L), new 
Instant(1823L)),
+timerInGlobalWindow("B", "event-timer1", new Instant(1900L), new 
Instant(1923L)),
+timerInGlobalWindow("B", "event-timer1", new Instant(2000L), new 
Instant(2033L)),
+timerInGlobalWindow("Y", "event-timer1", new Instant(2100L), new 
Instant(2143L;
+assertThat(
+fakeTimerClient.getTimers(processingFamilyTimer),
+contains(
+timerInGlobalWindow("X", "processing-timer1", new Instant(1000L), 
new Instant(10004L)),
+timerInGlobalWindow("Y", "processing-timer1", new Instant(1100L), 
new Instant(10004L)),
+timerInGlobalWindow("X", "processing-timer1", new Instant(1200L), 
new Instant(10004L)),

[GitHub] [beam] veblush commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-20 Thread GitBox


veblush commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631882467


   Thanks, Brian and Chamikara!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-20 Thread GitBox


boyuanzz commented on pull request #11756:
URL: https://github.com/apache/beam/pull/11756#issuecomment-631882129


   retest all please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11757:
URL: https://github.com/apache/beam/pull/11757#issuecomment-631880290







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj merged pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-20 Thread GitBox


chamikaramj merged pull request #11651:
URL: https://github.com/apache/beam/pull/11651


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tweise commented on pull request #11722: Bump Flink 1.10 version

2020-05-20 Thread GitBox


tweise commented on pull request #11722:
URL: https://github.com/apache/beam/pull/11722#issuecomment-631867326


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit closed pull request #11769: [DO NOT MERGE] Start snapshot build for release process

2020-05-20 Thread GitBox


TheNeuralBit closed pull request #11769:
URL: https://github.com/apache/beam/pull/11769


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11770: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11770:
URL: https://github.com/apache/beam/pull/11770#issuecomment-631860763







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11770: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11770:
URL: https://github.com/apache/beam/pull/11770#issuecomment-631859266







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit opened a new pull request #11770: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch

2020-05-20 Thread GitBox


TheNeuralBit opened a new pull request #11770:
URL: https://github.com/apache/beam/pull/11770


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 

[GitHub] [beam] rionmonster edited a comment on pull request #11761: [BEAM-10027] Support for Kotlin-based Beam Katas

2020-05-20 Thread GitBox


rionmonster edited a comment on pull request #11761:
URL: https://github.com/apache/beam/pull/11761#issuecomment-631728007


   @henryken 
   
   I was able to successfully export the content of this course over to Stepik 
and generate all of the expected new metadata associated with it (e.g. 
`*-remote-info.yaml`) files. I have a draft of the course on the site now that 
as far as I can tell seems to parallel the existing Java one as expected.
   
   You can [access the current iteration of the Kotlin course on Stepik at it 
stands here](https://stepik.org/course/72488) as I'm currently working through 
it to make sure things are working as expected. I've managed to fix a handful 
of issues that I've found post-publication which I've included as well (along 
with the updated `*-remote-info.yaml` files that align with the new course). 
   
   Let me know if anything sounds off with that or there's a better path to 
follow in terms of publishing the course, handling the course-related metadata 
in the repo, etc.
   
   Thanks again!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11360:
URL: https://github.com/apache/beam/pull/11360#issuecomment-631854098







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11360:
URL: https://github.com/apache/beam/pull/11360#issuecomment-631854178


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11360:
URL: https://github.com/apache/beam/pull/11360#issuecomment-631854055


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11360:
URL: https://github.com/apache/beam/pull/11360#issuecomment-631853980


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11757:
URL: https://github.com/apache/beam/pull/11757#issuecomment-631853840


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik merged pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.

2020-05-20 Thread GitBox


lukecwik merged pull request #11746:
URL: https://github.com/apache/beam/pull/11746


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


aaltay commented on pull request #11682:
URL: https://github.com/apache/beam/pull/11682#issuecomment-631853304


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11769: [DO NOT MERGE] Start snapshot build for release process

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11769:
URL: https://github.com/apache/beam/pull/11769#issuecomment-631849940


   Run Gradle Publish



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit opened a new pull request #11769: [DO NOT MERGE] Start snapshot build for release process

2020-05-20 Thread GitBox


TheNeuralBit opened a new pull request #11769:
URL: https://github.com/apache/beam/pull/11769


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 

[GitHub] [beam] TheNeuralBit commented on pull request #11769: [DO NOT MERGE] Start snapshot build for release process

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11769:
URL: https://github.com/apache/beam/pull/11769#issuecomment-631838720


   Run Gradle Publish



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tweise commented on pull request #11722: Bump Flink 1.10 version

2020-05-20 Thread GitBox


tweise commented on pull request #11722:
URL: https://github.com/apache/beam/pull/11722#issuecomment-631828652


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399929



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;
 
 /**
  * Constructs a PartitionDoFn.
  *
  * @throws IllegalArgumentException if {@code numPartitions <= 0}
  */
-public PartitionDoFn(int numPartitions, PartitionFn 
partitionFn) {
+public PartitionDoFn(
+int numPartitions,
+Contextful> ctxFn,
+Object originalFnForDisplayData) {

Review comment:
   Done thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399738



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;

Review comment:
   Done. thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399634



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -85,7 +141,14 @@
* @throws IllegalArgumentException if {@code numPartitions <= 0}
*/
   public static  Partition of(int numPartitions, PartitionFn 
partitionFn) {
-return new Partition<>(new PartitionDoFn(numPartitions, partitionFn));
+
+Contextful ctfFn =
+Contextful.fn(
+(T element, Contextful.Fn.Context c) ->
+partitionFn.partitionFor(element, numPartitions),
+Requirements.empty());
+Object aClass = partitionFn;

Review comment:
   Hi, I don't get your suggestion here. I will need to wrap interface 
function in ContextFul.Fn. Can you eloborate please?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399671



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -76,6 +93,45 @@
 int partitionFor(T elem, int numPartitions);
   }
 
+  /**
+   * A function object that chooses an output partition for an element.
+   *
+   * @param  the type of the elements being partitioned
+   */
+  public interface PartitionWithSideInputsFn extends Serializable {
+/**
+ * Chooses the partition into which to put the given element.
+ *
+ * @param elem the element to be partitioned
+ * @param numPartitions the total number of partitions ({@code >= 1})
+ * @param c the {@link Contextful.Fn.Context} needed to access sideInputs.
+ * @return index of the selected partition (in the range {@code 
[0..numPartitions-1]})
+ */
+int partitionFor(T elem, int numPartitions, Contextful.Fn.Context c);
+  }
+
+  /**
+   * Returns a new {@code Partition} {@code PTransform} that divides its input 
{@code PCollection}
+   * into the given number of partitions, using the given partitioning 
function.
+   *
+   * @param numPartitions the number of partitions to divide the input {@code 
PCollection} into
+   * @param partitionFn the function to invoke on each element to choose its 
output partition
+   * @param requirements the {@link Requirements} needed to run it.
+   * @throws IllegalArgumentException if {@code numPartitions <= 0}
+   */
+  public static  Partition of(
+  int numPartitions,
+  PartitionWithSideInputsFn partitionFn,
+  Requirements requirements) {
+Contextful ctfFn =
+Contextful.fn(
+(T element, Contextful.Fn.Context c) ->
+partitionFn.partitionFor(element, numPartitions, c),
+requirements);
+Object aClass = partitionFn;

Review comment:
   Hi, I don't get your suggestion here. I will need to wrap interface 
function in ContextFul.Fn. Can you eloborate please?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


darshanj commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428399770



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;
 
 /**
  * Constructs a PartitionDoFn.
  *
  * @throws IllegalArgumentException if {@code numPartitions <= 0}
  */
-public PartitionDoFn(int numPartitions, PartitionFn 
partitionFn) {
+public PartitionDoFn(

Review comment:
   Done.Thanks

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;

Review comment:
   Done Thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428399477



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/BeamBatchWorker.java
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.tset.TBase;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.tset.TBaseGraph;
+import edu.iu.dsc.tws.tset.env.BatchTSetEnvironment;
+import edu.iu.dsc.tws.tset.links.BaseTLink;
+import edu.iu.dsc.tws.tset.sets.BaseTSet;
+import edu.iu.dsc.tws.tset.sets.BuildableTSet;
+import edu.iu.dsc.tws.tset.sets.batch.CachedTSet;
+import edu.iu.dsc.tws.tset.sets.batch.ComputeTSet;
+import edu.iu.dsc.tws.tset.sets.batch.SinkTSet;
+import edu.iu.dsc.tws.tset.worker.BatchTSetIWorker;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.twister2.translators.functions.DoFnFunction;
+import 
org.apache.beam.runners.twister2.translators.functions.Twister2SinkFunction;
+
+/**
+ * The Twister2 worker that will execute the job logic once the job is 
submitted from the run
+ * method.
+ */
+public class BeamBatchWorker implements Serializable, BatchTSetIWorker {
+
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  private HashMap> sideInputDataSets;
+  private Set leaves;

Review comment:
   I might have to take a little time on this and make sure i get the type 
information correct. Would it be OK to create improvement issue for this for 
now, so i can get back to it later after the code is merged?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428399112



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+
+/** Represents a Twister2 pipeline execution result. */
+public class Twister2PipelineResult implements PipelineResult {
+
+  PipelineResult.State state = State.RUNNING;
+
+  @Override
+  public State getState() {
+return state;
+  }
+
+  @Override
+  public State cancel() throws IOException {
+throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+return State.DONE;

Review comment:
   Those operations are not supported currently, I missed to mark them as 
such. Fixed them





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428397620



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {

Review comment:
   Changed to protected 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631825554


   Run Dataflow ValidatesRunner



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit merged pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.

2020-05-20 Thread GitBox


TheNeuralBit merged pull request #11529:
URL: https://github.com/apache/beam/pull/11529


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] veblush commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-20 Thread GitBox


veblush commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631815316


   Thanks for running the test and the result doesn't make sense because this 
change won't introduce any runtime behavior other than dependency. Is it 
possible to be caused by flaky environment? Can we rerun the test again?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #11768: [BEAM-10051] Move closed reader check after sentinel.

2020-05-20 Thread GitBox


lostluck commented on pull request #11768:
URL: https://github.com/apache/beam/pull/11768#issuecomment-631812306


   R: @youngoli 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428385521



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/AssignWindowsFunction.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.ComputeCollectorFunc;
+import edu.iu.dsc.tws.api.tset.fn.RecordCollector;
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.logging.Logger;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.twister2.utils.Twister2AssignContext;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import 
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+
+/** Assign Windows function. */
+public class AssignWindowsFunction
+implements ComputeCollectorFunc, 
Iterator>> {
+  private static final Logger LOG = 
Logger.getLogger(AssignWindowsFunction.class.getName());
+
+  private transient boolean isInitilized = false;

Review comment:
   Thank you for catching that, fixed all the occurrences 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tweise commented on pull request #11722: Bump Flink 1.10 version

2020-05-20 Thread GitBox


tweise commented on pull request #11722:
URL: https://github.com/apache/beam/pull/11722#issuecomment-631811467


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428384829



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/package-info.java
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */

Review comment:
   wonder how i missed that, i was sure i searched for spark and flink 
:smile: . Fixed all the instances





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428384075



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2StreamTranslationContext.java
##
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+/** Twister2StreamTranslationContext. */
+public class Twister2StreamTranslationContext {}

Review comment:
   Yes it is :smile: , there was a duplicate class fixed it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428381476



##
File path: runners/twister2/build.gradle
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module' }
+
+applyJavaNature(automaticModuleName: 'org.apache.beam.runners.twister2')
+evaluationDependsOn(":sdks:java:core")
+configurations {
+validatesRunner
+}
+description = "Apache Beam :: Runners :: Twister2"
+
+repositories {

Review comment:
   Thanks for catching that, removed it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem merged pull request #11339: [BEAM-9468] Fhir io

2020-05-20 Thread GitBox


pabloem merged pull request #11339:
URL: https://github.com/apache/beam/pull/11339


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck opened a new pull request #11768: [BEAM-10051] Move closed reader check after sentinel.

2020-05-20 Thread GitBox


lostluck opened a new pull request #11768:
URL: https://github.com/apache/beam/pull/11768


   The closed reader check,  in it's current position prevents the "normal 
teardown" that the reader expects. This means that readers for instructions 
that terminate early such as due to splitting stay resident in memory and never 
close.
   
   In practice this is benign as the buffer would already be closed, but with 
streaming this memory leak would become noticable.
   
   This PR moves the check to after the sentinel check, and additionally check 
there for early termination to avoid closing the buffer twice.
   
   Since this largely affects the goroutine local cache, no test changes are 
possible.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 

[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428380317



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {
+this.options = options;
+  }
+
+  public static Twister2LegacyRunner fromOptions(PipelineOptions options) {
+Twister2PipelineOptions pipelineOptions =
+PipelineOptionsValidator.validate(Twister2PipelineOptions.class, 
options);
+if (pipelineOptions.getFilesToStage() == null) {
+  pipelineOptions.setFilesToStage(
+  detectClassPathResourcesToStage(
+  Twister2LegacyRunner.class.getClassLoader(), pipelineOptions));
+  LOG.info(
+  "PipelineOptions.filesToStage was not specified. "
+  + "Defaulting to files from the classpath: will stage {} files. "
+  + "Enable logging at DEBUG level to see which files will be 
staged"
+  + pipelineOptions.getFilesToStage().size());
+}
+return new Twister2LegacyRunner(pipelineOptions);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+// create a worker and pass in the pipeline and then do the translation
+Twister2PipelineExecutionEnvironment env = new 
Twister2PipelineExecutionEnvironment(options);
+LOG.info("Translating pipeline to 

[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-20 Thread GitBox


jaketf commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631805319


    



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428378347



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import edu.iu.dsc.tws.tset.env.TSetEnvironment;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/** Twister2PipelineOptions. */
+public interface Twister2PipelineOptions extends PipelineOptions, 
StreamingOptions {
+  @Description("set unique application name for Twister2 runner")

Review comment:
   Was not aware of that convention, thanks for pointing that out. 
reformatted according this rule





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] apilloud commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

2020-05-20 Thread GitBox


apilloud commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428371418



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;

Review comment:
   nit: this can be `final`

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -85,7 +141,14 @@
* @throws IllegalArgumentException if {@code numPartitions <= 0}
*/
   public static  Partition of(int numPartitions, PartitionFn 
partitionFn) {
-return new Partition<>(new PartitionDoFn(numPartitions, partitionFn));
+
+Contextful ctfFn =
+Contextful.fn(
+(T element, Contextful.Fn.Context c) ->
+partitionFn.partitionFor(element, numPartitions),
+Requirements.empty());
+Object aClass = partitionFn;

Review comment:
   This is a no-op, please remove.

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -76,6 +93,45 @@
 int partitionFor(T elem, int numPartitions);
   }
 
+  /**
+   * A function object that chooses an output partition for an element.
+   *
+   * @param  the type of the elements being partitioned
+   */
+  public interface PartitionWithSideInputsFn extends Serializable {
+/**
+ * Chooses the partition into which to put the given element.
+ *
+ * @param elem the element to be partitioned
+ * @param numPartitions the total number of partitions ({@code >= 1})
+ * @param c the {@link Contextful.Fn.Context} needed to access sideInputs.
+ * @return index of the selected partition (in the range {@code 
[0..numPartitions-1]})
+ */
+int partitionFor(T elem, int numPartitions, Contextful.Fn.Context c);
+  }
+
+  /**
+   * Returns a new {@code Partition} {@code PTransform} that divides its input 
{@code PCollection}
+   * into the given number of partitions, using the given partitioning 
function.
+   *
+   * @param numPartitions the number of partitions to divide the input {@code 
PCollection} into
+   * @param partitionFn the function to invoke on each element to choose its 
output partition
+   * @param requirements the {@link Requirements} needed to run it.
+   * @throws IllegalArgumentException if {@code numPartitions <= 0}
+   */
+  public static  Partition of(
+  int numPartitions,
+  PartitionWithSideInputsFn partitionFn,
+  Requirements requirements) {
+Contextful ctfFn =
+Contextful.fn(
+(T element, Contextful.Fn.Context c) ->
+partitionFn.partitionFor(element, numPartitions, c),
+requirements);
+Object aClass = partitionFn;

Review comment:
   This is a no-op, please remove.

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;
 
 /**
  * Constructs a PartitionDoFn.
  *
  * @throws IllegalArgumentException if {@code numPartitions <= 0}
  */
-public PartitionDoFn(int numPartitions, PartitionFn 
partitionFn) {
+public PartitionDoFn(

Review comment:
   nit: drop the `public`.

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;

Review comment:
   nit: this can be `final`

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) {
 
   private static class PartitionDoFn extends DoFn {
 private final int numPartitions;
-private final PartitionFn partitionFn;
 private final TupleTagList outputTags;
+private Contextful> ctxFn;
+private Object originalFnForDisplayData;
 
 /**
  * Constructs a PartitionDoFn.
  *
  * @throws IllegalArgumentException if {@code numPartitions <= 0}
  */
-public PartitionDoFn(int numPartitions, PartitionFn 
partitionFn) {
+public PartitionDoFn(
+int numPartitions,
+

[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428376950



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import edu.iu.dsc.tws.tset.env.TSetEnvironment;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/** Twister2PipelineOptions. */
+public interface Twister2PipelineOptions extends PipelineOptions, 
StreamingOptions {
+  @Description("set unique application name for Twister2 runner")
+  void setApplicationName(String name);
+
+  String getApplicationName();

Review comment:
   I initially added them for completeness, but ended up not using some of 
them. removed the unused options since they can be added if needed later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.

2020-05-20 Thread GitBox


lukecwik commented on pull request #11746:
URL: https://github.com/apache/beam/pull/11746#issuecomment-631800147


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz merged pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker

2020-05-20 Thread GitBox


boyuanzz merged pull request #11715:
URL: https://github.com/apache/beam/pull/11715


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker

2020-05-20 Thread GitBox


boyuanzz commented on pull request #11715:
URL: https://github.com/apache/beam/pull/11715#issuecomment-631794981


   Thanks for your help!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11529:
URL: https://github.com/apache/beam/pull/11529#issuecomment-631794665


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11529:
URL: https://github.com/apache/beam/pull/11529#issuecomment-631792217


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem merged pull request #11762: [BEAM-10044] Remove curly quotes from documentation code samples

2020-05-20 Thread GitBox


pabloem merged pull request #11762:
URL: https://github.com/apache/beam/pull/11762


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11762: [BEAM-10044] Remove curly quotes from documentation code samples

2020-05-20 Thread GitBox


pabloem commented on pull request #11762:
URL: https://github.com/apache/beam/pull/11762#issuecomment-631791451


   thanks @epicfaace !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia merged pull request #11724: Updated Videos and Podcasts page

2020-05-20 Thread GitBox


amaliujia merged pull request #11724:
URL: https://github.com/apache/beam/pull/11724


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11529:
URL: https://github.com/apache/beam/pull/11529#issuecomment-631789673


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-20 Thread GitBox


pabloem commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631786516


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit merged pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types

2020-05-20 Thread GitBox


TheNeuralBit merged pull request #11701:
URL: https://github.com/apache/beam/pull/11701


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jkff commented on a change in pull request #11406: [BEAM-9748] Refactor Reparallelize as an alternative Reshuffle implementation

2020-05-20 Thread GitBox


jkff commented on a change in pull request #11406:
URL: https://github.com/apache/beam/pull/11406#discussion_r428356257



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##
@@ -107,10 +108,57 @@ public void processElement(
 
   /** Implementation of {@link #viaRandomKey()}. */
   public static class ViaRandomKey extends PTransform, 
PCollection> {
+private boolean isHighFanoutAndLimitedInputParallelism;
+
 private ViaRandomKey() {}
 
+/**
+ * Use a different strategy that materializes the input and prepares it to 
be consumed in a
+ * highly parallel fashion.
+ *
+ * It is tailored to the case when input was produced in an extremely 
sequential way -
+ * typically by a ParDo that emits millions of outputs _per input 
element_, e.g., executing a
+ * large database query or a large simulation and emitting all of their 
results.
+ *
+ * Internally, it materializes the input at a moderate cost before 
reshuffling it, making the
+ * reshuffling itself significantly cheaper in these extreme cases on some 
runners. Use this
+ * only if your benchmarks show an improvement.
+ */
+public ViaRandomKey withHintHighFanoutAndLimitedInputParallelism() {
+  this.isHighFanoutAndLimitedInputParallelism = true;
+  return this;
+}
+
 @Override
 public PCollection expand(PCollection input) {
+  if (isHighFanoutAndLimitedInputParallelism) {
+// See https://issues.apache.org/jira/browse/BEAM-2803
+// We use a combined approach to "break fusion" here:
+// (see 
https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
+// 1) force the data to be materialized by passing it as a side input 
to an identity fn,
+// then 2) reshuffle it with a random key. Initial materialization 
provides some parallelism
+// and ensures that data to be shuffled can be generated in parallel, 
while reshuffling
+// provides perfect parallelism.
+// In most cases where a "fusion break" is needed, a simple reshuffle 
would be sufficient.
+// The current approach is necessary only to support the particular 
case of JdbcIO where
+// a single query may produce many gigabytes of query results.
+PCollectionView> empty =
+input
+.apply("Consume", 
Filter.by(SerializableFunctions.constant(false)))
+.apply(View.asIterable());
+PCollection materialized =
+input.apply(
+"Identity",
+ParDo.of(

Review comment:
   nit: here you could use a `MapElements.via(Contextful.of(t -> t, 
requiresSideInputs(empty)))`

##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##
@@ -107,10 +108,57 @@ public void processElement(
 
   /** Implementation of {@link #viaRandomKey()}. */
   public static class ViaRandomKey extends PTransform, 
PCollection> {
+private boolean isHighFanoutAndLimitedInputParallelism;

Review comment:
   Please use an immutable AutoValue with a builder here. PTransforms can't 
have mutable member variables.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-20 Thread GitBox


pabloem commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631782829


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR

2020-05-20 Thread GitBox


ibzib commented on pull request #11755:
URL: https://github.com/apache/beam/pull/11755#issuecomment-631781814


   > > This looks pretty neat. We could maybe specialize this for some 
particular common cases. I found myself many times requiring to run something 
like "Run All Spark Runners ValidatesRunner Tests" WDYT @ibzib ?
   > 
   > If you want to use it as-is, all you have to do is modify the 
`COMMENTS_TO_ADD`.
   
   Actually, for that use case, maybe it would be better to make a single 
Jenkins uber-job that runs all the Spark test suites we have.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-20 Thread GitBox


pabloem commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631778141


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-20 Thread GitBox


jaketf commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631777614


   are we able to run post commit?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631776883


   It looks like Dataflow VR tests are going to fail:
   ```
   15:10:31 org.apache.beam.sdk.transforms.join.CoGroupByKeyTest > 
testCoGroupByKeyWithWindowing FAILED
   15:10:31 java.lang.RuntimeException at CoGroupByKeyTest.java:494
   15:10:31 Caused by: java.lang.OutOfMemoryError
   15:10:31 
   15:10:31 org.apache.beam.sdk.transforms.FlattenTest > 
testFlattenIterablesLists FAILED
   15:10:31 java.lang.RuntimeException at FlattenTest.java:270
   15:10:31 Caused by: java.lang.OutOfMemoryError
   15:10:31 
   15:10:31 org.apache.beam.sdk.transforms.CreateTest > testCreateWithVoidType 
FAILED
   15:10:31 java.lang.RuntimeException at CreateTest.java:326
   15:10:31 Caused by: java.lang.OutOfMemoryError
   15:10:31 
   15:10:31 org.apache.beam.sdk.transforms.ReshuffleTest > 
testReshuffleAfterSlidingWindows FAILED
   15:10:31 java.lang.RuntimeException at ReshuffleTest.java:251
   15:10:31 Caused by: java.lang.RuntimeException
   15:10:31 Caused by: java.lang.IllegalArgumentException
   15:10:31 Caused by: java.lang.OutOfMemoryError
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631773038


   That definitely seems like a flake to me



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on a change in pull request #11331: [BEAM-9646] Add Google Cloud vision integration transform

2020-05-20 Thread GitBox


tysonjh commented on a change in pull request #11331:
URL: https://github.com/apache/beam/pull/11331#discussion_r428351403



##
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateImages.java
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.vision.v1.AnnotateImageRequest;
+import com.google.cloud.vision.v1.AnnotateImageResponse;
+import com.google.cloud.vision.v1.BatchAnnotateImagesResponse;
+import com.google.cloud.vision.v1.Feature;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.cloud.vision.v1.ImageContext;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Parent class for transform utilizing Cloud Vision API.
+ *
+ * @param  Type of input PCollection.
+ */
+public abstract class AnnotateImages
+extends PTransform, 
PCollection>> {
+
+  private static final Long MIN_BATCH_SIZE = 1L;
+  private static final Long MAX_BATCH_SIZE = 5L;
+
+  protected final PCollectionView> contextSideInput;
+  protected final List featureList;
+  private long batchSize;
+
+  public AnnotateImages(

Review comment:
   A summary fragment would be nice here, even something simple like 
'Constructs an AnnotateImages transform.'





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io

2020-05-20 Thread GitBox


jaketf commented on pull request #11339:
URL: https://github.com/apache/beam/pull/11339#issuecomment-631768508


   Seems pre-commit is stuck reporting back to github.
   There's one unrelated test that failed
   
https://builds.apache.org/job/beam_PreCommit_Java_Commit/11508/testReport/org.apache.beam.sdk.extensions.ml/VideoIntelligenceIT/annotateVideoFromURINoContext/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR

2020-05-20 Thread GitBox


ibzib commented on pull request #11755:
URL: https://github.com/apache/beam/pull/11755#issuecomment-631766935


   > This looks pretty neat. We could maybe specialize this for some particular 
common cases. I found myself many times requiring to run something like "Run 
All Spark Runners ValidatesRunner Tests" WDYT @ibzib ?
   
   If you want to use it as-is, all you have to do is modify the 
`COMMENTS_TO_ADD`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11764: [BEAM-10048] Clean up release guide.

2020-05-20 Thread GitBox


ibzib commented on pull request #11764:
URL: https://github.com/apache/beam/pull/11764#issuecomment-631766210


   > Will take a look once this PR becomes stable. (Seems still evolving).
   
   I'm done with it for now  sorry for the churn



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on pull request #11764: [BEAM-10048] Clean up release guide.

2020-05-20 Thread GitBox


amaliujia commented on pull request #11764:
URL: https://github.com/apache/beam/pull/11764#issuecomment-631764165


   Will take a look once this PR becomes stable. (Seems still evolving).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker

2020-05-20 Thread GitBox


boyuanzz commented on a change in pull request #11715:
URL: https://github.com/apache/beam/pull/11715#discussion_r428343813



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
##
@@ -120,13 +136,27 @@ public String toString() {
   public Progress getProgress() {
 // If we have never attempted an offset, we return the length of the 
entire range as work
 // remaining.
+// Convert to BigDecimal in computation to prevent overflow, which may 
result in loss of
+// precision.
 if (lastAttemptedOffset == null) {
-  return Progress.from(0, range.getTo() - range.getFrom());
+  return Progress.from(
+  0,
+  BigDecimal.valueOf(range.getTo())
+  .subtract(BigDecimal.valueOf(range.getFrom()), 
MathContext.DECIMAL128)
+  .doubleValue());
 }
 
 // Compute the amount of work remaining from where we are to where we are 
attempting to get to
 // with a minimum of zero in case we have claimed beyond the end of the 
range.
-long workRemaining = Math.max(range.getTo() - lastAttemptedOffset, 0);
-return Progress.from(range.getTo() - range.getFrom() - workRemaining, 
workRemaining);
+BigDecimal workRemaining =
+BigDecimal.valueOf(range.getTo())
+.subtract(BigDecimal.valueOf(lastAttemptedOffset), 
MathContext.DECIMAL128)
+.max(BigDecimal.ZERO);
+BigDecimal wholeWork =

Review comment:
   Done. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker

2020-05-20 Thread GitBox


boyuanzz commented on a change in pull request #11715:
URL: https://github.com/apache/beam/pull/11715#discussion_r428343752



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+/**
+ * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code 
Long.MAX_VALUE} is
+ * used as the end of the range to indicate infinity.
+ *
+ * An offset range is considered growable when the end offset could grow 
(or change) during
+ * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ *
+ * The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class GrowableOffsetRangeTracker extends OffsetRangeTracker {
+  /**
+   * Provides the estimated end offset of the range.
+   *
+   * {@link #estimate} is called to give the end offset when {@link 
#trySplit} or {@link
+   * #getProgress} is invoked. The end offset is exclusive for the range. The 
estimated end is not
+   * required to monotonically increase as it will only be taken into 
consideration when the
+   * estimated end offset is larger than the current position. Returning 
{@code Long.MAX_VALUE} as
+   * the estimate implies the largest possible position for the range is 
{@code Long.MAX_VALUE - 1}.
+   * Return {@code Long.MIN_VALUE} if an estimate can not be provided.
+   *
+   * Providing a good estimate is important for an accurate progress signal 
and will impact
+   * splitting decisions by the runner.
+   *
+   * If {@link #estimate} is expensive to compute, consider wrapping the 
implementation with
+   * {@link Suppliers#memoizeWithExpiration} or equivalent as an optimization.
+   *
+   * TODO(BEAM-10032): Also consider using {@link RangeEndEstimator} when 
the range is not ended
+   * with {@code Long.MAX_VALUE}.
+   */
+  @FunctionalInterface
+  public interface RangeEndEstimator {
+long estimate();
+  }
+
+  private final RangeEndEstimator rangeEndEstimator;
+
+  public GrowableOffsetRangeTracker(long start, RangeEndEstimator 
rangeEndEstimator) {
+super(new OffsetRange(start, Long.MAX_VALUE));
+this.rangeEndEstimator = checkNotNull(rangeEndEstimator);
+  }
+
+  @Override
+  public SplitResult trySplit(double fractionOfRemainder) {
+// If current tracking range is no longer growable, split it as a normal 
range.
+if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+  return super.trySplit(fractionOfRemainder);
+}
+// If current range has been done, there is no more space to split.
+if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) {
+  return null;
+}
+BigDecimal cur =
+(lastAttemptedOffset == null)
+? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE, 
MathContext.DECIMAL128)
+: BigDecimal.valueOf(lastAttemptedOffset);
+
+// Fetch the estimated end offset. If the estimated end is smaller than 
the next offset, use
+// the next offset as end.
+BigDecimal estimateRangeEnd =
+BigDecimal.valueOf(rangeEndEstimator.estimate())
+.max(cur.add(BigDecimal.ONE, MathContext.DECIMAL128));
+
+// Convert to BigDecimal in computation to prevent overflow, which may 
result in loss of
+// precision.
+// split = cur + max(1, (estimateRangeEnd - cur) * fractionOfRemainder)
+BigDecimal splitPos =
+cur.add(
+estimateRangeEnd
+.subtract(cur, MathContext.DECIMAL128)
+.multiply(BigDecimal.valueOf(fractionOfRemainder), 
MathContext.DECIMAL128)
+

[GitHub] [beam] iemejia commented on pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR

2020-05-20 Thread GitBox


iemejia commented on pull request #11755:
URL: https://github.com/apache/beam/pull/11755#issuecomment-631761505


   This looks pretty neat. We could maybe specialize this for some particular 
common cases. I found myself many times requiring  to run something like "Run 
All Spark Runners ValidatesRunner Tests" WDYT @ibzib ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11701:
URL: https://github.com/apache/beam/pull/11701#issuecomment-631760558


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11762: [BEAM-10044] Remove curly quotes from documentation code samples

2020-05-20 Thread GitBox


pabloem commented on pull request #11762:
URL: https://github.com/apache/beam/pull/11762#issuecomment-631758390


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax opened a new pull request #11767: Vortex sink

2020-05-20 Thread GitBox


reuvenlax opened a new pull request #11767:
URL: https://github.com/apache/beam/pull/11767


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428335678



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {
+this.options = options;
+  }
+
+  public static Twister2LegacyRunner fromOptions(PipelineOptions options) {
+Twister2PipelineOptions pipelineOptions =
+PipelineOptionsValidator.validate(Twister2PipelineOptions.class, 
options);
+if (pipelineOptions.getFilesToStage() == null) {
+  pipelineOptions.setFilesToStage(
+  detectClassPathResourcesToStage(
+  Twister2LegacyRunner.class.getClassLoader(), pipelineOptions));
+  LOG.info(
+  "PipelineOptions.filesToStage was not specified. "
+  + "Defaulting to files from the classpath: will stage {} files. "
+  + "Enable logging at DEBUG level to see which files will be 
staged"
+  + pipelineOptions.getFilesToStage().size());
+}
+return new Twister2LegacyRunner(pipelineOptions);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+// create a worker and pass in the pipeline and then do the translation
+Twister2PipelineExecutionEnvironment env = new 
Twister2PipelineExecutionEnvironment(options);
+LOG.info("Translating pipeline to 

[GitHub] [beam] HuangLED commented on a change in pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.

2020-05-20 Thread GitBox


HuangLED commented on a change in pull request #11746:
URL: https://github.com/apache/beam/pull/11746#discussion_r428333755



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.runners.core.construction.ModelCoders.STATE_BACKED_ITERABLE_CODER_URN;
+
+import com.google.auto.service.AutoService;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import 
org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
+import org.apache.beam.runners.core.construction.CoderTranslator;
+import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar;
+import org.apache.beam.sdk.coders.IterableLikeCoder;
+import org.apache.beam.sdk.fn.stream.DataStreams;
+import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+
+/**
+ * A {@link BeamFnStateClient state} backed iterable which allows for fetching 
elements over the
+ * portability state API. See https://s.apache.org/beam-fn-state-api-and-bundle-processing;>remote 
references for
+ * additional details.
+ *
+ * One must supply a {@link StateBackedIterableTranslationContext} when 
using {@link
+ * CoderTranslator#fromComponents} to be able to create a {@link 
StateBackedIterable.Coder}.
+ */
+public class StateBackedIterable implements Iterable {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final org.apache.beam.sdk.coders.Coder elemCoder;
+  @VisibleForTesting final StateRequest request;
+  @VisibleForTesting final List prefix;
+
+  public StateBackedIterable(
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  ByteString runnerKey,
+  org.apache.beam.sdk.coders.Coder elemCoder,
+  List prefix) {
+this.beamFnStateClient = beamFnStateClient;
+this.elemCoder = elemCoder;
+
+StateRequest.Builder requestBuilder = StateRequest.newBuilder();
+requestBuilder
+.setInstructionId(instructionId)
+.getStateKeyBuilder()
+.getRunnerBuilder()
+.setKey(runnerKey);
+this.request = requestBuilder.build();
+this.prefix = prefix;
+  }
+
+  @Override
+  public Iterator iterator() {
+return Iterators.concat(
+prefix.iterator(),
+new DataStreams.DataStreamDecoder(
+elemCoder,
+
DataStreams.inbound(StateFetchingIterators.forFirstChunk(beamFnStateClient, 
request;
+  }
+
+  /**
+   * Decodes an {@link Iterable} that might be backed by state. If the 
terminator at the end of the
+   * value stream is {@code -1} then we return a {@link StateBackedIterable} 
otherwise we return an
+   * {@link Iterable}.
+   */
+  public static class Coder extends IterableLikeCoder> {
+
+private final BeamFnStateClient beamFnStateClient;
+private final Supplier instructionId;
+
+public Coder(
+BeamFnStateClient beamFnStateClient,
+Supplier instructionId,
+org.apache.beam.sdk.coders.Coder elemCoder) {
+  super(elemCoder, "StateBackedIterable");
+  this.beamFnStateClient = beamFnStateClient;
+  this.instructionId = instructionId;
+}
+
+@Override
+protected Iterable decodeToIterable(List decodedElements) {
+  return decodedElements;
+}
+
+@Override

[GitHub] [beam] TheNeuralBit edited a comment on pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types

2020-05-20 Thread GitBox


TheNeuralBit edited a comment on pull request #11701:
URL: https://github.com/apache/beam/pull/11701#issuecomment-631751672


   Python PreCommit failure is a flake due to 
https://issues.apache.org/jira/browse/BEAM-9975



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11701:
URL: https://github.com/apache/beam/pull/11701#issuecomment-631751672


   Python PreCommit failure is due to 
https://issues.apache.org/jira/browse/BEAM-9975



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11754:
URL: https://github.com/apache/beam/pull/11754#issuecomment-631750861


   I'm going to go ahead and merge since CI failure is unrelated



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit merged pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build

2020-05-20 Thread GitBox


TheNeuralBit merged pull request #11754:
URL: https://github.com/apache/beam/pull/11754


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit merged pull request #11575: [BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform

2020-05-20 Thread GitBox


TheNeuralBit merged pull request #11575:
URL: https://github.com/apache/beam/pull/11575


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

2020-05-20 Thread GitBox


robertwb opened a new pull request #11766:
URL: https://github.com/apache/beam/pull/11766


   Also adds (naive) dataframe.agg() that uses this.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] rohdesamuel opened a new pull request #11765: [BEAM-9322] Turn new PCollection naming schemes to True by default

2020-05-20 Thread GitBox


rohdesamuel opened a new pull request #11765:
URL: https://github.com/apache/beam/pull/11765


   Change-Id: I8c2d660b175442d1917fe2b1ae166c0f4a1caaca
   
   This turns "passthrough_pcollection_output_ids" and 
"force_generated_pcollection_output_ids" to True by default.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 

[GitHub] [beam] ibzib merged pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR

2020-05-20 Thread GitBox


ibzib merged pull request #11755:
URL: https://github.com/apache/beam/pull/11755


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631748270


   whoops



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631748120


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.

2020-05-20 Thread GitBox


TheNeuralBit commented on pull request #11529:
URL: https://github.com/apache/beam/pull/11529#issuecomment-631747496


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11651:
URL: https://github.com/apache/beam/pull/11651#issuecomment-631745259







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11360:
URL: https://github.com/apache/beam/pull/11360#issuecomment-631745574


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #11763: [BEAM-9978] Adding functionality and tests to Go offset range tracker.

2020-05-20 Thread GitBox


lostluck commented on a change in pull request #11763:
URL: https://github.com/apache/beam/pull/11763#discussion_r428305778



##
File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package offsetrange
+
+import (
+   "fmt"
+   "github.com/google/go-cmp/cmp"
+   "testing"
+)
+
+// TestRestriction_EvenSplits tests various splits and checks that they all
+// follow the contract for EvenSplits. This means that all restrictions are
+// evenly split, that each restriction has at least one element, and that each
+// element is present in the split restrictions.
+func TestRestriction_EvenSplits(t *testing.T) {
+   tests := []struct {
+   rest Restriction
+   num  int64
+   }{
+   {rest: Restriction{Start: 0, End: 21}, num: 4},
+   {rest: Restriction{Start: 21, End: 42}, num: 4},
+   {rest: Restriction{Start: 0, End: 5}, num: 10},
+   {rest: Restriction{Start: 0, End: 21}, num: -1},
+   }
+   for _, test := range tests {
+   test := test
+   t.Run(fmt.Sprintf("(rest[%v, %v], splits = %v)",
+   test.rest.Start, test.rest.End, test.num), func(t 
*testing.T) {
+   r := test.rest
+
+   // Get the minimum size that a split restriction can 
be. Max size
+   // should be min + 1. This way we can check the size of 
each split.
+   num := test.num
+   if num <= 1 {
+   num = 1
+   }
+   min := (r.End - r.Start) / num
+
+   splits := r.EvenSplits(test.num)
+   prevEnd := r.Start
+   for _, split := range splits {
+   size := split.End - split.Start
+   // Check: Each restriction has at least 1 
element.
+   if size == 0 {
+   t.Errorf("split restriction [%v, %v] is 
empty, size must be greater than 0.",
+   split.Start, split.End)
+   }
+   // Check: Restrictions are evenly split.
+   if size != min && size != min+1 {
+   t.Errorf("split restriction [%v, %v] 
has unexpected size. got: %v, want: %v or %v",
+   split.Start, split.End, size, 
min, min+1)
+   }
+   // Check: All elements are still in a split 
restrictions. This
+   // logic assumes that the splits are returned 
in order which
+   // isn't guaranteed by EvenSplits, but this 
check is way easier
+   // with the assumption.
+   if split.Start != prevEnd {
+   t.Errorf("restriction range [%v, %v] 
missing after splits.",
+   prevEnd, split.Start)
+   } else {
+   prevEnd = split.End
+   }
+   }
+   if prevEnd != r.End {
+   t.Errorf("restriction range [%v, %v] missing 
after splits.",
+   prevEnd, r.End)
+   }
+   })
+   }
+}
+
+// TestTracker_TryClaim validates both success and failure cases for TryClaim.
+func TestTracker_TryClaim(t *testing.T) {
+   // Test that TryClaim works as expected when called correctly.
+   t.Run("Correctness", func(t *testing.T) {
+   tests := []struct {
+   rest   Restriction
+   claims []int64
+   }{
+   {rest: Restriction{Start: 0, End: 3}, claims: 
[]int64{0, 1, 2, 3}},
+   {rest: Restriction{Start: 10, End: 

[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11757:
URL: https://github.com/apache/beam/pull/11757#issuecomment-631736712


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model

2020-05-20 Thread GitBox


chamikaramj commented on pull request #11757:
URL: https://github.com/apache/beam/pull/11757#issuecomment-631736587


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on a change in pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model

2020-05-20 Thread GitBox


ihji commented on a change in pull request #11757:
URL: https://github.com/apache/beam/pull/11757#discussion_r428314117



##
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##
@@ -314,8 +314,15 @@ def __init__(
 if container_image_url in already_added_containers:
   # Do not add the pipeline environment again.
 
+  # Currently, Dataflow uses Docker container images to uniquely
+  # identify execution environments. Hence Dataflow executes all
+  # transforms that specify the same Docker container image in the
+  # same container. Dependencies of all environments that specify a
+  # given container image will be staged in the container for that

Review comment:
   If you pick an alternative sentence above: `container instance` here too.

##
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##
@@ -314,8 +314,15 @@ def __init__(
 if container_image_url in already_added_containers:
   # Do not add the pipeline environment again.
 
+  # Currently, Dataflow uses Docker container images to uniquely
+  # identify execution environments. Hence Dataflow executes all
+  # transforms that specify the same Docker container image in the
+  # same container. Dependencies of all environments that specify a

Review comment:
   `the same Docker container image in the same container` sounds little 
confusing to me. How about something like `the same Docker container image in 
the single container instance`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rionmonster commented on pull request #11761: [BEAM-10027] Support for Kotlin-based Beam Katas

2020-05-20 Thread GitBox


rionmonster commented on pull request #11761:
URL: https://github.com/apache/beam/pull/11761#issuecomment-631728007


   @henryken 
   
   I was able to successfully export the content of this course over to Stepik 
and generate all of the expected new metadata associated with it (e.g. 
`*-remote-info.yaml`) files. I have a draft of the course on the site now that 
as far as I can tell seems to parallel the existing Java one as expected.
   
   I wasn't sure regarding the publication of the actual course itself and what 
that process entailed (and if it warranted another push to this branch to 
actually commit the course metadata to support future updates or not). Anyways 
- I just thought I'd ask. 
   
   Thanks again!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker

2020-05-20 Thread GitBox


lukecwik commented on a change in pull request #11715:
URL: https://github.com/apache/beam/pull/11715#discussion_r428304723



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+/**
+ * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code 
Long.MAX_VALUE} is
+ * used as the end of the range to indicate infinity.
+ *
+ * An offset range is considered growable when the end offset could grow 
(or change) during
+ * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ *
+ * The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class GrowableOffsetRangeTracker extends OffsetRangeTracker {
+  /**
+   * Provides the estimated end offset of the range.
+   *
+   * {@link #estimate} is called to give the end offset when {@link 
#trySplit} or {@link
+   * #getProgress} is invoked. The end offset is exclusive for the range. The 
estimated end is not
+   * required to monotonically increase as it will only be taken into 
consideration when the
+   * estimated end offset is larger than the current position. Returning 
{@code Long.MAX_VALUE} as
+   * the estimate implies the largest possible position for the range is 
{@code Long.MAX_VALUE - 1}.
+   * Return {@code Long.MIN_VALUE} if an estimate can not be provided.
+   *
+   * Providing a good estimate is important for an accurate progress signal 
and will impact
+   * splitting decisions by the runner.
+   *
+   * If {@link #estimate} is expensive to compute, consider wrapping the 
implementation with
+   * {@link Suppliers#memoizeWithExpiration} or equivalent as an optimization.
+   *
+   * TODO(BEAM-10032): Also consider using {@link RangeEndEstimator} when 
the range is not ended
+   * with {@code Long.MAX_VALUE}.
+   */
+  @FunctionalInterface
+  public interface RangeEndEstimator {
+long estimate();
+  }
+
+  private final RangeEndEstimator rangeEndEstimator;
+
+  public GrowableOffsetRangeTracker(long start, RangeEndEstimator 
rangeEndEstimator) {
+super(new OffsetRange(start, Long.MAX_VALUE));
+this.rangeEndEstimator = checkNotNull(rangeEndEstimator);
+  }
+
+  @Override
+  public SplitResult trySplit(double fractionOfRemainder) {
+// If current tracking range is no longer growable, split it as a normal 
range.
+if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+  return super.trySplit(fractionOfRemainder);
+}
+// If current range has been done, there is no more space to split.
+if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) {
+  return null;
+}
+BigDecimal cur =
+(lastAttemptedOffset == null)
+? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE, 
MathContext.DECIMAL128)
+: BigDecimal.valueOf(lastAttemptedOffset);
+
+// Fetch the estimated end offset. If the estimated end is smaller than 
the next offset, use
+// the next offset as end.
+BigDecimal estimateRangeEnd =
+BigDecimal.valueOf(rangeEndEstimator.estimate())
+.max(cur.add(BigDecimal.ONE, MathContext.DECIMAL128));
+
+// Convert to BigDecimal in computation to prevent overflow, which may 
result in loss of
+// precision.
+// split = cur + max(1, (estimateRangeEnd - cur) * fractionOfRemainder)
+BigDecimal splitPos =
+cur.add(
+estimateRangeEnd
+.subtract(cur, MathContext.DECIMAL128)
+.multiply(BigDecimal.valueOf(fractionOfRemainder), 
MathContext.DECIMAL128)
+

  1   2   3   >