pan3793 commented on code in PR #2731:
URL: https://github.com/apache/incubator-kyuubi/pull/2731#discussion_r880246710


##########
extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHTable.scala:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.tpch
+
+import java.util
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions.`list asScalaBuffer`
+
+import io.trino.tpch.{TpchColumnType, TpchEntity, TpchTable}
+import io.trino.tpch.TpchColumnType.Base._
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table => 
SparkTable, TableCapability}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TPCHTable(tbl: String, scale: Int, options: CaseInsensitiveStringMap)
+  extends SparkTable with SupportsRead {
+
+  // When true, use CHAR VARCHAR; otherwise use STRING
+  val useAnsiStringType: Boolean = options.getBoolean("useAnsiStringType", 
false)
+
+  val tpchTable: TpchTable[_] = TpchTable.getTable(tbl)
+
+  override def name: String = s"`sf$scale`.`$tbl`"

Review Comment:
   I removed back quote in tpcds module because it's not display friendly.



##########
extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHBatchScan.scala:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.tpch
+
+import java.time.LocalDate
+import java.time.format.DateTimeFormatter
+
+import scala.collection.mutable.ArrayBuffer
+
+import io.trino.tpch._
+import io.trino.tpch.GenerateUtils.formatDate
+import io.trino.tpch.TpchColumnType.Base._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class TPCHTableChuck(table: String, scale: Int, parallelism: Int, index: 
Int)
+  extends InputPartition
+
+class TPCHBatchScan(
+    @transient table: TpchTable[_],
+    scale: Int,
+    schema: StructType) extends ScanBuilder
+  with Scan with Batch with Serializable {
+
+  private val _numRows: Long = TPCHStatisticsUtils.numRows(table, scale)
+
+  private val rowCountPerTask: Int = 1000000
+
+  private val parallelism: Int =
+    if (table.equals(TpchTable.NATION) || table.equals(TpchTable.REGION)) 1
+    else math.max(
+      SparkSession.active.sparkContext.defaultParallelism,
+      (_numRows / rowCountPerTask.toDouble).ceil.toInt)
+
+  override def build: Scan = this
+
+  override def toBatch: Batch = this
+
+  override def description: String =
+    s"Scan TPC-H sf$scale.${table.getTableName}, count: ${_numRows}, 
parallelism: $parallelism"
+
+  override def readSchema: StructType = schema
+
+  override def planInputPartitions: Array[InputPartition] =
+    (1 to parallelism).map { i =>
+      TPCHTableChuck(table.getTableName, scale, parallelism, i)
+    }.toArray
+
+  def createReaderFactory: PartitionReaderFactory = (partition: 
InputPartition) => {
+    val chuck = partition.asInstanceOf[TPCHTableChuck]
+    new TPCHPartitionReader(chuck.table, chuck.scale, chuck.parallelism, 
chuck.index, schema)
+  }
+
+}
+
+class TPCHPartitionReader(
+    table: String,
+    scale: Int,
+    parallelism: Int,
+    index: Int,
+    schema: StructType) extends PartitionReader[InternalRow] {
+
+  private val tpchTable = TpchTable.getTable(table)
+
+  private val columns = tpchTable.getColumns
+    .asInstanceOf[java.util.List[TpchColumn[TpchEntity]]]
+
+  private lazy val dateFmt: DateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd")
+
+  private val iterator = tpchTable.createGenerator(scale, index, 
parallelism).iterator
+
+  private var currentRow: InternalRow = _
+
+  override def next(): Boolean = {
+    val hasNext = iterator.hasNext
+    if (hasNext) currentRow = {
+      val row = iterator.next().asInstanceOf[TpchEntity]
+      val rowValue = new ArrayBuffer[String]()
+      columns.stream().forEach(column => {
+        val baseType = column.getType.getBase
+        var value: String = ""
+        baseType match {
+          case IDENTIFIER => value += column.getIdentifier(row)
+          case INTEGER => value += column.getInteger(row)
+          case DATE => value += column.getDate(row)
+          case DOUBLE => value += column.getDouble(row)
+          case VARCHAR => value += column.getString(row)
+        }
+        rowValue += value
+      })
+      val rowAny = new ArrayBuffer[Any]()
+      rowValue.zipWithIndex.map { case (value, i) =>
+        (value, schema(i).dataType) match {
+          case (null, _) => null
+          case ("", _) => null
+          case (value, IntegerType) => rowAny += value.toInt
+          case (value, LongType) => rowAny += value.toLong
+          case (value, DoubleType) => rowAny += value.toDouble
+          case (value, DateType) => rowAny += 
LocalDate.parse(formatDate(value.toInt), dateFmt)
+              .toEpochDay.toInt
+          case (value, StringType) => rowAny += UTF8String.fromString(value)
+          case (value, CharType(_)) => rowAny += UTF8String.fromString(value)
+          case (value, VarcharType(_)) => rowAny += 
UTF8String.fromString(value)
+          case (value, DecimalType()) => rowAny += Decimal(value)
+          case (value, dt) => throw new IllegalArgumentException(s"value: 
$value, type: $dt")
+        }
+      }
+      InternalRow.fromSeq(rowAny)

Review Comment:
   It's a perf sensitive code block, we iterate the row three times, I think we 
can learn something from Spark build-in JSON/CSV format to Implement it in one 
pass.
   Since it's a perf improvement, if you like, you can choose to Implement in a 
followup PR.



##########
extensions/spark/kyuubi-spark-connector-tpch/pom.xml:
##########
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.kyuubi</groupId>
+        <artifactId>kyuubi-parent</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kyuubi-spark-connector-tpch_2.12</artifactId>
+    <name>Kyuubi Spark TPC-H Connector</name>
+    <packaging>jar</packaging>
+    <url>https://kyuubi.apache.org/</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.trino.tpch</groupId>
+            <artifactId>tpch</artifactId>
+            <version>1.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalatestplus</groupId>
+            <artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!--
+          Spark requires `commons-collections` and `commons-io` but got them 
from transitive
+          dependencies of `hadoop-client`. As we are using Hadoop Shaded 
Client, we need add
+          them explicitly. See more details at SPARK-33212.
+          -->
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.xml.bind</groupId>
+            <artifactId>jakarta.xml.bind-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+        
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <shadedArtifactAttached>false</shadedArtifactAttached>
+                    <artifactSet>
+                        <includes>
+                            <include>io.trino.tpch:tpch</include>

Review Comment:
   We should relocate tpch as well



##########
extensions/spark/kyuubi-spark-connector-tpch/src/test/resources/log4j2-test.properties:
##########
@@ -0,0 +1,40 @@
+#

Review Comment:
   Use log4j.properties rather than log4j2, @wForget is working on it in TPCDS 
module



##########
extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalog.scala:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.tpch
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import io.trino.tpch.TpchTable
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException}
+import org.apache.spark.sql.connector.catalog.{Identifier, Table => 
SparkTable, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TPCHCatalog extends TableCatalog {
+
+  val tables: Array[String] = TpchTable.getTables.asScala
+    .map(_.getTableName).toArray
+
+  val scales: Array[Int] = TPCHStatisticsUtils.SCALES
+
+  val databases: Array[String] = scales.map("sf" + _)
+
+  var options: CaseInsensitiveStringMap = _
+
+  override def name: String = "tpch"

Review Comment:
   it should be configurable by initialize



##########
extensions/spark/kyuubi-spark-connector-tpch/pom.xml:
##########
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.kyuubi</groupId>
+        <artifactId>kyuubi-parent</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kyuubi-spark-connector-tpch_2.12</artifactId>
+    <name>Kyuubi Spark TPC-H Connector</name>
+    <packaging>jar</packaging>
+    <url>https://kyuubi.apache.org/</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.trino.tpch</groupId>
+            <artifactId>tpch</artifactId>
+            <version>1.1</version>

Review Comment:
   manage it in parent module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to