[jira] [Commented] (KAFKA-6496) NAT and Kafka

2018-02-01 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349884#comment-16349884
 ] 

Manikumar commented on KAFKA-6496:
--

Try configuring "advertised.listeners" config property and make sure advertised 
host is reachable by zk.

> NAT and Kafka
> -
>
> Key: KAFKA-6496
> URL: https://issues.apache.org/jira/browse/KAFKA-6496
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Ronald van de Kuil
>Priority: Critical
>
> Hi,
> As far as I know Kafka itself does not support NAT based on a test that I did 
> with my physical router.
>  
> I can imagine that a real use case exists where NAT is desirable. For 
> example, an OpenStack installation where Kafka hides behind floating ip 
> addresses.
>  
> Are there any plans, to make Kafka NAT friendly?
>  
> Best Regards,
> Ronald



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6496) NAT and Kafka

2018-02-01 Thread Ronald van de Kuil (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349848#comment-16349848
 ] 

Ronald van de Kuil commented on KAFKA-6496:
---

I did a test with Kafka behind a physical router with a NAT port forward.

With wireshark I saw it return the internal IP address of the kafka server as 
part of the handshake. I did not get the communication to work. That was some 
months ago.

The floating IP to fixed iP assignment is like NAT.

That is why I expect the same for accessing kafka from a place outside the 
openstack tenant is similar. The IP address that will be returned during the 
handshake will be the internal IP adresses which will not be resolvable.

> NAT and Kafka
> -
>
> Key: KAFKA-6496
> URL: https://issues.apache.org/jira/browse/KAFKA-6496
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Ronald van de Kuil
>Priority: Critical
>
> Hi,
> As far as I know Kafka itself does not support NAT based on a test that I did 
> with my physical router.
>  
> I can imagine that a real use case exists where NAT is desirable. For 
> example, an OpenStack installation where Kafka hides behind floating ip 
> addresses.
>  
> Are there any plans, to make Kafka NAT friendly?
>  
> Best Regards,
> Ronald



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6496) NAT and Kafka

2018-02-01 Thread Ronald van de Kuil (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349848#comment-16349848
 ] 

Ronald van de Kuil edited comment on KAFKA-6496 at 2/2/18 6:23 AM:
---

I did a test with Kafka behind a physical router with a NAT port forward.

With wireshark I saw it return the internal IP address of the kafka server as 
part of the handshake. I did not get the communication to work. That was some 
months ago.

The floating IP to fixed iP assignment is like NAT.

That is why I expect the same for accessing kafka from a place outside the 
openstack tenant is similar. The IP address that will be returned during the 
handshake will be the internal IP adresses which will not be reachable.


was (Author: ronald van de kuil):
I did a test with Kafka behind a physical router with a NAT port forward.

With wireshark I saw it return the internal IP address of the kafka server as 
part of the handshake. I did not get the communication to work. That was some 
months ago.

The floating IP to fixed iP assignment is like NAT.

That is why I expect the same for accessing kafka from a place outside the 
openstack tenant is similar. The IP address that will be returned during the 
handshake will be the internal IP adresses which will not be resolvable.

> NAT and Kafka
> -
>
> Key: KAFKA-6496
> URL: https://issues.apache.org/jira/browse/KAFKA-6496
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Ronald van de Kuil
>Priority: Critical
>
> Hi,
> As far as I know Kafka itself does not support NAT based on a test that I did 
> with my physical router.
>  
> I can imagine that a real use case exists where NAT is desirable. For 
> example, an OpenStack installation where Kafka hides behind floating ip 
> addresses.
>  
> Are there any plans, to make Kafka NAT friendly?
>  
> Best Regards,
> Ronald



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6425) Calculating cleanBytes in LogToClean might not be correct

2018-02-01 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349734#comment-16349734
 ] 

huxihx commented on KAFKA-6425:
---

Hi all, any updates for this jira?

> Calculating cleanBytes in LogToClean might not be correct
> -
>
> Key: KAFKA-6425
> URL: https://issues.apache.org/jira/browse/KAFKA-6425
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: huxihx
>Priority: Major
>
> In class `LogToClean`, the calculation for `cleanBytes` is as below:
> {code:java}
> val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
> {code}
> Most of the time, the `firstDirtyOffset` is the base offset of active segment 
> which works pretty well with log.logSegments, so we can calculate the 
> cleanBytes by safely summing up the sizes of all log segments whose base 
> offset is less than `firstDirtyOffset`.
> However, things changed after `firstUnstableOffset` was introduced. Users 
> could indirectly change this offset to a non-base offset(changing log start 
> offset for instance). In this case, it's not correct to sum up the total size 
> for a log segment. Instead, we should only sum up the bytes between the base 
> offset and `firstUnstableOffset`.
> Let me show an example:
> Say I have three log segments, shown as below:
> 0L   -->  log segment1, size: 1000Bytes
> 1234L -->  log segment2, size: 1000Bytes
> 4567L --> active log segment, current size: 500Bytes
> Based on the current code, if `firstUnstableOffset` is deliberately set to 
> 2000L(this could be possible, since it's lower bounded by the log start 
> offset and user could explicitly change LSO), then `cleanBytes` is calculated 
> as 2000Bytes which is wrong. The expected value should be 1000 + (bytes 
> between offset 1234L and 2000L) 
> [~junrao] [~ijuma] Do all of these make sense?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6370) MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to NullPointerException

2018-02-01 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-6370.
---
Resolution: Cannot Reproduce

Although it's harmless to add some defensive checks, this issue should have not 
happened based on the code review. Since it is not easy to reproduce again, 
just closed this Jira and be free to reopen it if encountered.

> MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to 
> NullPointerException
> ---
>
> Key: KAFKA-6370
> URL: https://issues.apache.org/jira/browse/KAFKA-6370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: huxihx
>Priority: Minor
>  Labels: mirror-maker
>
> From 
> https://builds.apache.org/job/kafka-trunk-jdk8/2277/testReport/junit/kafka.tools/MirrorMakerIntegrationTest/testCommaSeparatedRegex/
>  :
> {code}
> java.lang.NullPointerException
>   at 
> scala.collection.immutable.StringLike.$anonfun$format$1(StringLike.scala:351)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:234)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at scala.collection.immutable.StringLike.format(StringLike.scala:351)
>   at scala.collection.immutable.StringLike.format$(StringLike.scala:350)
>   at scala.collection.immutable.StringOps.format(StringOps.scala:29)
>   at 
> kafka.metrics.KafkaMetricsGroup$.$anonfun$toScope$3(KafkaMetricsGroup.scala:170)
>   at scala.collection.immutable.List.map(List.scala:283)
>   at 
> kafka.metrics.KafkaMetricsGroup$.kafka$metrics$KafkaMetricsGroup$$toScope(KafkaMetricsGroup.scala:170)
>   at 
> kafka.metrics.KafkaMetricsGroup.explicitMetricName(KafkaMetricsGroup.scala:67)
>   at 
> kafka.metrics.KafkaMetricsGroup.explicitMetricName$(KafkaMetricsGroup.scala:51)
>   at 
> kafka.network.RequestMetrics.explicitMetricName(RequestChannel.scala:352)
>   at 
> kafka.metrics.KafkaMetricsGroup.metricName(KafkaMetricsGroup.scala:47)
>   at 
> kafka.metrics.KafkaMetricsGroup.metricName$(KafkaMetricsGroup.scala:42)
>   at kafka.network.RequestMetrics.metricName(RequestChannel.scala:352)
>   at 
> kafka.metrics.KafkaMetricsGroup.newHistogram(KafkaMetricsGroup.scala:81)
>   at 
> kafka.metrics.KafkaMetricsGroup.newHistogram$(KafkaMetricsGroup.scala:80)
>   at kafka.network.RequestMetrics.newHistogram(RequestChannel.scala:352)
>   at kafka.network.RequestMetrics.(RequestChannel.scala:364)
>   at 
> kafka.network.RequestChannel$Metrics.$anonfun$new$2(RequestChannel.scala:57)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at kafka.network.RequestChannel$Metrics.(RequestChannel.scala:56)
>   at kafka.network.RequestChannel.(RequestChannel.scala:243)
>   at kafka.network.SocketServer.(SocketServer.scala:71)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:238)
>   at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
>   at 
> kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:93)
> {code}
> Here is the code from KafkaMetricsGroup.scala :
> {code}
> .map { case (key, value) => "%s.%s".format(key, 
> value.replaceAll("\\.", "_"))}
> {code}
> It seems (some) value was null.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4739) KafkaConsumer poll going into an infinite loop

2018-02-01 Thread Steve (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349634#comment-16349634
 ] 

Steve commented on KAFKA-4739:
--

Hi,I use the Kafka Connect framework,it use the Kafka consumer. The same 
problem happened to me .Some consumer threads use 100% cpu even I stop sending 
new message.
{code:java}
USER %CPU PRI SCNT WCHAN USER SYSTEM TID TIME 
hdfs 0.0 19 - ep_pol - - 32301 00:14:50
hdfs 0.0 19 - futex_ - - 499 00:00:05
hdfs 0.0 19 - futex_ - - 1060 00:00:24
hdfs 91.6 19 - - - - 24105 2-09:48:19
hdfs 0.0 19 - futex_ - - 7111 00:00:39
[hdfs@ha3 root]$ jstack 32111 | grep 5e29 -A 15
"WorkerSinkTask-t-connector-2" daemon prio=10 tid=0x7fe9c4108800 nid=0x5e29 
runnable [0x7fe97931d000] 
 java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 - locked <0x0007482e6388> (a sun.nio.ch.Util$2)
 - locked <0x0007482e6378> (a java.util.Collections$UnmodifiableSet)
 - locked <0x0007482de340> (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at org.apache.kafka.common.network.Selector.select(Selector.java:425)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320){code}
I think it was caused by jdk epoll bug,  .The bug is still happen in jdk 
1.7,1.8.

Kafka client should handle the epoll bug as same as NETTY  or other nio 
frameworks.:)

[https://github.com/netty/netty/issues/327]

THRIFT-4251

> KafkaConsumer poll going into an infinite loop
> --
>
> Key: KAFKA-4739
> URL: https://issues.apache.org/jira/browse/KAFKA-4739
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Vipul Singh
>Priority: Major
>
> We are seeing an issue with our kafka consumer where it seems to go into an 
> infinite loop while polling, trying to fetch data from kafka. We are seeing 
> the heartbeat requests on the broker from the consumer, but nothing else from 
> the kafka consumer.
> We enabled debug level logging on the consumer, and see these logs: 
> https://gist.github.com/neoeahit/757bff7acdea62656f065f4dcb8974b4
> And this just goes on. The way we have been able to replicate this issue, is 
> by restarting the process in multiple successions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR

2018-02-01 Thread Wang Shuxiao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Shuxiao updated KAFKA-6522:

Description: 
we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 
fails to fetch data from leader:
{code:java}
[2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, 
fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as 
the leader reported an error: UNKNOWN_SERVER_ERROR 
(kafka.server.ReplicaFetcherThread)
[2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, 
fetcherId=3] Error when sending leader epoch request for Map(sub_myshardSinfo-3 
-> -1, sub_myshardUinfo-1 -> -1, sub_videoOnlineResourceType8Test-0 -> -1, 
pub_videoReportEevent-1 -> 9, sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> 
-1, pub_lidaTopic3-15 -> -1, pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, 
sub_svAdminTagging-5 -> -1, pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> 
-1, sub_tinyVideoWatch-4 -> 14, __consumer_offsets-36 -> -1, 
pub_ybusAuditorChannel3-2 -> -1, pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 
-> -1, sub_baseonline-4 -> -1, __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> 
-1, sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 
-> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, 
sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 -> 
-1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, smsgateway-10 
-> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, 
__consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> -1, 
smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 401 was disconnected before the response was 
read
 at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
 at 
kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312)
 at 
kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code}
 

on the leader(broker-401) side, the log shows:
{code:java}
[2018-02-02 08:58:26,859] ERROR Closing socket for 
192.168.100.101:9099-192.168.100.103:30476 because of error 
(kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
for apiKey: 23 and apiVersion: 0
Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it 
should be between `0` and `20` (inclusive)
 at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73)
 at 
org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
 at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96)
 at kafka.network.RequestChannel$Request.(RequestChannel.scala:91)
 at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
 at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
 at kafka.network.Processor.run(SocketServer.scala:417)
 at java.lang.Thread.run(Thread.java:745){code}

  was:
