beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257916349
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,20 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField,
StructType, VarcharType}
-
-// The classes in this file are basically moved from
https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
+class TPCDSTables(sparkSession: SparkSession, config: GenTPCDataConfig)
Review Comment:
```suggestion
class TPCDSTables(val spark: SparkSession, config: GenTPCDataConfig)
```
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,20 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField,
StructType, VarcharType}
-
-// The classes in this file are basically moved from
https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
+class TPCDSTables(sparkSession: SparkSession, config: GenTPCDataConfig)
+ extends TableGenerator with TPCDSSchema with Logging with Serializable {
- def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
- val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
- def next(): Stream[T] = q.take match {
- case Left(0) => Stream.empty
- case Left(code) =>
- if (nonzeroException) scala.sys.error("Nonzero exit code: " + code)
else Stream.empty
- case Right(s) => Stream.cons(s, next())
- }
-
- new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code),
() => next())
- }
- }
-
- // See scala.sys.process.ProcessImpl.Spawn
- private object Spawn {
- def apply(f: => Unit): Thread = apply(f, daemon = false)
- def apply(f: => Unit, daemon: Boolean): Thread = {
- val thread = new Thread() { override def run() = { f } }
- thread.setDaemon(daemon)
- thread.start()
- thread
- }
- }
-
- def apply(command: Seq[String]): Stream[String] = {
- val streamed = BlockingStreamed[String](true)
- val process = command.run(BasicIO(false, streamed.process, None))
- Spawn(streamed.done(process.exitValue()))
- streamed.stream()
- }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
- private val dsdgen = s"$dsdgenDir/dsdgen"
-
- def generate(
- sparkContext: SparkContext,
- tableName: String,
- partitions: Int,
- scaleFactor: Int): RDD[String] = {
- val generatedData = {
- sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
- val localToolsDir = if (new java.io.File(dsdgen).exists) {
- dsdgenDir
- } else if (new java.io.File(s"/$dsdgen").exists) {
- s"/$dsdgenDir"
- } else {
- throw new IllegalStateException(
- s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
- }
-
- // NOTE: RNGSEED is the RNG seed used by the data generator. Right
now, it is fixed to 100.
- val parallel = if (partitions > 1) s"-parallel $partitions -child $i"
else ""
- val commands = Seq(
- "bash", "-c",
- s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale
$scaleFactor " +
- s"-RNGSEED 100 $parallel")
- BlockingLineStream(commands)
- }
- }
-
- generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
- generatedData
- }
-}
-
-class TPCDSTables(spark: SparkSession, dsdgenDir: String, scaleFactor: Int)
- extends TPCDSSchema with Logging with Serializable {
-
- private val dataGenerator = new Dsdgen(dsdgenDir)
-
- private def tables: Seq[Table] = tableColumns.map { case (tableName,
schemaString) =>
+ override protected val dataGenerator: DataGenerator = new
Dsdgen(config.dsdgenDir)
+ override protected val spark: SparkSession = sparkSession
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]