[GitHub] kafka pull request #2291: MINOR: Fix typo

2016-12-23 Thread jeffwidman
GitHub user jeffwidman opened a pull request:

https://github.com/apache/kafka/pull/2291

MINOR: Fix typo



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeffwidman/kafka patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2291.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2291


commit f631b8fe003abbf348469dbcff0d307de9e3f507
Author: Jeff Widman 
Date:   2016-12-23T23:41:10Z

MINOR: Fix typo




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4569) Transient failure in org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable

2016-12-23 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-4569:


 Summary: Transient failure in 
org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable
 Key: KAFKA-4569
 URL: https://issues.apache.org/jira/browse/KAFKA-4569
 Project: Kafka
  Issue Type: Sub-task
  Components: unit tests
Reporter: Guozhang Wang


One example is:

https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/370/testReport/junit/org.apache.kafka.clients.consumer/KafkaConsumerTest/testWakeupWithFetchDataAvailable/

{code}
Stacktrace

java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.fail(Assert.java:95)
at 
org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable(KafkaConsumerTest.java:679)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
at 
org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4568) Simplify multiple mechanism support in SaslTestHarness

2016-12-23 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-4568:
-

 Summary: Simplify multiple mechanism support in SaslTestHarness
 Key: KAFKA-4568
 URL: https://issues.apache.org/jira/browse/KAFKA-4568
 Project: Kafka
  Issue Type: Improvement
  Components: unit tests
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
Priority: Minor


The dynamic JAAS configuration property in KAFKA-4259 can be used for 
implementing multiple client mechanisms  in a JVM without the hacks in 
SaslTestHarness.scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4547) Consumer.position returns incorrect results for Kafka 0.10.1.0 client

2016-12-23 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773490#comment-15773490
 ] 

Vahid Hashemian commented on KAFKA-4547:


[~pnakhe] Thanks for sharing the source you used when discovering the issue. I 
was able to reproduce the issue on the 0.10.1.0 release and on trunk on both 
Windows and Ubuntu. I'll investigate the issue and share my findings here.

> Consumer.position returns incorrect results for Kafka 0.10.1.0 client
> -
>
> Key: KAFKA-4547
> URL: https://issues.apache.org/jira/browse/KAFKA-4547
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Windows Kafka 0.10.1.0
>Reporter: Pranav Nakhe
>Assignee: Vahid Hashemian
>  Labels: clients
> Fix For: 0.10.2.0
>
> Attachments: issuerep.zip
>
>
> Consider the following code -
>   KafkaConsumer consumer = new 
> KafkaConsumer(props);
>   List listOfPartitions = new ArrayList();
>   for (int i = 0; i < 
> consumer.partitionsFor("IssueTopic").size(); i++) {
>   listOfPartitions.add(new TopicPartition("IssueTopic", 
> i));
>   }
>   consumer.assign(listOfPartitions);  
>   consumer.pause(listOfPartitions);
>   consumer.seekToEnd(listOfPartitions);
> //consumer.resume(listOfPartitions); -- commented out
>   for(int i = 0; i < listOfPartitions.size(); i++) {
>   
> System.out.println(consumer.position(listOfPartitions.get(i)));
>   }
>   
> I have created a topic IssueTopic with 3 partitions with a single replica on 
> my single node kafka installation (0.10.1.0)
> The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 
> 0.10.0.1
> A) Initially when there are no messages on IssueTopic running the above 
> program returns
> 0.10.1.0   
> 0  
> 0  
> 0   
> 0.10.0.1
> 0
> 0
> 0
> B) Next I send 6 messages and see that the messages have been evenly 
> distributed across the three partitions. Running the above program now 
> returns 
> 0.10.1.0   
> 0  
> 0  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> Clearly there is a difference in behavior for the 2 clients.
> Now after seekToEnd call if I make a call to resume (uncomment the resume 
> call in code above) then the behavior is
> 0.10.1.0   
> 2  
> 2  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> This is an issue I came across when using the spark kafka integration for 
> 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a 
> pull request to resolve that issue [SPARK-18779] but when looking at the 
> kafka client implementation/documentation now it seems the issue is with 
> kafka and not with spark. There does not seem to be any documentation which 
> specifies/implies that we need to call resume after seekToEnd for position to 
> return the correct value. Also there is a clear difference in the behavior in 
> the two kafka client implementations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2016-12-23 Thread Manikumar
Hi,

