Github user koeninger commented on a diff in the pull request:
https://github.com/apache/spark/pull/3798#discussion_r23746235
--- Diff:
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+trait OffsetRange {
--- End diff --
Ok, so there are a couple of different concerns here.
First, the easy one. Case classes. KafkaRDDPartition isn't a case class.
The only case class in the entire PR is LeaderOffset, which isn't public and
probably doesn't need to be a case class anyway. No worries.
Second, the question of whether OffsetRange needs to have a host and port.
The issue here is that in order to get a meaningful endpoint for the range, you
have to make a remote call to find the kafka leader anyway. So if you give
people a constructor that allows them to specify an ending offset, but don't
allow them to specify a preferred leader, you are forcing an interface that
requires 2x the number of remote calls.
Third, clients need to not only define offset ranges, they need to obtain
offsets from the stream (for those that need them for exactly-once, or
zookeeper interop, or whatever). The idea of the interface is to provide
limited access to the offsets without exposing any concrete implementation
classes, so that you can change them later if need be. That allows clients to
do
stream.foreachRDD { rdd =>
rdd.foreachPartitionWithIndex { (i, iter) =>
val offsetRange = rdd.partitions(i).asInstanceOf[OffsetRange]
or
stream.foreachRDD { rdd =>
val allOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
without knowing anything at all about KafkaRDD or its partition class (or
any concrete classes for that matter). I'm pretty sure the same cannot be done
with your suggestion, because there's nothing public to cast the RDD or the
partition to. I updated the usage examples to show how this works.
https://github.com/koeninger/kafka-exactly-once/commit/d1641718807fc97f46e729e28acaba96ebc94c33
The asInstanceOf is unfortunate, but because of the way DStream is
implemented, we cannot say anything at compile time about what the RDDs
returned in a DStream are capable of. By this I mean we can make
KafkaUtils.createRDD return a "RDD[R] with HasOffsetRanges" instead of
KafkaRDD, but we cannot make a corresponding change to
KafkaUtils.createNewStream, because foreachRDD just returns RDD, not a
parameterized type.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]