we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 
fails to fetch data from leader:
{code:java}
[2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, 
fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as 
the leader reported an error: UNKNOWN_SERVER_ERROR 
(kafka.server.ReplicaFetcherThread)
[2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, 
fetcherId=3] Error when sending leader epoch request for Map(sub_myshardSinfo-3 
-> -1, sub_myshardUinfo-1 -> -1, sub_videoOnlineResourceType8Test-0 -> -1, 
pub_videoReportEevent-1 -> 9, sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> 
-1, pub_lidaTopic3-15 -> -1, pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, 
sub_svAdminTagging-5 -> -1, pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> 
-1, sub_tinyVideoWatch-4 -> 14, __consumer_offsets-36 -> -1, 
pub_ybusAuditorChannel3-2 -> -1, pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 
-> -1, sub_baseonline-4 -> -1, __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> 
-1, sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 
-> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 

[jira] [Created] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR

2018-02-01 Thread Wang Shuxiao (JIRA)
Wang Shuxiao created KAFKA-6522:
---

 Summary: Retrying leaderEpoch request for partition xxx as the 
leader reported an error: UNKNOWN_SERVER_ERROR
 Key: KAFKA-6522
 URL: https://issues.apache.org/jira/browse/KAFKA-6522
 Project: Kafka
  Issue Type: New Feature
  Components: core
Affects Versions: 1.0.0
 Environment: Ubuntu 16.04 LTS 64bit-server
Reporter: Wang Shuxiao


we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 
fails to fetch data from leader:
{code:java}
[2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, 
fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as 
the leader reported an error: UNKNOWN_SERVER_ERROR 
(kafka.server.ReplicaFetcherThread)
[2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, 
fetcherId=3] Error when sending leader epoch request for Map(sub_myshardSinfo-3 
-> -1, sub_myshardUinfo-1 -> -1, sub_videoOnlineResourceType8Test-0 -> -1, 
pub_videoReportEevent-1 -> 9, sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> 
-1, pub_lidaTopic3-15 -> -1, pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, 
sub_svAdminTagging-5 -> -1, pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> 
-1, sub_tinyVideoWatch-4 -> 14, __consumer_offsets-36 -> -1, 
pub_ybusAuditorChannel3-2 -> -1, pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 
-> -1, sub_baseonline-4 -> -1, __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> 
-1, sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 
-> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, 
sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 -> 
-1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, smsgateway-10 
-> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, 
__consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> -1, 
smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 401 was disconnected before the response was 
read
 at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
 at 
kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312)
 at 
kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code}
 

on the leader(broker-401) side, the log shows:
{code:java}
[2018-02-02 08:58:26,859] ERROR Closing socket for 
116.31.112.9:9099-116.31.121.159:30476 because of error 
(kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
for apiKey: 23 and apiVersion: 0
Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it 
should be between `0` and `20` (inclusive)
 at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73)
 at 
org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
 at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96)
 at kafka.network.RequestChannel$Request.(RequestChannel.scala:91)
 at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
 at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
 at kafka.network.Processor.run(SocketServer.scala:417)
 at java.lang.Thread.run(Thread.java:745){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2018-02-01 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349616#comment-16349616
 ] 

huxihx commented on KAFKA-6362:
---

[~hachikuji] Yes, I do mean the consumer using assign() API. And for consumer 
groups, they will ensure the coordinator's readiness in every round of 
ConsumerCoordinator#poll() if they found it's not available.  Does it make 
sense?

> auto commit not work since coordinatorUnknown() is always true.
> ---
>
> Key: KAFKA-6362
> URL: https://issues.apache.org/jira/browse/KAFKA-6362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1
>Reporter: Renkai Ge
>Assignee: huxihx
>Priority: Major
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>   auto.commit.interval.ms = 5000
>   auto.offset.reset = latest
>   bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
> 11.192.73.66:3002]
>   check.crcs = true
>   client.id = 
>   connections.max.idle.ms = 54
>   enable.auto.commit = true
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = tcprtdetail_flink
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
>   ssl.truststore.type = JKS
>   value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
> e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that 
> the coordinatorUnknown() function in 
> [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
>  always returns true,and nextAutoCommitDeadline just increases 
> infinitly.Should there be a lookupCoordinator() after line 604 like in 
> [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
>  I add lookupCoordinator() next to line 604.The consumer can auto commit 
> offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-02-01 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6492.

   Resolution: Fixed
 Assignee: Jason Gustafson
Fix Version/s: (was: 1.2.0)
   1.1.0

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>Reporter: Jiangjie Qin
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-02-01 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349613#comment-16349613
 ] 

Jason Gustafson commented on KAFKA-6388:


[~pdavidson] [~dhay] We have addressed one known case where this problem can 
occur in KAFKA-6492. When you have encountered this, do you know if the topics 
were using compaction?

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> 

[jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2018-02-01 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349611#comment-16349611
 ] 

Jason Gustafson commented on KAFKA-6362:


[~huxi_2b] By "standalone," you mean a simple consumer which is not using group 
management (i.e. one using the {{assign()}} API), right? I am just confirming 
that the problem is only for this usage.

> auto commit not work since coordinatorUnknown() is always true.
> ---
>
> Key: KAFKA-6362
> URL: https://issues.apache.org/jira/browse/KAFKA-6362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1
>Reporter: Renkai Ge
>Assignee: huxihx
>Priority: Major
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>   auto.commit.interval.ms = 5000
>   auto.offset.reset = latest
>   bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
> 11.192.73.66:3002]
>   check.crcs = true
>   client.id = 
>   connections.max.idle.ms = 54
>   enable.auto.commit = true
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = tcprtdetail_flink
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
>   ssl.truststore.type = JKS
>   value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
> e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that 
> the coordinatorUnknown() function in 
> [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
>  always returns true,and nextAutoCommitDeadline just increases 
> infinitly.Should there be a lookupCoordinator() after line 604 like in 
> [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
>  I add lookupCoordinator() next to line 604.The consumer can auto commit 
> offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2018-02-01 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349609#comment-16349609
 ] 

huxihx commented on KAFKA-6362:
---

[~hachikuji] I reproduced the issue on my local test environment where 
standalone consumer was used. Seems there is no code for standalone consumers 
to rediscover the coordinator even after it came back to work.

> auto commit not work since coordinatorUnknown() is always true.
> ---
>
> Key: KAFKA-6362
> URL: https://issues.apache.org/jira/browse/KAFKA-6362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1
>Reporter: Renkai Ge
>Assignee: huxihx
>Priority: Major
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>   auto.commit.interval.ms = 5000
>   auto.offset.reset = latest
>   bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
> 11.192.73.66:3002]
>   check.crcs = true
>   client.id = 
>   connections.max.idle.ms = 54
>   enable.auto.commit = true
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = tcprtdetail_flink
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
>   ssl.truststore.type = JKS
>   value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
> e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that 
> the coordinatorUnknown() function in 
> [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
>  always returns true,and nextAutoCommitDeadline just increases 
> infinitly.Should there be a lookupCoordinator() after line 604 like in 
> [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
>  I add lookupCoordinator() next to line 604.The consumer can auto commit 
> offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2018-02-01 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-6362:
--

Assignee: huxihx

> auto commit not work since coordinatorUnknown() is always true.
> ---
>
> Key: KAFKA-6362
> URL: https://issues.apache.org/jira/browse/KAFKA-6362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1
>Reporter: Renkai Ge
>Assignee: huxihx
>Priority: Major
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>   auto.commit.interval.ms = 5000
>   auto.offset.reset = latest
>   bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
> 11.192.73.66:3002]
>   check.crcs = true
>   client.id = 
>   connections.max.idle.ms = 54
>   enable.auto.commit = true
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = tcprtdetail_flink
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
>   ssl.truststore.type = JKS
>   value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
> e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that 
> the coordinatorUnknown() function in 
> [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
>  always returns true,and nextAutoCommitDeadline just increases 
> infinitly.Should there be a lookupCoordinator() after line 604 like in 
> [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
>  I add lookupCoordinator() next to line 604.The consumer can auto commit 
> offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2018-02-01 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349578#comment-16349578
 ] 

Jason Gustafson commented on KAFKA-6362:


[~RenkaiGe] Thanks for filing the issue and sorry for getting to it so late. As 
far as I can tell, this should only be affecting "simple" consumer usage with 
assign(). Can you confirm whether that's the case?

> auto commit not work since coordinatorUnknown() is always true.
> ---
>
> Key: KAFKA-6362
> URL: https://issues.apache.org/jira/browse/KAFKA-6362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1
>Reporter: Renkai Ge
>Assignee: huxihx
>Priority: Major
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>   auto.commit.interval.ms = 5000
>   auto.offset.reset = latest
>   bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
> 11.192.73.66:3002]
>   check.crcs = true
>   client.id = 
>   connections.max.idle.ms = 54
>   enable.auto.commit = true
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = tcprtdetail_flink
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
>   ssl.truststore.type = JKS
>   value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
> e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that 
> the coordinatorUnknown() function in 
> [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
>  always returns true,and nextAutoCommitDeadline just increases 
> infinitly.Should there be a lookupCoordinator() after line 604 like in 
> [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
>  I add lookupCoordinator() next to line 604.The consumer can auto commit 
> offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349560#comment-16349560
 ] 

ASF GitHub Bot commented on KAFKA-6487:
---

guozhangwang closed pull request #4495: KAFKA-6487: 
ChangeLoggingKeyValueBytesStore does not propagate delete
URL: https://github.com/apache/kafka/pull/4495
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 8dc457a9949..94ee275a3bf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -77,8 +77,8 @@ public void putAll(final List> 
entries) {
 
 @Override
 public byte[] delete(final Bytes key) {
-final byte[] oldValue = inner.get(key);
-put(key, null);
+final byte[] oldValue = inner.delete(key);
+changeLogger.logChange(key, null);
 return oldValue;
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
deleted file mode 100644
index ea9f7aa8713..000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class ChangeLoggingKeyValueStore extends 
WrappedStateStore.AbstractStateStore implements KeyValueStore {
-private final ChangeLoggingKeyValueBytesStore innerBytes;
-private final Serde keySerde;
-private final Serde valueSerde;
-private StateSerdes serdes;
-
-
-ChangeLoggingKeyValueStore(final KeyValueStore bytesStore,
-   final Serde keySerde,
-   final Serde valueSerde) {
-this(new ChangeLoggingKeyValueBytesStore(bytesStore), keySerde, 
valueSerde);
-}
-
-private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore 
bytesStore,
-   final Serde keySerde,
-   final Serde valueSerde) {
-super(bytesStore);
-this.innerBytes = bytesStore;
-this.keySerde = keySerde;
-this.valueSerde = valueSerde;
-}
-
-@SuppressWarnings("unchecked")
-@Override
-public void init(final ProcessorContext context, final StateStore root) {
-innerBytes.init(context, root);
-
-serdes = new 
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 innerBytes.name()),
-   keySerde == null ? (Serde) 
context.keySerde() : keySerde,
-   valueSerde == null ? (Serde) 
context.valueSerde() : valueSerde);
-}
-
-@Override
-public long approximateNumEntries() {
-return innerBytes.approximateNumEntries();
-}
-
-@Override
-public void put(final K key, final V value) {
-final Bytes bytesKey = 

[jira] [Commented] (KAFKA-6504) Connect: Some per-task-metrics not working

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349545#comment-16349545
 ] 

ASF GitHub Bot commented on KAFKA-6504:
---

rayokota opened a new pull request #4514: KAFKA-6504: Fix source task metric 
caused by copy-paste error
URL: https://github.com/apache/kafka/pull/4514
 
 
   This is a simple change to correct "sink-active-record-count" with 
"source-active-record-count" for the 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect: Some per-task-metrics not working
> --
>
> Key: KAFKA-6504
> URL: https://issues.apache.org/jira/browse/KAFKA-6504
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Minor
>
> Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at 
> least it seems like MBean 
> "kafka.connect:type=source-task-metrics,connector=,task=x" 
> attribute "source-record-active-count" reports the same number for all x 
> tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a 
> source-connector "my-connector" with 2 tasks that both run in the same 
> Kafka-Connect instance, but I know that only one of them actually produces 
> anything (and therefore can have "active source-records") both 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes 
> up (following each other). It should only go up for the one task that 
> actually produces something.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6386) Deprecate KafkaStreams constructor taking StreamsConfig parameter

2018-02-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6386:
---
Fix Version/s: 1.2.0

> Deprecate KafkaStreams constructor taking StreamsConfig parameter
> -
>
> Key: KAFKA-6386
> URL: https://issues.apache.org/jira/browse/KAFKA-6386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Minor
>  Labels: beginner, kip, newbie
> Fix For: 1.2.0
>
>
> Currently, {{KafkaStreams}} constructor has overloads that take either 
> {{Properties}} or {{StreamsConfig}} a parameters.
> Because {{StreamsConfig}} is immutable and is created from a {{Properties}} 
> object itself, the constructors accepting {{StreamsConfig}} are not useful 
> and adds only boiler plate code. Thus, we should deprecate those constructors 
> in order to remove them eventually.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6058) KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient

