Thanks for the report. Is this with 2.12.0? If so,
https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L293
seems a strange place to get a NullPointerException. Is there perhaps
an exception earlier in the code (which could be the root cause)?

On Tue, May 28, 2019 at 4:52 AM 青雉(祁明良) <[email protected]> wrote:
>
> Hi Robert,
>
> When I set the —artifacts-dir to hdfs location, I got a NPE exception. The 
> url is accessible via hadoop client.
>
> ---------------
> ./beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server
>  --flink-master-url test-mqi-job1-hl:8081 --artifacts-dir 
> hdfs://10.53.48.6:4007/algo-emr/k8s_flink/beam/
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - ArtifactStagingService started on localhost:8098
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - Java ExpansionService started on localhost:8097
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - JobService started on localhost:8099
> May 28, 2019 2:43:56 AM 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor run
> SEVERE: Exception while executing runnable 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@44065193
> java.lang.NullPointerException
> at 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onCompleted(BeamFileSystemArtifactStagingService.java:293)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:259)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> On 27 May 2019, at 9:49 PM, Robert Bradshaw <[email protected]> wrote:
>
> On Mon, May 27, 2019 at 2:24 PM 青雉(祁明良) <[email protected]> wrote:
>
>
> Just now I try to use the PROCESS environment type, the Flink taskmanager 
> complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found 
> this directory is only created on the machine with beam job endpoint. I guess 
> maybe I should set the artifact-dir to a hdfs location, but no luck for me:(
>
>
> Yes, you need to set your artifact staging directory (the
> --artifacts-dir flag) to something visible to both the job server and
> the workers. Did you try that?
>
> I don’t know if the following error message from job endpoint is related when 
> submitting the job.
>
> Error from job endpoint:
> ---------------
> [grpc-default-executor-0] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - 
> Encountered Unexpected Exception for Invocation 
> job_09aa2abd-0bc0-4994-a8b7-130156e4c13c
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)
> at 
> org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> On 27 May 2019, at 6:53 PM, Maximilian Michels <[email protected]> wrote:
>
> Hi Mingliang,
>
> The environment is created for each TaskManager.
>
>   For docker, will it create one docker per flink taskmanager?
>
>
> Yes.
>
>   For process, does it mean start a python process to run the user code? And 
> it seems "command" should be set in the environment config, but what should 
> it be?
>
>
> You will have to start the same Python SDK Harness which would run inside a 
> Docker container if you had chosen Docker. This is a more manual approach 
> which should only be chosen if you cannot use Docker.
>
>   For external(loopback), does it mean let flink operator to call an external 
> service and by default set to the place where I submit the beam job? This 
> looks like all the data will be shift to a single machine and processed there.
>
>
> This intended for a long-running SDK Harness which is already running when 
> you run your pipeline. Thus, you only provide the address to the already 
> running SDK Harness.
>
> Cheers,
> Max
>
> On 26.05.19 13:51, 青雉(祁明良) wrote:
>
> Hi All,
> I'm currently trying python portable runner with Flink. I see there are 3 
> kinds of environment_type available "docker/process/external(loopback)" when 
> submit a job. But I didn't find any material explain more.
> 1. For docker, will it create one docker per flink taskmanager?
> 2. For process, does it mean start a python process to run the user
>   code? And it seems "command" should be set in the environment
>   config, but what should it be?
> 3. For external(loopback), does it mean let flink operator to call an
>   external service and by default set to the place where I submit the
>   beam job? This looks like all the data will be shift to a single
>   machine and processed there.
> Thanks,
> Mingliang
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁 止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发) 
> 本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 邮件!
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.
>
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.
>
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.

Reply via email to