I would like to initiate the vote on KIP-48:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+
Delegation+token+support+for+Kafka


Thanks,
Manikumar


[jira] [Commented] (KAFKA-4558) throttling_test fails if the producer starts too fast.

2016-12-23 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773299#comment-15773299
 ] 

Ewen Cheslack-Postava commented on KAFKA-4558:
--

There's another case that looks basically the same as this issue:

http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-23--001.1482484603--apache--trunk--76169f9/report.html

{quote}
test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.security_protocol=SASL_SSL.failure_mode=hard_bounce.broker_type=controller
status: FAIL
run time:   3 minutes 27.556 seconds


9 acked message did not make it to the Consumer. They are: [3425, 3428, 
3404, 3407, 3410, 3413, 3416, 3419, 3422]. We validated that the first 9 of 
these missing messages correctly made it into Kafka's data files. This suggests 
they were lost on their way to the consumer.
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 123, in run
data = self.run_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 176, in run_test
return self.test_context.function(self.test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
 line 321, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/core/replication_test.py",
 line 155, in test_replication_with_broker_failure
self.run_produce_consume_validate(core_test_action=lambda: 
failures[failure_mode](self, broker_type))
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 101, in run_produce_consume_validate
self.validate()
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 163, in validate
assert success, msg
AssertionError: 9 acked message did not make it to the Consumer. They are: 
[3425, 3428, 3404, 3407, 3410, 3413, 3416, 3419, 3422]. We validated that the 
first 9 of these missing messages correctly made it into Kafka's data files. 
This suggests they were lost on their way to the consumer.
{quote}

These are in the middle of the set and are all 3 apart, which is presumably due 
to the fact that there are 3 partitions in the topic and we are seeing a piece 
of one of the partitions missing instead of all 3. I think probably this is 
fairly pervasive in the ProduceConsumeValidate tests, so may not be "fixable" 
just by ignoring tests one-off.

> throttling_test fails if the producer starts too fast.
> --
>
> Key: KAFKA-4558
> URL: https://issues.apache.org/jira/browse/KAFKA-4558
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>
> As described in https://issues.apache.org/jira/browse/KAFKA-4526, the 
> throttling test will fail if the producer in the produce-consume-validate 
> loop starts up before the consumer is fully initialized.
> We need to block the start of the producer until the consumer is ready to go. 
> The current plan is to poll the consumer for a particular metric (like, for 
> instance, partition assignment) which will act as a good proxy for successful 
> initialization. Currently, we just check for the existence of a process with 
> the PID, which is not a strong enough check, causing the test to fail 
> intermittently. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3240) Replication issues

2016-12-23 Thread Joseph Francis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773143#comment-15773143
 ] 

Joseph Francis edited comment on KAFKA-3240 at 12/23/16 3:50 PM:
-

We had a similar issue, but looks like it was triggered by kafka-python library 
double compressing the messages: 
https://github.com/dpkp/kafka-python/issues/718.
The corrupt messages got the replica threads to stop, eventually taking the 
brokers out of the cluster.


was (Author: josephfrancis):
We had a similar issue, but looks like it was triggered by kafka-python library 
double compressing the messages.
The corrupt messages got the replica threads to stop, eventually taking the 
brokers out of the cluster.