2018-02-01 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-6058:
---
Labels: kip-222  (was: needs-kip)

> KIP-222: Add "describe consumer groups" and "list consumer groups" to 
> KafkaAdminClient
> --
>
> Key: KAFKA-6058
> URL: https://issues.apache.org/jira/browse/KAFKA-6058
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Major
>  Labels: kip-222
>
> {{KafkaAdminClient}} does not allow to get information about consumer groups. 
> This feature is supported by old {{kafka.admin.AdminClient}} though.
> We should add {{KafkaAdminClient#describeConsumerGroups()}} and 
> {{KafkaAdminClient#listConsumerGroup()}}.
> Associated KIP: KIP-222



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6513) New Connect header support doesn't define `converter.type` property correctly

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349517#comment-16349517
 ] 

ASF GitHub Bot commented on KAFKA-6513:
---

rhauch opened a new pull request #4512: KAFKA-6513: Corrected how Converters 
and HeaderConverters are instantiated and configured
URL: https://github.com/apache/kafka/pull/4512
 
 
   The commits for KIP-145 (KAFKA-5142) changed how the Connect workers 
instantiate and configure the Converters, and also added the ability to do the 
same for the new HeaderConverters. However, the last few commits removed the 
default value for the `converter.type` property for Converters and 
HeaderConverters, and this broke how the internal converters were being created.
   
   This change corrects the behavior so that the `converter.type` property is 
always set by the worker (or by the Plugins class), which means the existing 
Converter implementations will not have to do this. The built-in JsonConverter, 
ByteArrayConverter, and StringConverter also implement HeaderConverter which 
implements Configurable, but the Worker and Plugins methods do not yet use the 
`Configurable.configure(Map)` method and instead still use the 
`Converter.configure(Map,boolean)`.
   
   Several tests were modified, and a new PluginsTest was added to verify the 
new behavior in Plugins for instantiating and configuring the Converter and 
HeaderConverter instances.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> New Connect header support doesn't define `converter.type` property correctly
> -
>
> Key: KAFKA-6513
> URL: https://issues.apache.org/jira/browse/KAFKA-6513
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 1.1.0
>
>
> The recent feature (KAFKA-5142) added a new {{converter.type}} to make the 
> {{Converter}} implementations now implement {{Configurable}}. However, the 
> worker is not correctly setting these new property types and is instead 
> incorrectly assuming the existing {{Converter}} implementations will set 
> them. For example:
> {noformat}
> Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
> Missing required configuration "converter.type" which has no default value.
> at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:472)
> at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:462)
> at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
> at 
> org.apache.kafka.connect.storage.ConverterConfig.(ConverterConfig.java:48)
> at 
> org.apache.kafka.connect.json.JsonConverterConfig.(JsonConverterConfig.java:59)
> at 
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:284)
> at 
> org.apache.kafka.connect.runtime.isolation.Plugins.newConfiguredPlugin(Plugins.java:77)
> at 
> org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:208)
> at org.apache.kafka.connect.runtime.Worker.(Worker.java:107)
> at 
> io.confluent.connect.replicator.ReplicatorApp.config(ReplicatorApp.java:104)
> at 
> io.confluent.connect.replicator.ReplicatorApp.main(ReplicatorApp.java:60)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6364) Add Second Check for End Offset During Restore

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349456#comment-16349456
 ] 

ASF GitHub Bot commented on KAFKA-6364:
---

bbejeck opened a new pull request #4511: KAFKA-6364: second check for ensuring 
changelog topic not changed during restore
URL: https://github.com/apache/kafka/pull/4511
 
 
   Added a second check for race condition where store changelog topic updated 
during restore, but not if a KTable changelog topic. 
   
   This will be tricky to test, but I wanted to push the PR to get feedback on 
the approach.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Second Check for End Offset During Restore
> --
>
> Key: KAFKA-6364
> URL: https://issues.apache.org/jira/browse/KAFKA-6364
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.0.2
>
>
> We need to re-check the ending offset when restoring a changelog topic to 
> guard against the race condition of an additional record appended to log 
> immediately on restoring start.  Also, need to add a check for KTable source 
> topic and if offset limit is set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6521) Store record timestamps in KTable stores

2018-02-01 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6521:
--

 Summary: Store record timestamps in KTable stores
 Key: KAFKA-6521
 URL: https://issues.apache.org/jira/browse/KAFKA-6521
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Currently, KTables store plain key-value pairs. However, it is desirable to 
also store a timestamp for the record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-02-01 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349439#comment-16349439
 ] 

Andy Bryant commented on KAFKA-6378:


Thanks [~guozhang] - much appreciated!

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>Assignee: Andy Bryant
>Priority: Major
> Fix For: 1.1.0, 1.0.1, 1.2.0
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349409#comment-16349409
 ] 

Gunnar Morling commented on KAFKA-3821:
---

Thanks for the pointers, I'll give it a try.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6515) Add toString() method to kafka connect Field class

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349389#comment-16349389
 ] 

Gunnar Morling commented on KAFKA-6515:
---

Filed a PR as shown above, but can't transition this to "Patch available" due 
to lack of permissions I reckon.

> Add toString() method to kafka connect Field class
> --
>
> Key: KAFKA-6515
> URL: https://issues.apache.org/jira/browse/KAFKA-6515
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Bartłomiej Tartanus
>Priority: Minor
>
> Currently testing is really painful:
> {code:java}
> org.apache.kafka.connect.data.Field@1d51df1f was not equal to 
> org.apache.kafka.connect.data.Field@c0d62cd8{code}
>  
> toString() method would fix this, so please add one. :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6515) Add toString() method to kafka connect Field class

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349386#comment-16349386
 ] 

ASF GitHub Bot commented on KAFKA-6515:
---

gunnarmorling opened a new pull request #4509: KAFKA-6515 Adding toString() 
method to o.a.k.connect.data.Field
URL: https://github.com/apache/kafka/pull/4509
 
 
   https://issues.apache.org/jira/browse/KAFKA-6515
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add toString() method to kafka connect Field class
> --
>
> Key: KAFKA-6515
> URL: https://issues.apache.org/jira/browse/KAFKA-6515
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Bartłomiej Tartanus
>Priority: Minor
>
> Currently testing is really painful:
> {code:java}
> org.apache.kafka.connect.data.Field@1d51df1f was not equal to 
> org.apache.kafka.connect.data.Field@c0d62cd8{code}
>  
> toString() method would fix this, so please add one. :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-02-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6520:
---
Labels: user-experience  (was: )

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Kohout
>Priority: Major
>  Labels: user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-02-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6520:
---
Component/s: streams

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Kohout
>Priority: Major
>  Labels: user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349240#comment-16349240
 ] 

ASF GitHub Bot commented on KAFKA-4750:
---

guozhangwang closed pull request #3430: KAFKA-4750: RocksDBStore always deletes 
null values
URL: https://github.com/apache/kafka/pull/3430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 70345927f24..528da666e6e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -245,7 +245,7 @@ public synchronized void put(K key, V value) {
 Objects.requireNonNull(key, "key cannot be null");
 validateStoreOpen();
 byte[] rawKey = serdes.rawKey(key);
-byte[] rawValue = serdes.rawValue(value);
+byte[] rawValue = value == null ? null : serdes.rawValue(value);
 putInternal(rawKey, rawValue);
 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreCustomNullSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreCustomNullSerdeTest.java
new file mode 100644
index 000..776747af211
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreCustomNullSerdeTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test checks RocksDBStore behaviour with serializer, which
+ * serializes null value into non-null byte array.
+ */
+public class RocksDBStoreCustomNullSerdeTest {
+private RocksDBStore subject;
+private MockProcessorContext context;
+
+@Before
+public void setUp() throws Exception {
+final Serializer serializer = new StringSerializer() {
+@Override
+public byte[] serialize(final String topic, final String data) {
+if (data == null) {
+return "null-encoding-that-is-not-just-'null'".getBytes();
+}
+return super.serialize(topic, data);
+}
+};
+final Serde serde = Serdes.serdeFrom(serializer, new 
StringDeserializer());
+subject = new RocksDBStore<>("test", serde, serde);
+context = new MockProcessorContext(
+TestUtils.tempDirectory(),
+serde,
+serde,
+new NoOpRecordCollector(),
+new ThreadCache("testCache", 0, new MockStreamsMetrics(new 
Metrics(;
+}
+
+@After
+public void tearDown() throws Exception {
+subject.close();
+}
+
+@Test
+public void shouldNotReturnDeletedInIterator() {
+subject.init(context, subject);
+subject.put("a", "1");
+subject.put("b", "2");
+subject.delete("a");
+final KeyValueIterator it = subject.all();
+while (it.hasNext()) {
+   

[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349235#comment-16349235
 ] 

ASF GitHub Bot commented on KAFKA-4750:
---

guozhangwang opened a new pull request #4508: KAFKA-4750: Bypass null value and 
treat it as deletes
URL: https://github.com/apache/kafka/pull/4508
 
 
   * If the value is null, bypass the serde and treat it as deletes in the 
inner-most underlying store.
   * Update javadocs, add unit tests accordingly
   
   This is originally contributed by @evis.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>Priority: Major
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349206#comment-16349206
 ] 

ASF GitHub Bot commented on KAFKA-6367:
---

bbejeck opened a new pull request #4507: KAFKA-6367: StateRestoreListener use 
actual last restored offset for restored batch
URL: https://github.com/apache/kafka/pull/4507
 
 
   
   Use the last actual restored offset for the 
`StoreRestoreListener.onBatchRestored` method.  Probably should update the docs 
to inform users this could include gaps in sequence due to commit markers.
   
   Updated existing tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix StateRestoreListener To Use Correct Ending Offset
> -
>
> Key: KAFKA-6367
> URL: https://issues.apache.org/jira/browse/KAFKA-6367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.0.2
>
>
> {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} 
> long  but the {{nextPosition}} is not correct, it should be the offset of the 
> latest restored offset, but {{nextPosition}} is the offset of the first not 
> restored offset.
> We can't automatically use {{nextPosition}} - 1 as this could be a commit 
> marker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-02-01 Thread Michael Kohout (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Kohout updated KAFKA-6520:
--
Description: 
When you execute the following scenario the application is always in RUNNING 
state
  
 1)start kafka
 2)start app, app connects to kafka and starts processing
 3)kill kafka(stop docker container)
 4)the application doesn't give any indication that it's no longer 
connected(Stream State is still RUNNING, and the uncaught exception handler 
isn't invoked)
  
  
 It would be useful if the Stream State had a DISCONNECTED status.
  
See 
[this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
for a discussion from the google user forum.  
[This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a related 
issue.

  was:
When you execute the following scenario the application is always in RUNNING 
state
 
1)start kafka
2)start app, app connects to kafka and starts processing
3)kill kafka(stop docker container)
4)the application doesn't give any indication that it's no longer 
connected(Stream State is still RUNNING, and the uncaught exception handler 
isn't invoked)
 
 
It would be useful if the Stream State had a DISCONNECTED status.
 
see 
[this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
for a discussion from the google user forum.


> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Bug
>Reporter: Michael Kohout
>Priority: Major
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-02-01 Thread Michael Kohout (JIRA)
Michael Kohout created KAFKA-6520:
-

 Summary: When a Kafka Stream can't communicate with the server, 
it's Status stays RUNNING
 Key: KAFKA-6520
 URL: https://issues.apache.org/jira/browse/KAFKA-6520
 Project: Kafka
  Issue Type: Bug
Reporter: Michael Kohout


When you execute the following scenario the application is always in RUNNING 
state
 
1)start kafka
2)start app, app connects to kafka and starts processing
3)kill kafka(stop docker container)
4)the application doesn't give any indication that it's no longer 
connected(Stream State is still RUNNING, and the uncaught exception handler 
isn't invoked)
 
 
It would be useful if the Stream State had a DISCONNECTED status.
 
see 
[this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
for a discussion from the google user forum.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349179#comment-16349179
 ] 

ASF GitHub Bot commented on KAFKA-6492:
---

becketqin closed pull request #4498: KAFKA-6492: Fix log truncation to empty 
segment
URL: https://github.com/apache/kafka/pull/4498
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 45c820bff8d..5970f42f6d9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -28,7 +28,7 @@ import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time}
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.math._
@@ -345,20 +345,23 @@ class LogSegment private[log] (val log: FileRecords,
*/
   @nonthreadsafe
   def truncateTo(offset: Long): Int = {
+// Do offset translation before truncating the index to avoid needless 
scanning
+// in case we truncate the full index
 val mapping = translateOffset(offset)
-if (mapping == null)
-  return 0
 offsetIndex.truncateTo(offset)
 timeIndex.truncateTo(offset)
 txnIndex.truncateTo(offset)
-// after truncation, reset and allocate more space for the (new currently  
active) index
+
+// After truncation, reset and allocate more space for the (new currently 
active) index
 offsetIndex.resize(offsetIndex.maxIndexSize)
 timeIndex.resize(timeIndex.maxIndexSize)
