[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631891047 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631890990 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tweise commented on pull request #11722: Bump Flink 1.10 version
tweise commented on pull request #11722: URL: https://github.com/apache/beam/pull/11722#issuecomment-631884230 Unrelated test failure `org.apache.beam.sdk.extensions.ml.VideoIntelligenceIT.annotateVideoFromURINoContext` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tweise merged pull request #11722: Bump Flink 1.10 version
tweise merged pull request #11722: URL: https://github.com/apache/beam/pull/11722 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r428438971 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -962,16 +971,25 @@ private Progress getProgress() { .build()); } - private void processTimer(String timerId, TimeDomain timeDomain, Timer timer) { + private void processTimer( + String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) { currentTimer = timer; currentTimeDomain = timeDomain; onTimerContext = new OnTimerContext<>(timer.getUserKey()); +String timerId = +timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX) Review comment: Ack. Thanks! ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: I thought it would be good time to revisit the dynamic timer API design but it's not in the scope of this PR. Let's leave it as now. Thanks! ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java ## @@ -947,49 +910,213 @@ public void testTimers() throws Exception { timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)), timerInGlobalWindow("X", new Instant(1700L), new Instant(10022L)), timerInGlobalWindow("C", new Instant(1800L), new Instant(10022L)), -timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L; +timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L)), +timerInGlobalWindow("B", new Instant(2000L), new Instant(10032L)), +timerInGlobalWindow("Y", new Instant(2100L), new Instant(10042L; +assertThat( +fakeTimerClient.getTimers(eventFamilyTimer), +contains( +timerInGlobalWindow("X", "event-timer1", new Instant(1000L), new Instant(1003L)), +timerInGlobalWindow("Y", "event-timer1", new Instant(1100L), new Instant(1103L)), +timerInGlobalWindow("X", "event-timer1", new Instant(1200L), new Instant(1203L)), +timerInGlobalWindow("Y", "event-timer1", new Instant(1300L), new Instant(1303L)), +timerInGlobalWindow("A", "event-timer1", new Instant(1400L), new Instant(2413L)), +timerInGlobalWindow("B", "event-timer1", new Instant(1500L), new Instant(2513L)), +timerInGlobalWindow("A", "event-timer1", new Instant(1600L), new Instant(2613L)), +timerInGlobalWindow("X", "event-timer1", new Instant(1700L), new Instant(1723L)), +timerInGlobalWindow("C", "event-timer1", new Instant(1800L), new Instant(1823L)), +timerInGlobalWindow("B", "event-timer1", new Instant(1900L), new Instant(1923L)), +timerInGlobalWindow("B", "event-timer1", new Instant(2000L), new Instant(2033L)), +timerInGlobalWindow("Y", "event-timer1", new Instant(2100L), new Instant(2143L; +assertThat( +fakeTimerClient.getTimers(processingFamilyTimer), +contains( +timerInGlobalWindow("X", "processing-timer1", new Instant(1000L), new Instant(10004L)), +timerInGlobalWindow("Y", "processing-timer1", new Instant(1100L), new Instant(10004L)), +timerInGlobalWindow("X", "processing-timer1", new Instant(1200L), new Instant(10004L)),
[GitHub] [beam] veblush commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
veblush commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631882467 Thanks, Brian and Chamikara! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631882129 retest all please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model
chamikaramj commented on pull request #11757: URL: https://github.com/apache/beam/pull/11757#issuecomment-631880290 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj merged pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
chamikaramj merged pull request #11651: URL: https://github.com/apache/beam/pull/11651 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tweise commented on pull request #11722: Bump Flink 1.10 version
tweise commented on pull request #11722: URL: https://github.com/apache/beam/pull/11722#issuecomment-631867326 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit closed pull request #11769: [DO NOT MERGE] Start snapshot build for release process
TheNeuralBit closed pull request #11769: URL: https://github.com/apache/beam/pull/11769 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11770: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch
TheNeuralBit commented on pull request #11770: URL: https://github.com/apache/beam/pull/11770#issuecomment-631860763 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11770: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch
TheNeuralBit commented on pull request #11770: URL: https://github.com/apache/beam/pull/11770#issuecomment-631859266 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit opened a new pull request #11770: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch
TheNeuralBit opened a new pull request #11770: URL: https://github.com/apache/beam/pull/11770 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build
[GitHub] [beam] rionmonster edited a comment on pull request #11761: [BEAM-10027] Support for Kotlin-based Beam Katas
rionmonster edited a comment on pull request #11761: URL: https://github.com/apache/beam/pull/11761#issuecomment-631728007 @henryken I was able to successfully export the content of this course over to Stepik and generate all of the expected new metadata associated with it (e.g. `*-remote-info.yaml`) files. I have a draft of the course on the site now that as far as I can tell seems to parallel the existing Java one as expected. You can [access the current iteration of the Kotlin course on Stepik at it stands here](https://stepik.org/course/72488) as I'm currently working through it to make sure things are working as expected. I've managed to fix a handful of issues that I've found post-publication which I've included as well (along with the updated `*-remote-info.yaml` files that align with the new course). Let me know if anything sounds off with that or there's a better path to follow in terms of publishing the course, handling the course-related metadata in the repo, etc. Thanks again! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation
chamikaramj commented on pull request #11360: URL: https://github.com/apache/beam/pull/11360#issuecomment-631854098 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation
chamikaramj commented on pull request #11360: URL: https://github.com/apache/beam/pull/11360#issuecomment-631854178 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation
chamikaramj commented on pull request #11360: URL: https://github.com/apache/beam/pull/11360#issuecomment-631854055 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation
chamikaramj commented on pull request #11360: URL: https://github.com/apache/beam/pull/11360#issuecomment-631853980 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model
chamikaramj commented on pull request #11757: URL: https://github.com/apache/beam/pull/11757#issuecomment-631853840 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik merged pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.
lukecwik merged pull request #11746: URL: https://github.com/apache/beam/pull/11746 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11682: [BEAM-9946] | added new api in Partition Transform
aaltay commented on pull request #11682: URL: https://github.com/apache/beam/pull/11682#issuecomment-631853304 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11769: [DO NOT MERGE] Start snapshot build for release process
TheNeuralBit commented on pull request #11769: URL: https://github.com/apache/beam/pull/11769#issuecomment-631849940 Run Gradle Publish This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit opened a new pull request #11769: [DO NOT MERGE] Start snapshot build for release process
TheNeuralBit opened a new pull request #11769: URL: https://github.com/apache/beam/pull/11769 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build
[GitHub] [beam] TheNeuralBit commented on pull request #11769: [DO NOT MERGE] Start snapshot build for release process
TheNeuralBit commented on pull request #11769: URL: https://github.com/apache/beam/pull/11769#issuecomment-631838720 Run Gradle Publish This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tweise commented on pull request #11722: Bump Flink 1.10 version
tweise commented on pull request #11722: URL: https://github.com/apache/beam/pull/11722#issuecomment-631828652 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399929 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; /** * Constructs a PartitionDoFn. * * @throws IllegalArgumentException if {@code numPartitions <= 0} */ -public PartitionDoFn(int numPartitions, PartitionFn partitionFn) { +public PartitionDoFn( +int numPartitions, +Contextful> ctxFn, +Object originalFnForDisplayData) { Review comment: Done thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399738 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; Review comment: Done. thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399634 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -85,7 +141,14 @@ * @throws IllegalArgumentException if {@code numPartitions <= 0} */ public static Partition of(int numPartitions, PartitionFn partitionFn) { -return new Partition<>(new PartitionDoFn(numPartitions, partitionFn)); + +Contextful ctfFn = +Contextful.fn( +(T element, Contextful.Fn.Context c) -> +partitionFn.partitionFor(element, numPartitions), +Requirements.empty()); +Object aClass = partitionFn; Review comment: Hi, I don't get your suggestion here. I will need to wrap interface function in ContextFul.Fn. Can you eloborate please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399671 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -76,6 +93,45 @@ int partitionFor(T elem, int numPartitions); } + /** + * A function object that chooses an output partition for an element. + * + * @param the type of the elements being partitioned + */ + public interface PartitionWithSideInputsFn extends Serializable { +/** + * Chooses the partition into which to put the given element. + * + * @param elem the element to be partitioned + * @param numPartitions the total number of partitions ({@code >= 1}) + * @param c the {@link Contextful.Fn.Context} needed to access sideInputs. + * @return index of the selected partition (in the range {@code [0..numPartitions-1]}) + */ +int partitionFor(T elem, int numPartitions, Contextful.Fn.Context c); + } + + /** + * Returns a new {@code Partition} {@code PTransform} that divides its input {@code PCollection} + * into the given number of partitions, using the given partitioning function. + * + * @param numPartitions the number of partitions to divide the input {@code PCollection} into + * @param partitionFn the function to invoke on each element to choose its output partition + * @param requirements the {@link Requirements} needed to run it. + * @throws IllegalArgumentException if {@code numPartitions <= 0} + */ + public static Partition of( + int numPartitions, + PartitionWithSideInputsFn partitionFn, + Requirements requirements) { +Contextful ctfFn = +Contextful.fn( +(T element, Contextful.Fn.Context c) -> +partitionFn.partitionFor(element, numPartitions, c), +requirements); +Object aClass = partitionFn; Review comment: Hi, I don't get your suggestion here. I will need to wrap interface function in ContextFul.Fn. Can you eloborate please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
darshanj commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428399770 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; /** * Constructs a PartitionDoFn. * * @throws IllegalArgumentException if {@code numPartitions <= 0} */ -public PartitionDoFn(int numPartitions, PartitionFn partitionFn) { +public PartitionDoFn( Review comment: Done.Thanks ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; Review comment: Done Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428399477 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/BeamBatchWorker.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2; + +import edu.iu.dsc.tws.api.config.Config; +import edu.iu.dsc.tws.api.tset.TBase; +import edu.iu.dsc.tws.api.tset.sets.TSet; +import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet; +import edu.iu.dsc.tws.tset.TBaseGraph; +import edu.iu.dsc.tws.tset.env.BatchTSetEnvironment; +import edu.iu.dsc.tws.tset.links.BaseTLink; +import edu.iu.dsc.tws.tset.sets.BaseTSet; +import edu.iu.dsc.tws.tset.sets.BuildableTSet; +import edu.iu.dsc.tws.tset.sets.batch.CachedTSet; +import edu.iu.dsc.tws.tset.sets.batch.ComputeTSet; +import edu.iu.dsc.tws.tset.sets.batch.SinkTSet; +import edu.iu.dsc.tws.tset.worker.BatchTSetIWorker; +import java.io.Serializable; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.apache.beam.runners.twister2.translators.functions.DoFnFunction; +import org.apache.beam.runners.twister2.translators.functions.Twister2SinkFunction; + +/** + * The Twister2 worker that will execute the job logic once the job is submitted from the run + * method. + */ +public class BeamBatchWorker implements Serializable, BatchTSetIWorker { + + private static final String SIDEINPUTS = "sideInputs"; + private static final String LEAVES = "leaves"; + private static final String GRAPH = "graph"; + private HashMap> sideInputDataSets; + private Set leaves; Review comment: I might have to take a little time on this and make sure i get the type information correct. Would it be OK to create improvement issue for this for now, so i can get back to it later after the code is merged? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428399112 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2; + +import java.io.IOException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.joda.time.Duration; + +/** Represents a Twister2 pipeline execution result. */ +public class Twister2PipelineResult implements PipelineResult { + + PipelineResult.State state = State.RUNNING; + + @Override + public State getState() { +return state; + } + + @Override + public State cancel() throws IOException { +throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public State waitUntilFinish(Duration duration) { +return State.DONE; Review comment: Those operations are not supported currently, I missed to mark them as such. Fixed them This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428397620 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java ## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2; + +import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage; + +import edu.iu.dsc.tws.api.JobConfig; +import edu.iu.dsc.tws.api.Twister2Job; +import edu.iu.dsc.tws.api.config.Config; +import edu.iu.dsc.tws.api.driver.DriverJobState; +import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException; +import edu.iu.dsc.tws.api.scheduler.Twister2JobState; +import edu.iu.dsc.tws.api.tset.sets.TSet; +import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet; +import edu.iu.dsc.tws.local.LocalSubmitter; +import edu.iu.dsc.tws.rsched.core.ResourceAllocator; +import edu.iu.dsc.tws.rsched.job.Twister2Submitter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.logging.LogManager; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.SplittableParDo; +import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded; +import org.apache.beam.runners.core.construction.resources.PipelineResources; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +/** + * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them + * to a Twister2 Plan and then executing them either locally or on a Twister2 cluster, depending on + * the configuration. + */ +public class Twister2LegacyRunner extends PipelineRunner { + + private static final Logger LOG = Logger.getLogger(Twister2LegacyRunner.class.getName()); + private static final String SIDEINPUTS = "sideInputs"; + private static final String LEAVES = "leaves"; + private static final String GRAPH = "graph"; + /** Provided options. */ + private final Twister2PipelineOptions options; + + public Twister2LegacyRunner(Twister2PipelineOptions options) { Review comment: Changed to protected This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
TheNeuralBit commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631825554 Run Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit merged pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
TheNeuralBit merged pull request #11529: URL: https://github.com/apache/beam/pull/11529 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] veblush commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
veblush commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631815316 Thanks for running the test and the result doesn't make sense because this change won't introduce any runtime behavior other than dependency. Is it possible to be caused by flaky environment? Can we rerun the test again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #11768: [BEAM-10051] Move closed reader check after sentinel.
lostluck commented on pull request #11768: URL: https://github.com/apache/beam/pull/11768#issuecomment-631812306 R: @youngoli This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428385521 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/AssignWindowsFunction.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2.translators.functions; + +import edu.iu.dsc.tws.api.tset.TSetContext; +import edu.iu.dsc.tws.api.tset.fn.ComputeCollectorFunc; +import edu.iu.dsc.tws.api.tset.fn.RecordCollector; +import java.io.ObjectStreamException; +import java.util.Collection; +import java.util.Iterator; +import java.util.logging.Logger; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.twister2.utils.Twister2AssignContext; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException; + +/** Assign Windows function. */ +public class AssignWindowsFunction +implements ComputeCollectorFunc, Iterator>> { + private static final Logger LOG = Logger.getLogger(AssignWindowsFunction.class.getName()); + + private transient boolean isInitilized = false; Review comment: Thank you for catching that, fixed all the occurrences This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tweise commented on pull request #11722: Bump Flink 1.10 version
tweise commented on pull request #11722: URL: https://github.com/apache/beam/pull/11722#issuecomment-631811467 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428384829 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/package-info.java ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Internal implementation of the Beam runner for Apache Flink. */ Review comment: wonder how i missed that, i was sure i searched for spark and flink :smile: . Fixed all the instances This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428384075 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2StreamTranslationContext.java ## @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2; + +/** Twister2StreamTranslationContext. */ +public class Twister2StreamTranslationContext {} Review comment: Yes it is :smile: , there was a duplicate class fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428381476 ## File path: runners/twister2/build.gradle ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import groovy.json.JsonOutput + +plugins { id 'org.apache.beam.module' } + +applyJavaNature(automaticModuleName: 'org.apache.beam.runners.twister2') +evaluationDependsOn(":sdks:java:core") +configurations { +validatesRunner +} +description = "Apache Beam :: Runners :: Twister2" + +repositories { Review comment: Thanks for catching that, removed it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem merged pull request #11339: [BEAM-9468] Fhir io
pabloem merged pull request #11339: URL: https://github.com/apache/beam/pull/11339 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck opened a new pull request #11768: [BEAM-10051] Move closed reader check after sentinel.
lostluck opened a new pull request #11768: URL: https://github.com/apache/beam/pull/11768 The closed reader check, in it's current position prevents the "normal teardown" that the reader expects. This means that readers for instructions that terminate early such as due to splitting stay resident in memory and never close. In practice this is benign as the buffer would already be closed, but with streaming this memory leak would become noticable. This PR moves the check to after the sentinel check, and additionally check there for early termination to avoid closing the buffer twice. Since this largely affects the goroutine local cache, no test changes are possible. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428380317 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java ## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2; + +import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage; + +import edu.iu.dsc.tws.api.JobConfig; +import edu.iu.dsc.tws.api.Twister2Job; +import edu.iu.dsc.tws.api.config.Config; +import edu.iu.dsc.tws.api.driver.DriverJobState; +import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException; +import edu.iu.dsc.tws.api.scheduler.Twister2JobState; +import edu.iu.dsc.tws.api.tset.sets.TSet; +import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet; +import edu.iu.dsc.tws.local.LocalSubmitter; +import edu.iu.dsc.tws.rsched.core.ResourceAllocator; +import edu.iu.dsc.tws.rsched.job.Twister2Submitter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.logging.LogManager; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.SplittableParDo; +import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded; +import org.apache.beam.runners.core.construction.resources.PipelineResources; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +/** + * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them + * to a Twister2 Plan and then executing them either locally or on a Twister2 cluster, depending on + * the configuration. + */ +public class Twister2LegacyRunner extends PipelineRunner { + + private static final Logger LOG = Logger.getLogger(Twister2LegacyRunner.class.getName()); + private static final String SIDEINPUTS = "sideInputs"; + private static final String LEAVES = "leaves"; + private static final String GRAPH = "graph"; + /** Provided options. */ + private final Twister2PipelineOptions options; + + public Twister2LegacyRunner(Twister2PipelineOptions options) { +this.options = options; + } + + public static Twister2LegacyRunner fromOptions(PipelineOptions options) { +Twister2PipelineOptions pipelineOptions = +PipelineOptionsValidator.validate(Twister2PipelineOptions.class, options); +if (pipelineOptions.getFilesToStage() == null) { + pipelineOptions.setFilesToStage( + detectClassPathResourcesToStage( + Twister2LegacyRunner.class.getClassLoader(), pipelineOptions)); + LOG.info( + "PipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: will stage {} files. " + + "Enable logging at DEBUG level to see which files will be staged" + + pipelineOptions.getFilesToStage().size()); +} +return new Twister2LegacyRunner(pipelineOptions); + } + + @Override + public PipelineResult run(Pipeline pipeline) { +// create a worker and pass in the pipeline and then do the translation +Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options); +LOG.info("Translating pipeline to
[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io
jaketf commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631805319  This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428378347 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import edu.iu.dsc.tws.tset.env.TSetEnvironment; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; + +/** Twister2PipelineOptions. */ +public interface Twister2PipelineOptions extends PipelineOptions, StreamingOptions { + @Description("set unique application name for Twister2 runner") Review comment: Was not aware of that convention, thanks for pointing that out. reformatted according this rule This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform
apilloud commented on a change in pull request #11682: URL: https://github.com/apache/beam/pull/11682#discussion_r428371418 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; Review comment: nit: this can be `final` ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -85,7 +141,14 @@ * @throws IllegalArgumentException if {@code numPartitions <= 0} */ public static Partition of(int numPartitions, PartitionFn partitionFn) { -return new Partition<>(new PartitionDoFn(numPartitions, partitionFn)); + +Contextful ctfFn = +Contextful.fn( +(T element, Contextful.Fn.Context c) -> +partitionFn.partitionFor(element, numPartitions), +Requirements.empty()); +Object aClass = partitionFn; Review comment: This is a no-op, please remove. ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -76,6 +93,45 @@ int partitionFor(T elem, int numPartitions); } + /** + * A function object that chooses an output partition for an element. + * + * @param the type of the elements being partitioned + */ + public interface PartitionWithSideInputsFn extends Serializable { +/** + * Chooses the partition into which to put the given element. + * + * @param elem the element to be partitioned + * @param numPartitions the total number of partitions ({@code >= 1}) + * @param c the {@link Contextful.Fn.Context} needed to access sideInputs. + * @return index of the selected partition (in the range {@code [0..numPartitions-1]}) + */ +int partitionFor(T elem, int numPartitions, Contextful.Fn.Context c); + } + + /** + * Returns a new {@code Partition} {@code PTransform} that divides its input {@code PCollection} + * into the given number of partitions, using the given partitioning function. + * + * @param numPartitions the number of partitions to divide the input {@code PCollection} into + * @param partitionFn the function to invoke on each element to choose its output partition + * @param requirements the {@link Requirements} needed to run it. + * @throws IllegalArgumentException if {@code numPartitions <= 0} + */ + public static Partition of( + int numPartitions, + PartitionWithSideInputsFn partitionFn, + Requirements requirements) { +Contextful ctfFn = +Contextful.fn( +(T element, Contextful.Fn.Context c) -> +partitionFn.partitionFor(element, numPartitions, c), +requirements); +Object aClass = partitionFn; Review comment: This is a no-op, please remove. ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; /** * Constructs a PartitionDoFn. * * @throws IllegalArgumentException if {@code numPartitions <= 0} */ -public PartitionDoFn(int numPartitions, PartitionFn partitionFn) { +public PartitionDoFn( Review comment: nit: drop the `public`. ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; Review comment: nit: this can be `final` ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ## @@ -124,21 +190,26 @@ private Partition(PartitionDoFn partitionDoFn) { private static class PartitionDoFn extends DoFn { private final int numPartitions; -private final PartitionFn partitionFn; private final TupleTagList outputTags; +private Contextful> ctxFn; +private Object originalFnForDisplayData; /** * Constructs a PartitionDoFn. * * @throws IllegalArgumentException if {@code numPartitions <= 0} */ -public PartitionDoFn(int numPartitions, PartitionFn partitionFn) { +public PartitionDoFn( +int numPartitions, +
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428376950 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import edu.iu.dsc.tws.tset.env.TSetEnvironment; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; + +/** Twister2PipelineOptions. */ +public interface Twister2PipelineOptions extends PipelineOptions, StreamingOptions { + @Description("set unique application name for Twister2 runner") + void setApplicationName(String name); + + String getApplicationName(); Review comment: I initially added them for completeness, but ended up not using some of them. removed the unused options since they can be added if needed later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.
lukecwik commented on pull request #11746: URL: https://github.com/apache/beam/pull/11746#issuecomment-631800147 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz merged pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker
boyuanzz merged pull request #11715: URL: https://github.com/apache/beam/pull/11715 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker
boyuanzz commented on pull request #11715: URL: https://github.com/apache/beam/pull/11715#issuecomment-631794981 Thanks for your help! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
TheNeuralBit commented on pull request #11529: URL: https://github.com/apache/beam/pull/11529#issuecomment-631794665 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
TheNeuralBit commented on pull request #11529: URL: https://github.com/apache/beam/pull/11529#issuecomment-631792217 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem merged pull request #11762: [BEAM-10044] Remove curly quotes from documentation code samples
pabloem merged pull request #11762: URL: https://github.com/apache/beam/pull/11762 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11762: [BEAM-10044] Remove curly quotes from documentation code samples
pabloem commented on pull request #11762: URL: https://github.com/apache/beam/pull/11762#issuecomment-631791451 thanks @epicfaace ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia merged pull request #11724: Updated Videos and Podcasts page
amaliujia merged pull request #11724: URL: https://github.com/apache/beam/pull/11724 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
TheNeuralBit commented on pull request #11529: URL: https://github.com/apache/beam/pull/11529#issuecomment-631789673 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631786516 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit merged pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types
TheNeuralBit merged pull request #11701: URL: https://github.com/apache/beam/pull/11701 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jkff commented on a change in pull request #11406: [BEAM-9748] Refactor Reparallelize as an alternative Reshuffle implementation
jkff commented on a change in pull request #11406: URL: https://github.com/apache/beam/pull/11406#discussion_r428356257 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java ## @@ -107,10 +108,57 @@ public void processElement( /** Implementation of {@link #viaRandomKey()}. */ public static class ViaRandomKey extends PTransform, PCollection> { +private boolean isHighFanoutAndLimitedInputParallelism; + private ViaRandomKey() {} +/** + * Use a different strategy that materializes the input and prepares it to be consumed in a + * highly parallel fashion. + * + * It is tailored to the case when input was produced in an extremely sequential way - + * typically by a ParDo that emits millions of outputs _per input element_, e.g., executing a + * large database query or a large simulation and emitting all of their results. + * + * Internally, it materializes the input at a moderate cost before reshuffling it, making the + * reshuffling itself significantly cheaper in these extreme cases on some runners. Use this + * only if your benchmarks show an improvement. + */ +public ViaRandomKey withHintHighFanoutAndLimitedInputParallelism() { + this.isHighFanoutAndLimitedInputParallelism = true; + return this; +} + @Override public PCollection expand(PCollection input) { + if (isHighFanoutAndLimitedInputParallelism) { +// See https://issues.apache.org/jira/browse/BEAM-2803 +// We use a combined approach to "break fusion" here: +// (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion) +// 1) force the data to be materialized by passing it as a side input to an identity fn, +// then 2) reshuffle it with a random key. Initial materialization provides some parallelism +// and ensures that data to be shuffled can be generated in parallel, while reshuffling +// provides perfect parallelism. +// In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient. +// The current approach is necessary only to support the particular case of JdbcIO where +// a single query may produce many gigabytes of query results. +PCollectionView> empty = +input +.apply("Consume", Filter.by(SerializableFunctions.constant(false))) +.apply(View.asIterable()); +PCollection materialized = +input.apply( +"Identity", +ParDo.of( Review comment: nit: here you could use a `MapElements.via(Contextful.of(t -> t, requiresSideInputs(empty)))` ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java ## @@ -107,10 +108,57 @@ public void processElement( /** Implementation of {@link #viaRandomKey()}. */ public static class ViaRandomKey extends PTransform, PCollection> { +private boolean isHighFanoutAndLimitedInputParallelism; Review comment: Please use an immutable AutoValue with a builder here. PTransforms can't have mutable member variables. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631782829 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR
ibzib commented on pull request #11755: URL: https://github.com/apache/beam/pull/11755#issuecomment-631781814 > > This looks pretty neat. We could maybe specialize this for some particular common cases. I found myself many times requiring to run something like "Run All Spark Runners ValidatesRunner Tests" WDYT @ibzib ? > > If you want to use it as-is, all you have to do is modify the `COMMENTS_TO_ADD`. Actually, for that use case, maybe it would be better to make a single Jenkins uber-job that runs all the Spark test suites we have. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631778141 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io
jaketf commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631777614 are we able to run post commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
TheNeuralBit commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631776883 It looks like Dataflow VR tests are going to fail: ``` 15:10:31 org.apache.beam.sdk.transforms.join.CoGroupByKeyTest > testCoGroupByKeyWithWindowing FAILED 15:10:31 java.lang.RuntimeException at CoGroupByKeyTest.java:494 15:10:31 Caused by: java.lang.OutOfMemoryError 15:10:31 15:10:31 org.apache.beam.sdk.transforms.FlattenTest > testFlattenIterablesLists FAILED 15:10:31 java.lang.RuntimeException at FlattenTest.java:270 15:10:31 Caused by: java.lang.OutOfMemoryError 15:10:31 15:10:31 org.apache.beam.sdk.transforms.CreateTest > testCreateWithVoidType FAILED 15:10:31 java.lang.RuntimeException at CreateTest.java:326 15:10:31 Caused by: java.lang.OutOfMemoryError 15:10:31 15:10:31 org.apache.beam.sdk.transforms.ReshuffleTest > testReshuffleAfterSlidingWindows FAILED 15:10:31 java.lang.RuntimeException at ReshuffleTest.java:251 15:10:31 Caused by: java.lang.RuntimeException 15:10:31 Caused by: java.lang.IllegalArgumentException 15:10:31 Caused by: java.lang.OutOfMemoryError ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11339: [BEAM-9468] Fhir io
TheNeuralBit commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631773038 That definitely seems like a flake to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on a change in pull request #11331: [BEAM-9646] Add Google Cloud vision integration transform
tysonjh commented on a change in pull request #11331: URL: https://github.com/apache/beam/pull/11331#discussion_r428351403 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateImages.java ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.vision.v1.AnnotateImageRequest; +import com.google.cloud.vision.v1.AnnotateImageResponse; +import com.google.cloud.vision.v1.BatchAnnotateImagesResponse; +import com.google.cloud.vision.v1.Feature; +import com.google.cloud.vision.v1.ImageAnnotatorClient; +import com.google.cloud.vision.v1.ImageContext; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Parent class for transform utilizing Cloud Vision API. + * + * @param Type of input PCollection. + */ +public abstract class AnnotateImages +extends PTransform, PCollection>> { + + private static final Long MIN_BATCH_SIZE = 1L; + private static final Long MAX_BATCH_SIZE = 5L; + + protected final PCollectionView> contextSideInput; + protected final List featureList; + private long batchSize; + + public AnnotateImages( Review comment: A summary fragment would be nice here, even something simple like 'Constructs an AnnotateImages transform.' This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io
jaketf commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631768508 Seems pre-commit is stuck reporting back to github. There's one unrelated test that failed https://builds.apache.org/job/beam_PreCommit_Java_Commit/11508/testReport/org.apache.beam.sdk.extensions.ml/VideoIntelligenceIT/annotateVideoFromURINoContext/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR
ibzib commented on pull request #11755: URL: https://github.com/apache/beam/pull/11755#issuecomment-631766935 > This looks pretty neat. We could maybe specialize this for some particular common cases. I found myself many times requiring to run something like "Run All Spark Runners ValidatesRunner Tests" WDYT @ibzib ? If you want to use it as-is, all you have to do is modify the `COMMENTS_TO_ADD`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11764: [BEAM-10048] Clean up release guide.
ibzib commented on pull request #11764: URL: https://github.com/apache/beam/pull/11764#issuecomment-631766210 > Will take a look once this PR becomes stable. (Seems still evolving). I'm done with it for now  sorry for the churn This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #11764: [BEAM-10048] Clean up release guide.
amaliujia commented on pull request #11764: URL: https://github.com/apache/beam/pull/11764#issuecomment-631764165 Will take a look once this PR becomes stable. (Seems still evolving). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker
boyuanzz commented on a change in pull request #11715: URL: https://github.com/apache/beam/pull/11715#discussion_r428343813 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java ## @@ -120,13 +136,27 @@ public String toString() { public Progress getProgress() { // If we have never attempted an offset, we return the length of the entire range as work // remaining. +// Convert to BigDecimal in computation to prevent overflow, which may result in loss of +// precision. if (lastAttemptedOffset == null) { - return Progress.from(0, range.getTo() - range.getFrom()); + return Progress.from( + 0, + BigDecimal.valueOf(range.getTo()) + .subtract(BigDecimal.valueOf(range.getFrom()), MathContext.DECIMAL128) + .doubleValue()); } // Compute the amount of work remaining from where we are to where we are attempting to get to // with a minimum of zero in case we have claimed beyond the end of the range. -long workRemaining = Math.max(range.getTo() - lastAttemptedOffset, 0); -return Progress.from(range.getTo() - range.getFrom() - workRemaining, workRemaining); +BigDecimal workRemaining = +BigDecimal.valueOf(range.getTo()) +.subtract(BigDecimal.valueOf(lastAttemptedOffset), MathContext.DECIMAL128) +.max(BigDecimal.ZERO); +BigDecimal wholeWork = Review comment: Done. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker
boyuanzz commented on a change in pull request #11715: URL: https://github.com/apache/beam/pull/11715#discussion_r428343752 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.splittabledofn; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.math.BigDecimal; +import java.math.MathContext; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; + +/** + * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code Long.MAX_VALUE} is + * used as the end of the range to indicate infinity. + * + * An offset range is considered growable when the end offset could grow (or change) during + * execution time (e.g., Kafka topic partition offset, appended file, ...). + * + * The growable range is marked as done by claiming {@code Long.MAX_VALUE}. + */ +@Experimental(Kind.SPLITTABLE_DO_FN) +public class GrowableOffsetRangeTracker extends OffsetRangeTracker { + /** + * Provides the estimated end offset of the range. + * + * {@link #estimate} is called to give the end offset when {@link #trySplit} or {@link + * #getProgress} is invoked. The end offset is exclusive for the range. The estimated end is not + * required to monotonically increase as it will only be taken into consideration when the + * estimated end offset is larger than the current position. Returning {@code Long.MAX_VALUE} as + * the estimate implies the largest possible position for the range is {@code Long.MAX_VALUE - 1}. + * Return {@code Long.MIN_VALUE} if an estimate can not be provided. + * + * Providing a good estimate is important for an accurate progress signal and will impact + * splitting decisions by the runner. + * + * If {@link #estimate} is expensive to compute, consider wrapping the implementation with + * {@link Suppliers#memoizeWithExpiration} or equivalent as an optimization. + * + * TODO(BEAM-10032): Also consider using {@link RangeEndEstimator} when the range is not ended + * with {@code Long.MAX_VALUE}. + */ + @FunctionalInterface + public interface RangeEndEstimator { +long estimate(); + } + + private final RangeEndEstimator rangeEndEstimator; + + public GrowableOffsetRangeTracker(long start, RangeEndEstimator rangeEndEstimator) { +super(new OffsetRange(start, Long.MAX_VALUE)); +this.rangeEndEstimator = checkNotNull(rangeEndEstimator); + } + + @Override + public SplitResult trySplit(double fractionOfRemainder) { +// If current tracking range is no longer growable, split it as a normal range. +if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) { + return super.trySplit(fractionOfRemainder); +} +// If current range has been done, there is no more space to split. +if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) { + return null; +} +BigDecimal cur = +(lastAttemptedOffset == null) +? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE, MathContext.DECIMAL128) +: BigDecimal.valueOf(lastAttemptedOffset); + +// Fetch the estimated end offset. If the estimated end is smaller than the next offset, use +// the next offset as end. +BigDecimal estimateRangeEnd = +BigDecimal.valueOf(rangeEndEstimator.estimate()) +.max(cur.add(BigDecimal.ONE, MathContext.DECIMAL128)); + +// Convert to BigDecimal in computation to prevent overflow, which may result in loss of +// precision. +// split = cur + max(1, (estimateRangeEnd - cur) * fractionOfRemainder) +BigDecimal splitPos = +cur.add( +estimateRangeEnd +.subtract(cur, MathContext.DECIMAL128) +.multiply(BigDecimal.valueOf(fractionOfRemainder), MathContext.DECIMAL128) +
[GitHub] [beam] iemejia commented on pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR
iemejia commented on pull request #11755: URL: https://github.com/apache/beam/pull/11755#issuecomment-631761505 This looks pretty neat. We could maybe specialize this for some particular common cases. I found myself many times requiring to run something like "Run All Spark Runners ValidatesRunner Tests" WDYT @ibzib ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types
TheNeuralBit commented on pull request #11701: URL: https://github.com/apache/beam/pull/11701#issuecomment-631760558 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11762: [BEAM-10044] Remove curly quotes from documentation code samples
pabloem commented on pull request #11762: URL: https://github.com/apache/beam/pull/11762#issuecomment-631758390 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax opened a new pull request #11767: Vortex sink
reuvenlax opened a new pull request #11767: URL: https://github.com/apache/beam/pull/11767 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on a change in pull request #10888: URL: https://github.com/apache/beam/pull/10888#discussion_r428335678 ## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java ## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2; + +import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage; + +import edu.iu.dsc.tws.api.JobConfig; +import edu.iu.dsc.tws.api.Twister2Job; +import edu.iu.dsc.tws.api.config.Config; +import edu.iu.dsc.tws.api.driver.DriverJobState; +import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException; +import edu.iu.dsc.tws.api.scheduler.Twister2JobState; +import edu.iu.dsc.tws.api.tset.sets.TSet; +import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet; +import edu.iu.dsc.tws.local.LocalSubmitter; +import edu.iu.dsc.tws.rsched.core.ResourceAllocator; +import edu.iu.dsc.tws.rsched.job.Twister2Submitter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.logging.LogManager; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.SplittableParDo; +import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded; +import org.apache.beam.runners.core.construction.resources.PipelineResources; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +/** + * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them + * to a Twister2 Plan and then executing them either locally or on a Twister2 cluster, depending on + * the configuration. + */ +public class Twister2LegacyRunner extends PipelineRunner { + + private static final Logger LOG = Logger.getLogger(Twister2LegacyRunner.class.getName()); + private static final String SIDEINPUTS = "sideInputs"; + private static final String LEAVES = "leaves"; + private static final String GRAPH = "graph"; + /** Provided options. */ + private final Twister2PipelineOptions options; + + public Twister2LegacyRunner(Twister2PipelineOptions options) { +this.options = options; + } + + public static Twister2LegacyRunner fromOptions(PipelineOptions options) { +Twister2PipelineOptions pipelineOptions = +PipelineOptionsValidator.validate(Twister2PipelineOptions.class, options); +if (pipelineOptions.getFilesToStage() == null) { + pipelineOptions.setFilesToStage( + detectClassPathResourcesToStage( + Twister2LegacyRunner.class.getClassLoader(), pipelineOptions)); + LOG.info( + "PipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: will stage {} files. " + + "Enable logging at DEBUG level to see which files will be staged" + + pipelineOptions.getFilesToStage().size()); +} +return new Twister2LegacyRunner(pipelineOptions); + } + + @Override + public PipelineResult run(Pipeline pipeline) { +// create a worker and pass in the pipeline and then do the translation +Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options); +LOG.info("Translating pipeline to
[GitHub] [beam] HuangLED commented on a change in pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.
HuangLED commented on a change in pull request #11746: URL: https://github.com/apache/beam/pull/11746#discussion_r428333755 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.runners.core.construction.ModelCoders.STATE_BACKED_ITERABLE_CODER_URN; + +import com.google.auto.service.AutoService; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext; +import org.apache.beam.runners.core.construction.CoderTranslator; +import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar; +import org.apache.beam.sdk.coders.IterableLikeCoder; +import org.apache.beam.sdk.fn.stream.DataStreams; +import org.apache.beam.sdk.util.BufferedElementCountingOutputStream; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; + +/** + * A {@link BeamFnStateClient state} backed iterable which allows for fetching elements over the + * portability state API. See https://s.apache.org/beam-fn-state-api-and-bundle-processing;>remote references for + * additional details. + * + * One must supply a {@link StateBackedIterableTranslationContext} when using {@link + * CoderTranslator#fromComponents} to be able to create a {@link StateBackedIterable.Coder}. + */ +public class StateBackedIterable implements Iterable { + + private final BeamFnStateClient beamFnStateClient; + private final org.apache.beam.sdk.coders.Coder elemCoder; + @VisibleForTesting final StateRequest request; + @VisibleForTesting final List prefix; + + public StateBackedIterable( + BeamFnStateClient beamFnStateClient, + String instructionId, + ByteString runnerKey, + org.apache.beam.sdk.coders.Coder elemCoder, + List prefix) { +this.beamFnStateClient = beamFnStateClient; +this.elemCoder = elemCoder; + +StateRequest.Builder requestBuilder = StateRequest.newBuilder(); +requestBuilder +.setInstructionId(instructionId) +.getStateKeyBuilder() +.getRunnerBuilder() +.setKey(runnerKey); +this.request = requestBuilder.build(); +this.prefix = prefix; + } + + @Override + public Iterator iterator() { +return Iterators.concat( +prefix.iterator(), +new DataStreams.DataStreamDecoder( +elemCoder, + DataStreams.inbound(StateFetchingIterators.forFirstChunk(beamFnStateClient, request; + } + + /** + * Decodes an {@link Iterable} that might be backed by state. If the terminator at the end of the + * value stream is {@code -1} then we return a {@link StateBackedIterable} otherwise we return an + * {@link Iterable}. + */ + public static class Coder extends IterableLikeCoder> { + +private final BeamFnStateClient beamFnStateClient; +private final Supplier instructionId; + +public Coder( +BeamFnStateClient beamFnStateClient, +Supplier instructionId, +org.apache.beam.sdk.coders.Coder elemCoder) { + super(elemCoder, "StateBackedIterable"); + this.beamFnStateClient = beamFnStateClient; + this.instructionId = instructionId; +} + +@Override +protected Iterable decodeToIterable(List decodedElements) { + return decodedElements; +} + +@Override
[GitHub] [beam] TheNeuralBit edited a comment on pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types
TheNeuralBit edited a comment on pull request #11701: URL: https://github.com/apache/beam/pull/11701#issuecomment-631751672 Python PreCommit failure is a flake due to https://issues.apache.org/jira/browse/BEAM-9975 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types
TheNeuralBit commented on pull request #11701: URL: https://github.com/apache/beam/pull/11701#issuecomment-631751672 Python PreCommit failure is due to https://issues.apache.org/jira/browse/BEAM-9975 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
TheNeuralBit commented on pull request #11754: URL: https://github.com/apache/beam/pull/11754#issuecomment-631750861 I'm going to go ahead and merge since CI failure is unrelated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit merged pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
TheNeuralBit merged pull request #11754: URL: https://github.com/apache/beam/pull/11754 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit merged pull request #11575: [BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform
TheNeuralBit merged pull request #11575: URL: https://github.com/apache/beam/pull/11575 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11766: [BEAM-10036] More flexible dataframes partitioning.
robertwb opened a new pull request #11766: URL: https://github.com/apache/beam/pull/11766 Also adds (naive) dataframe.agg() that uses this. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] rohdesamuel opened a new pull request #11765: [BEAM-9322] Turn new PCollection naming schemes to True by default
rohdesamuel opened a new pull request #11765: URL: https://github.com/apache/beam/pull/11765 Change-Id: I8c2d660b175442d1917fe2b1ae166c0f4a1caaca This turns "passthrough_pcollection_output_ids" and "force_generated_pcollection_output_ids" to True by default. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build
[GitHub] [beam] ibzib merged pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR
ibzib merged pull request #11755: URL: https://github.com/apache/beam/pull/11755 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
TheNeuralBit commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631748270 whoops This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
TheNeuralBit commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631748120 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
TheNeuralBit commented on pull request #11529: URL: https://github.com/apache/beam/pull/11529#issuecomment-631747496 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631745259 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation
chamikaramj commented on pull request #11360: URL: https://github.com/apache/beam/pull/11360#issuecomment-631745574 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #11763: [BEAM-9978] Adding functionality and tests to Go offset range tracker.
lostluck commented on a change in pull request #11763: URL: https://github.com/apache/beam/pull/11763#discussion_r428305778 ## File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go ## @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package offsetrange + +import ( + "fmt" + "github.com/google/go-cmp/cmp" + "testing" +) + +// TestRestriction_EvenSplits tests various splits and checks that they all +// follow the contract for EvenSplits. This means that all restrictions are +// evenly split, that each restriction has at least one element, and that each +// element is present in the split restrictions. +func TestRestriction_EvenSplits(t *testing.T) { + tests := []struct { + rest Restriction + num int64 + }{ + {rest: Restriction{Start: 0, End: 21}, num: 4}, + {rest: Restriction{Start: 21, End: 42}, num: 4}, + {rest: Restriction{Start: 0, End: 5}, num: 10}, + {rest: Restriction{Start: 0, End: 21}, num: -1}, + } + for _, test := range tests { + test := test + t.Run(fmt.Sprintf("(rest[%v, %v], splits = %v)", + test.rest.Start, test.rest.End, test.num), func(t *testing.T) { + r := test.rest + + // Get the minimum size that a split restriction can be. Max size + // should be min + 1. This way we can check the size of each split. + num := test.num + if num <= 1 { + num = 1 + } + min := (r.End - r.Start) / num + + splits := r.EvenSplits(test.num) + prevEnd := r.Start + for _, split := range splits { + size := split.End - split.Start + // Check: Each restriction has at least 1 element. + if size == 0 { + t.Errorf("split restriction [%v, %v] is empty, size must be greater than 0.", + split.Start, split.End) + } + // Check: Restrictions are evenly split. + if size != min && size != min+1 { + t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v or %v", + split.Start, split.End, size, min, min+1) + } + // Check: All elements are still in a split restrictions. This + // logic assumes that the splits are returned in order which + // isn't guaranteed by EvenSplits, but this check is way easier + // with the assumption. + if split.Start != prevEnd { + t.Errorf("restriction range [%v, %v] missing after splits.", + prevEnd, split.Start) + } else { + prevEnd = split.End + } + } + if prevEnd != r.End { + t.Errorf("restriction range [%v, %v] missing after splits.", + prevEnd, r.End) + } + }) + } +} + +// TestTracker_TryClaim validates both success and failure cases for TryClaim. +func TestTracker_TryClaim(t *testing.T) { + // Test that TryClaim works as expected when called correctly. + t.Run("Correctness", func(t *testing.T) { + tests := []struct { + rest Restriction + claims []int64 + }{ + {rest: Restriction{Start: 0, End: 3}, claims: []int64{0, 1, 2, 3}}, + {rest: Restriction{Start: 10, End:
[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model
chamikaramj commented on pull request #11757: URL: https://github.com/apache/beam/pull/11757#issuecomment-631736712 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model
chamikaramj commented on pull request #11757: URL: https://github.com/apache/beam/pull/11757#issuecomment-631736587 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on a change in pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model
ihji commented on a change in pull request #11757: URL: https://github.com/apache/beam/pull/11757#discussion_r428314117 ## File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ## @@ -314,8 +314,15 @@ def __init__( if container_image_url in already_added_containers: # Do not add the pipeline environment again. + # Currently, Dataflow uses Docker container images to uniquely + # identify execution environments. Hence Dataflow executes all + # transforms that specify the same Docker container image in the + # same container. Dependencies of all environments that specify a + # given container image will be staged in the container for that Review comment: If you pick an alternative sentence above: `container instance` here too. ## File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ## @@ -314,8 +314,15 @@ def __init__( if container_image_url in already_added_containers: # Do not add the pipeline environment again. + # Currently, Dataflow uses Docker container images to uniquely + # identify execution environments. Hence Dataflow executes all + # transforms that specify the same Docker container image in the + # same container. Dependencies of all environments that specify a Review comment: `the same Docker container image in the same container` sounds little confusing to me. How about something like `the same Docker container image in the single container instance`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rionmonster commented on pull request #11761: [BEAM-10027] Support for Kotlin-based Beam Katas
rionmonster commented on pull request #11761: URL: https://github.com/apache/beam/pull/11761#issuecomment-631728007 @henryken I was able to successfully export the content of this course over to Stepik and generate all of the expected new metadata associated with it (e.g. `*-remote-info.yaml`) files. I have a draft of the course on the site now that as far as I can tell seems to parallel the existing Java one as expected. I wasn't sure regarding the publication of the actual course itself and what that process entailed (and if it warranted another push to this branch to actually commit the course metadata to support future updates or not). Anyways - I just thought I'd ask. Thanks again! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker
lukecwik commented on a change in pull request #11715: URL: https://github.com/apache/beam/pull/11715#discussion_r428304723 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.splittabledofn; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.math.BigDecimal; +import java.math.MathContext; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; + +/** + * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code Long.MAX_VALUE} is + * used as the end of the range to indicate infinity. + * + * An offset range is considered growable when the end offset could grow (or change) during + * execution time (e.g., Kafka topic partition offset, appended file, ...). + * + * The growable range is marked as done by claiming {@code Long.MAX_VALUE}. + */ +@Experimental(Kind.SPLITTABLE_DO_FN) +public class GrowableOffsetRangeTracker extends OffsetRangeTracker { + /** + * Provides the estimated end offset of the range. + * + * {@link #estimate} is called to give the end offset when {@link #trySplit} or {@link + * #getProgress} is invoked. The end offset is exclusive for the range. The estimated end is not + * required to monotonically increase as it will only be taken into consideration when the + * estimated end offset is larger than the current position. Returning {@code Long.MAX_VALUE} as + * the estimate implies the largest possible position for the range is {@code Long.MAX_VALUE - 1}. + * Return {@code Long.MIN_VALUE} if an estimate can not be provided. + * + * Providing a good estimate is important for an accurate progress signal and will impact + * splitting decisions by the runner. + * + * If {@link #estimate} is expensive to compute, consider wrapping the implementation with + * {@link Suppliers#memoizeWithExpiration} or equivalent as an optimization. + * + * TODO(BEAM-10032): Also consider using {@link RangeEndEstimator} when the range is not ended + * with {@code Long.MAX_VALUE}. + */ + @FunctionalInterface + public interface RangeEndEstimator { +long estimate(); + } + + private final RangeEndEstimator rangeEndEstimator; + + public GrowableOffsetRangeTracker(long start, RangeEndEstimator rangeEndEstimator) { +super(new OffsetRange(start, Long.MAX_VALUE)); +this.rangeEndEstimator = checkNotNull(rangeEndEstimator); + } + + @Override + public SplitResult trySplit(double fractionOfRemainder) { +// If current tracking range is no longer growable, split it as a normal range. +if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) { + return super.trySplit(fractionOfRemainder); +} +// If current range has been done, there is no more space to split. +if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) { + return null; +} +BigDecimal cur = +(lastAttemptedOffset == null) +? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE, MathContext.DECIMAL128) +: BigDecimal.valueOf(lastAttemptedOffset); + +// Fetch the estimated end offset. If the estimated end is smaller than the next offset, use +// the next offset as end. +BigDecimal estimateRangeEnd = +BigDecimal.valueOf(rangeEndEstimator.estimate()) +.max(cur.add(BigDecimal.ONE, MathContext.DECIMAL128)); + +// Convert to BigDecimal in computation to prevent overflow, which may result in loss of +// precision. +// split = cur + max(1, (estimateRangeEnd - cur) * fractionOfRemainder) +BigDecimal splitPos = +cur.add( +estimateRangeEnd +.subtract(cur, MathContext.DECIMAL128) +.multiply(BigDecimal.valueOf(fractionOfRemainder), MathContext.DECIMAL128) +