[ 
https://issues.apache.org/jira/browse/BEAM-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506482#comment-16506482
 ] 

Eugene Kirpichov commented on BEAM-4473:
----------------------------------------

I think the bug is that at 
[https://github.com/apache/beam/blob/69fb5c97a91ec9be3f481e827300cf796cbbfa19/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java#L83]
 we don't wrap the outbound observer into any sort of synchronization, unlike 
at 
[https://github.com/apache/beam/blob/69fb5c97a91ec9be3f481e827300cf796cbbfa19/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java#L130,]
 where we end up wrapping it into 
[https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/StreamObserverFactory.java]
 .

> Flaky 
> org.apache.beam.runners.direct.portable.ReferenceRunnerTest.pipelineExecution
> -----------------------------------------------------------------------------------
>
>                 Key: BEAM-4473
>                 URL: https://issues.apache.org/jira/browse/BEAM-4473
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>
> Example run: 
> [https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/6062/testReport/junit/org.apache.beam.runners.direct.portable/ReferenceRunnerTest/pipelineExecution/]
>  
> {code:java}
> Error Message
> java.lang.IllegalStateException: sendHeaders has already been called
> Stacktrace
> java.lang.IllegalStateException: sendHeaders has already been called
>       at 
> org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
>       at io.grpc.internal.ServerCallImpl.sendHeaders(ServerCallImpl.java:104)
>       at 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:282)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:112)
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:224)
>       at 
> org.apache.beam.runners.direct.portable.RemoteStageEvaluatorFactory$RemoteStageEvaluator.finishBundle(RemoteStageEvaluatorFactory.java:85)
>       at 
> org.apache.beam.runners.direct.portable.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:154)
>       at 
> org.apache.beam.runners.direct.portable.DirectTransformExecutor.run(DirectTransformExecutor.java:103)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>       Suppressed: java.lang.IllegalStateException: Processing bundle failed, 
> TODO: [BEAM-3962] abort bundle.
>               at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:234)
>               ... 8 more
> Standard Output
> Shutting SDK harness down.
> Standard Error
> Jun 04, 2018 9:34:41 PM org.apache.beam.sdk.coders.SerializableCoder 
> checkEqualsMethodDefined
> WARNING: Can't verify serialized elements of type BoundedSource have well 
> defined equals method. This may produce incorrect results on some 
> PipelineRunner
> Jun 04, 2018 9:34:41 PM org.apache.beam.sdk.coders.SerializableCoder 
> checkEqualsMethodDefined
> WARNING: Can't verify serialized elements of type BoundedSource have well 
> defined equals method. This may produce incorrect results on some 
> PipelineRunner
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient 
> closeAndTerminateOutstandingRequests
> SEVERE: FnApiControlClient closed, clearing outstanding requests 
> {5=java.util.concurrent.CompletableFuture@1051ec6e[Not completed, 1 
> dependents], 6=java.util.concurrent.CompletableFuture@341889cc[Not completed, 
> 1 dependents]}
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
> INFO: Beam Fn Control client connected with id 
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
> INFO: Fn Harness started
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
> INFO: Beam Fn Logging client connected.
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
> INFO: Entering instruction processing loop
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
> INFO: Beam Fn Data client connected.
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.runners.fnexecution.logging.GrpcLoggingService$InboundObserver
>  onCompleted
> INFO: Logging client hanged up.
> java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: 
> CANCELLED: Runner closed connection
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>       at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.processInstructionRequests(BeamFnControlClient.java:158)
>       at org.apache.beam.fn.harness.FnHarness.main(FnHarness.java:157)
>       at 
> org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory.lambda$createEnvironment$0(InProcessEnvironmentFactory.java:90)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver onError
> SEVERE: Failed to handle for url: "InProcessServer_4"
> io.grpc.StatusRuntimeException: CANCELLED: Multiplexer hanging up
>       at io.grpc.Status.asRuntimeException(Status.java:540)
>       at 
> io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:392)
>       at 
> io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
>       at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
>       at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
>       at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
>       at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
>       at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
>       at 
> io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Jun 04, 2018 9:34:45 PM 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver 
> onCompleted
> WARNING: Hanged up for unknown endpoint.
> Caused by: io.grpc.StatusRuntimeException: CANCELLED: Runner closed connection
>       at io.grpc.Status.asRuntimeException(Status.java:540)
>       at 
> io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:392)
>       at 
> io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
>       at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
>       at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
>       at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
>       at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
>       at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
>       at 
> io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
>       ... 3 more
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to