Adar Dembo has posted comments on this change. ( http://gerrit.cloudera.org:8080/12484 )
Change subject: KUDU-2672: [spark] Optionally repartition to match Kudu partitions ...................................................................... Patch Set 1: (6 comments) http://gerrit.cloudera.org:8080/#/c/12484/1//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/12484/1//COMMIT_MSG@12 PS1, Line 12: : Repartitioning ensures that one task/client is only : writing to a single tablet. This improves throughput : by improving batching especially for tables with a large : number of partitions. Have you had a chance to try this on a real cluster? What sort of improvement should one expect? http://gerrit.cloudera.org:8080/#/c/12484/1/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala File java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala: http://gerrit.cloudera.org:8080/#/c/12484/1/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala@385 PS1, Line 385: tables table's http://gerrit.cloudera.org:8080/#/c/12484/1/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala@386 PS1, Line 386: val keyedRdd = rdd.mapPartitions { rows => Can we hoist the openTable and/or KuduPartitionerBuilder.build() out of the loop? Each one involves sending RPCs, and I don't see why the results can't be shared across iterations of the loop. Indeed, in the case of the partitioner, we _want_ to share state because we want the source of truth for partitioning to be as close to an "atomic snapshot" as possible. Not that the partitioner guarantees that, but using multiple partitioners is likely to increase the window wherein there may be disagreement about how the table is partitioned. http://gerrit.cloudera.org:8080/#/c/12484/1/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala@389 PS1, Line 389: val partitioner = new KuduPartitioner.KuduPartitionerBuilder(table).build() Can we satisfy getPartitionCount() above using the table and partitioner we build here? http://gerrit.cloudera.org:8080/#/c/12484/1/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala@407 PS1, Line 407: val shuffledRDD = if (writeOptions.repartitionSort) { I think Impala will automatically partial sort if the number of rows being inserted is large enough. Maybe we should apply a similar heuristic here too, overrideable by user input? http://gerrit.cloudera.org:8080/#/c/12484/1/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala File java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala: http://gerrit.cloudera.org:8080/#/c/12484/1/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala@454 PS1, Line 454: new KuduWriteOptions(repartition = true, repartitionSort = true)) Should we also test with repartitionSort=false? -- To view, visit http://gerrit.cloudera.org:8080/12484 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I8763615997bccc08901235841149fc3bacb321e7 Gerrit-Change-Number: 12484 Gerrit-PatchSet: 1 Gerrit-Owner: Grant Henke <[email protected]> Gerrit-Reviewer: Adar Dembo <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Reviewer: Mike Percy <[email protected]> Gerrit-Reviewer: Will Berkeley <[email protected]> Gerrit-Comment-Date: Thu, 14 Feb 2019 22:51:22 +0000 Gerrit-HasComments: Yes