> Replication issues
> --
>
> Key: KAFKA-3240
> URL: https://issues.apache.org/jira/browse/KAFKA-3240
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0, 0.8.2.2, 0.9.0.1
> Environment: FreeBSD 10.2-RELEASE-p9
>Reporter: Jan Omar
>  Labels: reliability
>
> Hi,
> We are trying to replace our 3-broker cluster running on 0.6 with a new 
> cluster on 0.9.0.1 (but tried 0.8.2.2 and 0.9.0.0 as well).
> - 3 kafka nodes with one zookeeper instance on each machine
> - FreeBSD 10.2 p9
> - Nagle off (sysctl net.inet.tcp.delayed_ack=0)
> - all kafka machines write a ZFS ZIL to a dedicated SSD
> - 3 producers on 3 machines, writing to 1 topics, partitioning 3, replication 
> factor 3
> - acks all
> - 10 Gigabit Ethernet, all machines on one switch, ping 0.05 ms worst case.
> While using the ProducerPerformance or rdkafka_performance we are seeing very 
> strange Replication errors. Any hint on what's going on would be highly 
> appreciated. Any suggestion on how to debug this properly would help as well.
> This is what our broker config looks like:
> {code}
> broker.id=5
> auto.create.topics.enable=false
> delete.topic.enable=true
> listeners=PLAINTEXT://:9092
> port=9092
> host.name=kafka-five.acc
> advertised.host.name=10.5.3.18
> zookeeper.connect=zookeeper-four.acc:2181,zookeeper-five.acc:2181,zookeeper-six.acc:2181
> zookeeper.connection.timeout.ms=6000
> num.replica.fetchers=1
> replica.fetch.max.bytes=1
> replica.fetch.wait.max.ms=500
> replica.high.watermark.checkpoint.interval.ms=5000
> replica.socket.timeout.ms=30
> replica.socket.receive.buffer.bytes=65536
> replica.lag.time.max.ms=1000
> min.insync.replicas=2
> controller.socket.timeout.ms=3
> controller.message.queue.size=100
> log.dirs=/var/db/kafka
> num.partitions=8
> message.max.bytes=1
> auto.create.topics.enable=false
> log.index.interval.bytes=4096
> log.index.size.max.bytes=10485760
> log.retention.hours=168
> log.flush.interval.ms=1
> log.flush.interval.messages=2
> log.flush.scheduler.interval.ms=2000
> log.roll.hours=168
> log.retention.check.interval.ms=30
> log.segment.bytes=536870912
> zookeeper.connection.timeout.ms=100
> zookeeper.sync.time.ms=5000
> num.io.threads=8
> num.network.threads=4
> socket.request.max.bytes=104857600
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> queued.max.requests=10
> fetch.purgatory.purge.interval.requests=100
> producer.purgatory.purge.interval.requests=100
> replica.lag.max.messages=1000
> {code}
> These are the errors we're seeing:
> {code:borderStyle=solid}
> ERROR [Replica Manager on Broker 5]: Error processing fetch operation on 
> partition [test,0] offset 50727 (kafka.server.ReplicaManager)
> java.lang.IllegalStateException: Invalid message size: 0
>   at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:141)
>   at kafka.log.LogSegment.translateOffset(LogSegment.scala:105)
>   at kafka.log.LogSegment.read(LogSegment.scala:126)
>   at kafka.log.Log.read(Log.scala:506)
>   at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536)
>   at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507)
>   at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462)
>   at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431)
>   at kafka.server.KafkaApis.handle(Kaf

[jira] [Commented] (KAFKA-3240) Replication issues

2016-12-23 Thread Joseph Francis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773143#comment-15773143
 ] 

Joseph Francis commented on KAFKA-3240:
---

We had a similar issue, but looks like it was triggered by kafka-python library 
double compressing the messages.
The corrupt messages got the replica threads to stop, eventually taking the 
brokers out of the cluster.

