Github user ijuma commented on a diff in the pull request:
https://github.com/apache/spark/pull/21488#discussion_r192961847
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
---
@@ -203,7 +215,13 @@ class KafkaTestUtils(withBrokerProps: Map[String,
Object] = Map.empty) extends L
/** Add new partitions to a Kafka topic */
def addPartitions(topic: String, partitions: Int): Unit = {
- AdminUtils.addPartitions(zkUtils, topic, partitions)
+ val existingAssignment = zkClient.getReplicaAssignmentForTopics(
+ collection.immutable.Set(topic)).map {
+ case (topicPartition, replicas) => topicPartition.partition ->
replicas
+ }
--- End diff --
We can get replica assignment information via AdminClient too. I think we
should try to avoid the internal `ZkUtils` and `KafkaZkClient` as much as we
can.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]