-val bytesTruncated = log.truncateTo(mapping.position)
-if(log.sizeInBytes == 0) {
+
+val bytesTruncated = if (mapping == null) 0 else 
log.truncateTo(mapping.position)
+if (log.sizeInBytes == 0) {
   created = time.milliseconds
   rollingBasedTimestamp = None
 }
+
 bytesSinceLastIndexEntry = 0
 if (maxTimestampSoFar >= 0)
   loadLargestTimestamp()
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 469b3cca40e..c45ed0d2986 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils.checkEquals
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -36,13 +36,16 @@ class LogSegmentTest {
   var logDir: File = _
 
   /* create a segment with the given base offset */
-  def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
+  def createSegment(offset: Long,
+indexIntervalBytes: Int = 10,
+maxSegmentMs: Int = Int.MaxValue,
+time: Time = Time.SYSTEM): LogSegment = {
 val ms = FileRecords.open(Log.logFile(logDir, offset))
 val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, 
maxIndexSize = 1000)
 val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, 
maxIndexSize = 1500)
 val txnIndex = new TransactionIndex(offset, 
Log.transactionIndexFile(logDir, offset))
-val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue,
-  maxSegmentBytes = Int.MaxValue, Time.SYSTEM)
+val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs,
+  maxSegmentBytes = Int.MaxValue, time)
 segments += seg
 seg
   }
@@ -157,6 +160,47 @@ class LogSegmentTest {
 }
   }
 
+  @Test
+  def testTruncateEmptySegment() {
+// This tests the scenario in which the follower truncates to an empty 
segment. In this
+// case we must ensure that the index is resized so that the log segment 
is not mistakenly
+// rolled due to a full index
+
+val maxSegmentMs = 30
+val time = new MockTime
+val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+seg.close()
+
+val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+assertEquals(0, seg.timeIndex.sizeInBytes)
+assertEquals(0, seg.offsetIndex.sizeInBytes)
+
+time.sleep(500)
+reopened.truncateTo(57)
+assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP))
+

[jira] [Updated] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-02-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6378:
-
Fix Version/s: 1.2.0
   1.1.0

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>Assignee: Andy Bryant
>Priority: Major
> Fix For: 1.1.0, 1.0.1, 1.2.0
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-02-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349046#comment-16349046
 ] 

Guozhang Wang commented on KAFKA-6378:
--

[~kiwiandy] I have added you to the contributor list so that in the future you 
can assign tickets to yourself. Thanks for your contribution!

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>Assignee: Andy Bryant
>Priority: Major
> Fix For: 1.0.1
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-02-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6378:


Assignee: Andy Bryant

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>Assignee: Andy Bryant
>Priority: Major
> Fix For: 1.0.1
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6499) Avoid creating dummy checkpoint files with no state stores

2018-02-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6499.
--
   Resolution: Fixed
Fix Version/s: 1.2.0
   1.1.0

> Avoid creating dummy checkpoint files with no state stores
> --
>
> Key: KAFKA-6499
> URL: https://issues.apache.org/jira/browse/KAFKA-6499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 1.1.0, 1.2.0
>
>
> Today, for a streams task that contains no state stores, its processor state 
> manager would still write a dummy checkpoint file that contains some 
> characters (version, size). This introduces unnecessary disk IOs and should 
> be avoidable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6499) Avoid creating dummy checkpoint files with no state stores

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349027#comment-16349027
 ] 

ASF GitHub Bot commented on KAFKA-6499:
---

guozhangwang closed pull request #4492: KAFKA-6499: Do not write offset 
checkpoint file with empty offset map
URL: https://github.com/apache/kafka/pull/4492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index d38776239a9..b270e03f2e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -65,7 +65,7 @@ public void reinitializeStateStoresForPartitions(final Logger 
log,
 try {
 checkpoint.write(checkpointableOffsets);
 } catch (final IOException fatalException) {
-log.error("Failed to update checkpoint file for global stores.", 
fatalException);
+log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
 throw new StreamsException("Failed to reinitialize global store.", 
fatalException);
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2d4ee8fe613..56e6bed0850 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -339,7 +339,7 @@ public void checkpoint(final Map 
offsets) {
 try {
 checkpoint.write(checkpointableOffsets);
 } catch (IOException e) {
-log.warn("Failed to write offsets checkpoint for global 
globalStores", e);
+log.warn("Failed to write offset checkpoint file to {} for 
global stores: {}", checkpoint, e);
 }
 }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1ee0e146a09..e7a23bd4b5f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -294,7 +294,6 @@ public void close(final Map 
ackedOffsets) throws Processor
 // write the checkpoint
 @Override
 public void checkpoint(final Map ackedOffsets) {
-log.trace("Writing checkpoint: {}", ackedOffsets);
 checkpointableOffsets.putAll(changelogReader.restoredOffsets());
 for (final StateStore store : stores.values()) {
 final String storeName = store.name();
@@ -311,14 +310,16 @@ public void checkpoint(final Map 
ackedOffsets) {
 }
 }
 }
-// write the checkpoint file before closing, to indicate clean shutdown
+// write the checkpoint file before closing
+if (checkpoint == null) {
+checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
+}
+
+log.trace("Writing checkpoint: {}", checkpointableOffsets);
 try {
-if (checkpoint == null) {
-checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
-}
 checkpoint.write(checkpointableOffsets);
 } catch (final IOException e) {
-log.warn("Failed to write checkpoint file to {}:", new 
File(baseDir, CHECKPOINT_FILE_NAME), e);
+log.warn("Failed to write offset checkpoint file to {}: {}", 
checkpoint, e);
 }
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 8c147373ccf..9f0e1f8c50c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -66,6 +66,11 @@ public OffsetCheckpoint(final File file) {
  * @throws IOException if any file operation fails with an IO exception
  */
   

[jira] [Comment Edited] (KAFKA-6475) ConfigException on the broker results in UnknownServerException in the admin client

2018-02-01 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349019#comment-16349019
 ] 

Xavier Léauté edited comment on KAFKA-6475 at 2/1/18 6:04 PM:
--

I was able to reproduce the problem by modifying the topic configuration  
{{file.delete.delay.ms}} to an invalid string, e.g. {{abc123}}

This results in the following exception

{{org.apache.kafka.common.errors.UnknownServerException: Invalid value abc123 
for configuration file.delete.delay.ms: Not a number of type LONG}}


was (Author: xvrl):
I was able to reproduce the problem by modifying the topic configuration  
{{file.delete.delay.ms}} to an invalid string, e.g. {{abc123}}

This results in the following exception

{{org.apache.kafka.common.errors.UnknownServerException: Invalid value 
94310fdaf43 for configuration file.delete.delay.ms: Not a number of type LONG}}

> ConfigException on the broker results in UnknownServerException in the admin 
> client
> ---
>
> Key: KAFKA-6475
> URL: https://issues.apache.org/jira/browse/KAFKA-6475
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Xavier Léauté
>Assignee: Colin P. McCabe
>Priority: Minor
>
> Calling AdminClient.alterConfigs with an invalid configuration may cause 
> ConfigException to be thrown on the broker side, which results in an 
> UnknownServerException thrown by the admin client. It would probably make 
> more sense for the admin client to throw InvalidConfigurationException in 
> that case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6475) ConfigException on the broker results in UnknownServerException in the admin client

2018-02-01 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349019#comment-16349019
 ] 

Xavier Léauté commented on KAFKA-6475:
--

I was able to reproduce the problem by modifying the topic configuration  
{{file.delete.delay.ms}} to an invalid string, e.g. {{abc123}}

This results in the following exception

{{org.apache.kafka.common.errors.UnknownServerException: Invalid value 
94310fdaf43 for configuration file.delete.delay.ms: Not a number of type LONG}}

> ConfigException on the broker results in UnknownServerException in the admin 
> client
> ---
>
> Key: KAFKA-6475
> URL: https://issues.apache.org/jira/browse/KAFKA-6475
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Xavier Léauté
>Assignee: Colin P. McCabe
>Priority: Minor
>
> Calling AdminClient.alterConfigs with an invalid configuration may cause 
> ConfigException to be thrown on the broker side, which results in an 
> UnknownServerException thrown by the admin client. It would probably make 
> more sense for the admin client to throw InvalidConfigurationException in 
> that case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6489) Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata refresh topics set.

2018-02-01 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved KAFKA-6489.
-
Resolution: Fixed

> Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata 
> refresh topics set.
> --
>
> Key: KAFKA-6489
> URL: https://issues.apache.org/jira/browse/KAFKA-6489
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently if users call KafkaConsumer.offsetsForTimes() with a large set of 
> partitions. The consumer will add one topic at a time for the metadata 
> refresh. We should add all the topics to the metadata topics and just do one 
> metadata refresh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349012#comment-16349012
 ] 

ASF GitHub Bot commented on KAFKA-6514:
---

allenxwang opened a new pull request #4506: KAFKA-6514: Add API version as a 
tag for the RequestsPerSec metric
URL: https://github.com/apache/kafka/pull/4506
 
 
   Updated `RequestChannel` to include `version` as a tag for all 
RequestsPerSec metrics. Updated tests to verify that the extra tag exists.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add API version as a tag for the RequestsPerSec metric
> --
>
> Key: KAFKA-6514
> URL: https://issues.apache.org/jira/browse/KAFKA-6514
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Allen Wang
>Priority: Major
>
> After we upgrade broker to a new version, one important insight is to see how 
> many clients have been upgraded so that we can switch the message format when 
> most of the clients have also been updated to the new version to minimize the 
> performance penalty. 
> RequestsPerSec with the version tag will give us that insight.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6489) Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata refresh topics set.

2018-02-01 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348949#comment-16348949
 ] 

Jiangjie Qin commented on KAFKA-6489:
-

[~damianguy] Can we include this in 1.1? This patch is important for usability 
of the consumer group tool.

> Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata 
> refresh topics set.
> --
>
> Key: KAFKA-6489
> URL: https://issues.apache.org/jira/browse/KAFKA-6489
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently if users call KafkaConsumer.offsetsForTimes() with a large set of 
> partitions. The consumer will add one topic at a time for the metadata 
> refresh. We should add all the topics to the metadata topics and just do one 
> metadata refresh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6489) Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata refresh topics set.

2018-02-01 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-6489:

Fix Version/s: (was: 1.2.0)
   1.1.0

> Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata 
> refresh topics set.
> --
>
> Key: KAFKA-6489
> URL: https://issues.apache.org/jira/browse/KAFKA-6489
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently if users call KafkaConsumer.offsetsForTimes() with a large set of 
> partitions. The consumer will add one topic at a time for the metadata 
> refresh. We should add all the topics to the metadata topics and just do one 
> metadata refresh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6489) Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata refresh topics set.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348905#comment-16348905
 ] 

ASF GitHub Bot commented on KAFKA-6489:
---

