We made some progress to parallelize our python code using beam-spark.
Following your advice, we are using spark 3.2.1
The spark server and worker are connected ok.
In a third machine, the client machine, I am running the docker jobserver:
$ sudo docker run --net=host apache/beam_spark_job_server:latest
--spark-master-url=spark://<SERVER-IP>:7077
Then on the client:
$ python test_beam.py
If it matters, the code in test_beam.py has the following:
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--save_main_session",
"--environment_type=DOCKER",
"--environment_config=docker.io/apache/beam_python3.8_sdk:2.37.0"
])
with beam.Pipeline(options=options) as p:
lines = (p
| 'Create words' >> beam.Create(['this is working'])
| 'Add hostname' >> beam.Map(lambda words: addhost(words))
| 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
| 'Build byte array' >> beam.ParDo(ConvertToByteArray())
| 'Group' >> beam.GroupBy() # Do future batching here
| 'print output' >> beam.Map(myprint)
)
I think I got the versions wrong because the the server logs gives
(192.168.1.252 is the client IP):
22/04/04 11:51:57 DEBUG TransportServer: New connection accepted for remote
address /192.168.1.252:53330.
22/04/04 11:51:57 ERROR TransportRequestHandler: Error while invoking
RpcHandler#receive() for one-way message.
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription;
local class incompatible: stream classdesc serialVersionUID =
6543101073799644159, local class serialVersionUID = 1574364215946805297
….
On the client logs, I got:
22/04/04 11:51:56 INFO
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Staging
artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.
22/04/04 11:51:56 INFO
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Resolving
artifacts for
job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.ref_Environment_default_environment_1.
22/04/04 11:51:56 INFO
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Getting 1
artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.null.
22/04/04 11:51:56 INFO
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Artifacts
fully staged for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.
22/04/04 11:51:56 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking
job BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038
22/04/04 11:51:56 INFO org.apache.beam.runners.jobsubmission.JobInvocation:
Starting job invocation
BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038
22/04/04 11:51:56 INFO
org.apache.beam.runners.core.construction.resources.PipelineResources:
PipelineOptions.filesToStage was not specified. Defaulting to files from the
classpath: will stage 6 files. Enable logging at DEBUG level to see which files
will be staged.
22/04/04 11:51:56 INFO
org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a brand
new Spark Context.
22/04/04 11:51:56 WARN org.apache.spark.util.Utils: Your hostname,
spark-ml-client resolves to a loopback address: 127.0.0.1; using 192.168.1.252
instead (on interface eth0)
22/04/04 11:51:56 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you
need to bind to another address
22/04/04 11:51:57 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
22/04/04 11:52:57 ERROR
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application has
been killed. Reason: All masters are unresponsive! Giving up.
22/04/04 11:52:57 WARN
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application ID
is not initialized yet.
22/04/04 11:52:57 WARN
org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint: Drop
UnregisterApplication(null) because has not yet connected to master
22/04/04 11:52:57 WARN org.apache.spark.metrics.MetricsSystem: Stopping a
MetricsSystem that is not running
22/04/04 11:52:57 ERROR org.apache.spark.SparkContext: Error initializing
SparkContext.
java.lang.NullPointerException
at
org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:64)
at
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:248)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:510)
at
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101)
at
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67)
at
org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:118)
at
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
22/04/04 11:52:57 ERROR org.apache.beam.runners.jobsubmission.JobInvocation:
Error during job invocation
BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038.
java.lang.NullPointerException
at
org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:64)
at
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:248)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:510)
at
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101)
at
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67)
at
org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:118)
at
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)