[Spark] Simplifiy creating range partitioned tables

In order to create a range partitioned table a user
needs to create PartialRows representing the
lower and upper bounds. In the Spark API
the Kudu Schema isn’t made available which
could make matching the expected schema
tricky.

This small change makes retrieving the generated
Kudu schema possible and also allows for table
creation with a Kudu schema directly.

Change-Id: Ied45b90b38d9e123868c4b0430dc52c888f033b3
Reviewed-on: http://gerrit.cloudera.org:8080/9379
Tested-by: Kudu Jenkins
Reviewed-by: Hao Hao <hao....@cloudera.com>
Reviewed-by: Dan Burkert <danburk...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e94556ef
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e94556ef
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e94556ef

Branch: refs/heads/master
Commit: e94556ef767b11f7508d7dae3a92e124769f800e
Parents: 52ca55a
Author: Grant Henke <granthe...@gmail.com>
Authored: Mon Feb 12 11:06:06 2018 -0600
Committer: Grant Henke <granthe...@gmail.com>
Committed: Fri Feb 23 17:48:16 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/KuduContext.scala    | 30 +++++++++++++++--
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 35 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e94556ef/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index ece4df0..165719a 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -154,11 +154,38 @@ class KuduContext(val kuduMaster: String,
     * @param schema struct schema of table
     * @param keys primary keys of the table
     * @param options replication and partitioning options for the table
+    * @return the KuduTable that was created
     */
   def createTable(tableName: String,
                   schema: StructType,
                   keys: Seq[String],
                   options: CreateTableOptions): KuduTable = {
+    val kuduSchema = createSchema(schema, keys)
+    createTable(tableName, kuduSchema, options)
+  }
+
+  /**
+    * Creates a kudu table for the given schema. Partitioning can be specified 
through options.
+    *
+    * @param tableName table to create
+    * @param schema schema of table
+    * @param options replication and partitioning options for the table
+    * @return the KuduTable that was created
+    */
+  def createTable(tableName: String,
+                  schema: Schema,
+                  options: CreateTableOptions): KuduTable = {
+    syncClient.createTable(tableName, schema, options)
+  }
+
+  /**
+    * Creates a kudu schema for the given struct schema.
+    *
+    * @param schema struct schema of table
+    * @param keys primary keys of the table
+    * @return the Kudu schema
+    */
+  def createSchema(schema: StructType, keys: Seq[String]): Schema = {
     val kuduCols = new util.ArrayList[ColumnSchema]()
     // add the key columns first, in the order specified
     for (key <- keys) {
@@ -171,8 +198,7 @@ class KuduContext(val kuduMaster: String,
       val col = createColumn(field, isKey = false)
       kuduCols.add(col)
     }
-
-    syncClient.createTable(tableName, new Schema(kuduCols), options)
+    new Schema(kuduCols)
   }
 
   private def createColumn(field: StructField, isKey: Boolean): ColumnSchema = 
{

http://git-wip-us.apache.org/repos/asf/kudu/blob/e94556ef/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 59997d2..3894dd0 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -101,6 +101,41 @@ class DefaultSourceTest extends FunSuite with TestContext 
with BeforeAndAfter wi
     assertFalse(kuduContext.tableExists(tableName))
   }
 
+  test("table creation with partitioning") {
+    val tableName = "testcreatepartitionedtable"
+    if (kuduContext.tableExists(tableName)) {
+      kuduContext.deleteTable(tableName)
+    }
+    val df = sqlContext.read.options(kuduOptions).kudu
+
+    val kuduSchema = kuduContext.createSchema(df.schema, Seq("key"))
+    val lower = kuduSchema.newPartialRow()
+    lower.addInt("key", 0)
+    val upper = kuduSchema.newPartialRow()
+    upper.addInt("key", Integer.MAX_VALUE)
+
+    kuduContext.createTable(tableName, kuduSchema,
+      new CreateTableOptions()
+        .addHashPartitions(List("key").asJava, 2)
+        .setRangePartitionColumns(List("key").asJava)
+        .addRangePartition(lower, upper)
+        .setNumReplicas(1))
+    kuduContext.insertRows(df, tableName)
+
+    // now use new options to refer to the new table name
+    val newOptions: Map[String, String] = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> miniCluster.getMasterAddresses)
+    val checkDf = sqlContext.read.options(newOptions).kudu
+
+    assert(checkDf.schema === df.schema)
+    assertTrue(kuduContext.tableExists(tableName))
+    assert(checkDf.count == 10)
+
+    kuduContext.deleteTable(tableName)
+    assertFalse(kuduContext.tableExists(tableName))
+  }
+
   test("insertion") {
     val df = sqlContext.read.options(kuduOptions).kudu
     val changedDF = df.limit(1).withColumn("key", 
df("key").plus(100)).withColumn("c2_s", lit("abc"))

Reply via email to