becketqin closed pull request #4478: KAFKA-6489; 
Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.
URL: https://github.com/apache/kafka/pull/4478
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5dc0b26f8fb..6d56139118a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -604,11 +604,14 @@ private long endTimestamp() {
 final Map timestampsToSearch) {
 // Group the partitions by node.
 final Map> timestampsToSearchByNode = 
new HashMap<>();
+// Add the topics to the metadata to do a single metadata fetch.
+for (TopicPartition tp : timestampsToSearch.keySet())
+metadata.add(tp.topic());
+
 for (Map.Entry entry: 
timestampsToSearch.entrySet()) {
 TopicPartition tp  = entry.getKey();
 PartitionInfo info = metadata.fetch().partition(tp);
 if (info == null) {
-metadata.add(tp.topic());
 log.debug("Partition {} is unknown for fetching offset, wait 
for metadata refresh", tp);
 return RequestFuture.staleMetadata();
 } else if (info.leader() == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8b334729247..d843414fd7a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -198,6 +198,12 @@ public void send(ClientRequest request, long now) {
 if (metadataUpdate == null)
 metadata.update(metadata.fetch(), this.unavailableTopics, 
time.milliseconds());
 else {
+if (metadataUpdate.expectMatchRefreshTopics
+&& 
!metadata.topics().equals(metadataUpdate.cluster.topics())) {
+throw new IllegalStateException("The metadata topics does 
not match expectation. "
++ "Expected topics: " 
+ metadataUpdate.cluster.topics()
++ ", asked topics: " + 
metadata.topics());
+}
 this.unavailableTopics = metadataUpdate.unavailableTopics;
 metadata.update(metadataUpdate.cluster, 
metadataUpdate.unavailableTopics, time.milliseconds());
 }
@@ -344,7 +350,13 @@ public void reset() {
 }
 
 public void prepareMetadataUpdate(Cluster cluster, Set 
unavailableTopics) {
-metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics));
+metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, 
false));
+}
+
+public void prepareMetadataUpdate(Cluster cluster,
+  Set unavailableTopics,
+  boolean expectMatchMetadataTopics) {
+metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, 
expectMatchMetadataTopics));
 }
 
 public void setNode(Node node) {
@@ -433,9 +445,11 @@ public void setNodeApiVersions(NodeApiVersions 
nodeApiVersions) {
 private static class MetadataUpdate {
 final Cluster cluster;
 final Set unavailableTopics;
-MetadataUpdate(Cluster cluster, Set unavailableTopics) {
+final boolean expectMatchRefreshTopics;
+MetadataUpdate(Cluster cluster, Set unavailableTopics, boolean 
expectMatchRefreshTopics) {
 this.cluster = cluster;
 this.unavailableTopics = unavailableTopics;
+this.expectMatchRefreshTopics = expectMatchRefreshTopics;
 }
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 26d7a50cf60..a3ea79356f9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.ApiVersions;
 import 

[jira] [Updated] (KAFKA-6253) Improve sink connector topic regex validation

2018-02-01 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6253:
-
Fix Version/s: (was: 1.2.0)
   1.1.0

> Improve sink connector topic regex validation
> -
>
> Key: KAFKA-6253
> URL: https://issues.apache.org/jira/browse/KAFKA-6253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Jeff Klukas
>Priority: Major
> Fix For: 1.1.0
>
>
> KAFKA-3073 adds topic regex support for sink connectors. The addition 
> requires that you only specify one of topics or topics.regex settings. This 
> is being validated in one place, but not during submission of connectors. We 
> should improve this since this means it's possible to get a bad connector 
> config into the config topic.
> For more detailed discussion, see 
> https://github.com/apache/kafka/pull/4151#pullrequestreview-77300221



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2018-02-01 Thread John Chu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348894#comment-16348894
 ] 

John Chu commented on KAFKA-3702:
-

Anybody have ever encountered this one? WARN: Failed to send SSL Close message 
java.io.IOException: Unexpected status returned by SSLEngine.wrap, expected 
CLOSED, received OK.

Seems similar but message is a little bit different. 

https://issues.apache.org/jira/browse/KAFKA-6510

 

> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6518) Kafka broker get stops after configured log retention time and throws I/O exception in append to log

2018-02-01 Thread Rajendra Jangir (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajendra Jangir updated KAFKA-6518:
---
Attachment: I_O_Exception_In_Append_To_Log.png

> Kafka broker get stops after configured log retention time and throws  I/O 
> exception in append to log
> -
>
> Key: KAFKA-6518
> URL: https://issues.apache.org/jira/browse/KAFKA-6518
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.2.0
> Environment: Windows 7, Java version 9.0.4, kafka version- 0.10.2.0, 
> zookeeper version -3.3.6
>Reporter: Rajendra Jangir
>Priority: Major
> Fix For: 0.10.0.2
>
> Attachments: I_O_Exception_In_Append_To_Log.png
>
>
> I am facing one serious issue in kafka. I have multiple kafka brokers. And I 
> am producing approx 100-200 messages per second through a python script. 
> Initially it works fine without any exception. When it comes to log retention 
> time(5 mins in my case) then it throws I/O exception in append to log  and 
> finally kafka broker get stops. And it throws following error -
> [2018-02-01 17:18:39,168] FATAL [Replica Manager on Broker 3]: Halting due to 
> unrecoverable I/O error while handling produce request: 
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'rjangir1-22'
>  at kafka.log.Log.append(Log.scala:349)
>  at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>  at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>  at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>  at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>  at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>  at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>  at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>  at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:416)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>  at java.base/java.lang.Thread.run(Thread.java:844)
> Caused by: java.io.IOException: The requested operation cannot be performed 
> on a file with a user-mapped section open
>  at java.base/java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:125)
>  at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:116)
>  at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:175)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:174)
>  at kafka.log.Log.roll(Log.scala:774)
>  at kafka.log.Log.maybeRoll(Log.scala:745)
>  at kafka.log.Log.append(Log.scala:405)
>  ... 22 more
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6518) Kafka broker get stops after configured log retention time and throws I/O exception in append to log

2018-02-01 Thread Rajendra Jangir (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajendra Jangir updated KAFKA-6518:
---
Attachment: (was: AppendToLogError.PNG)

> Kafka broker get stops after configured log retention time and throws  I/O 
> exception in append to log
> -
>
> Key: KAFKA-6518
> URL: https://issues.apache.org/jira/browse/KAFKA-6518
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.2.0
> Environment: Windows 7, Java version 9.0.4, kafka version- 0.10.2.0, 
> zookeeper version -3.3.6
>Reporter: Rajendra Jangir
>Priority: Major
> Fix For: 0.10.0.2
>
>
> I am facing one serious issue in kafka. I have multiple kafka brokers. And I 
> am producing approx 100-200 messages per second through a python script. 
> Initially it works fine without any exception. When it comes to log retention 
> time(5 mins in my case) then it throws I/O exception in append to log  and 
> finally kafka broker get stops. And it throws following error -
> [2018-02-01 17:18:39,168] FATAL [Replica Manager on Broker 3]: Halting due to 
> unrecoverable I/O error while handling produce request: 
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'rjangir1-22'
>  at kafka.log.Log.append(Log.scala:349)
>  at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>  at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>  at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>  at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>  at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>  at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>  at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>  at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>  at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:416)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>  at java.base/java.lang.Thread.run(Thread.java:844)
> Caused by: java.io.IOException: The requested operation cannot be performed 
> on a file with a user-mapped section open
>  at java.base/java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:125)
>  at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:116)
>  at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:175)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:174)
>  at kafka.log.Log.roll(Log.scala:774)
>  at kafka.log.Log.maybeRoll(Log.scala:745)
>  at kafka.log.Log.append(Log.scala:405)
>  ... 22 more
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-02-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348685#comment-16348685
 ] 

Randall Hauch commented on KAFKA-3821:
--

See also KAFKA-6080, which allows a source connector to expose transaction 
boundaries via new {{SourceRecord}} subclasses. That fits nicely with my 
earlier suggestion to use an {{OffsetRecord}} subclass.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-02-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348676#comment-16348676
 ] 

Randall Hauch commented on KAFKA-3821:
--

[~gunnar.morling], that'd be great. The 
[CONTRIBUTING.md|https://github.com/apache/kafka/blob/trunk/CONTRIBUTING.md] 
file in the AK git repo has the links to the documentation about how/when to 
create a KIP and what the process is. Let me know if that's not enough.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6518) Kafka broker get stops after configured log retention time and throws I/O exception in append to log

2018-02-01 Thread Rajendra Jangir (JIRA)
Rajendra Jangir created KAFKA-6518:
--

 Summary: Kafka broker get stops after configured log retention 
time and throws  I/O exception in append to log
 Key: KAFKA-6518
 URL: https://issues.apache.org/jira/browse/KAFKA-6518
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.10.2.0
 Environment: Windows 7, Java version 9.0.4, kafka version- 0.10.2.0, 
zookeeper version -3.3.6
Reporter: Rajendra Jangir
 Fix For: 0.10.0.2
 Attachments: AppendToLogError.PNG

I am facing one serious issue in kafka. I have multiple kafka brokers. And I am 
producing approx 100-200 messages per second through a python script. Initially 
it works fine without any exception. When it comes to log retention time(5 mins 
in my case) then it throws I/O exception in append to log  and finally kafka 
broker get stops. And it throws following error -

[2018-02-01 17:18:39,168] FATAL [Replica Manager on Broker 3]: Halting due to 
unrecoverable I/O error while handling produce request: 
(kafka.server.ReplicaManager)
kafka.common.KafkaStorageException: I/O exception in append to log 'rjangir1-22'
 at kafka.log.Log.append(Log.scala:349)
 at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
 at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
 at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
 at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
 at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:416)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
 at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.io.IOException: The requested operation cannot be performed on 
a file with a user-mapped section open
 at java.base/java.io.RandomAccessFile.setLength(Native Method)
 at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:125)
 at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
 at kafka.log.AbstractIndex.resize(AbstractIndex.scala:116)
 at 
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:175)
 at 
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:175)
 at 
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:175)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
 at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:174)
 at kafka.log.Log.roll(Log.scala:774)
 at kafka.log.Log.maybeRoll(Log.scala:745)
 at kafka.log.Log.append(Log.scala:405)
 ... 22 more

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6516) KafkaProducer retries indefinitely to authenticate on SaslAuthenticationException

2018-02-01 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar resolved KAFKA-6516.
--
Resolution: Won't Fix

thanks, [~rsivaram] so my expectation was invalid then

> KafkaProducer retries indefinitely to authenticate on 
> SaslAuthenticationException
> -
>
> Key: KAFKA-6516
> URL: https://issues.apache.org/jira/browse/KAFKA-6516
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: Edoardo Comar
>Priority: Major
>
> Even after https://issues.apache.org/jira/browse/KAFKA-5854
> the producer's (background) polling thread keeps retrying to authenticate.
> if the future returned by KafkaProducer.send is not resolved.
> The current test
> {{org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest.testProducerWithInvalidCredentials()}}
> passes because it relies on the future being resolved.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6517) ZooKeeperClient holds a lock while waiting for responses, blocking shutdown

2018-02-01 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6517:
-

 Summary: ZooKeeperClient holds a lock while waiting for responses, 
blocking shutdown
 Key: KAFKA-6517
 URL: https://issues.apache.org/jira/browse/KAFKA-6517
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 1.1.0


Stack traces from a local test run that was deadlocked because shutdown 
couldn't acquire the lock:
 # kafka-scheduler-7: acquired read lock in 
kafka.zookeeper.ZooKeeperClient.handleRequests
 # Test worker-EventThread waiting for write lock to process SessionExpired in 
kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process
 # ForkJoinPool-1-worker-11 processing KafkaServer.shutdown is queued behind 2) 
waiting to acquire read lock for 
kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler

Stack traces of the relevant threads:

{quote}
"kafka-scheduler-7" daemon prio=5 tid=0x7fade918d800 nid=0xd317 waiting on 
condition [0x7b371000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0007e4c6e698> (a 
java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
    at 
kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:146)
    at 
kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:126)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
    at 
kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:125)
    at 
kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1432)
    at 
kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1425)
    at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:583)
    at 
kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
    at 
kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:665)
    at kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:509)
    at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
    at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
    at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
    at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:499)
    at 
kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
    at 
kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)

..

"Test worker-EventThread" daemon prio=5 tid=0x7fade90cf800 nid=0xef13 
waiting on condition [0x7a23f000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000781847620> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
    at 
java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
    at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
    at 
kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:355)
    at 
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
    at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)

 

"ForkJoinPool-1-worker-11" daemon prio=5 tid=0x7fade9a83000 nid=0x17907 
waiting on condition [0x700011eaf000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  

[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-02-01 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348515#comment-16348515
 ] 

Gunnar Morling commented on KAFKA-3821:
---

This is a feature which would be very helpful for the Debezium project. If no 
one else is working on it yet, I'd like to give it a try. Is there a 
description of the process to follow? I.e. where could I find some information 
on how I had to go about proposing a KIP for this?

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6516) KafkaProducer retries indefinitely to authenticate on SaslAuthenticationException

2018-02-01 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348499#comment-16348499
 ] 

Rajini Sivaram commented on KAFKA-6516:
---

[~ecomar] We don't stop the sender thread if authentication fails. The 
intention was to provide feedback via an exception if the producer was 
configured with invalid credentials. But we don't close the producer - we 
expect that the application will see the exception and close the producer and 
hence the sender thread is left running until the producer is closed.

If authentication fails because a producer makes a connection before its 
credentials are available on a broker, we throw an exception to indicate that 
authentication failed, but we allow the application to retry. This should work 
because we don't close the producer and the sender thread is still active.

 

 