> Replication issues
> --
>
> Key: KAFKA-3240
> URL: https://issues.apache.org/jira/browse/KAFKA-3240
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0, 0.8.2.2, 0.9.0.1
> Environment: FreeBSD 10.2-RELEASE-p9
>Reporter: Jan Omar
>  Labels: reliability
>
> Hi,
> We are trying to replace our 3-broker cluster running on 0.6 with a new 
> cluster on 0.9.0.1 (but tried 0.8.2.2 and 0.9.0.0 as well).
> - 3 kafka nodes with one zookeeper instance on each machine
> - FreeBSD 10.2 p9
> - Nagle off (sysctl net.inet.tcp.delayed_ack=0)
> - all kafka machines write a ZFS ZIL to a dedicated SSD
> - 3 producers on 3 machines, writing to 1 topics, partitioning 3, replication 
> factor 3
> - acks all
> - 10 Gigabit Ethernet, all machines on one switch, ping 0.05 ms worst case.
> While using the ProducerPerformance or rdkafka_performance we are seeing very 
> strange Replication errors. Any hint on what's going on would be highly 
> appreciated. Any suggestion on how to debug this properly would help as well.
> This is what our broker config looks like:
> {code}
> broker.id=5
> auto.create.topics.enable=false
> delete.topic.enable=true
> listeners=PLAINTEXT://:9092
> port=9092
> host.name=kafka-five.acc
> advertised.host.name=10.5.3.18
> zookeeper.connect=zookeeper-four.acc:2181,zookeeper-five.acc:2181,zookeeper-six.acc:2181
> zookeeper.connection.timeout.ms=6000
> num.replica.fetchers=1
> replica.fetch.max.bytes=1
> replica.fetch.wait.max.ms=500
> replica.high.watermark.checkpoint.interval.ms=5000
> replica.socket.timeout.ms=30
> replica.socket.receive.buffer.bytes=65536
> replica.lag.time.max.ms=1000
> min.insync.replicas=2
> controller.socket.timeout.ms=3
> controller.message.queue.size=100
> log.dirs=/var/db/kafka
> num.partitions=8
> message.max.bytes=1
> auto.create.topics.enable=false
> log.index.interval.bytes=4096
> log.index.size.max.bytes=10485760
> log.retention.hours=168
> log.flush.interval.ms=1
> log.flush.interval.messages=2
> log.flush.scheduler.interval.ms=2000
> log.roll.hours=168
> log.retention.check.interval.ms=30
> log.segment.bytes=536870912
> zookeeper.connection.timeout.ms=100
> zookeeper.sync.time.ms=5000
> num.io.threads=8
> num.network.threads=4
> socket.request.max.bytes=104857600
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> queued.max.requests=10
> fetch.purgatory.purge.interval.requests=100
> producer.purgatory.purge.interval.requests=100
> replica.lag.max.messages=1000
> {code}
> These are the errors we're seeing:
> {code:borderStyle=solid}
> ERROR [Replica Manager on Broker 5]: Error processing fetch operation on 
> partition [test,0] offset 50727 (kafka.server.ReplicaManager)
> java.lang.IllegalStateException: Invalid message size: 0
>   at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:141)
>   at kafka.log.LogSegment.translateOffset(LogSegment.scala:105)
>   at kafka.log.LogSegment.read(LogSegment.scala:126)
>   at kafka.log.Log.read(Log.scala:506)
>   at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536)
>   at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507)
>   at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462)
>   at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)0
> {code}
> and 
> {code}
> ERROR Found invalid messages during fetch for partition [test,0] offset 2732 
> error Message found with corrupt size (0) in shallow iterator 
> (kafka.server.ReplicaFetcherThread

[jira] [Updated] (KAFKA-1923) Negative offsets in replication-offset-checkpoint file

2016-12-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-1923:
---
Fix Version/s: 0.10.2.0

> Negative offsets in replication-offset-checkpoint file
> --
>
> Key: KAFKA-1923
> URL: https://issues.apache.org/jira/browse/KAFKA-1923
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.0
>Reporter: Oleg Golovin
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> Today was the second time we witnessed negative offsets in 
> replication-offset-checkpoint file. After restart the node stops replicating 
> some of its partitions.
> Unfortunately we can't reproduce it yet. But the two cases we encountered 
> indicate a bug which should be addressed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1923) Negative offsets in replication-offset-checkpoint file

2016-12-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-1923:
---
Labels: reliability  (was: )

