Re: Beam's job crashes on cluster
This is not that difficult to implement, but would be better to be done when you guys integrated native job submition for Python. However, I need t fix this last issue, which is the crash. Any idea where I should look at? Sent: Friday, December 13, 2019 at 2:52 PM From: "Kyle Weaver" To: dev Subject: Re: Beam's job crashes on cluster > I applied some modifications to the code to run Beam tasks on k8s cluster using spark-submit. Interesting, how does that work? On Fri, Dec 13, 2019 at 12:49 PM Matthew K. <softm...@gmx.com> wrote: I'm not sure if that could be a problem. I'm *not* running snadalone Spark. I applied some modifications to the code to run Beam tasks on k8s cluster using spark-submit. Therefore, worker nodes are spawned when spark-submit is called and connect to the master, and are supposed to be destroyed when job is finished. Therefore, the crash should have some other reason. Sent: Friday, December 13, 2019 at 2:37 PM From: "Kyle Weaver" <kcwea...@google.com> To: dev <dev@beam.apache.org> Subject: Re: Beam's job crashes on cluster Correction: should be formatted `spark://host:port`. Should follow the rules here: https://spark.apache.org/docs/latest/submitting-applications.html#master-urls On Fri, Dec 13, 2019 at 12:36 PM Kyle Weaver <kcwea...@google.com> wrote: You probably will want to add argument `-PsparkMasterUrl=localhost:8080` (or whatever host:port your Spark master is on) to the job-server:runShadow command. Without specifying the master URL, the default is to start an embedded Spark master within the same JVM as the job server, rather than using your standalone master. On Fri, Dec 13, 2019 at 12:15 PM Matthew K. <softm...@gmx.com> wrote: Job server is running on master node by this: ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd` Spark workers (executors) run on separate nodes, sharing /tmp (1GB size) in order to be able to access Beam job's MANIFEST. I'm running Python 2.7. There is no other shared resources between them. A pure Spark job works fine on the cluster (as far as I tested a simple one). If I'm not wrong, beam job executes with no problem when all master and workers run on the same node (but separate containers). Sent: Friday, December 13, 2019 at 1:49 PM From: "Kyle Weaver" <kcwea...@google.com> To: dev@beam.apache.org Subject: Re: Beam's job crashes on cluster > Do workers need to talk to job server independent from spark executors? No, they don't. From the time stamps in your logs, it looks like the sigbus happened after the executor was lost. Some additional info that might help us establish a chain of causation: - the arguments you used to start the job server? - the spark cluster deployment setup? On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <softm...@gmx.com> wrote: Actually the reason for that error is Job Server/JRE crashes at final stages and service becomes unavailable (note: job is on a very small dataset that is the absence of cluster, will be done in a couple of seconds): 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB, free: 967.8 MB) 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB) 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB) 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37 <-> 98% EXECUTING [2m 26s] > IDLE > IDLE > IDLE > :runners:spark:job-server:runShadow # # A fatal error has been detected by the Java Runtime Environment: # # SIGBUS (0x7) at pc=0x7f5ad7cd0d5e, pid=825, tid=0x7f5abb886700 # # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build 1.8.0_232-b09) # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64 compressed oops) # Problematic frame: # V [libjvm.so+0x8f8d5e] PerfLongVariant::sample()+0x1e # # Core dump written. Default location: /opt/spark/beam/core or core.825 # # An error report file with more information is saved as: # /opt/spark/beam/hs_err_pid825.log # # If you would like to submit a bug report, please visit: # http://bugreport.java.com/bugreport/crash.jsp # Aborted (core dumped) From /opt/spark/beam/hs_err_pid825.log: Internal exceptions (10 events):
Re: Beam's job crashes on cluster
I'm not sure if that could be a problem. I'm *not* running snadalone Spark. I applied some modifications to the code to run Beam tasks on k8s cluster using spark-submit. Therefore, worker nodes are spawned when spark-submit is called and connect to the master, and are supposed to be destroyed when job is finished. Therefore, the crash should have some other reason. Sent: Friday, December 13, 2019 at 2:37 PM From: "Kyle Weaver" To: dev Subject: Re: Beam's job crashes on cluster Correction: should be formatted `spark://host:port`. Should follow the rules here: https://spark.apache.org/docs/latest/submitting-applications.html#master-urls On Fri, Dec 13, 2019 at 12:36 PM Kyle Weaver <kcwea...@google.com> wrote: You probably will want to add argument `-PsparkMasterUrl=localhost:8080` (or whatever host:port your Spark master is on) to the job-server:runShadow command. Without specifying the master URL, the default is to start an embedded Spark master within the same JVM as the job server, rather than using your standalone master. On Fri, Dec 13, 2019 at 12:15 PM Matthew K. <softm...@gmx.com> wrote: Job server is running on master node by this: ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd` Spark workers (executors) run on separate nodes, sharing /tmp (1GB size) in order to be able to access Beam job's MANIFEST. I'm running Python 2.7. There is no other shared resources between them. A pure Spark job works fine on the cluster (as far as I tested a simple one). If I'm not wrong, beam job executes with no problem when all master and workers run on the same node (but separate containers). Sent: Friday, December 13, 2019 at 1:49 PM From: "Kyle Weaver" <kcwea...@google.com> To: dev@beam.apache.org Subject: Re: Beam's job crashes on cluster > Do workers need to talk to job server independent from spark executors? No, they don't. From the time stamps in your logs, it looks like the sigbus happened after the executor was lost. Some additional info that might help us establish a chain of causation: - the arguments you used to start the job server? - the spark cluster deployment setup? On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <softm...@gmx.com> wrote: Actually the reason for that error is Job Server/JRE crashes at final stages and service becomes unavailable (note: job is on a very small dataset that is the absence of cluster, will be done in a couple of seconds): 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB, free: 967.8 MB) 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB) 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB) 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37 <-> 98% EXECUTING [2m 26s] > IDLE > IDLE > IDLE > :runners:spark:job-server:runShadow # # A fatal error has been detected by the Java Runtime Environment: # # SIGBUS (0x7) at pc=0x7f5ad7cd0d5e, pid=825, tid=0x7f5abb886700 # # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build 1.8.0_232-b09) # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64 compressed oops) # Problematic frame: # V [libjvm.so+0x8f8d5e] PerfLongVariant::sample()+0x1e # # Core dump written. Default location: /opt/spark/beam/core or core.825 # # An error report file with more information is saved as: # /opt/spark/beam/hs_err_pid825.log # # If you would like to submit a bug report, please visit: # http://bugreport.java.com/bugreport/crash.jsp # Aborted (core dumped) From /opt/spark/beam/hs_err_pid825.log: Internal exceptions (10 events): Event: 0.664 Thread 0x7f5ad000a800 Exception (0x000794d72040) thrown at [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line 605] Event: 0.664 Thread 0x7f5ad000a800 Exception (0x000794d73e60) thrown at [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line 605]
Re: Beam's job crashes on cluster
Job server is running on master node by this: ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd` Spark workers (executors) run on separate nodes, sharing /tmp (1GB size) in order to be able to access Beam job's MANIFEST. I'm running Python 2.7. There is no other shared resources between them. A pure Spark job works fine on the cluster (as far as I tested a simple one). If I'm not wrong, beam job executes with no problem when all master and workers run on the same node (but separate containers). Sent: Friday, December 13, 2019 at 1:49 PM From: "Kyle Weaver" To: dev@beam.apache.org Subject: Re: Beam's job crashes on cluster > Do workers need to talk to job server independent from spark executors? No, they don't. From the time stamps in your logs, it looks like the sigbus happened after the executor was lost. Some additional info that might help us establish a chain of causation: - the arguments you used to start the job server? - the spark cluster deployment setup? On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <softm...@gmx.com> wrote: Actually the reason for that error is Job Server/JRE crashes at final stages and service becomes unavailable (note: job is on a very small dataset that is the absence of cluster, will be done in a couple of seconds): 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB, free: 967.8 MB) 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB) 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB) 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37 <-> 98% EXECUTING [2m 26s] > IDLE > IDLE > IDLE > :runners:spark:job-server:runShadow # # A fatal error has been detected by the Java Runtime Environment: # # SIGBUS (0x7) at pc=0x7f5ad7cd0d5e, pid=825, tid=0x7f5abb886700 # # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build 1.8.0_232-b09) # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64 compressed oops) # Problematic frame: # V [libjvm.so+0x8f8d5e] PerfLongVariant::sample()+0x1e # # Core dump written. Default location: /opt/spark/beam/core or core.825 # # An error report file with more information is saved as: # /opt/spark/beam/hs_err_pid825.log # # If you would like to submit a bug report, please visit: # http://bugreport.java.com/bugreport/crash.jsp # Aborted (core dumped) From /opt/spark/beam/hs_err_pid825.log: Internal exceptions (10 events): Event: 0.664 Thread 0x7f5ad000a800 Exception (0x000794d72040) thrown at [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line 605] Event: 0.664 Thread 0x7f5ad000a800 Exception (0x000794d73e60) thrown at [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line 605] Event: 0.665 Thread 0x7f5ad000a800 Exception (0x000794d885d0) thrown at [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line 605] Event: 0.665 Thread 0x7f5ad000a800 Exception (0x000794d8c6d8) thrown at [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line 605] Event: 0.673 Thread 0x7f5ad000a800 Exception (0x000794df7b70) thrown at [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line 605] Event: 0.674 Thread 0x7f5ad000a800 Exception (0x000794df8f38) thrown at [/home/openjdk/
Re: Beam's job crashes on cluster
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Sent: Friday, December 13, 2019 at 6:58 AM From: "Matthew K." To: dev@beam.apache.org Cc: dev Subject: Re: Beam's job crashes on cluster Hi Kyle, This is the pipeleine options config (I replaced localhost with actual job server's IP address, and still receive the same error. Do workers need to talk to job server independent from spark executors?): options = PipelineOptions([ "--runner=PortableRunner", "--job_endpoint=%s:8099" % ip_address, "--environment_type=PROCESS", "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}", "" ]) Sent: Thursday, December 12, 2019 at 5:30 PM From: "Kyle Weaver" To: dev Subject: Re: Beam's job crashes on cluster Can you share the pipeline options you are using? Particularly environment_type and environment_config. On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <softm...@gmx.com> wrote: Running Beam on Spark cluster, it crashhes and I get the following error (workers are on separate nodes, it works fine when workers are on the same node as runner): > Task :runners:spark:job-server:runShadow FAILED Exception in thread wait_until_finish_read: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 411, in read_messages for message in self._message_stream: File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next return self._next() File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next raise self _Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Socket closed" debug_error_string = "{"created":"@1576190515.361076583","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}" > Traceback (most recent call last): File "/opt/spark/work-dir/beam_script.py", line 49, in stats = tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, pipeline_options=options) File "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", line 197, in generate_statistics_from_csv statistics_pb2.DatasetFeatureStatisticsList))) File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 427, in __exit__ self.run().wait_until_finish() File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 429, in wait_until_finish for state_response in self._state_stream: File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next return self._next() File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next raise self grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Socket closed" debug_error_string = "{"created":"@1576190515.361053677","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}"
Re: Beam's job crashes on cluster
Hi Kyle, This is the pipeleine options config (I replaced localhost with actual job server's IP address, and still receive the same error. Do workers need to talk to job server independent from spark executors?): options = PipelineOptions([ "--runner=PortableRunner", "--job_endpoint=%s:8099" % ip_address, "--environment_type=PROCESS", "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}", "" ]) Sent: Thursday, December 12, 2019 at 5:30 PM From: "Kyle Weaver" To: dev Subject: Re: Beam's job crashes on cluster Can you share the pipeline options you are using? Particularly environment_type and environment_config. On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <softm...@gmx.com> wrote: Running Beam on Spark cluster, it crashhes and I get the following error (workers are on separate nodes, it works fine when workers are on the same node as runner): > Task :runners:spark:job-server:runShadow FAILED Exception in thread wait_until_finish_read: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 411, in read_messages for message in self._message_stream: File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next return self._next() File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next raise self _Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Socket closed" debug_error_string = "{"created":"@1576190515.361076583","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}" > Traceback (most recent call last): File "/opt/spark/work-dir/beam_script.py", line 49, in stats = tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, pipeline_options=options) File "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", line 197, in generate_statistics_from_csv statistics_pb2.DatasetFeatureStatisticsList))) File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 427, in __exit__ self.run().wait_until_finish() File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 429, in wait_until_finish for state_response in self._state_stream: File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next return self._next() File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next raise self grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Socket closed" debug_error_string = "{"created":"@1576190515.361053677","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}"
Beam's job crashes on cluster
Running Beam on Spark cluster, it crashhes and I get the following error (workers are on separate nodes, it works fine when workers are on the same node as runner): > Task :runners:spark:job-server:runShadow FAILED Exception in thread wait_until_finish_read: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 411, in read_messages for message in self._message_stream: File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next return self._next() File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next raise self _Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Socket closed" debug_error_string = "{"created":"@1576190515.361076583","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}" > Traceback (most recent call last): File "/opt/spark/work-dir/beam_script.py", line 49, in stats = tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, pipeline_options=options) File "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", line 197, in generate_statistics_from_csv statistics_pb2.DatasetFeatureStatisticsList))) File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 427, in __exit__ self.run().wait_until_finish() File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 429, in wait_until_finish for state_response in self._state_stream: File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next return self._next() File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next raise self grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Socket closed" debug_error_string = "{"created":"@1576190515.361053677","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}"
Pipeline parameters for running jobs in a cluster
Hi, To run a beam job on a spark cluster with some number of nodes running: 1. Is it recommended to set pipeline parameters --num_workers, --max_num_workers, --autoscaling_algorithms, --worker_machine_type, etc, or beam (spark) will figure that out? 2. If that is recommended to set those params, what are the recommended values based on the machines and resources in the cluster? Thanks
Re: Command for Beam worker on Spark cluster
Thanks, but still have problem making remote worker on k8s work (important to point out that I had to create shared volume between nodes in order all have access to the same /tmp, since beam runner creates artifact staging files on the machine it is running on, and expects workers to read from it). However, I get this error from executor: INFO AbstractArtifactRetrievalService: GetManifest for /tmp/beam-artifact-staging/job_cca3e889-76d9-4c8a-a942-a64ddbd2dd1f/MANIFEST INFO AbstractArtifactRetrievalService: GetManifest for /tmp/beam-artifact-staging/job_cca3e889-76d9-4c8a-a942-a64ddbd2dd1f/MANIFEST -> 0 artifacts INFO ProcessEnvironmentFactory: Still waiting for startup of environment '/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot' for worker id 3-1 ERROR Executor: Exception in task 0.1 in stage 0.0 (TID 2) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) (note that job manifest has no artifacts in it) I can see ports for enpoints (logging, artifact, ...) are open on the worker. Some debugging to boot.go and running it manually shows it doesn't return from "artifact.Materialize" function. Any idea what could be wrong in setup? Sent: Wednesday, November 06, 2019 at 5:45 PM From: "Kyle Weaver" To: dev Subject: Re: Command for Beam worker on Spark cluster > Where can I extract these parameters from? These parameters should be passed automatically when the process is run (note the use of $* in the example script): https://github.com/apache/beam/blob/fbc84b61240a3d83d9c19f7ccc17ba22e5d7e2c9/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java#L115-L121 > Also, how spark executor can find the port that grpc server is running on? Not sure which grpc server you mean here. On Wed, Nov 6, 2019 at 3:32 PM Matthew K. <softm...@gmx.com> wrote: Thanks, still I need to pass parameters to the boot executable, such as, worker id, control endpoint, logging endpoint, etc. Where can I extract these parameters from? (In apache_beam Python code, those can be extracted from StartWorker request parameters) Also, how spark executor can find the port that grpc server is running on? Sent: Wednesday, November 06, 2019 at 5:07 PM From: "Kyle Weaver" <kcwea...@google.com> To: dev <dev@beam.apache.org> Subject: Re: Command for Beam worker on Spark cluster In Docker mode, most everything's taken care of for you, but in process mode you have to do a lot of setup yourself. The command you're looking for is `sdks/python/container/build/target/launcher/linux_amd64/boot`. You will be required to have both that executable (which you can build from source using `./gradlew :sdks:python:container:build`) and a Python installation including Beam and other dependencies on all of your worker machines. The best example I know of is here: https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165 On Wed, Nov 6, 2019 at 2:24 PM Matthew K. <softm...@gmx.com> wrote: Hi all, I am trying to run *Python* beam pipeline on a Spark cluster. Since workers are running on separate nodes, I am using "PROCESS" for "evironment_type" in pipeline options, but I couldn't find any documentation on what "command" I should pass to "environment_config" to run on the worker, so executor can be able to communicate with. Can someone help me on that?
Re: Command for Beam worker on Spark cluster
Thanks, still I need to pass parameters to the boot executable, such as, worker id, control endpoint, logging endpoint, etc. Where can I extract these parameters from? (In apache_beam Python code, those can be extracted from StartWorker request parameters) Also, how spark executor can find the port that grpc server is running on? Sent: Wednesday, November 06, 2019 at 5:07 PM From: "Kyle Weaver" To: dev Subject: Re: Command for Beam worker on Spark cluster In Docker mode, most everything's taken care of for you, but in process mode you have to do a lot of setup yourself. The command you're looking for is `sdks/python/container/build/target/launcher/linux_amd64/boot`. You will be required to have both that executable (which you can build from source using `./gradlew :sdks:python:container:build`) and a Python installation including Beam and other dependencies on all of your worker machines. The best example I know of is here: https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165 On Wed, Nov 6, 2019 at 2:24 PM Matthew K. <softm...@gmx.com> wrote: Hi all, I am trying to run *Python* beam pipeline on a Spark cluster. Since workers are running on separate nodes, I am using "PROCESS" for "evironment_type" in pipeline options, but I couldn't find any documentation on what "command" I should pass to "environment_config" to run on the worker, so executor can be able to communicate with. Can someone help me on that?
Command for Beam worker on Spark cluster
Hi all, I am trying to run *Python* beam pipeline on a Spark cluster. Since workers are running on separate nodes, I am using "PROCESS" for "evironment_type" in pipeline options, but I couldn't find any documentation on what "command" I should pass to "environment_config" to run on the worker, so executor can be able to communicate with. Can someone help me on that?