[Spark] Simplifiy creating range partitioned tables In order to create a range partitioned table a user needs to create PartialRows representing the lower and upper bounds. In the Spark API the Kudu Schema isnât made available which could make matching the expected schema tricky.
This small change makes retrieving the generated Kudu schema possible and also allows for table creation with a Kudu schema directly. Change-Id: Ied45b90b38d9e123868c4b0430dc52c888f033b3 Reviewed-on: http://gerrit.cloudera.org:8080/9379 Tested-by: Kudu Jenkins Reviewed-by: Hao Hao <hao....@cloudera.com> Reviewed-by: Dan Burkert <danburk...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e94556ef Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e94556ef Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e94556ef Branch: refs/heads/master Commit: e94556ef767b11f7508d7dae3a92e124769f800e Parents: 52ca55a Author: Grant Henke <granthe...@gmail.com> Authored: Mon Feb 12 11:06:06 2018 -0600 Committer: Grant Henke <granthe...@gmail.com> Committed: Fri Feb 23 17:48:16 2018 +0000 ---------------------------------------------------------------------- .../apache/kudu/spark/kudu/KuduContext.scala | 30 +++++++++++++++-- .../kudu/spark/kudu/DefaultSourceTest.scala | 35 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/e94556ef/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index ece4df0..165719a 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -154,11 +154,38 @@ class KuduContext(val kuduMaster: String, * @param schema struct schema of table * @param keys primary keys of the table * @param options replication and partitioning options for the table + * @return the KuduTable that was created */ def createTable(tableName: String, schema: StructType, keys: Seq[String], options: CreateTableOptions): KuduTable = { + val kuduSchema = createSchema(schema, keys) + createTable(tableName, kuduSchema, options) + } + + /** + * Creates a kudu table for the given schema. Partitioning can be specified through options. + * + * @param tableName table to create + * @param schema schema of table + * @param options replication and partitioning options for the table + * @return the KuduTable that was created + */ + def createTable(tableName: String, + schema: Schema, + options: CreateTableOptions): KuduTable = { + syncClient.createTable(tableName, schema, options) + } + + /** + * Creates a kudu schema for the given struct schema. + * + * @param schema struct schema of table + * @param keys primary keys of the table + * @return the Kudu schema + */ + def createSchema(schema: StructType, keys: Seq[String]): Schema = { val kuduCols = new util.ArrayList[ColumnSchema]() // add the key columns first, in the order specified for (key <- keys) { @@ -171,8 +198,7 @@ class KuduContext(val kuduMaster: String, val col = createColumn(field, isKey = false) kuduCols.add(col) } - - syncClient.createTable(tableName, new Schema(kuduCols), options) + new Schema(kuduCols) } private def createColumn(field: StructField, isKey: Boolean): ColumnSchema = { http://git-wip-us.apache.org/repos/asf/kudu/blob/e94556ef/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 59997d2..3894dd0 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -101,6 +101,41 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter wi assertFalse(kuduContext.tableExists(tableName)) } + test("table creation with partitioning") { + val tableName = "testcreatepartitionedtable" + if (kuduContext.tableExists(tableName)) { + kuduContext.deleteTable(tableName) + } + val df = sqlContext.read.options(kuduOptions).kudu + + val kuduSchema = kuduContext.createSchema(df.schema, Seq("key")) + val lower = kuduSchema.newPartialRow() + lower.addInt("key", 0) + val upper = kuduSchema.newPartialRow() + upper.addInt("key", Integer.MAX_VALUE) + + kuduContext.createTable(tableName, kuduSchema, + new CreateTableOptions() + .addHashPartitions(List("key").asJava, 2) + .setRangePartitionColumns(List("key").asJava) + .addRangePartition(lower, upper) + .setNumReplicas(1)) + kuduContext.insertRows(df, tableName) + + // now use new options to refer to the new table name + val newOptions: Map[String, String] = Map( + "kudu.table" -> tableName, + "kudu.master" -> miniCluster.getMasterAddresses) + val checkDf = sqlContext.read.options(newOptions).kudu + + assert(checkDf.schema === df.schema) + assertTrue(kuduContext.tableExists(tableName)) + assert(checkDf.count == 10) + + kuduContext.deleteTable(tableName) + assertFalse(kuduContext.tableExists(tableName)) + } + test("insertion") { val df = sqlContext.read.options(kuduOptions).kudu val changedDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("abc"))