Using Streaming Listener in a Structured Streaming job
In a Structured Streaming job, the listener that is supported is StreamingQueryListener. spark.streams().addListener( new StreamingQueryListener() { ... } ); However, there is no straightforward way to use StreamingListener. I have done it like this: StreamingContext streamingContext = new StreamingContext(spark.sparkContext(), new Duration(1000)) streamingContext.addStreamingListener( new StreamingListener() { ... } ) However, this is not working for me. Is there a way to use StreamingListener in a Structured Streaming query? Thanks, Arwin
Storage Partition Joins only works for buckets?
Hey team, I was reading through the Storage Partition Join SPIP (https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit#heading=h.82w8qxfl2uwl) but it seems like it only supports buckets, not partitions. Is that true? And if so does anybody have an intuition for why - is it simply a bad idea? Thanks, Arwin
Parquet 'bucketBy' creates a ton of files
I am trying to use Spark's **bucketBy** feature on a pretty large dataset. ```java dataframe.write() .format("parquet") .bucketBy(500, bucketColumn1, bucketColumn2) .mode(SaveMode.Overwrite) .option("path", "s3://my-bucket") .saveAsTable("my_table"); ``` The problem is that my Spark cluster has about 500 partitions/tasks/executors (not sure the terminology), so I end up with files that look like: ``` part-1-{UUID}_1.c000.snappy.parquet part-1-{UUID}_2.c000.snappy.parquet ... part-1-{UUID}_00500.c000.snappy.parquet part-2-{UUID}_1.c000.snappy.parquet part-2-{UUID}_2.c000.snappy.parquet ... part-2-{UUID}_00500.c000.snappy.parquet part-00500-{UUID}_1.c000.snappy.parquet part-00500-{UUID}_2.c000.snappy.parquet ... part-00500-{UUID}_00500.c000.snappy.parquet ``` That's 500x500=25 bucketed parquet files! It takes forever for the `FileOutputCommitter` to commit that to S3. Is there a way to generate **one file per bucket**, like in Hive? Or is there a better way to deal with this problem? As of now it seems like I have to choose between lowering the parallelism of my cluster (reduce number of writers) or reducing the parallelism of my parquet files (reduce number of buckets), which will lower the parallelism of my downstream jobs. Thanks
Creating custom Spark-Native catalyst/codegen functions
Hi friends, I am looking into converting some UDFs/UDAFs to Spark-Native functions to leverage Catalyst and codegen. Looking through some examples (for example: https://github.com/apache/spark/pull/7214/files for Levenshtein) it seems like we need to add these functions to the Spark framework itself. Is there a way to add custom Spark-Native functions in "userspace"? Thank you! Arwin
Re: Creating custom Spark-Native catalyst/codegen functions
Hey, It seems like the GeoSpark repo is not publicly accessible? But from the filepath it seems like the Spark codebase itself was forked or modified. The examples that I've seen seem to suggest that you need to register custom Spark-Native functions inside Spark's private namespace like you said (FunctionRegistry.scala I believe). I was wondering if it was possible to add the more efficient Spark-Native functions in my user application without having to fork or modify Spark itself. Thanks, Arwin From: Georg Heiler Sent: Wednesday, August 21, 11:18 PM Subject: Re: Creating custom Spark-Native catalyst/codegen functions To: Arwin Tio Cc: user@spark.apache.org Look at https://github.com/DataSystemsLab/GeoSpark/tree/master/sql/src/main/scala/org/apache/spark/sql/geospark sql for an example. Using custom function registration and functions residing inside sparks private namespace should work. But I am not aware of a public user facing API. Is there any I am missing? Arwin Tio < arwin@hotmail.com<mailto:arwin@hotmail.com>> schrieb am Do. 22. Aug. 2019 04:28: Hi friends, I am looking into converting some UDFs/UDAFs to Spark-Native functions to leverage Catalyst and codegen. Looking through some examples (for example: https://github.com/apache/spark/pull/7214/files for Levenshtein) it seems like we need to add these functions to the Spark framework itself. Is there a way to add custom Spark-Native functions in "userspace"? Thank you! Arwin
In Catalyst expressions, when is it appropriate to use codegen
Hi, I am exploring the usage of Catalyst expression functions to avoid the performance issues associated with UDFs. One thing that I noticed is that there is a trait called CodegenFallback and there are some Catalyst expressions in Spark that inherit from it [0]. My question is, is there a technical limitation for some Catalyst expressions, like datetimeExpressions, that make codegen unsuitable? How do you evaluate whether or not a Catalyst expression should use codegen? Thanks, Arwin [0] https://github.com/apache/spark/blob/3a4afce96c6840431ed45280742f9e969be19639/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L95
Re: Spark Executor OOMs when writing Parquet
Okay! I didn't realize you can pump those partition numbers up that high. 15000 partitions still failed. I am trying 3 partitions now. There is still some disk spill but it is not that high. Thanks, Arwin From: Chris Teoh Sent: January 17, 2020 7:32 PM To: Arwin Tio Cc: user @spark Subject: Re: Spark Executor OOMs when writing Parquet You also have disk spill which is a performance hit. Try multiplying the number of partitions by about 20x - 40x and see if you can eliminate shuffle spill. On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, mailto:arwin@hotmail.com>> wrote: Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under the impression that memory spill is OK? [cid:52075a7e-f05d-4d0d-a6e3-0ea4f7cf2c6c] (If you're wondering, this is EMR). From: Chris Teoh mailto:chris.t...@gmail.com>> Sent: January 17, 2020 10:30 AM To: Arwin Tio mailto:arwin@hotmail.com>> Cc: user @spark mailto:user@spark.apache.org>> Subject: Re: Spark Executor OOMs when writing Parquet Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are your stages experiencing shuffle spill? On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, mailto:arwin@hotmail.com>> wrote: Hello, I have a fairly straightforward Spark job that converts CSV to Parquet: ``` Dataset df = spark.read(...) df .repartition(5000) .write() .format("parquet") .parquet("s3://mypath/...); ``` For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed). The error looks like this: ``` 20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.OutOfMemoryError at sun.misc.Unsafe.allocateMemory(Native Method) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106) at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299) at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424) at org.apache.spark.sql.execution.datasources.parquet.Parquet