> Negative offsets in replication-offset-checkpoint file
> --
>
> Key: KAFKA-1923
> URL: https://issues.apache.org/jira/browse/KAFKA-1923
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.0
>Reporter: Oleg Golovin
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> Today was the second time we witnessed negative offsets in 
> replication-offset-checkpoint file. After restart the node stops replicating 
> some of its partitions.
> Unfortunately we can't reproduce it yet. But the two cases we encountered 
> indicate a bug which should be addressed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1923) Negative offsets in replication-offset-checkpoint file

2016-12-23 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772787#comment-15772787
 ] 

Alexey Ozeritskiy commented on KAFKA-1923:
--

Problem stil exists (v0.10.1.1). Trying to reproduce it. I've read the code and 
found that the only place with negative offsets is Replica.logEndOffset (for 
remote Replica), but I dont understand how it can get into highwatermark.

> Negative offsets in replication-offset-checkpoint file
> --
>
> Key: KAFKA-1923
> URL: https://issues.apache.org/jira/browse/KAFKA-1923
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.0
>Reporter: Oleg Golovin
>
> Today was the second time we witnessed negative offsets in 
> replication-offset-checkpoint file. After restart the node stops replicating 
> some of its partitions.
> Unfortunately we can't reproduce it yet. But the two cases we encountered 
> indicate a bug which should be addressed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4547) Consumer.position returns incorrect results for Kafka 0.10.1.0 client

2016-12-23 Thread Pranav Nakhe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pranav Nakhe updated KAFKA-4547:

Attachment: issuerep.zip

> Consumer.position returns incorrect results for Kafka 0.10.1.0 client
> -
>
> Key: KAFKA-4547
> URL: https://issues.apache.org/jira/browse/KAFKA-4547
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Windows Kafka 0.10.1.0
>Reporter: Pranav Nakhe
>Assignee: Vahid Hashemian
>  Labels: clients
> Fix For: 0.10.2.0
>
> Attachments: issuerep.zip
>
>
> Consider the following code -
>   KafkaConsumer consumer = new 
> KafkaConsumer(props);
>   List listOfPartitions = new ArrayList();
>   for (int i = 0; i < 
> consumer.partitionsFor("IssueTopic").size(); i++) {
>   listOfPartitions.add(new TopicPartition("IssueTopic", 
> i));
>   }
>   consumer.assign(listOfPartitions);  
>   consumer.pause(listOfPartitions);
>   consumer.seekToEnd(listOfPartitions);
> //consumer.resume(listOfPartitions); -- commented out
>   for(int i = 0; i < listOfPartitions.size(); i++) {
>   
> System.out.println(consumer.position(listOfPartitions.get(i)));
>   }
>   
> I have created a topic IssueTopic with 3 partitions with a single replica on 
> my single node kafka installation (0.10.1.0)
> The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 
> 0.10.0.1
> A) Initially when there are no messages on IssueTopic running the above 
> program returns
> 0.10.1.0   
> 0  
> 0  
> 0   
> 0.10.0.1
> 0
> 0
> 0
> B) Next I send 6 messages and see that the messages have been evenly 
> distributed across the three partitions. Running the above program now 
> returns 
> 0.10.1.0   
> 0  
> 0  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> Clearly there is a difference in behavior for the 2 clients.
> Now after seekToEnd call if I make a call to resume (uncomment the resume 
> call in code above) then the behavior is
> 0.10.1.0   
> 2  
> 2  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> This is an issue I came across when using the spark kafka integration for 
> 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a 
> pull request to resolve that issue [SPARK-18779] but when looking at the 
> kafka client implementation/documentation now it seems the issue is with 
> kafka and not with spark. There does not seem to be any documentation which 
> specifies/implies that we need to call resume after seekToEnd for position to 
> return the correct value. Also there is a clear difference in the behavior in 
> the two kafka client implementations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4547) Consumer.position returns incorrect results for Kafka 0.10.1.0 client

2016-12-23 Thread Pranav Nakhe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pranav Nakhe updated KAFKA-4547:

Attachment: (was: issuerep.zip)

