Ah yes so along the lines of the second approach: http://stackoverflow.com/questions/27251055/stopping-spark-streaming-after-reading-first-batch-of-data .
On Wed, Apr 29, 2015 at 10:26 AM, Cody Koeninger <c...@koeninger.org> wrote: > The idea of peek vs poll doesn't apply to kafka, because kafka is not a > queue. > > There are two ways of doing what you want, either using KafkaRDD or a > direct stream > > The Kafka rdd approach would require you to find the beginning and ending > offsets for each partition. For an example of this, see > getEarliestLeaderOffsets and getLatestLeaderOffsets in > https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala > > For usage examples see the tests. That code isn't public so you'd need to > either duplicate it, or build a version of spark with all of the > 'private[blah]' restrictions removed. > > The direct stream approach would require setting the kafka parameter > auto.offset.reset to smallest, in order to start at the beginning. If you > haven't set any rate limiting parameters, then the first batch will contain > all the messages. You can then kill the job after the first batch. It's > possible you may be able to kill the job from a > StreamingListener.onBatchCompleted, but I've never tried and don't know > what the consequences may be. > > On Wed, Apr 29, 2015 at 8:52 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> Part of the issues is, when you read messages in a topic, the messages >> are peeked, not polled, so there'll be no "when the queue is empty", as I >> understand it. >> >> So it would seem I'd want to do KafkaUtils.createRDD, which takes an >> array of OffsetRange's. Each OffsetRange is characterized by topic, >> partition, fromOffset, and untilOffset. In my case, I want to read all >> data, i.e. from all partitions and I don't know how many partitions there >> may be, nor do I know the 'untilOffset' values. >> >> In essence, I just want something like createRDD(new >> OffsetRangeAllData()); >> >> In addition, I'd ideally want the option of not peeking but polling the >> messages off the topics involved. But I'm not sure whether Kafka API's >> support it and then whether Spark does/will support that as well... >> >> >> >> On Wed, Apr 29, 2015 at 1:52 AM, ayan guha <guha.a...@gmail.com> wrote: >> >>> I guess what you mean is not streaming. If you create a stream context >>> at time t, you will receive data coming through starting time t++, not >>> before time t. >>> >>> Looks like you want a queue. Let Kafka write to a queue, consume msgs >>> from the queue and stop when queue is empty. >>> On 29 Apr 2015 14:35, "dgoldenberg" <dgoldenberg...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I'm wondering about the use-case where you're not doing continuous, >>>> incremental streaming of data out of Kafka but rather want to publish >>>> data >>>> once with your Producer(s) and consume it once, in your Consumer, then >>>> terminate the consumer Spark job. >>>> >>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>> Durations.milliseconds(...)); >>>> >>>> The batchDuration parameter is "The time interval at which streaming >>>> data >>>> will be divided into batches". Can this be worked somehow to cause Spark >>>> Streaming to just get all the available data, then let all the RDD's >>>> within >>>> the Kafka discretized stream get processed, and then just be done and >>>> terminate, rather than wait another period and try and process any more >>>> data >>>> from Kafka? >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >> >