Hi Mike, >From the documentation on https://beam.apache.org/documentation/runners/spark/#pipeline-options-for-the-spark-runner
storageLevel The StorageLevel to use when caching RDDs in batch pipelines. The Spark Runner automatically caches RDDs that are evaluated repeatedly. This is a batch-only property as streaming pipelines in Beam are stateful, which requires Spark DStream's StorageLevel to be MEMORY_ONLY. MEMORY_ONLY So i think you are out of luck here. On Thu, Oct 11, 2018 at 10:05 PM Mike Kaplinskiy <[email protected]> wrote: > Hey folks, > > Admittedly I may be a bit on the bleeding edge here, but I'm attempting to > run a Beam pipeline on Spark which is running on top of Kubernetes. > Specifically Beam 2.6.0 with Spark 2.4.0-rc2 running in client mode with a > Kubernetes (1.11) driver. It's actually pretty cool - from a Kubernetes > perspective, I start a pod which starts a ton of workers to do the parallel > stuff and then cleans up after itself. > > One thing I can't seem to get working is setting the storage level for > Spark RDDs via Beam. Specifically passing --storageLevel=MEMORY_AND_DISK > seems to not work - the rdd still shows up as "Memory Deserialized 1x > Replicated" in the Spark UI. I would expect it to be something closer to > "Disk Memory Deserialized 1x Replicated." It *seems* to be serialized only > in the sense that less memory is used (I assume it gets encoded). > > I even tried hardcoding storageLevel in BoundedDataset.java (based on the > line number in the DAG viz). Unfortunately it still shows up as memory-only. > > Am I missing something that would let me spill data to disk? > > For reference, here's my exact command line: > /opt/spark/bin/spark-submit > --master 'k8s://https://kubernetes:443' > --deploy-mode client > --name $(MY_POD_NAME) > --conf spark.executor.instances=20 > --conf spark.driver.host=$(MY_POD_IP) > --conf spark.driver.port=7077 > --conf spark.kubernetes.container.image=$(MY_IMAGE) > --conf spark.kubernetes.driver.pod.name=$(MY_POD_NAME) > --conf spark.kubernetes.executor.podNamePrefix=$(MY_POD_NAME) > --conf spark.executor.memory=5500m > --conf spark.executor.memoryOverhead=1300m > --conf spark.memory.fraction=0.45 > --conf spark.executor.cores=3 > --conf spark.kubernetes.executor.limit.cores=3 > --conf spark.default.parallelism=60 > --conf spark.kubernetes.allocation.batch.size=20 > --conf spark.kubernetes.driver.label.app=beam-datomic-smoketest > --conf spark.kubernetes.node.selector.node.ladderlife.com/group=etl > --conf > spark.kubernetes.executor.annotation.iam.amazonaws.com/role=etl-role > --conf spark.kubernetes.executor.secrets.google-cloud=/google-cloud-secrets > --conf spark.kubernetes.executor.secretKeyRef.SENTRY_DSN=sentry-secrets:dsn > --conf spark.executorEnv.STATSD_HOST=169.254.168.253 > --class ladder.my_beam_job > local:///srv/beam_job.jar > --runner=SparkRunner > --storageLevel=MEMORY_AND_DISK > > Thanks, > Mike. > > Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life. > -- JC