> Consumer.position returns incorrect results for Kafka 0.10.1.0 client
> -
>
> Key: KAFKA-4547
> URL: https://issues.apache.org/jira/browse/KAFKA-4547
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Windows Kafka 0.10.1.0
>Reporter: Pranav Nakhe
>Assignee: Vahid Hashemian
>  Labels: clients
> Fix For: 0.10.2.0
>
>
> Consider the following code -
>   KafkaConsumer consumer = new 
> KafkaConsumer(props);
>   List listOfPartitions = new ArrayList();
>   for (int i = 0; i < 
> consumer.partitionsFor("IssueTopic").size(); i++) {
>   listOfPartitions.add(new TopicPartition("IssueTopic", 
> i));
>   }
>   consumer.assign(listOfPartitions);  
>   consumer.pause(listOfPartitions);
>   consumer.seekToEnd(listOfPartitions);
> //consumer.resume(listOfPartitions); -- commented out
>   for(int i = 0; i < listOfPartitions.size(); i++) {
>   
> System.out.println(consumer.position(listOfPartitions.get(i)));
>   }
>   
> I have created a topic IssueTopic with 3 partitions with a single replica on 
> my single node kafka installation (0.10.1.0)
> The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 
> 0.10.0.1
> A) Initially when there are no messages on IssueTopic running the above 
> program returns
> 0.10.1.0   
> 0  
> 0  
> 0   
> 0.10.0.1
> 0
> 0
> 0
> B) Next I send 6 messages and see that the messages have been evenly 
> distributed across the three partitions. Running the above program now 
> returns 
> 0.10.1.0   
> 0  
> 0  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> Clearly there is a difference in behavior for the 2 clients.
> Now after seekToEnd call if I make a call to resume (uncomment the resume 
> call in code above) then the behavior is
> 0.10.1.0   
> 2  
> 2  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> This is an issue I came across when using the spark kafka integration for 
> 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a 
> pull request to resolve that issue [SPARK-18779] but when looking at the 
> kafka client implementation/documentation now it seems the issue is with 
> kafka and not with spark. There does not seem to be any documentation which 
> specifies/implies that we need to call resume after seekToEnd for position to 
> return the correct value. Also there is a clear difference in the behavior in 
> the two kafka client implementations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4547) Consumer.position returns incorrect results for Kafka 0.10.1.0 client

2016-12-23 Thread Pranav Nakhe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772272#comment-15772272
 ] 

Pranav Nakhe edited comment on KAFKA-4547 at 12/23/16 8:11 AM:
---

The kafka server I am using is kafka_2.11-0.10.1.0 on Windows 10. I am 
attaching the complete source I am using to reproduce the issue. Both the 
sender and receiver applications are included in the issuerep.zip file.


was (Author: pnakhe):
The kafka server I am using is kafka_2.11-0.10.1.0 on Windows 10. I am 
attaching the complete source I am using to reproduce the issue. Both the 
sender and receiver applications are included in the zip file.

