pralabhkumar commented on a change in pull request #23447: [SPARK-26462][CORE]
Use ConfigEntry for hardcoded configs for execution categories
URL: https://github.com/apache/spark/pull/23447#discussion_r245732758
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -780,4 +880,146 @@ package object config {
ConfigBuilder("spark.executor.logs.rolling.enableCompression")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val IO_COMPRESSION_SNAPPY_BLOCKSIZE =
+ ConfigBuilder("spark.io.compression.snappy.blockSize")
+ .doc("Block size in bytes used in Snappy compression, in the case when "
+
+ "Snappy compression codec is used. Lowering this block size " +
+ "will also lower shuffle memory usage when Snappy is used")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("32k")
+
+ private[spark] val IO_COMPRESSION_SNAPPY_BLOCK_SIZE =
+ ConfigBuilder("spark.io.compression.snappy.block.size")
+ .doc("Block size in bytes used in Snappy compression, in the case when "
+
+ "Snappy compression codec is used. Lowering this block size " +
+ "will also lower shuffle memory usage when Snappy is used. This used
in older version 1.4")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("32k")
+
+ private[spark] val IO_COMPRESSION_LZ4_BLOCKSIZE =
+ ConfigBuilder("spark.io.compression.lz4.blockSize")
+ .doc("Block size in bytes used in LZ4 compression, in the case when LZ4
compression" +
+ "codec is used. Lowering this block size will also lower shuffle
memory " +
+ "usage when LZ4 is used.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("32k")
+
+ private[spark] val IO_COMPRESSION_LZ4_BLOCK_SIZE =
+ ConfigBuilder("spark.io.compression.lz4.block.size")
+ .doc("Block size in bytes used in LZ4 compression, in the case when LZ4
compression" +
+ "codec is used. Lowering this block size will also lower shuffle
memory " +
+ "usage when LZ4 is used. This used in older version 1.4")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("32k")
+
+ private[spark] val IO_COMPRESSION_CODEC =
+ ConfigBuilder("spark.io.compression.codec")
+ .doc("The codec used to compress internal data such as RDD partitions,
event log, " +
+ "broadcast variables and shuffle outputs. By default, Spark provides
four codecs: " +
+ "lz4, lzf, snappy, and zstd. You can also use fully qualified class
names to specify " +
+ "the codec")
+ .stringConf
+ .createWithDefaultString("lz4")
+
+ private[spark] val IO_COMPRESSION_ZSTD_BUFFERSIZE =
+ ConfigBuilder("spark.io.compression.zstd.bufferSize")
+ .doc("Buffer size in bytes used in Zstd compression, in the case when
Zstd " +
+ "compression codec is used. Lowering this size will lower the shuffle
" +
+ "memory usage when Zstd is used, but it might increase the compression
" +
+ "cost because of excessive JNI call overhead")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("32k")
+
+ private[spark] val IO_COMPRESSION_ZSTD_LEVEL =
+ ConfigBuilder("spark.io.compression.zstd.level")
+ .doc("Compression level for Zstd compression codec. Increasing the
compression" +
+ " level will result in better compression at the expense of more CPU
and memory")
+ .intConf
+ .createWithDefault(1)
+
+ private[spark] val BUFFER_SIZE =
+ ConfigBuilder("spark.buffer.size")
+ .intConf
+ .createWithDefault(65536)
+
+ private[spark] val LOCALITY_WAIT_PROCESS =
ConfigBuilder("spark.locality.wait.process")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString(LOCALITY_WAIT.defaultValueString)
+
+ private[spark] val LOCALITY_WAIT_NODE =
ConfigBuilder("spark.locality.wait.node")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString(LOCALITY_WAIT.defaultValueString)
+
+ private[spark] val LOCALITY_WAIT_RACK =
ConfigBuilder("spark.locality.wait.rack")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString(LOCALITY_WAIT.defaultValueString)
+
+ private[spark] val REDUCER_MAX_SIZE_IN_FLIGHT =
ConfigBuilder("spark.reducer.maxSizeInFlight")
+ .doc("Maximum size of map outputs to fetch simultaneously from each reduce
task, " +
+ "in MiB unless otherwise specified. Since each output requires us to
create a " +
+ "buffer to receive it, this represents a fixed memory overhead per
reduce task, " +
+ "so keep it small unless you have a large amount of memory.")
+ .bytesConf(ByteUnit.MiB)
+ .createWithDefaultString("48m")
+
+ private[spark] val REDUCER_MAX_REQS_IN_FLIGHT =
ConfigBuilder("spark.reducer.maxReqsInFlight")
+ .doc("This configuration limits the number of remote requests to fetch
blocks at " +
+ "any given point. When the number of hosts in the cluster increase, " +
+ "it might lead to very large number of inbound connections to one or
more nodes, " +
+ "causing the workers to fail under load. By allowing it to limit the
number of " +
+ "fetch requests, this scenario can be mitigated")
+ .intConf
+ .createWithDefault(Int.MaxValue)
+
+ private[spark] val REDUCER_MAX_MB_IN_FLIGHT =
ConfigBuilder("spark.reducer.maxMbInFlight")
+ .doc("Maximum size (in megabytes) of map outputs to fetch simultaneously
from each " +
+ "reduce task. Its for spark 1.4 ")
+ .bytesConf(ByteUnit.MiB)
+ .createWithDefaultString("48")
+
+ private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_MEM =
+ ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(Int.MaxValue - 512)
+
+ private[spark] val BROADCAST_COMPRESS =
ConfigBuilder("spark.broadcast.compress")
+ .doc("Whether to compress broadcast variables before sending them. " +
+ "Generally a good idea. Compression will use spark.io.compression.codec")
+ .booleanConf.createWithDefault(true)
+
+ private[spark] val BROADCAST_BLOCKSIZE =
ConfigBuilder("spark.broadcast.blockSize")
+ .doc("Size of each piece of a block for TorrentBroadcastFactory, in " +
+ "KiB unless otherwise specified. Too large a value decreases " +
+ "parallelism during broadcast (makes it slower); however, " +
+ "if it is too small, BlockManager might take a performance hit")
+ .bytesConf(ByteUnit.KiB)
+ .createWithDefaultString("4m")
+
+ private[spark] val BROADCAST_CHECKSUM =
ConfigBuilder("spark.broadcast.checksum")
+ .doc("Whether to enable checksum for broadcast. If enabled, " +
+ "broadcasts will include a checksum, which can help detect " +
+ "corrupted blocks, at the cost of computing and sending a little " +
+ "more data. It's possible to disable it if the network has other " +
+ "mechanisms to guarantee data won't be corrupted during broadcast")
+ .booleanConf.createWithDefault(true)
+
+ private[spark] val RDD_COMPRESS = ConfigBuilder("spark.rdd.compress")
+ .doc("Whether to compress serialized RDD partitions " +
+ "(e.g. for StorageLevel.MEMORY_ONLY_SER in Scala " +
+ "or StorageLevel.MEMORY_ONLY in Python). Can save substantial " +
+ "space at the cost of some extra CPU time. " +
+ "Compression will use spark.io.compression.codec")
+ .booleanConf.createWithDefault(false)
+
+ private[spark] val RDD_PARALLEL_LISTING_THRESHOLD =
+ ConfigBuilder("spark.rdd.parallelListingThreshold")
+ .intConf
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]