[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-07 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899634#comment-15899634
 ] 

Vahid Hashemian commented on KAFKA-4845:


Great, then I'll mark this one as a duplicate of KAFKA-4547. Thanks for 
confirming.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-07 Thread Dan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899299#comment-15899299
 ] 

Dan commented on KAFKA-4845:


I checked 0.10.2.0 again and it was indeed fixed. Thanks a lot!

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898258#comment-15898258
 ] 

Vahid Hashemian commented on KAFKA-4845:


[~DanC], The issue you raised sounds very similar to the one reported in 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed 
in 0.10.2.0.
Also, the seconds code snippet and the comments you added there apply to before 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was 
[fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0].
 The function {{updateFetchPositions(Set partitions)}} 
currently looks like this:
{code}
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
coordinator.refreshCommittedOffsetsIfNeeded();
fetcher.updateFetchPositions(partitions);
}
{code}

So it sounds like you are not running the latest KafkaConsumer code. The issue 
you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 
0.10.2.0.

Please advise if I'm misunderstood the defect or am missing something. Thanks.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897780#comment-15897780
 ] 

Vahid Hashemian commented on KAFKA-4845:


[~ijuma], sure, I'll take a look.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897022#comment-15897022
 ] 

Ismael Juma commented on KAFKA-4845:


[~vahid], you've worked on this code, would you be interested in taking a look?

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)