> Consumer.position returns incorrect results for Kafka 0.10.1.0 client
> -
>
> Key: KAFKA-4547
> URL: https://issues.apache.org/jira/browse/KAFKA-4547
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Windows Kafka 0.10.1.0
>Reporter: Pranav Nakhe
>Assignee: Vahid Hashemian
>  Labels: clients
> Fix For: 0.10.2.0
>
> Attachments: issuerep.zip
>
>
> Consider the following code -
>   KafkaConsumer consumer = new 
> KafkaConsumer(props);
>   List listOfPartitions = new ArrayList();
>   for (int i = 0; i < 
> consumer.partitionsFor("IssueTopic").size(); i++) {
>   listOfPartitions.add(new TopicPartition("IssueTopic", 
> i));
>   }
>   consumer.assign(listOfPartitions);  
>   consumer.pause(listOfPartitions);
>   consumer.seekToEnd(listOfPartitions);
> //consumer.resume(listOfPartitions); -- commented out
>   for(int i = 0; i < listOfPartitions.size(); i++) {
>   
> System.out.println(consumer.position(listOfPartitions.get(i)));
>   }
>   
> I have created a topic IssueTopic with 3 partitions with a single replica on 
> my single node kafka installation (0.10.1.0)
> The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 
> 0.10.0.1
> A) Initially when there are no messages on IssueTopic running the above 
> program returns
> 0.10.1.0   
> 0  
> 0  
> 0   
> 0.10.0.1
> 0
> 0
> 0
> B) Next I send 6 messages and see that the messages have been evenly 
> distributed across the three partitions. Running the above program now 
> returns 
> 0.10.1.0   
> 0  
> 0  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> Clearly there is a difference in behavior for the 2 clients.
> Now after seekToEnd call if I make a call to resume (uncomment the resume 
> call in code above) then the behavior is
> 0.10.1.0   
> 2  
> 2  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> This is an issue I came across when using the spark kafka integration for 
> 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a 
> pull request to resolve that issue [SPARK-18779] but when looking at the 
> kafka client implementation/documentation now it seems the issue is with 
> kafka and not with spark. There does not seem to be any documentation which 
> specifies/implies that we need to call resume after seekToEnd for position to 
> return the correct value. Also there is a clear difference in the behavior in 
> the two kafka client implementations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4547) Consumer.position returns incorrect results for Kafka 0.10.1.0 client

2016-12-23 Thread Pranav Nakhe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pranav Nakhe updated KAFKA-4547:

Attachment: issuerep.zip

The kafka server I am using is kafka_2.11-0.10.1.0 on Windows 10. I am 
attaching the complete source I am using to reproduce the issue. Both the 
sender and receiver applications are included in the zip file.

> Consumer.position returns incorrect results for Kafka 0.10.1.0 client
> -
>
> Key: KAFKA-4547
> URL: https://issues.apache.org/jira/browse/KAFKA-4547
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Windows Kafka 0.10.1.0
>Reporter: Pranav Nakhe
>Assignee: Vahid Hashemian
>  Labels: clients
> Fix For: 0.10.2.0
>
> Attachments: issuerep.zip
>
>
> Consider the following code -
>   KafkaConsumer consumer = new 
> KafkaConsumer(props);
>   List listOfPartitions = new ArrayList();
>   for (int i = 0; i < 
> consumer.partitionsFor("IssueTopic").size(); i++) {
>   listOfPartitions.add(new TopicPartition("IssueTopic", 
> i));
>   }
>   consumer.assign(listOfPartitions);  
>   consumer.pause(listOfPartitions);
>   consumer.seekToEnd(listOfPartitions);
> //consumer.resume(listOfPartitions); -- commented out
>   for(int i = 0; i < listOfPartitions.size(); i++) {
>   
> System.out.println(consumer.position(listOfPartitions.get(i)));
>   }
>   
> I have created a topic IssueTopic with 3 partitions with a single replica on 
> my single node kafka installation (0.10.1.0)
> The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 
> 0.10.0.1
> A) Initially when there are no messages on IssueTopic running the above 
> program returns
> 0.10.1.0   
> 0  
> 0  
> 0   
> 0.10.0.1
> 0
> 0
> 0
> B) Next I send 6 messages and see that the messages have been evenly 
> distributed across the three partitions. Running the above program now 
> returns 
> 0.10.1.0   
> 0  
> 0  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> Clearly there is a difference in behavior for the 2 clients.
> Now after seekToEnd call if I make a call to resume (uncomment the resume 
> call in code above) then the behavior is
> 0.10.1.0   
> 2  
> 2  
> 2  
> 0.10.0.1
> 2
> 2
> 2
> This is an issue I came across when using the spark kafka integration for 
> 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a 
> pull request to resolve that issue [SPARK-18779] but when looking at the 
> kafka client implementation/documentation now it seems the issue is with 
> kafka and not with spark. There does not seem to be any documentation which 
> specifies/implies that we need to call resume after seekToEnd for position to 
> return the correct value. Also there is a clear difference in the behavior in 
> the two kafka client implementations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)