[jira] [Work logged] (BEAM-5262) JobState support for Reference Runner
[ https://issues.apache.org/jira/browse/BEAM-5262?focusedWorklogId=143888=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143888 ] ASF GitHub Bot logged work on BEAM-5262: Author: ASF GitHub Bot Created on: 13/Sep/18 08:36 Start Date: 13/Sep/18 08:36 Worklog Time Spent: 10m Work Description: asfgit closed pull request #6301: [BEAM-5262] Add Reference runner support for add state stream URL: https://github.com/apache/beam/pull/6301 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java index 9627152c100..b58959b338c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java @@ -36,6 +36,8 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; @@ -56,6 +58,7 @@ /** The ReferenceRunner uses the portability framework to execute a Pipeline on a single machine. */ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnService { private static final Logger LOG = LoggerFactory.getLogger(ReferenceRunnerJobService.class); + private static final int WAIT_MS = 1000; public static ReferenceRunnerJobService create(final ServerFactory serverFactory) { LOG.info("Starting {}", ReferenceRunnerJobService.class); @@ -201,6 +204,36 @@ public void getState( responseObserver.onCompleted(); } + @Override + public void getStateStream( + GetJobStateRequest request, StreamObserver responseObserver) { +LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request); +String invocationId = request.getJobId(); +try { + Thread.sleep(WAIT_MS); + Enum state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED); + responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build()); + while (Enum.RUNNING.equals(state)) { +Thread.sleep(WAIT_MS); +state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED); + } + responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build()); +} catch (Exception e) { + String errMessage = + String.format("Encountered Unexpected Exception for Invocation %s", invocationId); + LOG.error(errMessage, e); + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); +} +responseObserver.onCompleted(); + } + + @Override + public void getMessageStream( + JobMessagesRequest request, StreamObserver responseObserver) { +// Not implemented +LOG.trace("{} {}", JobMessagesRequest.class.getSimpleName(), request); + } + @Override public void cancel(CancelJobRequest request, StreamObserver responseObserver) { LOG.trace("{} {}", CancelJobRequest.class.getSimpleName(), request); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 143888) Time Spent: 1h 20m (was: 1h 10m) > JobState support for Reference Runner > - > > Key: BEAM-5262 > URL: https://issues.apache.org/jira/browse/BEAM-5262 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Ankur Goenka >Assignee: Ankur Goenka >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > Reference runner does not support getStateStream which is needed by portable > SDK -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5262) JobState support for Reference Runner
[ https://issues.apache.org/jira/browse/BEAM-5262?focusedWorklogId=143885=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143885 ] ASF GitHub Bot logged work on BEAM-5262: Author: ASF GitHub Bot Created on: 13/Sep/18 07:46 Start Date: 13/Sep/18 07:46 Worklog Time Spent: 10m Work Description: angoenka commented on issue #6301: [BEAM-5262] Add Reference runner support for add state stream URL: https://github.com/apache/beam/pull/6301#issuecomment-420915357 @mxm @tweise Ping for merge! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 143885) Time Spent: 1h 10m (was: 1h) > JobState support for Reference Runner > - > > Key: BEAM-5262 > URL: https://issues.apache.org/jira/browse/BEAM-5262 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Ankur Goenka >Assignee: Ankur Goenka >Priority: Minor > Time Spent: 1h 10m > Remaining Estimate: 0h > > Reference runner does not support getStateStream which is needed by portable > SDK -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5262) JobState support for Reference Runner
[ https://issues.apache.org/jira/browse/BEAM-5262?focusedWorklogId=143205=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143205 ] ASF GitHub Bot logged work on BEAM-5262: Author: ASF GitHub Bot Created on: 11/Sep/18 17:31 Start Date: 11/Sep/18 17:31 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #6301: [BEAM-5262] Add Reference runner support for add state stream URL: https://github.com/apache/beam/pull/6301#discussion_r216754306 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java ## @@ -201,6 +204,36 @@ public void getState( responseObserver.onCompleted(); } + @Override + public void getStateStream( + GetJobStateRequest request, StreamObserver responseObserver) { +LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request); +String invocationId = request.getJobId(); +try { + Thread.sleep(WAIT_MS); + Enum state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED); + responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build()); + while (Enum.RUNNING.equals(state)) { +Thread.sleep(WAIT_MS); +state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED); + } + responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build()); Review comment: Makes sense. And it seems that in the case of it not being in the running state, it's not an issue to republish the state, so there's no performance/correctness issues here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 143205) Time Spent: 1h (was: 50m) > JobState support for Reference Runner > - > > Key: BEAM-5262 > URL: https://issues.apache.org/jira/browse/BEAM-5262 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Ankur Goenka >Assignee: Ankur Goenka >Priority: Minor > Time Spent: 1h > Remaining Estimate: 0h > > Reference runner does not support getStateStream which is needed by portable > SDK -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5262) JobState support for Reference Runner
[ https://issues.apache.org/jira/browse/BEAM-5262?focusedWorklogId=142943=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142943 ] ASF GitHub Bot logged work on BEAM-5262: Author: ASF GitHub Bot Created on: 11/Sep/18 02:19 Start Date: 11/Sep/18 02:19 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6301: [BEAM-5262] Add Reference runner support for add state stream URL: https://github.com/apache/beam/pull/6301#discussion_r216528105 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java ## @@ -201,6 +204,36 @@ public void getState( responseObserver.onCompleted(); } + @Override + public void getStateStream( + GetJobStateRequest request, StreamObserver responseObserver) { +LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request); +String invocationId = request.getJobId(); +try { + Thread.sleep(WAIT_MS); + Enum state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED); + responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build()); + while (Enum.RUNNING.equals(state)) { +Thread.sleep(WAIT_MS); +state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED); + } + responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build()); Review comment: The idea here is that if the pipeline is not running then we simply publish the state in L220 and if it is running then we would have published the state in L215 and we will not republish till the state changes to something other than running. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 142943) Time Spent: 50m (was: 40m) > JobState support for Reference Runner > - > > Key: BEAM-5262 > URL: https://issues.apache.org/jira/browse/BEAM-5262 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Ankur Goenka >Assignee: Ankur Goenka >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > Reference runner does not support getStateStream which is needed by portable > SDK -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5262) JobState support for Reference Runner
[ https://issues.apache.org/jira/browse/BEAM-5262?focusedWorklogId=142926=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142926 ] ASF GitHub Bot logged work on BEAM-5262: Author: ASF GitHub Bot Created on: 11/Sep/18 00:11 Start Date: 11/Sep/18 00:11 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #6301: [BEAM-5262] Add Reference runner support for add state stream URL: https://github.com/apache/beam/pull/6301#discussion_r216512517 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java ## @@ -201,6 +204,36 @@ public void getState( responseObserver.onCompleted(); } + @Override + public void getStateStream( + GetJobStateRequest request, StreamObserver responseObserver) { +LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request); +String invocationId = request.getJobId(); +try { + Thread.sleep(WAIT_MS); + Enum state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED); + responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build()); + while (Enum.RUNNING.equals(state)) { +Thread.sleep(WAIT_MS); +state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED); + } + responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build()); Review comment: This seems wrong based on my understanding of onNext. I assumed (based on a quick reading of the documentation for StreamObservers) that onNext would have to be called every time the state enum changed. With the call before and after the while loop, it seems like it would be called twice in succession if the state is initially something other than Enum.RUNNING. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 142926) Time Spent: 40m (was: 0.5h) > JobState support for Reference Runner > - > > Key: BEAM-5262 > URL: https://issues.apache.org/jira/browse/BEAM-5262 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Ankur Goenka >Assignee: Ankur Goenka >Priority: Minor > Time Spent: 40m > Remaining Estimate: 0h > > Reference runner does not support getStateStream which is needed by portable > SDK -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5262) JobState support for Reference Runner
[ https://issues.apache.org/jira/browse/BEAM-5262?focusedWorklogId=141539=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141539 ] ASF GitHub Bot logged work on BEAM-5262: Author: ASF GitHub Bot Created on: 05/Sep/18 21:38 Start Date: 05/Sep/18 21:38 Worklog Time Spent: 10m Work Description: angoenka commented on issue #6301: [BEAM-5262] Add Reference runner support for add state stream URL: https://github.com/apache/beam/pull/6301#issuecomment-418890184 Ping! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 141539) Time Spent: 0.5h (was: 20m) > JobState support for Reference Runner > - > > Key: BEAM-5262 > URL: https://issues.apache.org/jira/browse/BEAM-5262 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Ankur Goenka >Assignee: Ankur Goenka >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > Reference runner does not support getStateStream which is needed by portable > SDK -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5262) JobState support for Reference Runner
[ https://issues.apache.org/jira/browse/BEAM-5262?focusedWorklogId=139461=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139461 ] ASF GitHub Bot logged work on BEAM-5262: Author: ASF GitHub Bot Created on: 30/Aug/18 00:01 Start Date: 30/Aug/18 00:01 Worklog Time Spent: 10m Work Description: angoenka commented on issue #6301: [BEAM-5262] Add Reference runner support for add state stream URL: https://github.com/apache/beam/pull/6301#issuecomment-417146515 cc: @youngoli This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 139461) Time Spent: 20m (was: 10m) > JobState support for Reference Runner > - > > Key: BEAM-5262 > URL: https://issues.apache.org/jira/browse/BEAM-5262 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Ankur Goenka >Assignee: Ankur Goenka >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > Reference runner does not support getStateStream which is needed by portable > SDK -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5262) JobState support for Reference Runner
[ https://issues.apache.org/jira/browse/BEAM-5262?focusedWorklogId=139460=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139460 ] ASF GitHub Bot logged work on BEAM-5262: Author: ASF GitHub Bot Created on: 30/Aug/18 00:01 Start Date: 30/Aug/18 00:01 Worklog Time Spent: 10m Work Description: angoenka opened a new pull request #6301: [BEAM-5262] Add Reference runner support for add state stream URL: https://github.com/apache/beam/pull/6301 **Please** add a meaningful description for your change here Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 139460) Time Spent: 10m Remaining Estimate: 0h > JobState support for Reference Runner > - > > Key: BEAM-5262 > URL: https://issues.apache.org/jira/browse/BEAM-5262 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Ankur Goenka >Assignee: Ankur Goenka >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > Reference runner does not support getStateStream which is needed by