bucket joins on multiple data frames.
Sorry if this has been answered, but I had a question about bucketed joins that I can't seem to find the answer to online. - I have a bunch of pyspark data frames (let's call them df1, df2, ...df10). I need to join them all together using the same key. - joined = df1.join(df2, "key", "full") - joined = joined.join(df3, "key", "full") - joined = joined.join(df4, "key", "full") - ... - I saw bucketed joins can help in this situation, but when I try to do it, I only get a bucket edjoin on the first join, and then I have to re-create a bucket table of joined results after each join otherwise I don't get a bucket join. This process of re-creating the joined table only slows the join down and I don't see any performance gain. - Doesn't work: (pseudo code) - df1.write-bucketed() ; t1 = spark.table("df1") - df2.write-bucketed() ; t2 = spark.table("df2") - df3.write-bucketed() ; t3 = spark.table("df3") - joined = t1.join(t2, "key", "full") - joined = joined.join(t3, "key", "full") - Works but is slow: (pseudo code) - df1.write-bucketed() ; t1 = spark.table("df1") - df2.write-bucketed() ; t2 = spark.table("df2") - df3.write-bucketed() ; t3 = spark.table("df3") - joined = t1.join(t2, "key", "full") - joined.write-bucketed() ; joined = spark.table("joined") - joined = joined.join(t3, "key", "full") I'm wondering if there is a way to get performance gains here, either by using bucketing or some other way. Also courions if this isn't what bucket joins are for, what are they actually for. Thanks Adrian
Unsubscribe
Unsubscribe - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Is BindingParquetOutputCommitter still used?
Hi, per https://spark.apache.org/docs/latest/cloud-integration.html, when using S3 storage one is advised to set these options: spark.sql.sources.commitProtocolClass > org.apache.spark.internal.io.cloud.PathOutputCommitProtocol > spark.sql.parquet.output.committer.class > org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter However, looking at code and trying simple tests suggests that BindingParquetOutputCommitter is not used at all. Specifically, I used this code import org.apache.log4j.{Level, Logger} Logger.getLogger("org.apache.spark.internal.io.cloud").setLevel(Level.TRACE) Logger.getLogger("org.apache.hadoop.mapreduce.lib.output").setLevel(Level.DEBUG) val spark = SparkSession.builder().master("local[*]") .config("spark.sql.sources.outputCommitterClass", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter") .config("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter") .config("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol") .config("fs.s3a.committer.magic.enabled", "true") .config("fs.s3.committer.magic.enabled", "true") .config("spark.hadoop.fs.s3a.committer.name", "magic") .config("spark.hadoop.fs.s3.committer.name", "magic") .getOrCreate() import spark.implicits._ val df = Seq("foo", "bar").toDF("s") df.write.mode("overwrite").parquet("s3:///2021-09-07-parquet") I observe that magic committer is used, and I get trace log message from PathOutputCommitProtocol, but not from BindingParquetOutputCommitter. If I remove configuration options that set BindingParquetOutputCommitter, I still see magic committer used. The spark.sql.parquet.output.committer.class option is only used in ParquetFileFormat, where it is copied to spark.sql.sources.outputCommitterClass, and that option, in turn, is only used by SQLHadoopMapReduceCommitProtocol - which we don't use here. So, it sounds like setting parquet.output.committer.class to org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter is no longer necessary? Or is there some code path where it matters? -- Vladimir Prus http://vladimirprus.com
Fwd: issue in Apache Spark install
Dear Learning member of of https://learning.oreilly.com some problem in install Apache Spark I try both CMD and Jupyter file same issue* Exception: Java gateway process exited before sending its port number* please resolve this issue find the attachment in Jupyter In CMD C:\Users\User>pyspark Python 3.8.8 (default, Apr 13 2021, 15:08:03) [MSC v.1916 64 bit (AMD64)] :: Anaconda, Inc. on win32 Warning: This Python interpreter is in a conda environment, but the environment has not been activated. Libraries may fail to load. To activate this environment please see https://conda.io/activation Type "help", "copyright", "credits" or "license" for more information. Exception in thread "main" java.lang.ExceptionInInitializerError at org.apache.spark.unsafe.array.ByteArrayMethods.(ByteArrayMethods.java:54) at org.apache.spark.internal.config.package$.(package.scala:1095) at org.apache.spark.internal.config.package$.(package.scala) at org.apache.spark.deploy.SparkSubmitArguments.$anonfun$loadEnvironmentArguments$3(SparkSubmitArguments.scala:157) at scala.Option.orElse(Option.scala:447) at org.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:157) at org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:115) at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.(SparkSubmit.scala:1022) at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:1022) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:85) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int) accessible: module java.base does not "opens java.nio" to unnamed module @71e9ddb4 at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) at java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:188) at java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:181) at org.apache.spark.unsafe.Platform.(Platform.java:56) ... 13 more Traceback (most recent call last): File "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\shell.py", line 35, in SparkContext._ensure_initialized() # type: ignore File "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\context.py", line 331, in _ensure_initialized SparkContext._gateway = gateway or launch_gateway(conf) File "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\java_gateway.py", line 108, in launch_gateway raise Exception("Java gateway process exited before sending its port number") Exception: Java gateway process exited before sending its port number SparkTest - Jupyter Notebook.pdf Description: Adobe PDF document - To unsubscribe e-mail: user-unsubscr...@spark.apache.org