> KafkaProducer retries indefinitely to authenticate on 
> SaslAuthenticationException
> -
>
> Key: KAFKA-6516
> URL: https://issues.apache.org/jira/browse/KAFKA-6516
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: Edoardo Comar
>Priority: Major
>
> Even after https://issues.apache.org/jira/browse/KAFKA-5854
> the producer's (background) polling thread keeps retrying to authenticate.
> if the future returned by KafkaProducer.send is not resolved.
> The current test
> {{org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest.testProducerWithInvalidCredentials()}}
> passes because it relies on the future being resolved.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5327) Console Consumer should only poll for up to max messages

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5327:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Console Consumer should only poll for up to max messages
> 
>
> Key: KAFKA-5327
> URL: https://issues.apache.org/jira/browse/KAFKA-5327
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Dustin Cote
>Assignee: huxihx
>Priority: Minor
> Fix For: 1.2.0
>
>
> The ConsoleConsumer has a --max-messages flag that can be used to limit the 
> number of messages consumed. However, the number of records actually consumed 
> is governed by max.poll.records. This means you see one message on the 
> console, but your offset has moved forward a default of 500, which is kind of 
> counterintuitive. It would be good to only commit offsets for messages we 
> have printed to the console.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5792) Transient failure in KafkaAdminClientTest.testHandleTimeout

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5792:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Transient failure in KafkaAdminClientTest.testHandleTimeout
> ---
>
> Key: KAFKA-5792
> URL: https://issues.apache.org/jira/browse/KAFKA-5792
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Colin P. McCabe
>Priority: Major
>  Labels: transient-unit-test-failure
> Fix For: 1.2.0
>
>
> The {{KafkaAdminClientTest.testHandleTimeout}} test occasionally fails with 
> the following:
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClientTest.testHandleTimeout(KafkaAdminClientTest.java:356)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6240) Support dynamic updates of frequently updated broker configs

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6240:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Support dynamic updates of frequently updated broker configs
> 
>
> Key: KAFKA-6240
> URL: https://issues.apache.org/jira/browse/KAFKA-6240
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> See 
> [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  for details.
> Implementation will be done under sub-tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6007:
--
Fix Version/s: (was: 1.0.2)
   (was: 0.11.0.3)
   (was: 1.1.0)
   1.2.0

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 1.2.0
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6476) Document dynamic config update

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6476:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Document dynamic config update
> --
>
> Key: KAFKA-6476
> URL: https://issues.apache.org/jira/browse/KAFKA-6476
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, documentation
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> Add documentation for dynamic broker config update.
> Include:
>   - Command line options for kafka-configs.sh with examples
>   - Configs that can be updated along with constraints applied



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6452) Add documentation for delegation token authentication mechanism

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6452:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Add documentation for delegation token authentication mechanism
> ---
>
> Key: KAFKA-6452
> URL: https://issues.apache.org/jira/browse/KAFKA-6452
> Project: Kafka
>  Issue Type: Sub-task
>  Components: documentation
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6111) Tests for KafkaZkClient

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6111:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Tests for KafkaZkClient
> ---
>
> Key: KAFKA-6111
> URL: https://issues.apache.org/jira/browse/KAFKA-6111
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 1.2.0
>
>
> Some methods in KafkaZkClient have no tests at the moment and we need to fix 
> that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5834) AbstractConfig.logUnused() may log confusing warning information.

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5834:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> AbstractConfig.logUnused() may log confusing warning information.
> -
>
> Key: KAFKA-5834
> URL: https://issues.apache.org/jira/browse/KAFKA-5834
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.2.0
>
>
> Currently {{AbstractConfig.logUnused()}} logs unused configurations in at 
> WARN level. It is a little weird because as long as there is a configurable 
> class taking a configuration, that configuration will be logged as unused at 
> WARN level even if it is actually used. It seems better to make it an INFO 
> level logging instead, or maybe it can take a log level argument to allow 
> caller to decide which log level should be used.
> [~hachikuji] [~ijuma] what do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6492:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.2.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5857) Excessive heap usage on controller node during reassignment

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5857:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Excessive heap usage on controller node during reassignment
> ---
>
> Key: KAFKA-5857
> URL: https://issues.apache.org/jira/browse/KAFKA-5857
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.11.0.0
> Environment: CentOs 7, Java 1.8
>Reporter: Raoufeh Hashemian
>Priority: Major
>  Labels: reliability
> Fix For: 1.2.0
>
> Attachments: CPU.png, disk_write_x.png, memory.png, 
> reassignment_plan.txt
>
>
> I was trying to expand our kafka cluster of 6 broker nodes to 12 broker 
> nodes. 
> Before expansion, we had a single topic with 960 partitions and a replication 
> factor of 3. So each node had 480 partitions. The size of data in each node 
> was 3TB . 
> To do the expansion, I submitted a partition reassignment plan (see attached 
> file for the current/new assignments). The plan was optimized to minimize 
> data movement and be rack aware. 
> When I submitted the plan, it took approximately 3 hours for moving data from 
> old to new nodes to complete. After that, it started deleting source 
> partitions (I say this based on the number of file descriptors) and 
> rebalancing leaders which has not been successful. Meanwhile, the heap usage 
> in the controller node started to go up with a large slope (along with long 
> GC times) and it took 5 hours for the controller to go out of memory and 
> another controller started to have the same behaviour for another 4 hours. At 
> this time the zookeeper ran out of disk and the service stopped.
> To recover from this condition:
> 1) Removed zk logs to free up disk and restarted all 3 zk nodes
> 2) Deleted /kafka/admin/reassign_partitions node from zk
> 3) Had to do unclean restarts of kafka service on oom controller nodes which 
> took 3 hours to complete  . After this stage there was still 676 under 
> replicated partitions.
> 4) Do a clean restart on all 12 broker nodes.
> After step 4 , number of under replicated nodes went to 0.
> So I was wondering if this memory footprint from controller is expected for 
> 1k partitions ? Did we do sth wrong or it is a bug?
> Attached are some resource usage graph during this 30 hours event and the 
> reassignment plan. I'll try to add log files as well



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5870) Idempotent producer: a producerId reset causes undesirable behavior for inflight batches to other partitions

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5870:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Idempotent producer: a producerId reset causes undesirable behavior for 
> inflight batches to other partitions
> 
>
> Key: KAFKA-5870
> URL: https://issues.apache.org/jira/browse/KAFKA-5870
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
>  Labels: exactly-once
> Fix For: 1.2.0
>
>
> Currently, if we have to reset the producer id for any reason (for instance 
> if batches to a partition get expired, if we get an 
> {{OutOfOrderSequenceException}}, etc) we could cause batches to other 
> --healthy-- partitions to fail with a spurious 
> {{OutOfOrderSequenceException}}.
> This is detailed in this PR discussion: 
> https://github.com/apache/kafka/pull/3743#discussion_r137907630
> Ideally, we would want all inflight batches to be handled to completion 
> rather than potentially failing them prematurely. Further, since we want to 
> tighten up the semantics of the {{OutOfOrderSequenceException}}, at the very 
> least we should raise another exception in this case, because there is no 
> data loss on the broker when the client gives up. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6292) KafkaConsumer ran into Unknown error fetching data for topic-partition caused by integer overflow in FileLogInputStream

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6292:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> KafkaConsumer ran into Unknown error fetching data for topic-partition caused 
> by integer overflow in FileLogInputStream 
> 
>
> Key: KAFKA-6292
> URL: https://issues.apache.org/jira/browse/KAFKA-6292
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2
> Environment: OS:Red Hat Enterprise Linux Server release 7.3 (Maipo)
> Kafka: kafka_2.12-0.11.0.0
> JDK: jdk1.8.0_121
>Reporter: Terence Yi
>Priority: Major
>  Labels: easyfix, reliability
> Fix For: 1.2.0
>
>
> Steps to reproduce:
> * Broker config to reproduce this bug:
> {code:java}
>   # The maximum size of a log segment file. When this size is reached a new 
> log segment will be created.
> #2G
> log.segment.bytes=2147483647
> {code}
> * Setups:
> producer sends messages constantly. 
> consumer polling
> topic has 1 partitions and replication factor 1.
> min.insync.replicas=1
> producer has "acks=all"
> consumer has default "enable.auto.commit=false"
> consumer manually commitSync offsets after handling messages.
> kafka in standalone
> * Observe log in consumer side(for me running 12 hours)
> {code:java}
> 2017-12-18 07:11:01.013 WARN sep105v1 
> [app-consumer-subscription-pool-4-thread-20] 
> org.apache.kafka.clients.consumer.internals.Fetcher {} Unknown error fetching 
> data for topic-partition DDI.DISPATCHER.P_TVIN.W_SL.P_appx.P_ul.P_pos-0
> {code}
> * Observe server.log in Kafka/logs
> {code:java}
> [2017-12-14 04:52:21,144] ERROR [Replica Manager on Broker 3]: Error 
> processing fetch operation on partition 
> DDI.DISPATCHER.P_TVIN.W_SL.P_appx.P_ul.P_pos-0, offset 4043314339 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read 
> `log header` from file channel `sun.nio.ch.FileChannelImpl@5604ea91`. 
> Expected to read 17 bytes, but reached end of file after reading 0 bytes. 
> Started read from position 2147483643.
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40)
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
> at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> at 
> org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:279)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:176)
> at kafka.log.LogSegment.read(LogSegment.scala:228)
> at kafka.log.Log.read(Log.scala:938)
> at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:719)
> at 
> kafka.server.ReplicaManager.$anonfun$readFromLocalLog$6(ReplicaManager.scala:780)
> at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
> at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:779)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:617)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:615)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException: Failed to read `log header` from file 
> channel `sun.nio.ch.FileChannelImpl@5604ea91`. Expected to read 17 bytes, but 
> reached end of file after reading 0 bytes. Started read from position 
> 2147483643.
> at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:751)
> at 
> org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:66)
> at 
> org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:40)
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
> ... 18 more
> {code}
> * Impact:
> # After EOF exception occurs, the consumer will failed to consume the remain 
> message
> # After the segments log files which cause the EOF exception has been deleted 
> by the log Cleaner thread. Consumer recovered to consumer message.
> # Have no impact from the view of producer
> * Analysis:

[jira] [Updated] (KAFKA-6463) Review logging level for user errors in AdminManager

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6463:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Review logging level for user errors in AdminManager
> 
>
> Key: KAFKA-6463
> URL: https://issues.apache.org/jira/browse/KAFKA-6463
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> AdminManager currently logs errors due to bad requests at INFO level (e.g. 
> alter configs with bad value). In other components, I think we only log user 
> errors are either not logged or logged at a lower logging level. We should 
> review logging in AdminManager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6342) Move workaround for JSON parsing of non-escaped strings

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6342:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Move workaround for JSON parsing of non-escaped strings
> ---
>
> Key: KAFKA-6342
> URL: https://issues.apache.org/jira/browse/KAFKA-6342
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Umesh Chaudhary
>Priority: Major
> Fix For: 1.2.0
>
>
> KAFKA-6319 added a workaround to parse invalid JSON persisted using older 
> versions of Kafka because special characters were not escaped. The workaround 
> is required in 1.0.1 to enable parsing invalid JSON from ACL configs in 
> ZooKeeper. We can move the workaround out of kafka.utils.Json#parseFull for 
> 1.1.0 so that it is applied only to ACLs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6028) Improve the quota throttle communication.

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6028:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Improve the quota throttle communication.
> -
>
> Key: KAFKA-6028
> URL: https://issues.apache.org/jira/browse/KAFKA-6028
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.2.0
>
>
> Currently if a client is throttled duet to quota violation, the broker will 
> only send back a response to the clients after the throttle time has passed. 
> In this case, the clients don't know how long the response will be throttled 
> and might hit request timeout before the response is returned. As a result 
> the clients will retry sending a request and results a even longer throttle 
> time.
> The above scenario could happen when a large clients group sending records to 
> the brokers. We saw this when a MapReduce job pushes data to the Kafka 
> cluster.
> To improve this, the broker can return the response with throttle time 
> immediately after processing the requests. After that, the broker will mute 
> the channel for this client. A correct client implementation should back off 
> for that long before sending the next request. If the client ignored the 
> throttle time and send the next request immediately, the channel will be 
> muted and the request won't be processed until the throttle time has passed.
> A KIP will follow with more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6415) KafkaLog4jAppender deadlocks when logging from producer network thread

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6415:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> KafkaLog4jAppender deadlocks when logging from producer network thread
> --
>
> Key: KAFKA-6415
> URL: https://issues.apache.org/jira/browse/KAFKA-6415
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> When a log entry is appended to a Kafka topic using KafkaLog4jAppender, the 
> producer.send operation may block waiting for metadata. This can result in 
> deadlocks in a couple of scenarios if a log entry from the producer network 
> thread is also at a log level that results in the entry being appended to a 
> Kafka topic.
> 1. Producer's network thread will attempt to send data to a Kafka topic and 
> this is unsafe since producer.send may block waiting for metadata, causing a 
> deadlock since the thread will not process the metadata request/response.
> 2. KafkaLog4jAppender#append is invoked while holding the lock of the logger. 
> So the thread waiting for metadata in the initial send will be holding the 
> logger lock. If the producer network thread has.a log entry that needs to be 
> appended, it will attempt to acquire the logger lock and deadlock.
> This was probably the case right from the beginning when KafkaLog4jAppender 
> was introduced, but did not cause any issues so far since there were only 
> debug log entries in that path which were not logged to a Kafka topic by any 
> of the tests. A recent info level log entry introduced by the commit 
> https://github.com/apache/kafka/commit/a3aea3cf4dbedb293f2d7859e0298bebc8e2185f
>  is causing system test failures in log4j_appender_test.py due to the 
> deadlock.
> The asynchronous append case can be fixed by moving all send operations to a 
> separate thread. But KafkaLog4jAppender also has a syncSend option which 
> blocks append while holding the logger lock until the send completes. Not 
> sure how this can be fixed if we want to support log appends from the 
> producer network thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6438) NSEE while concurrently creating and deleting a topic

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6438:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> NSEE while concurrently creating and deleting a topic
> -
>
> Key: KAFKA-6438
> URL: https://issues.apache.org/jira/browse/KAFKA-6438
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.0.0
> Environment: kafka_2.11-1.0.0.jar
> OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM 
> (build 25.102-b14, mixed mode)
> CentOS Linux release 7.3.1611 (Core)
>Reporter: Adam Kotwasinski
>Priority: Major
>  Labels: reliability
> Fix For: 1.2.0
>
>
> It appears that deleting a topic and creating it at the same time can cause 
> NSEE, what later results in a forced controller shutdown.
> Most probably topics are being created because consumers/producers are still 
> active (yes, this means the deletion is happening blindly).
> The main problem here (for me) is the controller switch, the data loss and 
> following unclean election is acceptable (as we admit to deleting blindly).
> Environment description:
> 20 kafka brokers
> 80k partitions (20k topics 4partitions each)
> 3 node ZK
> Incident:
> {code:java}
> [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion 
> callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for 
> partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic 
> in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to OfflinePartition for partitions 
> mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to NonExistentPartition for partitions 
> mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, 
> other, other2)], deleted topics: [Set()], new partition replica assignment 
> [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), 
> other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), 
> other-1 -> Vector(9), other-3 -> Vector(11))] 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback 
> for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation 
> callback for 
> other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to NewPartition for partitions 
> other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to OnlinePartition for 

