[
https://issues.apache.org/jira/browse/STORM-3145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim closed STORM-3145.
---
Resolution: Won't Fix
Please send any question to the user mailing list, u...@storm.apache.org
http://storm.apache.org/getting-help.html for more details to access mailing
list.
> UNCOMMITTED_EARLIEST and UNCOMMITTED_LATEST doesn't work as expected
>
>
> Key: STORM-3145
> URL: https://issues.apache.org/jira/browse/STORM-3145
> Project: Apache Storm
> Issue Type: Question
> Components: storm-kafka-client, trident
>Reporter: Prakash N M
>Priority: Blocker
> Labels: storm-kafka-client, trident
>
> I am using storm-kafka-client 1.2.2.
> When I ran my topology I got NPE as mentioned in
> https://issues.apache.org/jira/browse/STORM-3046.
> So I modified the KafkaTridentSpoutEmitter.java#seek with patch mentioned in
> the Jira STORM-3046. Below is the modified code.
> {noformat}
> private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata
> lastBatchMeta, long transactionId) {
> if (isFirstPoll(tp, transactionId)) {
> if (firstPollOffsetStrategy == EARLIEST) {
> LOG.debug("First poll for topic partition [{}], seeking to partition
> beginning", tp);
> kafkaConsumer.seekToBeginning(Collections.singleton(tp));
> } else if (firstPollOffsetStrategy == LATEST) {
> LOG.debug("First poll for topic partition [{}], seeking to partition end",
> tp);
> kafkaConsumer.seekToEnd(Collections.singleton(tp));
> } else if (lastBatchMeta != null) {
> LOG.debug("First poll for topic partition [{}], using last batch metadata",
> tp);
> kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next
> offset after last offset from previous batch
> } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
> LOG.debug("First poll for topic partition [{}] with no last batch metadata,
> seeking to partition beginning", tp);
> kafkaConsumer.seekToBeginning(Collections.singleton(tp));
> } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
> LOG.debug("First poll for topic partition [{}] with no last batch metadata,
> seeking to partition end", tp);
> kafkaConsumer.seekToEnd(Collections.singleton(tp));
> }
> firstPollTransaction.put(tp, transactionId);
> } else if (lastBatchMeta != null) {
> kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek
> next offset after last offset from previous batch
> LOG.debug("First poll for topic partition [{}], using last batch
> metadata", tp);
> } else {
> long initialFetchOffset = firstPollTransaction.get(tp);
> OffsetAndMetadata lastCommittedOffset = kafkaConsumer.committed(tp);
> kafkaConsumer.seek(tp, lastCommittedOffset.offset());
> LOG.debug("First poll for topic partition [{}], no last batch
> metadata present."
> + " Using stored initial fetch offset [{}]", tp,
> initialFetchOffset);
> }
> final long fetchOffset = kafkaConsumer.position(tp);
> LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp);
> return fetchOffset;
> }{noformat}
> Now after code change - when the offset strategy is UNCOMMITTED_EARLIEST for
> the very first time when it is the first poll for the spout, lastBatchMeta is
> null and the kafka consumer seeks to the beginning offset. Should I be
> checking for last commit and starting from there? Likewise for the next fetch
> (when lastBatchMeta is not null), kafka consumer seeks to
> lastBatchMeta.getLastOffset() + 1. Should I be doing same here, checking for
> last commit and starting from there?
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)