I successfully processed my data by resetting manually my topic offsets on
ZK.

If it may help someone, here's my steps :

Make sure you stop all your consumers before doing that, otherwise they
overwrite the new offsets you wrote

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId}
{newOffset}


Source : https://metabroadcast.com/blog/resetting-kafka-offsets

2016-02-22 11:55 GMT+01:00 Paul Leclercq <paul.lecle...@tabmo.io>:

> Thanks for your quick answer.
>
> If I set "auto.offset.reset" to "smallest" as for KafkaParams like this
>
> val kafkaParams = Map[String, String](
>  "metadata.broker.list" -> brokers,
>  "group.id" -> groupId,
>  "auto.offset.reset" -> "smallest"
> )
>
> And then use :
>
> val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics, 
> StorageLevel.MEMORY_AND_DISK_SER_2)
>
> My fear is that, every time I deploy a new version, the all consumer's topics 
> are going to be read from the beginning, but as said in Kafka's documentation
>
> auto.offset.reset default : largest
>
> What to do when there* is no initial offset in ZooKeeper* or if an offset is 
> out of range:
> * smallest : automatically reset the offset to the smallest offset
>
> So I will go for this option the next time I need to process a new topic 👍
>
> To fix my problem, as the topic as already been processed and registred in 
> ZK, I will use a directStream from smallest and remove all DB inserts of this 
> topic, and restart a "normal" stream when the lag will be caught up.
>
>
> 2016-02-22 10:57 GMT+01:00 Saisai Shao <sai.sai.s...@gmail.com>:
>
>> You could set this configuration "auto.offset.reset" through parameter
>> "kafkaParams" which is provided in some other overloaded APIs of
>> createStream.
>>
>> By default Kafka will pick data from latest offset unless you explicitly
>> set it, this is the behavior Kafka, not Spark.
>>
>> Thanks
>> Saisai
>>
>> On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq <paul.lecle...@tabmo.io>
>> wrote:
>>
>>> Hi,
>>>
>>> Do you know why, with the receiver approach
>>> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach>
>>> and a *consumer group*, a new topic is not read from the beginning but
>>> from the lastest ?
>>>
>>> Code example :
>>>
>>>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>>>      [ZK quorum], [consumer group id], [per-topic number of Kafka 
>>> partitions to consume])
>>>
>>>
>>> Is there a way to tell *only for new topic *to read from the beginning ?
>>>
>>> From Confluence FAQ
>>>
>>>> Alternatively, you can configure the consumer by setting
>>>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
>>>> for the old consumer.
>>>
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>>>
>>> Thanks
>>> --
>>>
>>> Paul Leclercq
>>>
>>
>>
>
>
> --
>
> Paul Leclercq | Data engineer
>
>
>  paul.lecle...@tabmo.io  |  http://www.tabmo.fr/
>



-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/

Reply via email to