[jira] [Updated] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6512:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Java Producer: Excessive memory usage with compression enabled
> --
>
> Key: KAFKA-6512
> URL: https://issues.apache.org/jira/browse/KAFKA-6512
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Windows 10
>Reporter: Kyle Tinker
>Priority: Major
> Fix For: 1.2.0
>
> Attachments: KafkaSender.java
>
>
> h2. User Story
> As a user of the Java producer, I want a predictable memory usage for the 
> Kafka client so that I can ensure that my system is sized appropriately and 
> will be stable even under heavy usage.
> As a user of the Java producer, I want a smaller memory footprint so that my 
> systems don't consume as many resources.
> h2. Acceptance Criteria
>  * Enabling Compression in Kafka should not significantly increase the memory 
> usage of Kafka
>  * The memory usage of Kafka's Java Producer should be roughly in line with 
> the buffer size (buffer.memory) and the number of producers declared.
> h2. Additional Information
> I've observed high memory usage in the producer when enabling compression 
> (gzip or lz4).  I don't observe the behavior with compression off, but with 
> it on I'll run out of heap (2GB).  Using a Java profiler, I see the data is 
> in the KafkaLZ4BlockOutputStream (or related class for gzip).   I see that 
> MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but 
> is not successful.  I'm most likely network bottlenecked, so I expect the 
> producer buffers to be full while the job is running and potentially a lot of 
> unacknowledged records.
> I've tried using the default buffer.memory with 20 producers (across 20 
> threads) and sending data as quickly as I can.  I've also tried 1MB of 
> buffer.memory, which seemed to reduce memory consumption but I could still 
> run OOM in certain cases.  I have max.in.flight.requests.per.connection set 
> to 1.  In short, I should only have ~20 MB (20* 1MB) of data in buffers, but 
> I can easily exhaust 2000 MB used by Kafka.
> In looking at the code more, it looks like the KafkaLZ4BlockOutputStream 
> doesn't clear the compressedBuffer or buffer when close() is called.  In my 
> heap dump, both of those are ~65k size each, meaning that each batch is 
> taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 
> and messages are 1k each until the batch fills).
> Kafka tries to manage memory usage by calling 
> MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release 
> resources required for record appends (e.g. compression buffers)".  However, 
> this method doesn't actually clear those buffers because 
> KafkaLZ4BlockOutputStream.close() only writes the block and end mark and 
> closes the output stream.  It doesn't actually clear the buffer and 
> compressedBuffer in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM 
> until the block is acknowledged by the broker, processed in 
> Sender:handleProduceResponse(), and the batch is deallocated.  This memory 
> usage therefore increases, possibly without bound.  In my test program, the 
> program died with approximately 345 unprocessed batches per producer (20 
> producers), despite having max.in.flight.requests.per.connection=1.
> h2. Steps to Reproduce
>  # Create a topic test with plenty of storage
>  # Use a connection with a very fast upload pipe and limited download.  This 
> allows the outbound data to go out, but acknowledgements to be delayed 
> flowing in.
>  # Download KafkaSender.java (attached to this ticket)
>  # Set line 17 to reference your Kafka broker
>  # Run the program with a 1GB Xmx value
> h2. Possible solutions
> There are a few possible optimizations I can think of:
>  # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as 
> non-final and null them in the close() method
>  # We could declare the MemoryRecordsBuilder.appendStream non-final and null 
> it in the closeForRecordAppends() method
>  # We could have the ProducerBatch discard the recordsBuilder in 
> closeForRecordAppends(), however, this is likely a bad idea because the 
> recordsBuilder contains significant metadata that is likely needed after the 
> stream is closed.  It is also final.
>  # We could try to limit the number of non-acknowledged batches in flight.  
> This would bound the maximum memory usage but may negatively impact 
> performance.
>  
> Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
> Fix #2 would improve all algorithms, compression and otherwise.  Of the 3 
> proposed here, it seems the best.  

[jira] [Updated] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6335:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> permission for operations: Read from hosts: *, User:11 has Allow permission 
> for operations: Read from hosts: *, User:38 has Allow permission for 
> operations: Read from hosts: *, User:8 has Allow permission for operations: 
> Read from hosts: *, User:28 has Allow permission for 

[jira] [Updated] (KAFKA-5964) Add more unit tests for SslTransportLayer

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5964:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Add more unit tests for SslTransportLayer
> -
>
> Key: KAFKA-5964
> URL: https://issues.apache.org/jira/browse/KAFKA-5964
> Project: Kafka
>  Issue Type: Test
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> Add unit tests for the  edge cases updated in KAFKA-5920:
> 1. Test that handshake failures are propagated as SslAuthenticationException 
> even if there are I/O exceptions in any of the read/write operations
> 2. Test that received data is processed even after an I/O exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6494) Extend ConfigCommand to update broker config using new AdminClient

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6494:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Extend ConfigCommand to update broker config using new AdminClient
> --
>
> Key: KAFKA-6494
> URL: https://issues.apache.org/jira/browse/KAFKA-6494
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> Add --bootstrap-server and --command-config options for new AdminClient. 
> Update ConfigCommand to use new AdminClient for dynamic broker config updates 
> in KIP-226. Full conversion of ConfigCommand to new AdminClient will be done 
> later under KIP-248.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5693) TopicCreationPolicy and AlterConfigsPolicy overlap

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5693:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> TopicCreationPolicy and AlterConfigsPolicy overlap
> --
>
> Key: KAFKA-5693
> URL: https://issues.apache.org/jira/browse/KAFKA-5693
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Priority: Minor
>  Labels: kip
> Fix For: 1.2.0
>
>
> The administrator of a cluster can configure a {{CreateTopicPolicy}}, which 
> has access to the topic configs as well as other metadata to make its 
> decision about whether a topic creation is allowed. Thus in theory the 
> decision could be based on a combination of of the replication factor, and 
> the topic configs, for example. 
> Separately there is an AlterConfigPolicy, which only has access to the 
> configs (and can apply to configurable entities other than just topics).
> There are potential issues with this. For example although the 
> CreateTopicPolicy is checked at creation time, it's not checked for any later 
> alterations to the topic config. So policies which depend on both the topic 
> configs and other topic metadata could be worked around by changing the 
> configs after creation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5782) Avoid unnecessary PID reset when expire batches.

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5782:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Avoid unnecessary PID reset when expire batches.
> 
>
> Key: KAFKA-5782
> URL: https://issues.apache.org/jira/browse/KAFKA-5782
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.11.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.2.0
>
>
> This is more of an efficiency optimization. Currently we will reset PID when 
> batch expiration happens and one of the expired batches is in retry mode. 
> This is assuming that we don't know if the batch in retry has been appended 
> to the broker or not. However, if the batch was in retry due to a retriable 
> exception returned by the broker, the batch is not appended. In this case, we 
> do not need to reset the PID.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5661) Develop and understanding of how to tune transactions for optimal performance

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5661:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Develop and understanding of how to tune transactions for optimal performance
> -
>
> Key: KAFKA-5661
> URL: https://issues.apache.org/jira/browse/KAFKA-5661
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 1.2.0
>
>
> Currently, we don't have an idea of the throughput curve for transactions 
> across a different range of workloads. 
> Thus we would like to understand how to tune transactions so that they are 
> viable across a broad range of work loads. For instance, what knobs can you 
> tweak if you use small messages to yet get acceptable transactional 
> performance? We don't understand the performance curve across variables like 
> message size, batch size, transaction duration, linger.ms, etc., and it would 
> be good to get an understanding of this area and publish recommended 
> configurations for different workloads.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5736) Improve error message in Connect when all kafka brokers are down

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5736:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Improve error message in Connect when all kafka brokers are down
> 
>
> Key: KAFKA-5736
> URL: https://issues.apache.org/jira/browse/KAFKA-5736
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 1.2.0
>
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> Currently when all the Kafka brokers are down, Kafka Connect is failing with 
> a pretty unintuitive message when it tries to, for instance, reconfigure 
> tasks. 
> Example output: 
> {code:java}
> [2017-08-15 19:12:09,959] ERROR Failed to reconfigure connector's tasks, 
> retrying after backoff: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> java.lang.IllegalArgumentException: CircularIterator can only be used on 
> non-empty lists
> at 
> org.apache.kafka.common.utils.CircularIterator.(CircularIterator.java:29)
> at 
> org.apache.kafka.clients.consumer.RoundRobinAssignor.assign(RoundRobinAssignor.java:61)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:68)
> at 
> ... (connector code)
> at 
> org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:230)
> {code}
> The error message needs to be improved, since its root cause is the absence 
> kafka brokers for assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5527) Idempotent/transactional Producer part 2 (KIP-98)

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5527:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Idempotent/transactional Producer part 2 (KIP-98)
> -
>
> Key: KAFKA-5527
> URL: https://issues.apache.org/jira/browse/KAFKA-5527
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 1.2.0
>
>
> KAFKA-4815 tracks the items that were included in 0.11.0.0. This JIRA is for 
> tracking the ones that did not make it. Setting "Fix version" to 0.11.1.0, 
> but that is subject to change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5883) Run tests on Java 9 with –illegal-access=deny

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5883:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Run tests on Java 9 with –illegal-access=deny
> -
>
> Key: KAFKA-5883
> URL: https://issues.apache.org/jira/browse/KAFKA-5883
> Project: Kafka
>  Issue Type: Task
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 1.2.0
>
>
> The default was changed from –illegal-access=deny to –illegal-access=warn 
> late in the Java 9 cycle. By using the former, we will ensure that our code 
> is not relying on functionality that will be removed in a future Java version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5795) Make the idempotent producer the default producer setting

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5795:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Make the idempotent producer the default producer setting
> -
>
> Key: KAFKA-5795
> URL: https://issues.apache.org/jira/browse/KAFKA-5795
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 1.2.0
>
>
> We would like to turn on idempotence by default. The KIP is here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-185%3A+Make+exactly+once+in+order+delivery+per+partition+the+default+producer+setting



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5543) We don't remove the LastStableOffsetLag metric when a partition is moved away from a broker

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5543:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> We don't remove the LastStableOffsetLag metric when a partition is moved away 
> from a broker
> ---
>
> Key: KAFKA-5543
> URL: https://issues.apache.org/jira/browse/KAFKA-5543
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 1.2.0
>
>
> Reported by [~junrao], we have a small leak where the `LastStableOffsetLag` 
> metric is not removed along with the other metrics in the 
> `Partition.removeMetrics` method. This could create a leak when partitions 
> are reassigned or a topic is deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6276) AdminClient may leave some futures hanging after shutdown

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6276:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> AdminClient may leave some futures hanging after shutdown
> -
>
> Key: KAFKA-6276
> URL: https://issues.apache.org/jira/browse/KAFKA-6276
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Jason Gustafson
>Assignee: Colin P. McCabe
>Priority: Major
> Fix For: 1.2.0
>
>
> When the admin client closes, it should ensure that any pending futures get 
> cancelled. For the most part it does so, but from glancing at the code, it 
> seems possible that some calls which haven't been sent may be left hanging 
> after shutdown. In particular, this collection is not cleaned on shutdown: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L961.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5780) Long shutdown time when updated to 0.11.0

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5780:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Long shutdown time when updated to 0.11.0
> -
>
> Key: KAFKA-5780
> URL: https://issues.apache.org/jira/browse/KAFKA-5780
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: CentOS Linux release 7.3.1611 , Kernel 3.10
>Reporter: Raoufeh Hashemian
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 1.2.0
>
> Attachments: broker_shutdown.png, shutdown.log, 
> shutdown_controller.log, shutdown_statechange.log
>
>
> When we switched from Kafka 0.10.2 to Kafka 0.11.0 , We faced a problem with 
> stopping the kafka service on a broker node.
> Our cluster consists of 6 broker nodes. We had an existing topic when 
> switched to Kafka 0.11.0 . Since then, gracefully stoping the service on a 
> Kafka broker node results in the following warning message being repeated 
> every 100 ms in the broker log, and the shutdown takes approximately 45 
> minutes to complete.
> {code:java}
> @4000599714da1e582e4c [2017-08-18 16:24:48,509] WARN Connection to node 
> 1002 could not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> @4000599714da245483a4 [2017-08-18 16:24:48,609] WARN Connection to node 
> 1002 could not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> @4000599714da2a51177c [2017-08-18 16:24:48,709] WARN Connection to node 
> 1002 could not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Below is the last log lines when the shutdown is complete :
> {code:java}
> @400059971afd31113dbc [2017-08-18 16:50:59,823] WARN Connection to node 
> 1002 could not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> @400059971afd361200bc [2017-08-18 16:50:59,907] INFO Shutdown complete. 
> (kafka.log.LogManager)
> @400059971afd36afa04c [2017-08-18 16:50:59,917] INFO Terminate ZkClient 
> event thread. (org.I0Itec.zkclient.ZkEventThread)
> @400059971afd36dd6edc [2017-08-18 16:50:59,920] INFO Session: 
> 0x35d68c9e76702a4 closed (org.apache.zookeeper.ZooKeeper)
> @400059971afd36deca84 [2017-08-18 16:50:59,920] INFO EventThread shut 
> down for session: 0x35d68c9e76702a4 (org.apache.zookeeper.ClientCnxn)
> @400059971afd36f6afb4 [2017-08-18 16:50:59,922] INFO [Kafka Server 1002], 
> shut down completed (kafka.server.KafkaServer)
> {code}
> I should note that I stopped the producers before shutting down the broker.
> If I repeat the process after brining up the service, the shutdown takes less 
> than a minute. However, if I start the producers even for a short time and 
> repeat the process, it will again take around 45 minutes to do a graceful 
> shutdown.
> Attached files shows the brokers CPU usage during the shutdown period (light 
> blue curve is the node in which the broker service is shutting down).
> The size of the topic is 2.3 TB per broker.
> I was wondering if this is an expected behaviour or It is a bug or a 
> misconfiguration? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5682) Consumer should include partition in exceptions raised during record parsing/validation

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5682:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Consumer should include partition in exceptions raised during record 
> parsing/validation
> ---
>
> Key: KAFKA-5682
> URL: https://issues.apache.org/jira/browse/KAFKA-5682
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> When we encounter an exception when validating a fetched record or when 
> deserializing it, we raise it to the user and keep the consumer's current 
> position at the offset of the failed record. The expectation is that the user 
> will either propagate the exception and shutdown or seek past the failed 
> record. However, in the latter case, there is no way for the user to know 
> which topic partition had the failed record. We should consider exposing an 
> exception type to expose this information which users can catch. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5482) A CONCURRENT_TRANASCTIONS error for the first AddPartitionsToTxn request slows down transactions significantly

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5482:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> A CONCURRENT_TRANASCTIONS error for the first AddPartitionsToTxn request 
> slows down transactions significantly
> --
>
> Key: KAFKA-5482
> URL: https://issues.apache.org/jira/browse/KAFKA-5482
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 1.2.0
>
>
> Here is the issue.
> # When we do a commit transaction, the producer sends an `EndTxn` request to 
> the coordinator. The coordinator writes the `PrepareCommit` message to the 
> transaction log and then returns the response the client. It writes the 
> transaction markers and the final 'CompleteCommit' message asynchronously. 
> # In the mean time, if the client starts another transaction, it will send an 
> `AddPartitions` request on the next `Sender.run` loop. If the markers haven't 
> been written yet, then the coordinator will return a retriable 
> `CONCURRENT_TRANSACTIONS` error to the client.
> # The current behavior in the producer is to sleep for `retryBackoffMs` 
> before retrying the request. The current default for this is 100ms. So the 
> producer will sleep for 100ms before sending the `AddPartitions` again. This 
> puts a floor on the latency for back to back transactions.
> This has been worked around in 
> https://issues.apache.org/jira/browse/KAFKA-5477 by reducing the retryBackoff 
> for the first AddPartitions request. But we need a stronger solution: like 
> having the commit block until the transaction is complete, or delaying the 
> addPartitions until batches are actually ready to be sent to the transaction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5945) Improve handling of authentication failures when credentials are removed

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5945:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Improve handling of authentication failures when credentials are removed
> 
>
> Key: KAFKA-5945
> URL: https://issues.apache.org/jira/browse/KAFKA-5945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> KAFKA-5854 improves the handling of authentication failures. The scope of 
> KAFKA-5854  was limited to a specific scenario - provide better feedback to 
> applications when security is misconfigured. The PR improves diagnostics for 
> this scenario by throwing an AuthenticationException and also avoids retries. 
> To enable this, the first request initiated by any public API was updated to 
> throw authentication exceptions.
> This JIRA is for a more extensive handling of authentication exceptions which 
> also includes proper handling of credential updates at any time. If a 
> credential is removed, then we could see authentication exception from any 
> request and we want to propagate this properly. This needs quite extensive 
> testing and is less likely to be hit by users, so it will be done later under 
> this JIRA.
> The gaps that need covering are:
> 1. Ensure authentication failures are processed in the Network client for any 
> request
> 2. Ensure metadata refresh failures are handled properly at any time
> 3. Heartbeat threads and other background threads should handle 
> authentication failures. Threads should not terminate on failure, but should 
> avoid retries until application performs a new operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5601) Refactor ReassignPartitionsCommand to use AdminClient

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5601:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Refactor ReassignPartitionsCommand to use AdminClient
> -
>
> Key: KAFKA-5601
> URL: https://issues.apache.org/jira/browse/KAFKA-5601
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> Currently the {{ReassignPartitionsCommand}} (used by 
> {{kafka-reassign-partitions.sh}}) talks directly to ZooKeeper. It would be 
> better to have it use the AdminClient API instead. 
> This would entail creating two new protocol APIs, one to initiate the request 
> and another to request the status of an in-progress reassignment. As such 
> this would require a KIP.
> This touches on the work of KIP-166, but that proposes to use the 
> {{ReassignPartitionsCommand}} API, so should not be affected so long as that 
> API is maintained. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5503:
--
Fix Version/s: (was: 1.0.2)
   (was: 1.1.0)
   1.2.0

> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 1.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5907) Support aggregatedJavadoc in Java 9

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5907:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Support aggregatedJavadoc in Java 9
> ---
>
> Key: KAFKA-5907
> URL: https://issues.apache.org/jira/browse/KAFKA-5907
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 1.2.0
>
>
> The Java 9 Javadoc tool has some improvements including a search bar. 
> However, it currently fails with a number of errors like:
> {code}
> > Task :aggregatedJavadoc
> /Users/ijuma/src/kafka/streams/src/main/java/org/apache/kafka/streams/Topology.java:29:
>  error: package org.apache.kafka.streams.processor.internals does not exist
> import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
>^
> /Users/ijuma/src/kafka/streams/src/main/java/org/apache/kafka/streams/Topology.java:30:
>  error: package org.apache.kafka.streams.processor.internals does not exist
> import org.apache.kafka.streams.processor.internals.ProcessorNode;
>^
> /Users/ijuma/src/kafka/streams/src/main/java/org/apache/kafka/streams/Topology.java:31:
>  error: package org.apache.kafka.streams.processor.internals does not exist
> import org.apache.kafka.streams.processor.internals.ProcessorTopology;
>^
> /Users/ijuma/src/kafka/streams/src/main/java/org/apache/kafka/streams/Topology.java:32:
>  error: package org.apache.kafka.streams.processor.internals does not exist
> import org.apache.kafka.streams.processor.internals.SinkNode;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3806:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Adjust default values of log.retention.hours and offsets.retention.minutes
> --
>
> Key: KAFKA-3806
> URL: https://issues.apache.org/jira/browse/KAFKA-3806
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Michal Turek
>Priority: Minor
> Fix For: 1.2.0
>
>
> Combination of default values of log.retention.hours (168 hours = 7 days) and 
> offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special 
> cases. Offset retention should be always greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, 
> topic wasn't deleted.
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no 
> more incoming data and there was nothing to confirm. (We have auto-commit 
> disabled, I'm not sure how behaves enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to 
> offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it 
> didn't find any committed offsets for that topic since they were deleted by 
> offsets.retention.minutes so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 
> days of messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, 
> don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice 
> larger than log.retention.hours.
> - Check these values during Kafka startup and log a warning if 
> offsets.retention.minutes is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets 
> in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6304) The controller should allow updating the partition reassignment for the partitions being reassigned

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6304:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> The controller should allow updating the partition reassignment for the 
> partitions being reassigned
> ---
>
> Key: KAFKA-6304
> URL: https://issues.apache.org/jira/browse/KAFKA-6304
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.2.0
>
>
> Currently the controller will not process the partition reassignment of a 
> partition if the partition is already being reassigned.
> The issue is that if there is a broker failure during the partition 
> reassignment, the partition reassignment may never finish. And the users may 
> want to cancel the partition reassignment. However, the controller will 
> refuse to do that unless user manually deletes the partition reassignment zk 
> path, force a controller switch and then issue the revert command. This is 
> pretty involved. It seems reasonable for the controller to replace the 
> ongoing stuck reassignment and replace it with the updated partition 
> assignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5729) Consumer should verify offset commits are from assigned partitions

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5729:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Consumer should verify offset commits are from assigned partitions
> --
>
> Key: KAFKA-5729
> URL: https://issues.apache.org/jira/browse/KAFKA-5729
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Priority: Major
> Fix For: 1.2.0
>
>
> Need to think through the compatibility implications since we currently allow 
> this, but at a minimum, we should verify that only offsets from partitions 
> dynamically assigned can be committed. The lack of this validation tends to 
> mask problems in the partition revocation and assignment process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4693) Consumer subscription change during rebalance causes exception

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4693:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> Consumer subscription change during rebalance causes exception
> --
>
> Key: KAFKA-4693
> URL: https://issues.apache.org/jira/browse/KAFKA-4693
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Priority: Minor
> Fix For: 1.2.0
>
>
> After every rebalance, the consumer validates that the assignment received 
> contains only partitions from topics that were subscribed. If not, then we 
> raise an exception to the user. It is possible for a wakeup or an interrupt 
> to leave the consumer with a rebalance in progress (e.g. with a JoinGroup to 
> the coordinator in-flight). If the user then changes the topic subscription, 
> then this validation upon completion of the rebalance will fail. We should 
> probably detect the subscription change, eat the exception, and request 
> another rebalance. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6045) All access to log should fail if log is closed

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6045:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> All access to log should fail if log is closed
> --
>
> Key: KAFKA-6045
> URL: https://issues.apache.org/jira/browse/KAFKA-6045
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Priority: Major
> Fix For: 1.2.0
>
>
> After log.close() or log.closeHandlers() is called for a given log, all uses 
> of the Log's API should fail with proper exception. For example, 
> log.appendAsLeader() should throw KafkaStorageException. APIs such as 
> Log.activeProducersWithLastSequence() should also fail but not necessarily 
> with KafkaStorageException, since the KafkaStorageException indicates failure 
> to access disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5950) AdminClient should retry based on returned error codes

2018-02-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5950:
--
Fix Version/s: (was: 1.1.0)
   1.2.0

> AdminClient should retry based on returned error codes
> --
>
> Key: KAFKA-5950
> URL: https://issues.apache.org/jira/browse/KAFKA-5950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Andrey Dyachkov
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> The AdminClient only retries if the request fails with a retriable error. If 
> a response is returned, then a retry is never attempted. This is inconsistent 
> with other clients that check the error codes in the response and retry for 
> each retriable error code.
> We should consider if it makes sense to adopt this behaviour in the 
> AdminClient so that users don't have to do it themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >