[jira] [Commented] (KAFKA-4368) Unclean shutdown breaks Kafka cluster

2016-11-02 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15631715#comment-15631715
 ] 

huxi commented on KAFKA-4368:
-

Could you paste the entire stack trace for both client and server ?

> Unclean shutdown breaks Kafka cluster
> -
>
> Key: KAFKA-4368
> URL: https://issues.apache.org/jira/browse/KAFKA-4368
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Anukool Rattana
>Priority: Critical
>
> My team has observed that if broker process die unclean then it will block 
> producer from sending messages to kafka topic.
> Here is how to reproduce the problem:
> 1) Create a Kafka 0.10 with three brokers (A, B and C). 
> 2) Create topic with replication_factor = 2 
> 3) Set producer to send messages with "acks=all" meaning all replicas must be 
> created before able to proceed next message. 
> 4) Force IEM (IBM Endpoint Manager) to send patch to broker A and force 
> server to reboot after patches installed.
> Note: min.insync.replicas = 1
> Result: - Producers are not able send messages to kafka topic after broker 
> rebooted and come back to join cluster with following error messages. 
> [2016-09-28 09:32:41,823] WARN Error while fetching metadata with correlation 
> id 0 : {logstash=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> We suspected that number of replication_factor (2) is not sufficient to our 
> kafka environment but really need an explanation on what happen when broker 
> facing unclean shutdown. 
> The same issue occurred when setting cluster with 2 brokers and 
> replication_factor = 1.
> The workaround i used to recover service is to cleanup both kafka topic log 
> file and zookeeper data (rmr /brokers/topics/XXX and rmr /consumers/XXX).
> Note:
> Topic list after A comeback from rebooted.
> Topic:logstash  PartitionCount:3ReplicationFactor:2 Configs:
> Topic: logstash Partition: 0Leader: 1   Replicas: 1,3   Isr: 
> 1,3
> Topic: logstash Partition: 1Leader: 2   Replicas: 2,1   Isr: 
> 2,1
> Topic: logstash Partition: 2Leader: 3   Replicas: 3,2   Isr: 
> 2,3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-02 Thread Ewen Cheslack-Postava
I think the bump isn't strictly required, but if the client is KIP-35
aware, it can proactively choose a compatible SASL mechanism based on its
initial ApiVersionRequest and avoid an extra connection round trip when
there are client/broker version differences. Without this, a newer client
would have to do 2 set of requests since the first SASL mechanism might not
be compatible.

I don't think this is a deal breaker, but I do think it would be good to
just standardize on KIP-35 as the way we figure out client/broker
compatibility. The SASL stuff happened in parallel (maybe before?) KIP-35
and ended up with its own mechanism, but I'm in favor of trying to simplify
everything by centralizing those considerations into a single API call. (By
the way, dredging up now ancient history in the KIP-35 discussion, this is
also why "features" vs "API version" is relevant. If we wanted to configure
a newer broker to disable SASL mechanisms we no longer want to allow use
of, this isn't really possible to express via API versions unless we also
explicitly add an API version that doesn't support that mechanism whereas
features would make this easier to toggle on/off. The SaslHandshakeRequest
probably makes it easier to keep thing secure compared to the current state
of ApiVersionRequest).

-Ewen

On Tue, Nov 1, 2016 at 2:09 PM, Rajini Sivaram  wrote:

> Gwen,
>
> I had thought the same too and hence I am assuming that Java clients could
> simply use SaslHandshakeRequest. SaslHandshakeRequest returns the list of
> mechanisms enabled in the broker. I think Jun's point was that by
> incrementing the version of SaslHandshakeRequest, clients can use
> ApiVersionsRequest to figure out the mechanisms the broker is capable of
> supporting and use that information to choose a mechanism to send in
> SaslHandshakeRequest. Not sure how useful this actually is, so will wait
> for Jun's response.
>
>
>
> On Tue, Nov 1, 2016 at 8:18 PM, Gwen Shapira  wrote:
>
> > Wait, I thought SaslHandshakeResponse includes a list of mechanisms
> > supported, so I'm not sure why we need to bump the version?
> >
> > I expect clients will send SaslHandshakeRequest_V0, see which mechanisms
> > are supported and make a call based on that? Which means KIP-35 is not
> > required in that case? Am I missing something?
> >
> > On Tue, Nov 1, 2016 at 1:07 PM, Rajini Sivaram <
> > rajinisiva...@googlemail.com
> > > wrote:
> >
> > > Jun,
> > >
> > > I have added the following text to the KIP. Does this match your
> > > expectation?
> > >
> > > *SaslHandshakeRequest version will be increased from 0 to 1 so that
> > clients
> > > can determine if the broker is capable of supporting SCRAM mechanisms
> > using
> > > ApiVersionsRequest. Java clients will not be updated to use
> > > ApiVersionsRequest to choose SASL mechanism under this KIP. Java
> clients
> > > will continue to use their configured SASL mechanism and will fail
> > > connection if the requested mechanism is not enabled in the broker.*
> > >
> > > Thank you,
> > >
> > > Rajini
> > >
> > > On Tue, Nov 1, 2016 at 4:54 PM, Jun Rao  wrote:
> > >
> > > > Hi, Rajini,
> > > >
> > > > One more thing. It seems that we should bump up the version of
> > > > SaslHandshakeRequest? This way, the client can figure out which SASL
> > > > mechanisms the broker is capable of supporting through
> > ApiVersionRequest.
> > > > We discussed this briefly as part of KIP-43.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Tue, Nov 1, 2016 at 7:41 AM, Rajini Sivaram <
> > > > rajinisiva...@googlemail.com
> > > > > wrote:
> > > >
> > > > > If there are no more comments, I will start vote on this KIP later
> > this
> > > > > week. In the meantime, please feel free to post any feedback or
> > > > > suggestions. Initial implementation is here:
> > > > > https://github.com/apache/kafka/pull/2086.
> > > > >
> > > > > Thank you,
> > > > >
> > > > > Rajini
> > > > >
> > > > > On Thu, Oct 27, 2016 at 11:18 AM, Rajini Sivaram <
> > > > > rajinisiva...@googlemail.com> wrote:
> > > > >
> > > > > > Jun,
> > > > > >
> > > > > > 4) Agree, it does make the implementation simpler. Updated KIP.
> > > > > > 5) Thank you, that looks neater. Updated KIP.
> > > > > >
> > > > > > On Wed, Oct 26, 2016 at 6:59 PM, Jun Rao 
> wrote:
> > > > > >
> > > > > >> Hi, Rajini,
> > > > > >>
> > > > > >> Thanks for the reply.
> > > > > >>
> > > > > >> 4. Implementation wise, it seems to me that it's simpler to read
> > > from
> > > > > the
> > > > > >> cache than reading directly from ZK since the config manager
> > already
> > > > > >> propagates all config changes through ZK. Also, it's probably a
> > good
> > > > > idea
> > > > > >> to limit the places in the code base that directly accesses ZK.
> > > > > >>
> > > > > >> 5. Yes, it seems that it makes sense to add the new SCRAM
> > > > configurations
> > > > > >> to
> > > > > >> the existing 

[jira] [Commented] (KAFKA-4368) Unclean shutdown breaks Kafka cluster

2016-11-02 Thread Anukool Rattana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15631475#comment-15631475
 ] 

Anukool Rattana commented on KAFKA-4368:


Hi [~huxi_2b], Yes, although the broken broker comeback but producer still not 
able to send messages.



> Unclean shutdown breaks Kafka cluster
> -
>
> Key: KAFKA-4368
> URL: https://issues.apache.org/jira/browse/KAFKA-4368
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Anukool Rattana
>Priority: Critical
>
> My team has observed that if broker process die unclean then it will block 
> producer from sending messages to kafka topic.
> Here is how to reproduce the problem:
> 1) Create a Kafka 0.10 with three brokers (A, B and C). 
> 2) Create topic with replication_factor = 2 
> 3) Set producer to send messages with "acks=all" meaning all replicas must be 
> created before able to proceed next message. 
> 4) Force IEM (IBM Endpoint Manager) to send patch to broker A and force 
> server to reboot after patches installed.
> Note: min.insync.replicas = 1
> Result: - Producers are not able send messages to kafka topic after broker 
> rebooted and come back to join cluster with following error messages. 
> [2016-09-28 09:32:41,823] WARN Error while fetching metadata with correlation 
> id 0 : {logstash=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> We suspected that number of replication_factor (2) is not sufficient to our 
> kafka environment but really need an explanation on what happen when broker 
> facing unclean shutdown. 
> The same issue occurred when setting cluster with 2 brokers and 
> replication_factor = 1.
> The workaround i used to recover service is to cleanup both kafka topic log 
> file and zookeeper data (rmr /brokers/topics/XXX and rmr /consumers/XXX).
> Note:
> Topic list after A comeback from rebooted.
> Topic:logstash  PartitionCount:3ReplicationFactor:2 Configs:
> Topic: logstash Partition: 0Leader: 1   Replicas: 1,3   Isr: 
> 1,3
> Topic: logstash Partition: 1Leader: 2   Replicas: 2,1   Isr: 
> 2,1
> Topic: logstash Partition: 2Leader: 3   Replicas: 3,2   Isr: 
> 2,3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4360) Controller may deadLock when autoLeaderRebalance encounter zk expired

2016-11-02 Thread Json Tu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15631474#comment-15631474
 ] 

Json Tu commented on KAFKA-4360:


I'm sorry, I rollback one comment commit, and 
https://github.com/apache/kafka/pull/2085 is closed by github,so I put a new 
pull request:https://github.com/apache/kafka/pull/2094.

Can someone help to promote the project landing.
[~becket_qin]  [~guozhang]  [~onurkaraman] [~wushujames] [~gwenshap] [~junrao]


> Controller may deadLock when autoLeaderRebalance encounter zk expired
> -
>
> Key: KAFKA-4360
> URL: https://issues.apache.org/jira/browse/KAFKA-4360
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Json Tu
>  Labels: bugfix
> Attachments: deadlock_patch, yf-mafka2-common02_jstack.txt
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> when controller has checkAndTriggerPartitionRebalance task in 
> autoRebalanceScheduler,and then zk expired at that time. It will
> run into deadlock.
> we can restore the scene as below,when zk session expired,zk thread will call 
> handleNewSession which defined in SessionExpirationListener, and it will get 
> controllerContext.controllerLock,and then it will 
> autoRebalanceScheduler.shutdown(),which need complete all the task in the 
> autoRebalanceScheduler,but that threadPoll also need get 
> controllerContext.controllerLock,but it has already owned by zk callback 
> thread,which will then run into deadlock.
> because of that,it will cause two problems at least, first is the broker’s id 
> is cannot register to the zookeeper,and it will be considered as dead by new 
> controller,second this procedure can not be stop by kafka-server-stop.sh, 
> because shutdown function
> can not get controllerContext.controllerLock also, we cannot shutdown kafka 
> except using kill -9.
> In my attachment, I upload a jstack file, which was created when my kafka 
> procedure cannot shutdown by kafka-server-stop.sh.
> I have met this scenes for several times,I think this may be a bug that not 
> solved in kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2094: KAFKA-4360:Controller may deadLock when autoLead...

2016-11-02 Thread xiguantiaozhan
GitHub user xiguantiaozhan opened a pull request:

https://github.com/apache/kafka/pull/2094

KAFKA-4360:Controller may deadLock when autoLeaderRebalance encounter zk 
expired



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xiguantiaozhan/kafka rebalance_deadlock

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2094.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2094


commit 477bb3ddb6dc337ba68c7c585dc0cb3afa55e2be
Author: xiguantiaozhan 
Date:   2016-11-01T06:27:20Z

avoid deadlock in autoRebalanceScheduler shutdown

commit 980ec8c7a9d4ce4aa19479bf4d542666f237c9ce
Author: tuyang 
Date:   2016-11-01T12:25:12Z

avoid deadlock in ZookeeperLeaderElector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2085: KAFKA-4360:Controller may deadLock when autoLead...

2016-11-02 Thread xiguantiaozhan
Github user xiguantiaozhan closed the pull request at:

https://github.com/apache/kafka/pull/2085


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4368) Unclean shutdown breaks Kafka cluster

2016-11-02 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15631259#comment-15631259
 ] 

huxi commented on KAFKA-4368:
-

Do you mean the producer still cannot be able to send messages even after the 
broken broker came back to the cluster?

> Unclean shutdown breaks Kafka cluster
> -
>
> Key: KAFKA-4368
> URL: https://issues.apache.org/jira/browse/KAFKA-4368
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Anukool Rattana
>Priority: Critical
>
> My team has observed that if broker process die unclean then it will block 
> producer from sending messages to kafka topic.
> Here is how to reproduce the problem:
> 1) Create a Kafka 0.10 with three brokers (A, B and C). 
> 2) Create topic with replication_factor = 2 
> 3) Set producer to send messages with "acks=all" meaning all replicas must be 
> created before able to proceed next message. 
> 4) Force IEM (IBM Endpoint Manager) to send patch to broker A and force 
> server to reboot after patches installed.
> Note: min.insync.replicas = 1
> Result: - Producers are not able send messages to kafka topic after broker 
> rebooted and come back to join cluster with following error messages. 
> [2016-09-28 09:32:41,823] WARN Error while fetching metadata with correlation 
> id 0 : {logstash=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> We suspected that number of replication_factor (2) is not sufficient to our 
> kafka environment but really need an explanation on what happen when broker 
> facing unclean shutdown. 
> The same issue occurred when setting cluster with 2 brokers and 
> replication_factor = 1.
> The workaround i used to recover service is to cleanup both kafka topic log 
> file and zookeeper data (rmr /brokers/topics/XXX and rmr /consumers/XXX).
> Note:
> Topic list after A comeback from rebooted.
> Topic:logstash  PartitionCount:3ReplicationFactor:2 Configs:
> Topic: logstash Partition: 0Leader: 1   Replicas: 1,3   Isr: 
> 1,3
> Topic: logstash Partition: 1Leader: 2   Replicas: 2,1   Isr: 
> 2,1
> Topic: logstash Partition: 2Leader: 3   Replicas: 3,2   Isr: 
> 2,3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4370) CorruptRecordException when ProducerRecord constructed without key nor partition and send

2016-11-02 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15631227#comment-15631227
 ] 

huxi commented on KAFKA-4370:
-

yes, compacted topics no longer accept messages without key and an exception is 
thrown by the producer if this is attempted. But as you said, it's better to 
clarify the error message to have it point out this cause explicitly.

> CorruptRecordException when ProducerRecord constructed without key nor 
> partition and send
> -
>
> Key: KAFKA-4370
> URL: https://issues.apache.org/jira/browse/KAFKA-4370
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Lars Pfannenschmidt
>Priority: Trivial
>
> According to the JavaDoc of ProducerRecord it should be possible to send 
> messages without a key:
> {quote}
> If neither key nor partition is present a partition will be assigned in a 
> round-robin fashion.
> {quote}
> {code:title=SomeProducer.java|borderStyle=solid}
> ProducerRecord record = new ProducerRecord<>(topic, 
> "somemessage");
> return this.producer.send(record).get();
> {code}
> Unfortunately an Exception is thrown:
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CorruptRecordException: This message has 
> failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: why cant SslTransportLayer be muted before handshake completion?

2016-11-02 Thread Harsha Chintalapani
HI Radai,
  One main reason is to keep the handshake details away from
the application layer. i.e Kafka network layer which is sending Kafka
protocols doesn't need to worry about the handshake details, all it needs
is a validation that the connection is completed and it can start sending
Kafka protocols over the wire.  So when a client tries to connect to a
broker's SSL port, it goes through the handshake, if we mute the channel,
Kafka network need to decide when to unmute it that means leaking some of
the SSL connection details in Kafka Selector code. Given that we are
supporting multiple Secure channels and each has its handshake mechanism we
kept the selector code the same irrespective of which channel/port/security
its trying to use. The details will be handled by the TransportLayer its
job is to finish the handshake and return the ready() to be true when its
ok for the client to start sending requests.
As Joel said, it's possible to pause/resume the handshake but
not sure why its needed; you can treat that as a black box and start
sending your requests once the channel.ready(). I haven't gone through
KIP-72 proposal so I might be missing something here.

Thanks,
Harsha

On Wed, Nov 2, 2016 at 5:01 PM Joel Koshy  wrote:

> Sriharsha can validate this, but I think the reason is that if we allow
> muting/unmuting at will (via those public APIs) that can completely mess up
> the handshake itself. It should be possible to pause/resume the handshake
> if that's what you'r elooking for but I'm not sure it is worth it for the
> purposes of KIP-72 given the small volumes of reads/writes involved in
> handshaking.
>
> On Wed, Nov 2, 2016 at 4:24 PM, radai  wrote:
>
> > Hi,
> >
> > as part of testing my code for KIP-72 (broker memory control), i ran into
> > the following code snippet in SslTransportLayer:
> >
> > public void removeInterestOps(int ops) {
> > if (!key.isValid())
> > throw new CancelledKeyException();
> > else if (!handshakeComplete)
> > throw new IllegalStateException("handshake is not
> completed");
> >
> > key.interestOps(key.interestOps() & ~ops);
> > }
> >
> > why cant an ssl socket be muted before handshake is complete?
> >
>


[jira] [Commented] (KAFKA-4367) MirrorMaker shuts down gracefully without being stopped

2016-11-02 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15631208#comment-15631208
 ] 

huxi commented on KAFKA-4367:
-

Did you close the terminal where runs this command ? Based on the log, the JVM 
shutdown hook thread got triggered to run a clean shutdown.

> MirrorMaker shuts down gracefully without being stopped
> ---
>
> Key: KAFKA-4367
> URL: https://issues.apache.org/jira/browse/KAFKA-4367
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
> Environment: RHEL 7
>Reporter: Alex
>
> Start:
> bin/kafka-mirror-maker.sh --new.consumer --consumer.config 
> config/ssl_mirroring_consumer.properties --producer.config 
> config/ssl_mirroring_producer.properties --whitelist 
> "TOPIC1|TOPIC2|TOPIC3|TOPIC4" --num.streams 20 &> /dev/null &
> MirrorMaker stops working without being stopped, 30 minutes after start. No 
> clue why this problem occurs.
> 
>   kafka-mirror-maker.log
> 
> [2016-11-01 19:23:32,003] TRACE Produced messages to topic-partition 
> CEP.FS.IN-175 with base offset offset 15015 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,003] TRACE Produced messages to topic-partition 
> CEP.FS.IN-151 with base offset offset 15066 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,003] TRACE Nodes with data ready to send: [Node(8, 
> 10.126.0.2, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,003] TRACE Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.producer.internals.Sender$1@483c4c7a, 
> request=RequestSend(header={api_key=0,api_version=1,correlation_id=219685,client_id=producer-1},
>  
> body={acks=-1,timeout=3,topic_data=[{topic=CEP.FS.IN,data=[{partition=133,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=9085 cap=16384]}]}]}), createdTimeMs=1478017412003, sendTimeMs=0)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,008] TRACE Returning fetched records for assigned 
> partition CEP.FS.IN-172 and update consumed position to 3869316 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-11-01 19:23:32,008] TRACE [mirrormaker-thread-7] Sending message with 
> value size 485 and offset 3869315 (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2016-11-01 19:23:32,008] TRACE Sending record 
> ProducerRecord(topic=CEP.FS.IN, partition=null, key=null, value=[B@12a54f5a 
> with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@5ea65b8f to 
> topic CEP.FS.IN partition 160 
> (org.apache.kafka.clients.producer.KafkaProducer)
> [2016-11-01 19:23:32,008] TRACE Allocating a new 16384 byte message buffer 
> for topic CEP.FS.IN partition 160 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2016-11-01 19:23:32,008] TRACE Waking up the sender since topic CEP.FS.IN 
> partition 160 is either full or getting a new batch 
> (org.apache.kafka.clients.producer.KafkaProducer)
> [2016-11-01 19:23:32,010] TRACE Received produce response from node 7 with 
> correlation id 219684 (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] TRACE Produced messages to topic-partition 
> CEP.FS.IN-106 with base offset offset 15086 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,010] TRACE Produced messages to topic-partition 
> CEP.FS.IN-124 with base offset offset 15095 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,010] TRACE Nodes with data ready to send: [Node(7, 
> 10.126.0.1, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] INFO Start clean shutdown. 
> (kafka.tools.MirrorMaker$)
> [2016-11-01 19:23:32,010] TRACE Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.producer.internals.Sender$1@44b788c7, 
> request=RequestSend(header={api_key=0,api_version=1,correlation_id=219686,client_id=producer-1},
>  
> body={acks=-1,timeout=3,topic_data=[{topic=CEP.FS.IN,data=[{partition=160,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=511 cap=16384]}]}]}), createdTimeMs=1478017412010, sendTimeMs=0)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] INFO Shutting down consumer threads. 
> (kafka.tools.MirrorMaker$)
> [2016-11-01 19:23:32,011] INFO [mirrormaker-thread-0] mirrormaker-thread-0 
> shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2016-11-01 19:23:32,011] INFO 

[jira] [Resolved] (KAFKA-4348) On Mac OS, KafkaConsumer.poll returns 0 when there are still messages on Kafka server

2016-11-02 Thread huxi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxi resolved KAFKA-4348.
-
Resolution: Duplicate
  Assignee: huxi

> On Mac OS, KafkaConsumer.poll returns 0 when there are still messages on 
> Kafka server
> -
>
> Key: KAFKA-4348
> URL: https://issues.apache.org/jira/browse/KAFKA-4348
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.1
> Environment: Mac OS X EI Capitan, Java 1.8.0_111
>Reporter: Yiquan Zhou
>Assignee: huxi
>  Labels: consumer, mac, polling
>
> Steps to reproduce:
> 1. start the zookeeper and kafka server using the default properties from the 
> distribution: 
> $ bin/zookeeper-server-start.sh config/zookeeper.properties
> $ bin/kafka-server-start.sh config/server.properties 
> 2. create a Kafka consumer using the Java API KafkaConsumer.poll(long 
> timeout). It polls the records from the server every second (timeout set to 
> 1000) and prints the number of records polled. The code can be found here: 
> https://gist.github.com/yiquanzhou/a94569a2c4ec8992444c83f3c393f596
> 3. use bin/kafka-verifiable-producer.sh to generate some messages: 
> $ bin/kafka-verifiable-producer.sh --topic connect-test --max-messages 20 
> --broker-list localhost:9092
> wait until all 200k messages are generated and sent to the server. 
> 4. Run the consumer Java code. In the output console of the consumer, we can 
> see that the consumer starts to poll some records, then it polls 0 records 
> for several seconds before polling some more. like this:
> polled 27160 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 26886 records
> polled 26886 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 26701 records
> polled 26214 records
> The bug slows down the consumption of messages a lot. And in our use case, 
> the consumer wrongly assumes that all messages are read from the topic.
> It is only reproducible on Mac OS X but neither on Linux nor Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: why cant SslTransportLayer be muted before handshake completion?

2016-11-02 Thread Joel Koshy
Sriharsha can validate this, but I think the reason is that if we allow
muting/unmuting at will (via those public APIs) that can completely mess up
the handshake itself. It should be possible to pause/resume the handshake
if that's what you'r elooking for but I'm not sure it is worth it for the
purposes of KIP-72 given the small volumes of reads/writes involved in
handshaking.

On Wed, Nov 2, 2016 at 4:24 PM, radai  wrote:

> Hi,
>
> as part of testing my code for KIP-72 (broker memory control), i ran into
> the following code snippet in SslTransportLayer:
>
> public void removeInterestOps(int ops) {
> if (!key.isValid())
> throw new CancelledKeyException();
> else if (!handshakeComplete)
> throw new IllegalStateException("handshake is not completed");
>
> key.interestOps(key.interestOps() & ~ops);
> }
>
> why cant an ssl socket be muted before handshake is complete?
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-02 Thread radai
my biggest issues with a "standard" wrapper format:

1. _ALL_ client _CODE_ (as opposed to kafka lib version) must be updated to
know about the container, because any old naive code trying to directly
deserialize its own payload would keel over and die (it needs to know to
deserialize a container, and then dig in there for its payload).
2. in order to write middleware-friendly clients that utilize such a
container one would basically have to write their own producer/consumer API
on top of the open source kafka one.
3. if you were going to go with a wrapper format you really dont need to
bother with a kip (just open source your own client stack from #2 above so
others could stop re-inventing it)

On Wed, Nov 2, 2016 at 4:25 PM, James Cheng  wrote:

> How exactly would this work? Or maybe that's out of scope for this email.


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-02 Thread James Cheng

> On Nov 2, 2016, at 2:33 AM, Michael Pearce  wrote:
> 
> Thanks James for taking the time out.
> 
> My comments per solution below you commented about. (I note you didn’t 
> comment on the 3rd at all , which is the current proposal in the kip)
> 1) 
> a. This forces all clients to have distinct knowledge of platform level 
> implementation detail 
> b. enforces single serialization technology for all apps payloads and 
> platform headers
> i. what if apps need to have different serialization e.g. app team 
> need to use XML for legacy system reasons but we force at a platform to have 
> to use avro because of our headers
> c. If we were to have a common Kafka solution, this would force everyone 
> onto a single serialization solution, I think this is something we don’t want 
> to do?
> d. this doesn’t deal with having large payloads as you’ve mentioned http 
> in second solution, think of MIME multipart.
> e. End2End encryption, if apps need end2end encryption then platform 
> tooling cannot read the header information without decoding the message that 
> then breaks reasons for having e2e encryption.
> 2) 
> a. Container is the solution we currently use (we don’t use MIME but it 
> looks like a not bad choice if you don’t care about size, or you have big 
> enough payloads its small overhead)
> i. I think if we don’t go with adding the headers to the message and 
> offset , having an common agreed container format is the next best offering.
> b. The TiVO specific HTTP MIME type message is indeed a good solution in 
> our view
> i. Deals with separating headers and payload
> ii. Allows multipart messaging

How exactly would this work? Or maybe that's out of scope for this email.

> iii. Allows payload to be encrypted yet headers not
> iv. Platform tooling doesn’t care about payload and can quickly read 
> headers
> v. Well established and known container solution
> c. HTTP MIME type headers (String keys) has a large byte overhead though
> i. See Nacho’s and Radai’s previous points on this
> d. If we agree on say a container format being MIME how does a platform 
> team integrate adding its needed headers without enforcing all teams to have 
> to be aware of it? Or is this actually ok?
> i. Would we make a new consumer and producer Kafka API that is 
> container aware?

I don't think we need to change the existing consumer/producer. I think this is 
simply a new serialization format. If a platform team wanted to use this, they 
would create a serializer/deserializer that would perform this serialization. 
It would be an instance of 
org.apache.kafka.common.serialization.Serializer/Deserializer. They would have 
to get the entire org to move over to this. And they may wrap the 
producer/consumer library to use this serializer, in order to have a 
centralized place to add headers. I see this as similar to what Confluent has 
done with io.confluent.kafka.serializers.KafkaAvroSerializer

I'm pretty sure LinkedIn has wrappers as well as serializers/deserializers that 
implement their existing solution. LinkedIn might even be able to change their 
implementation to do this the container way, and it might be transparent to 
their producers/consumers. Maybe.

> e. How would this work with the likes of Kafka Streams , where as a 
> platform team we want to add some meta data needed to ever message but we 
> don’t want to recode these frameworks.

Same answer as above. I think this is just a serialization format. You would 
use Kafka Streams, but would provide your own serializer/deserializer. Same 
thing applies to Kafka Connect.

-James

> 
> 
> On 10/29/16, 8:09 AM, "James Cheng"  wrote:
> 
>Let me talk about the container format that we are using here at TiVo to 
> add headers to our Kafka messages.
> 
>Just some quick terminology, so that I don't confuse everyone.
>I'm going to use "message body" to refer to the thing returned by 
> ConsumerRecord.value()
>And I'm going to use "payload" to refer to your data after it has been 
> serialized into bytes.
> 
>To recap, during the KIP call, we talked about 3 ways to have headers in 
> Kafka messages:
>1) The message body is your payload, which has headers within it.
>2) The message body is a container, which has headers in it as well your 
> payload.
>3) Extend Kafka to hold headers outside of the message body. The message 
> body holds your payload.
> 
>1) The message body is your payload, which has headers in it
>---
>Here's an example of what this may look like, if it were rendered in JSON:
> 
>{
>"headers" : {
>"Host" : "host.domain.com",
>"Service" : "PaymentProcessor",
>"Timestamp" : "2016-10-28 12:45:56"
>},
>"Field1" : "value",
>"Field2" : "value"
>}
> 
>In 

why cant SslTransportLayer be muted before handshake completion?

2016-11-02 Thread radai
Hi,

as part of testing my code for KIP-72 (broker memory control), i ran into
the following code snippet in SslTransportLayer:

public void removeInterestOps(int ops) {
if (!key.isValid())
throw new CancelledKeyException();
else if (!handshakeComplete)
throw new IllegalStateException("handshake is not completed");

key.interestOps(key.interestOps() & ~ops);
}

why cant an ssl socket be muted before handshake is complete?


Build failed in Jenkins: kafka-trunk-jdk8 #1017

2016-11-02 Thread Apache Jenkins Server
See 

Changes:

[cshapi] MINOR: Fix NPE when Connect offset contains non-primitive type

--
[...truncated 12422 lines...]
  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e).code, 
List[JLong]().asJava))
   ^
:609:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e).code, 
List[JLong]().asJava))
   ^
:270:
 class PartitionData in object ListOffsetRequest is deprecated: see 
corresponding Javadoc for more information.
val partitions = Map(topicPartition -> new 
ListOffsetRequest.PartitionData(earliestOrLatest, 1))
 ^
:271:
 constructor ListOffsetRequest in class ListOffsetRequest is deprecated: see 
corresponding Javadoc for more information.
(new ListOffsetRequest(consumerId, partitions.asJava), 0)
 ^
:281:
 value offsets in class PartitionData is deprecated: see corresponding Javadoc 
for more information.
  partitionData.offsets.get(0)
^
:298:
 method fromReplica in object FetchRequest is deprecated: see corresponding 
Javadoc for more information.
  else JFetchRequest.fromReplica(replicaId, maxWait, minBytes, requestMap)
 ^
:43:
 class OldProducer in package producer is deprecated: This class has been 
deprecated and will be removed in a future release. Please use 
org.apache.kafka.clients.producer.KafkaProducer instead.
new OldProducer(getOldProducerProps(config))
^
:45:
 class NewShinyProducer in package producer is deprecated: This class has been 
deprecated and will be removed in a future release. Please use 
org.apache.kafka.clients.producer.KafkaProducer instead.
new NewShinyProducer(getNewProducerProps(config))
^
24 warnings found
warning: [options] bootstrap class path not set in conjunction with -source 1.7
1 warning
:core:processResources UP-TO-DATE
:core:classes
:core:copyDependantLibs
:core:jar
:examples:compileJavawarning: [options] bootstrap class path not set in 
conjunction with -source 1.7
1 warning

:examples:processResources UP-TO-DATE
:examples:classes
:examples:checkstyleMain
:examples:compileTestJava UP-TO-DATE
:examples:processTestResources UP-TO-DATE
:examples:testClasses UP-TO-DATE
:examples:checkstyleTest UP-TO-DATE
:examples:test UP-TO-DATE
:log4j-appender:compileJavawarning: [options] bootstrap class path not set in 
conjunction with -source 1.7
1 warning

:log4j-appender:processResources UP-TO-DATE
:log4j-appender:classes
:log4j-appender:checkstyleMain
:log4j-appender:compileTestJavawarning: [options] bootstrap class path not set 
in conjunction with -source 1.7
1 warning

:log4j-appender:processTestResources UP-TO-DATE
:log4j-appender:testClasses
:log4j-appender:checkstyleTest
:log4j-appender:test

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testLog4jAppends STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testLog4jAppends PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testKafkaLog4jConfigs 
STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testKafkaLog4jConfigs 
PASSED
:core:compileTestJava UP-TO-DATE
:core:compileTestScala
:194:
 constructor ListOffsetRequest in class ListOffsetRequest is deprecated: see 
corresponding Javadoc for more information.
new requests.ListOffsetRequest(Map(tp -> new 
ListOffsetRequest.PartitionData(0, 100)).asJava)
^
:194:
 class PartitionData in object ListOffsetRequest is deprecated: see 
corresponding Javadoc for more information.
new requests.ListOffsetRequest(Map(tp -> new 
ListOffsetRequest.PartitionData(0, 100)).asJava)
   

[jira] [Commented] (KAFKA-4369) ZkClient is not closed upon streams shutdown

2016-11-02 Thread Andy Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630582#comment-15630582
 ] 

Andy Chambers commented on KAFKA-4369:
--

I found this while developing a test fixture that starts up a KafkaStreams 
instance, runs a test, then stops the stream.

I'd like to have a shot at fixing it unless you have plans to get it done 
within the next couple of weeks. If that is cool, I'll make a start this 
weekend. Thanks to Ryan for pinpointing the exact source of the problem. At 
least I can quite easily re-produce the problem :-)

> ZkClient is not closed upon streams shutdown
> 
>
> Key: KAFKA-4369
> URL: https://issues.apache.org/jira/browse/KAFKA-4369
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ryan P
>Assignee: Guozhang Wang
>
> Kafka Stream's InternalTopicManager creates a new ZkClient but fails to close 
> it as part of it's shutdown. 
> https://github.com/confluentinc/kafka/blob/v3.0.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L93
> This is likely only an issue when performing testing/debugging where the 
> streams application is shutdown but the JVM remains in tact. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Connect key.converter and value.converter properties for Avro encoding

2016-11-02 Thread Gwen Shapira
Both the Confluent Avro Converter and the Confluent Avro Serializer use the
Schema Registry. The reason is, as Tommy Becker mentioned below, to avoid
storing the entire schema in each record (which the JSON serializer in
Apache Kafka does). It has few other benefits schema validation and such.

If you are interested in trying this approach, you will want to use the
Converter, since it was written specifically to integrate with Connect.
If you prefer another approach, without the Schema Registry, you can write
your own Converter - that's why we made them pluggable. Feel free to copy
ours and modify it as fits your Avro approach.

Gwen

On Wed, Nov 2, 2016 at 2:48 AM,  wrote:

> I am using Kafka Connect in source mode i.e. using it to send events to
> Kafka topics.
>
> With the key.converter and value.converter properties set to
> org.apache.kafka.connect.storage.StringConverter I can attach a consumer
> to the topics and see the events in a readable form.  This is helpful and
> reassuring but it is not the desired representation for my downstream
> consumers - these require the events to be Avro encoded.
>
> It seems that to write the events to Kafka Avro encoded, these properties
> need to be set to io.confluent.kafka.serializers.KafkaAvroSerializer.  Is
> this correct?
>
> I am not using the Confluent platform, merely the standard Kafka 10
> download, and have been unable to find out how to get at these from a Maven
> repository jar.  http://docs.confluent.io/3.0.0/app-development.html#java
> suggest that these are available via:
>
>
>  io.confluent
>  kafka-avro-serializer
>  3.0.0
>  
>
> But it doesn't appear to be true.  The class exists in
> https://raw.githubusercontent.com/confluentinc/schema-
> registry/master/avro-converter/src/main/java/io/confluent/connect/avro/
> AvroConverter.java but this seems to use the Schema Registry which is
> something I'd rather avoid.
>
> I'd be grateful for any pointers on the simplest way of getting Avro
> encoded events written to Kafka from a Kafka Connect source connector/task.
>
> Also in the task which creates SourceRecords, I'm choosing
> Schema.BYTES_SCHEMA for the 4th arg in the constructor.  But I'm not clear
> what this achieves - some light shed on that would also be helpful.
>
> Many thanks,
> David
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Build failed in Jenkins: kafka-trunk-jdk7 #1668

2016-11-02 Thread Apache Jenkins Server
See 

Changes:

[cshapi] MINOR: Fix NPE when Connect offset contains non-primitive type

--
[...truncated 14343 lines...]
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testStickiness PASSED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithStandby STARTED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithStandby PASSED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithoutStandby STARTED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithoutStandby PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingMultiplexingTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingMultiplexingTopology PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingStatefulTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingStatefulTopology PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingSimpleTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingSimpleTopology PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingSimpleMultiSourceTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingSimpleMultiSourceTopology PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testTopologyMetadata STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testTopologyMetadata PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingMultiplexByNameTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingMultiplexByNameTopology PASSED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
testSpecificPartition STARTED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
testSpecificPartition PASSED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
shouldThrowStreamsExceptionAfterMaxAttempts STARTED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
shouldThrowStreamsExceptionAfterMaxAttempts PASSED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
shouldRetryWhenTimeoutExceptionOccursOnSend STARTED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
shouldRetryWhenTimeoutExceptionOccursOnSend PASSED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
testStreamPartitioner STARTED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
testStreamPartitioner PASSED

org.apache.kafka.streams.processor.internals.PunctuationQueueTest > 
testPunctuationInterval STARTED

org.apache.kafka.streams.processor.internals.PunctuationQueueTest > 
testPunctuationInterval PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldMapUserEndPointToTopicPartitions STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldMapUserEndPointToTopicPartitions PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldAddUserDefinedEndPointToSubscription STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldAddUserDefinedEndPointToSubscription PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
testAssignWithStandbyReplicas STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 

[GitHub] kafka-site pull request #28: Add Becket to the committers page

2016-11-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka-site/pull/28


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4370) CorruptRecordException when ProducerRecord constructed without key nor partition and send

2016-11-02 Thread Lars Pfannenschmidt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lars Pfannenschmidt updated KAFKA-4370:
---
Priority: Trivial  (was: Minor)

> CorruptRecordException when ProducerRecord constructed without key nor 
> partition and send
> -
>
> Key: KAFKA-4370
> URL: https://issues.apache.org/jira/browse/KAFKA-4370
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Lars Pfannenschmidt
>Priority: Trivial
>
> According to the JavaDoc of ProducerRecord it should be possible to send 
> messages without a key:
> {quote}
> If neither key nor partition is present a partition will be assigned in a 
> round-robin fashion.
> {quote}
> {code:title=SomeProducer.java|borderStyle=solid}
> ProducerRecord record = new ProducerRecord<>(topic, 
> "somemessage");
> return this.producer.send(record).get();
> {code}
> Unfortunately an Exception is thrown:
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CorruptRecordException: This message has 
> failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [ANNOUNCE] New committer: Jiangjie (Becket) Qin

2016-11-02 Thread Eno Thereska
Congrats!
Eno

> On 1 Nov 2016, at 05:57, Harsha Chintalapani  wrote:
> 
> Congrats Becket!
> -Harsha
> 
> On Mon, Oct 31, 2016 at 2:13 PM Rajini Sivaram 
> wrote:
> 
>> Congratulations, Becket!
>> 
>> On Mon, Oct 31, 2016 at 8:38 PM, Matthias J. Sax 
>> wrote:
>> 
>>> -BEGIN PGP SIGNED MESSAGE-
>>> Hash: SHA512
>>> 
>>> Congrats!
>>> 
>>> On 10/31/16 11:01 AM, Renu Tewari wrote:
 Congratulations Becket!! Absolutely thrilled to hear this. Well
 deserved!
 
 regards renu
 
 
 On Mon, Oct 31, 2016 at 10:35 AM, Joel Koshy 
 wrote:
 
> The PMC for Apache Kafka has invited Jiangjie (Becket) Qin to
> join as a committer and we are pleased to announce that he has
> accepted!
> 
> Becket has made significant contributions to Kafka over the last
> two years. He has been deeply involved in a broad range of KIP
> discussions and has contributed several major features to the
> project. He recently completed the implementation of a series of
> improvements (KIP-31, KIP-32, KIP-33) to Kafka’s message format
> that address a number of long-standing issues such as avoiding
> server-side re-compression, better accuracy for time-based log
> retention, log roll and time-based indexing of messages.
> 
> Congratulations Becket! Thank you for your many contributions. We
> are excited to have you on board as a committer and look forward
> to your continued participation!
> 
> Joel
> 
 
>>> -BEGIN PGP SIGNATURE-
>>> Comment: GPGTools - https://gpgtools.org
>>> 
>>> iQIcBAEBCgAGBQJYF6uzAAoJECnhiMLycopPBuwP/1N2MtwWw7ms5gAfT/jvVCGi
>>> mdNvdJprSwJHe3qwsc+glsvAqwS6OZfaVzK2qQcaxMX5KjQtwkkOKyErOl9hG7jD
>>> Vw0aDcCbPuV2oEZ4m9K2J4Q3mZIfFrevicVb7oPGf4Yjt1sh9wxP08o7KHP2l5pN
>>> 3mpIBEDp4rZ2pg/jXldyh57dW1btg3gZi1gNczWvXEAKf1ypXRPwPeDbvXADXDv3
>>> 0NgmcXn242geoggnIbL30WgjH0bwHpVjLBr++YQ33FzRoHzASfAYHR/jSDKAytQe
>>> a7Bkc69Bb1NSzkfhiJa+VW9V2DweO8kD+Xfz4dM02GQF0iJkAqare7a6zWedk/+U
>>> hJRPz+tGlDSLePCYdyNj1ivJrFOmIQtyFOI3SBANfaneOmGJhPKtlNQQlNFKDbWS
>>> CD1pBsc1iHNq6rXy21evc/aFk0Rrfs5d4rU9eG6jD8jc1mCbSwtzJI0vweX0r9Y/
>>> 6Ao8cnsmDejYfap5lUMWeQfZOTkNRNpbkL7eoiVpe6wZw1nGL3T7GkrrWGRS3EQO
>>> qp4Jjp+7yY4gIqsLfYouaHTEzAX7yN78QNUNCB4OqUiEL9+a8wTQ7dlTgXinEd8r
>>> Kh9vTfpW7fb4c58aSpzntPUU4YFD3MHMam0iu5UrV9d5DrVTFDMJ83k15Z5DyTMt
>>> 45nPYdjvJgFGWLYFnPwr
>>> =VbpG
>>> -END PGP SIGNATURE-
>>> 
>> 
>> 
>> 
>> --
>> Regards,
>> 
>> Rajini
>> 



[jira] [Commented] (KAFKA-4370) CorruptRecordException when ProducerRecord constructed without key nor partition and send

2016-11-02 Thread Lars Pfannenschmidt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630350#comment-15630350
 ] 

Lars Pfannenschmidt commented on KAFKA-4370:


Ah... compaction was enabled on that topic for some reason. Than you obviously 
need a key, my bad. A different error message would be great nonetheless.

Thanks!

> CorruptRecordException when ProducerRecord constructed without key nor 
> partition and send
> -
>
> Key: KAFKA-4370
> URL: https://issues.apache.org/jira/browse/KAFKA-4370
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Lars Pfannenschmidt
>Priority: Minor
>
> According to the JavaDoc of ProducerRecord it should be possible to send 
> messages without a key:
> {quote}
> If neither key nor partition is present a partition will be assigned in a 
> round-robin fashion.
> {quote}
> {code:title=SomeProducer.java|borderStyle=solid}
> ProducerRecord record = new ProducerRecord<>(topic, 
> "somemessage");
> return this.producer.send(record).get();
> {code}
> Unfortunately an Exception is thrown:
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CorruptRecordException: This message has 
> failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4370) CorruptRecordException when ProducerRecord constructed without key nor partition and send

2016-11-02 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630310#comment-15630310
 ] 

Ismael Juma commented on KAFKA-4370:


This should work, are there errors in the server log?

> CorruptRecordException when ProducerRecord constructed without key nor 
> partition and send
> -
>
> Key: KAFKA-4370
> URL: https://issues.apache.org/jira/browse/KAFKA-4370
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Lars Pfannenschmidt
>Priority: Minor
>
> According to the JavaDoc of ProducerRecord it should be possible to send 
> messages without a key:
> {quote}
> If neither key nor partition is present a partition will be assigned in a 
> round-robin fashion.
> {quote}
> {code:title=SomeProducer.java|borderStyle=solid}
> ProducerRecord record = new ProducerRecord<>(topic, 
> "somemessage");
> return this.producer.send(record).get();
> {code}
> Unfortunately an Exception is thrown:
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CorruptRecordException: This message has 
> failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-11-02 Thread Mickael Maison
Thanks for all the feedback.

I agree, throttling the requests sent will most likely result in a
loss of throughput -> BAD !
As suggested, selectively reading from the socket should enable to
control the memory usage without impacting performance. I've had look
at that today and I can see how that would work.
I'll update the KIP accordingly tomorrow.

@radai: I've not fully followed the KIP-72 discussions, so what
benefits would the memory pool implementation provide ? (over
selectively reading from the socket)
Also this thread is badly named, the plan is to introduce a config,
buffer.memory, to specify the memory used in bytes (and NOT the number
of in-flight requests).

On Wed, Nov 2, 2016 at 6:19 PM, Jay Kreps  wrote:
> Hey Radai,
>
> I think there are a couple discussions here. The first is about what is the
> interface to the user. The other is about what is exposed in the protocol,
> and implementation details of reading requests. I strongly agree with
> giving the user a simple "use X MB of memory" config and we calculate
> everything else off of that is really ideal. 99.9% of the time that is all
> you would care about. We often can't be perfect in this bound, but as long
> as we're close it is fine. I don't think this necessarily implies using a
> pool as in the producer. There may be an opportunity to reuse memory, which
> may or may not help performance, but last i checked we cached a bunch of
> deserialized records too which can't really be reused easily. All we really
> need to do, I think, is bound the bytes read per user-level poll call and
> stop early when the limit is reached, right?
>
> I'm also a big fan of simplifying config. If you think there are other
> areas we could rationalize, I think it'd be good to explore those too. I
> think the issue we always struggle with is that there are areas where you
> need fine grained control. Our current approach is to try to manage that
> with the importance level marking of the configs.
>
> -Jay
>
>
>
> On Wed, Nov 2, 2016 at 10:36 AM, Gwen Shapira  wrote:
>
>> +1
>>
>> On Wed, Nov 2, 2016 at 10:34 AM, radai  wrote:
>>
>> > In my opinion a lot of kafka configuration options were added using the
>> > "minimal diff" approach, which results in very nuanced and complicated
>> > configs required to indirectly achieve some goal. case in point -
>> timeouts.
>> >
>> > The goal here is to control the memory requirement. the 1st config was
>> max
>> > size of a single request, now the proposal is to control the number of
>> > those in flight - which is inaccurate (you dont know the actual size and
>> > must over-estimate), would have an impact on throughput in case of
>> > over-estimation, and also fails to completely achieve the goal (what
>> about
>> > decompression?)
>> >
>> > I think a memory pool in combination with Jay's proposal to only pick up
>> > from socket conditionally when memory is available is the correct
>> approach
>> > - it deals with the problem directly and would result in a simler and
>> more
>> > understandable configuration (a single property for max memory
>> > consumption).
>> >
>> > in the future the accuracy of the limit can be improved by, for example,
>> > declaring both the compressed _AND UNCOMPRESSED_ sizes up front, so that
>> we
>> > can pick up from socket when we have enough memory to decompress as well
>> -
>> > this would obviously be a wire format change and outside the scope here,
>> > but my point is that it could be done without adding any new configs)
>> >
>> > On Mon, Oct 31, 2016 at 10:25 AM, Joel Koshy 
>> wrote:
>> >
>> > > Agreed with this approach.
>> > > One detail to be wary of is that since we multiplex various other
>> > requests
>> > > (e.g., heartbeats, offset commits, metadata, etc.) over the client that
>> > > connects to the coordinator this could delay some of these critical
>> > > requests. Realistically I don't think it will be an issue except in
>> > extreme
>> > > scenarios where someone sets the memory limit to be unreasonably low.
>> > >
>> > > Thanks,
>> > >
>> > > Joel
>> > >
>> > > On Sun, Oct 30, 2016 at 12:32 PM, Jun Rao  wrote:
>> > >
>> > > > Hi, Mickael,
>> > > >
>> > > > I agree with others that it's better to be able to control the bytes
>> > the
>> > > > consumer can read from sockets, instead of limiting the fetch
>> requests.
>> > > > KIP-72 has a proposal to bound the memory size at the socket selector
>> > > > level. Perhaps that can be leveraged in this KIP too.
>> > > >
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > > On Thu, Oct 27, 2016 at 3:23 PM, Jay Kreps  wrote:
>> > > >
>> > > > > This is a good observation on limiting total memory usage. If I
>> > > > understand
>> > > > > the proposal 

[jira] [Updated] (KAFKA-4370) CorruptRecordException when ProducerRecord constructed without key nor partition and send

2016-11-02 Thread Lars Pfannenschmidt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lars Pfannenschmidt updated KAFKA-4370:
---
Priority: Minor  (was: Major)

> CorruptRecordException when ProducerRecord constructed without key nor 
> partition and send
> -
>
> Key: KAFKA-4370
> URL: https://issues.apache.org/jira/browse/KAFKA-4370
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Lars Pfannenschmidt
>Priority: Minor
>
> According to the JavaDoc of ProducerRecord it should be possible to send 
> messages without a key:
> {quote}
> If neither key nor partition is present a partition will be assigned in a 
> round-robin fashion.
> {quote}
> {code:title=SomeProducer.java|borderStyle=solid}
> ProducerRecord record = new ProducerRecord<>(topic, 
> "somemessage");
> return this.producer.send(record).get();
> {code}
> Unfortunately an Exception is thrown:
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CorruptRecordException: This message has 
> failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4370) CorruptRecordException when ProducerRecord constructed without key nor partition and send

2016-11-02 Thread Lars Pfannenschmidt (JIRA)
Lars Pfannenschmidt created KAFKA-4370:
--

 Summary: CorruptRecordException when ProducerRecord constructed 
without key nor partition and send
 Key: KAFKA-4370
 URL: https://issues.apache.org/jira/browse/KAFKA-4370
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.1.0
Reporter: Lars Pfannenschmidt


According to the JavaDoc of ProducerRecord it should be possible to send 
messages without a key:
{quote}
If neither key nor partition is present a partition will be assigned in a 
round-robin fashion.
{quote}

{code:title=SomeProducer.java|borderStyle=solid}
ProducerRecord record = new ProducerRecord<>(topic, 
"somemessage");
return this.producer.send(record).get();
{code}

Unfortunately an Exception is thrown:
{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.CorruptRecordException: This message has failed 
its CRC checksum, exceeds the valid size, or is otherwise corrupt.

at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-11-02 Thread Jay Kreps
Hey Radai,

I think there are a couple discussions here. The first is about what is the
interface to the user. The other is about what is exposed in the protocol,
and implementation details of reading requests. I strongly agree with
giving the user a simple "use X MB of memory" config and we calculate
everything else off of that is really ideal. 99.9% of the time that is all
you would care about. We often can't be perfect in this bound, but as long
as we're close it is fine. I don't think this necessarily implies using a
pool as in the producer. There may be an opportunity to reuse memory, which
may or may not help performance, but last i checked we cached a bunch of
deserialized records too which can't really be reused easily. All we really
need to do, I think, is bound the bytes read per user-level poll call and
stop early when the limit is reached, right?

I'm also a big fan of simplifying config. If you think there are other
areas we could rationalize, I think it'd be good to explore those too. I
think the issue we always struggle with is that there are areas where you
need fine grained control. Our current approach is to try to manage that
with the importance level marking of the configs.

-Jay



On Wed, Nov 2, 2016 at 10:36 AM, Gwen Shapira  wrote:

> +1
>
> On Wed, Nov 2, 2016 at 10:34 AM, radai  wrote:
>
> > In my opinion a lot of kafka configuration options were added using the
> > "minimal diff" approach, which results in very nuanced and complicated
> > configs required to indirectly achieve some goal. case in point -
> timeouts.
> >
> > The goal here is to control the memory requirement. the 1st config was
> max
> > size of a single request, now the proposal is to control the number of
> > those in flight - which is inaccurate (you dont know the actual size and
> > must over-estimate), would have an impact on throughput in case of
> > over-estimation, and also fails to completely achieve the goal (what
> about
> > decompression?)
> >
> > I think a memory pool in combination with Jay's proposal to only pick up
> > from socket conditionally when memory is available is the correct
> approach
> > - it deals with the problem directly and would result in a simler and
> more
> > understandable configuration (a single property for max memory
> > consumption).
> >
> > in the future the accuracy of the limit can be improved by, for example,
> > declaring both the compressed _AND UNCOMPRESSED_ sizes up front, so that
> we
> > can pick up from socket when we have enough memory to decompress as well
> -
> > this would obviously be a wire format change and outside the scope here,
> > but my point is that it could be done without adding any new configs)
> >
> > On Mon, Oct 31, 2016 at 10:25 AM, Joel Koshy 
> wrote:
> >
> > > Agreed with this approach.
> > > One detail to be wary of is that since we multiplex various other
> > requests
> > > (e.g., heartbeats, offset commits, metadata, etc.) over the client that
> > > connects to the coordinator this could delay some of these critical
> > > requests. Realistically I don't think it will be an issue except in
> > extreme
> > > scenarios where someone sets the memory limit to be unreasonably low.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Sun, Oct 30, 2016 at 12:32 PM, Jun Rao  wrote:
> > >
> > > > Hi, Mickael,
> > > >
> > > > I agree with others that it's better to be able to control the bytes
> > the
> > > > consumer can read from sockets, instead of limiting the fetch
> requests.
> > > > KIP-72 has a proposal to bound the memory size at the socket selector
> > > > level. Perhaps that can be leveraged in this KIP too.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Oct 27, 2016 at 3:23 PM, Jay Kreps  wrote:
> > > >
> > > > > This is a good observation on limiting total memory usage. If I
> > > > understand
> > > > > the proposal I think it is that the consumer client would stop
> > sending
> > > > > fetch requests once a certain number of in-flight fetch requests is
> > > met.
> > > > I
> > > > > think a better approach would be to always issue one fetch request
> to
> > > > each
> > > > > broker immediately, allow the server to process that request, and
> > send
> > > > data
> > > > > back to the local machine where it would be stored in the socket
> > buffer
> > > > (up
> > > > > to that buffer size). Instead of throttling the requests sent, the
> > > > consumer
> > > > > should ideally throttle the responses read from the socket buffer
> at
> > > any
> > > > > given time. That is, in a single poll call, rather than reading
> from
> > > > every
> > > > > single socket it should just read until it has a given amount of
> > memory
> > > > > used then bail out early. It can come back 

Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-11-02 Thread Gwen Shapira
+1

On Wed, Nov 2, 2016 at 10:34 AM, radai  wrote:

> In my opinion a lot of kafka configuration options were added using the
> "minimal diff" approach, which results in very nuanced and complicated
> configs required to indirectly achieve some goal. case in point - timeouts.
>
> The goal here is to control the memory requirement. the 1st config was max
> size of a single request, now the proposal is to control the number of
> those in flight - which is inaccurate (you dont know the actual size and
> must over-estimate), would have an impact on throughput in case of
> over-estimation, and also fails to completely achieve the goal (what about
> decompression?)
>
> I think a memory pool in combination with Jay's proposal to only pick up
> from socket conditionally when memory is available is the correct approach
> - it deals with the problem directly and would result in a simler and more
> understandable configuration (a single property for max memory
> consumption).
>
> in the future the accuracy of the limit can be improved by, for example,
> declaring both the compressed _AND UNCOMPRESSED_ sizes up front, so that we
> can pick up from socket when we have enough memory to decompress as well -
> this would obviously be a wire format change and outside the scope here,
> but my point is that it could be done without adding any new configs)
>
> On Mon, Oct 31, 2016 at 10:25 AM, Joel Koshy  wrote:
>
> > Agreed with this approach.
> > One detail to be wary of is that since we multiplex various other
> requests
> > (e.g., heartbeats, offset commits, metadata, etc.) over the client that
> > connects to the coordinator this could delay some of these critical
> > requests. Realistically I don't think it will be an issue except in
> extreme
> > scenarios where someone sets the memory limit to be unreasonably low.
> >
> > Thanks,
> >
> > Joel
> >
> > On Sun, Oct 30, 2016 at 12:32 PM, Jun Rao  wrote:
> >
> > > Hi, Mickael,
> > >
> > > I agree with others that it's better to be able to control the bytes
> the
> > > consumer can read from sockets, instead of limiting the fetch requests.
> > > KIP-72 has a proposal to bound the memory size at the socket selector
> > > level. Perhaps that can be leveraged in this KIP too.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Oct 27, 2016 at 3:23 PM, Jay Kreps  wrote:
> > >
> > > > This is a good observation on limiting total memory usage. If I
> > > understand
> > > > the proposal I think it is that the consumer client would stop
> sending
> > > > fetch requests once a certain number of in-flight fetch requests is
> > met.
> > > I
> > > > think a better approach would be to always issue one fetch request to
> > > each
> > > > broker immediately, allow the server to process that request, and
> send
> > > data
> > > > back to the local machine where it would be stored in the socket
> buffer
> > > (up
> > > > to that buffer size). Instead of throttling the requests sent, the
> > > consumer
> > > > should ideally throttle the responses read from the socket buffer at
> > any
> > > > given time. That is, in a single poll call, rather than reading from
> > > every
> > > > single socket it should just read until it has a given amount of
> memory
> > > > used then bail out early. It can come back and read more from the
> other
> > > > sockets after those messages are processed.
> > > >
> > > > The advantage of this approach is that you don't incur the additional
> > > > latency.
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Oct 10, 2016 at 6:41 AM, Mickael Maison <
> > > mickael.mai...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to discuss the following KIP proposal:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 81%3A+Max+in-flight+fetches
> > > > >
> > > > >
> > > > > Feedback and comments are welcome.
> > > > > Thanks !
> > > > >
> > > > > Mickael
> > > > >
> > > >
> > >
> >
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-11-02 Thread radai
In my opinion a lot of kafka configuration options were added using the
"minimal diff" approach, which results in very nuanced and complicated
configs required to indirectly achieve some goal. case in point - timeouts.

The goal here is to control the memory requirement. the 1st config was max
size of a single request, now the proposal is to control the number of
those in flight - which is inaccurate (you dont know the actual size and
must over-estimate), would have an impact on throughput in case of
over-estimation, and also fails to completely achieve the goal (what about
decompression?)

I think a memory pool in combination with Jay's proposal to only pick up
from socket conditionally when memory is available is the correct approach
- it deals with the problem directly and would result in a simler and more
understandable configuration (a single property for max memory consumption).

in the future the accuracy of the limit can be improved by, for example,
declaring both the compressed _AND UNCOMPRESSED_ sizes up front, so that we
can pick up from socket when we have enough memory to decompress as well -
this would obviously be a wire format change and outside the scope here,
but my point is that it could be done without adding any new configs)

On Mon, Oct 31, 2016 at 10:25 AM, Joel Koshy  wrote:

> Agreed with this approach.
> One detail to be wary of is that since we multiplex various other requests
> (e.g., heartbeats, offset commits, metadata, etc.) over the client that
> connects to the coordinator this could delay some of these critical
> requests. Realistically I don't think it will be an issue except in extreme
> scenarios where someone sets the memory limit to be unreasonably low.
>
> Thanks,
>
> Joel
>
> On Sun, Oct 30, 2016 at 12:32 PM, Jun Rao  wrote:
>
> > Hi, Mickael,
> >
> > I agree with others that it's better to be able to control the bytes the
> > consumer can read from sockets, instead of limiting the fetch requests.
> > KIP-72 has a proposal to bound the memory size at the socket selector
> > level. Perhaps that can be leveraged in this KIP too.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 27, 2016 at 3:23 PM, Jay Kreps  wrote:
> >
> > > This is a good observation on limiting total memory usage. If I
> > understand
> > > the proposal I think it is that the consumer client would stop sending
> > > fetch requests once a certain number of in-flight fetch requests is
> met.
> > I
> > > think a better approach would be to always issue one fetch request to
> > each
> > > broker immediately, allow the server to process that request, and send
> > data
> > > back to the local machine where it would be stored in the socket buffer
> > (up
> > > to that buffer size). Instead of throttling the requests sent, the
> > consumer
> > > should ideally throttle the responses read from the socket buffer at
> any
> > > given time. That is, in a single poll call, rather than reading from
> > every
> > > single socket it should just read until it has a given amount of memory
> > > used then bail out early. It can come back and read more from the other
> > > sockets after those messages are processed.
> > >
> > > The advantage of this approach is that you don't incur the additional
> > > latency.
> > >
> > > -Jay
> > >
> > > On Mon, Oct 10, 2016 at 6:41 AM, Mickael Maison <
> > mickael.mai...@gmail.com>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to discuss the following KIP proposal:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 81%3A+Max+in-flight+fetches
> > > >
> > > >
> > > > Feedback and comments are welcome.
> > > > Thanks !
> > > >
> > > > Mickael
> > > >
> > >
> >
>


Re: Kafka Connect key.converter and value.converter properties for Avro encoding

2016-11-02 Thread Tommy Becker

Although I can't speak to details of the Confluent packaging, anytime you're using 
Avro you need the schemas for the records you're working with. In an Avro data 
file the schema is included in the file itself. But when you're encoding 
individual records like in Kafka, most people instead encode some sort of 
identifier/version number/fingerprint in each message that uniquely identifies the 
schema in some sort of external system (i.e. a schema registry). So I'm not sure 
how you would use Avro in Kafka without some sort of schema registry, unless 
you're planning on either using a static topic -> schema mapping or encoding 
the schema in every message.

On 11/02/2016 05:48 AM, david.frank...@bt.com 
wrote:

I am using Kafka Connect in source mode i.e. using it to send events to Kafka 
topics.

With the key.converter and value.converter properties set to 
org.apache.kafka.connect.storage.StringConverter I can attach a consumer to the 
topics and see the events in a readable form.  This is helpful and reassuring 
but it is not the desired representation for my downstream consumers - these 
require the events to be Avro encoded.

It seems that to write the events to Kafka Avro encoded, these properties need 
to be set to io.confluent.kafka.serializers.KafkaAvroSerializer.  Is this 
correct?

I am not using the Confluent platform, merely the standard Kafka 10 download, 
and have been unable to find out how to get at these from a Maven repository 
jar.  http://docs.confluent.io/3.0.0/app-development.html#java suggest that 
these are available via:

  
io.confluent
kafka-avro-serializer
3.0.0


But it doesn't appear to be true.  The class exists in 
https://raw.githubusercontent.com/confluentinc/schema-registry/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java
 but this seems to use the Schema Registry which is something I'd rather avoid.

I'd be grateful for any pointers on the simplest way of getting Avro encoded 
events written to Kafka from a Kafka Connect source connector/task.

Also in the task which creates SourceRecords, I'm choosing Schema.BYTES_SCHEMA 
for the 4th arg in the constructor.  But I'm not clear what this achieves - 
some light shed on that would also be helpful.

Many thanks,
David



--
[cid:part1.567F4BCD.26FDFD10@tivo.com] Tommy Becker
Senior Software Engineer
O +1 919.460.4747
tivo.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


[GitHub] kafka pull request #2087: MINOR: Fix NPE when Connect offset contains non-pr...

2016-11-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2087


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site issue #28: Add Becket to the committers page

2016-11-02 Thread becketqin
Github user becketqin commented on the issue:

https://github.com/apache/kafka-site/pull/28
  
I'll try to merge this myself :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2093: MINOR: Add description of how consumer wakeup acts...

2016-11-02 Thread srdo
GitHub user srdo opened a pull request:

https://github.com/apache/kafka/pull/2093

MINOR: Add description of how consumer wakeup acts if no threads are 
awakened

I think the Javadoc should describe what happens if wakeup is called and no 
other thread is currently blocking. This may be important in some cases, e.g. 
trying to shut down a poll thread, followed by manually committing offsets.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/kafka minor-expand-wakeup-javadoc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2093.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2093


commit 5179eec42fcd209fc4ae0772e7307f36419097ff
Author: Stig Rohde Døssing 
Date:   2016-11-02T17:19:21Z

Add description of how consumer wakeup acts if no threads are awakened




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site pull request #28: Add Becket to the committers page

2016-11-02 Thread becketqin
GitHub user becketqin opened a pull request:

https://github.com/apache/kafka-site/pull/28

Add Becket to the committers page



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/becketqin/kafka-site asf-site

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/28.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #28


commit 8b967a7c8da8184e3d186c4fe607b2fb3e7e109a
Author: Jiangjie Qin 
Date:   2016-11-02T16:59:15Z

Add Becket to the committers page




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4369) ZkClient is not closed upon streams shutdown

2016-11-02 Thread Ryan P (JIRA)
Ryan P created KAFKA-4369:
-

 Summary: ZkClient is not closed upon streams shutdown
 Key: KAFKA-4369
 URL: https://issues.apache.org/jira/browse/KAFKA-4369
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Ryan P
Assignee: Guozhang Wang


Kafka Stream's InternalTopicManager creates a new ZkClient but fails to close 
it as part of it's shutdown. 

https://github.com/confluentinc/kafka/blob/v3.0.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L93

This is likely only an issue when performing testing/debugging where the 
streams application is shutdown but the JVM remains in tact. 





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: 0.10.1.0 - KafkaConsumer.poll() blocks background heartbeat thread causing consumer to be considered dead?

2016-11-02 Thread Jaikiran Pai
Thanks Ismael. Just checked, that one doesn't look like it's the same 
issue, but could be a similar one. In that JIRA it looks like the issue 
was probably addressed for the commitSync call. However, in this 
specific instance the KafkaConsumer.poll(...) itself leads to locking 
the object monitor of on the ConsumerNetworkClient. The heart beat 
thread in the background seems to be waiting to get hold of that object 
monitor and blocks on it.


If I keep aside the implementation details, what is the expected 
semantics with heart beat background thread - would it fail to send a 
heartbeat for a consumer if the consumer is currently busy with poll(), 
commitSync() or any similar call? If so, would this lack of heartbeat 
being sent (for a while) cause that member to be considered dead by the 
co-ordinator. My reading of the logs and the limited knowledge of Kafka 
code seems to indicate that this is what's happening, either as per 
expected semantics or a possible bug.


-Jaikiran

On Wednesday 02 November 2016 08:39 PM, Ismael Juma wrote:

Maybe https://issues.apache.org/jira/browse/KAFKA-4303?

On 2 Nov 2016 10:15 am, "Jaikiran Pai"  wrote:


We have been trying to narrow down an issue in 0.10.1 of Kafka in our
setups where our consumers are marked as dead very frequently causing
rebalances almost every few seconds. The consumer (Java new API) then
starts seeing exceptions like:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms,
which typically implies that the poll loop is spending too much time
message processing. You can address this either by increasing the session
timeout or by reducing the maximum size of batches returned in poll() with
max.poll.records.
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.RequestFuture.
fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.RequestFuture.
complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
~[kafka-clients-0.10.1.0.jar!/:na]
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:256) ~[kafka-clients-0.10.1.0.jar!/
:na]
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:180) ~[kafka-clients-0.10.1.0.jar!/
:na]
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor.commitOffsetsSync(ConsumerCoordinator.java:499)
~[kafka-clients-0.10.1.0.jar!/:na]


Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1
(i.e. we don't set any specific values). Every few seconds, we see messages
on the broker logs which indicate these consumers are considered dead:

[2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member
consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has
failed (kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to
restabilize group foo-bar with old generation 1034
(kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with
generation 1035 is now empty (kafka.coordinator.GroupCoordinator)


These messages keep repeating for almost every other consumer we have (in
different groups).

There's no real logic in our consumers and they just pick up the message
from partitions, commit the offset, and hand it immediately to a different
thread to process the message and go back to polling:

while (!stopped) {
 try {
 final ConsumerRecords consumerRecords =

Re: 0.10.1.0 - KafkaConsumer.poll() blocks background heartbeat thread causing consumer to be considered dead?

2016-11-02 Thread Ismael Juma
Maybe https://issues.apache.org/jira/browse/KAFKA-4303?

On 2 Nov 2016 10:15 am, "Jaikiran Pai"  wrote:

> We have been trying to narrow down an issue in 0.10.1 of Kafka in our
> setups where our consumers are marked as dead very frequently causing
> rebalances almost every few seconds. The consumer (Java new API) then
> starts seeing exceptions like:
>
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
> ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
> ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
> ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
> ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
> onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.RequestFuture.
> fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.RequestFuture.
> complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
> ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
> ~[kafka-clients-0.10.1.0.jar!/:na]
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:256) ~[kafka-clients-0.10.1.0.jar!/
> :na]
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:180) ~[kafka-clients-0.10.1.0.jar!/
> :na]
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.commitOffsetsSync(ConsumerCoordinator.java:499)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>
>
> Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1
> (i.e. we don't set any specific values). Every few seconds, we see messages
> on the broker logs which indicate these consumers are considered dead:
>
> [2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member
> consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has
> failed (kafka.coordinator.GroupCoordinator)
> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to
> restabilize group foo-bar with old generation 1034
> (kafka.coordinator.GroupCoordinator)
> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with
> generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
> 
>
> These messages keep repeating for almost every other consumer we have (in
> different groups).
>
> There's no real logic in our consumers and they just pick up the message
> from partitions, commit the offset, and hand it immediately to a different
> thread to process the message and go back to polling:
>
>while (!stopped) {
> try {
> final ConsumerRecords consumerRecords =
> consumer.poll(someValue);
> for (final TopicPartition topicPartition :
> consumerRecords.partitions()) {
> if (stopped) {
> break;
> }
> for (final ConsumerRecord consumerRecord :
> consumerRecords.records(topicPartition)) {
> final long previousOffset =
> consumerRecord.offset();
> // commit the offset and then pass on the
> message for processing (in a separate thread)
> consumer.commitSync(Collections.singletonMap(topicPartition, new
> OffsetAndMetadata(previousOffset + 1)));
>
> this.executor.execute(new Runnable() {
> @Override
> public void run() {
> // process the ConsumerRecord
> }
>   

0.10.1.0 - KafkaConsumer.poll() blocks background heartbeat thread causing consumer to be considered dead?

2016-11-02 Thread Jaikiran Pai
We have been trying to narrow down an issue in 0.10.1 of Kafka in our 
setups where our consumers are marked as dead very frequently causing 
rebalances almost every few seconds. The consumer (Java new API) then 
starts seeing exceptions like:


org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot 
be completed since the group has already rebalanced and assigned the 
partitions to another member. This means that the time between 
subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either 
by increasing the session timeout or by reducing the maximum size of 
batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) 
~[kafka-clients-0.10.1.0.jar!/:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:499) 
~[kafka-clients-0.10.1.0.jar!/:na]



Our session and heartbeat timeouts are defaults that ship in Kafka 
0.10.1 (i.e. we don't set any specific values). Every few seconds, we 
see messages on the broker logs which indicate these consumers are 
considered dead:


[2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member 
consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has 
failed (kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to 
restabilize group foo-bar with old generation 1034 
(kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with 
generation 1035 is now empty (kafka.coordinator.GroupCoordinator)



These messages keep repeating for almost every other consumer we have 
(in different groups).


There's no real logic in our consumers and they just pick up the message 
from partitions, commit the offset, and hand it immediately to a 
different thread to process the message and go back to polling:


   while (!stopped) {
try {
final ConsumerRecords consumerRecords = 
consumer.poll(someValue);
for (final TopicPartition topicPartition : 
consumerRecords.partitions()) {

if (stopped) {
break;
}
for (final ConsumerRecord consumerRecord 
: consumerRecords.records(topicPartition)) {
final long previousOffset = 
consumerRecord.offset();
// commit the offset and then pass on the 
message for processing (in a separate thread)
consumer.commitSync(Collections.singletonMap(topicPartition, new 
OffsetAndMetadata(previousOffset + 1)));


this.executor.execute(new Runnable() {
@Override
public void run() {
// process the ConsumerRecord
}
});
}
}
} catch (Exception e) {
// log the error and continue
continue;
}
}



We haven't been able to figure out why the 

[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-02 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628871#comment-15628871
 ] 

Andrew Olson commented on KAFKA-4362:
-

Is there documentation that advises for a COORDINATOR_NOT_AVAILABLE error when 
committing offsets the KafkaConsumer should be closed, and a new instance 
created? Or is the expectation that any CommitFailedException should be handled 
in that way? We have seen some cases where a call to poll() before retrying a 
failed commit due to ILLEGAL_GENERATION or UNKNOWN_MEMBER_ID allows the 
consumer to recover.

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4365) In case async producer closes the TCP connection to Kafka broker, last sent messages might be lost.

2016-11-02 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628696#comment-15628696
 ] 

Rajini Sivaram commented on KAFKA-4365:
---

[PR #1836|https://github.com/apache/kafka/pull/1836] for KAFKA-3703 addresses 
this issue.

> In case async producer closes the TCP connection to Kafka broker, last sent 
> messages might be lost.
> ---
>
> Key: KAFKA-4365
> URL: https://issues.apache.org/jira/browse/KAFKA-4365
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Ciprian Pascu
>
> I am using kafka-python producer (https://github.com/dpkp/kafka-python). The 
> producer is set as async (acks=0) and sends a burst of, for example, 1000 
> messages. As consumer I use either Logstash or the Kafka console consumer. 
> Quite often it can be seen that the consumer gets less than 1000 messages. 
> Also, by checking the messages written by the brokers on the disk, it can be 
> seen that not all messages are written. Still, by using tcpdump and 
> Wireshark, I can see that all messages have reached the brokers. Also, by 
> adding some test logs in Kafka code, I could see that the messages are added 
> to the staged receives, but not to completed receives 
> (org.apache.kafka.common.network.Selector class). And I believe that happens 
> because of the 'isMute' method in the classes implementing 
> org.apache.kafka.common.network.TransportLayer: they all(both) seem to check 
> also that the 'key' is valid, which doesn't hold true anymore if the TCP 
> connection has been closed; despite that, Kafka has already those messages as 
> staged receives, so it could add them to the log; besides, since acks=0, no 
> responses are needed to be sent. 
> This issue is not visible if acks=1 (synchronous producer) or the producer 
> keeps the TCP connections to brokers all the time up or enough time for Kafka 
> to actually write the logs to disk.
> Proposed solution: remove the 'key.isValid()' check from 'isMute' method in 
> SslTransportLayer and PlaintextTransportLayer classes 
> (org.apache.kafka.common.network package.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3703) Handle close gracefully for consumers and producers with acks=0

2016-11-02 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3703:
--
Summary: Handle close gracefully for consumers and producers with acks=0  
(was: Selector.close() doesn't complete outgoing writes)

> Handle close gracefully for consumers and producers with acks=0
> ---
>
> Key: KAFKA-3703
> URL: https://issues.apache.org/jira/browse/KAFKA-3703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> Outgoing writes may be discarded when a connection is closed. For instance, 
> when running a producer with acks=0, a producer that writes data and closes 
> the producer would expect to see all writes to complete if there are no 
> errors. But close() simply closes the channel and socket which could result 
> in outgoing data being discarded.
> This is also an issue in consumers which use commitAsync to commit offsets. 
> Closing the consumer may result in commits being discarded because writes 
> have not completed before close().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4368) Unclean shutdown breaks Kafka cluster

2016-11-02 Thread Anukool Rattana (JIRA)
Anukool Rattana created KAFKA-4368:
--

 Summary: Unclean shutdown breaks Kafka cluster
 Key: KAFKA-4368
 URL: https://issues.apache.org/jira/browse/KAFKA-4368
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.10.0.0, 0.9.0.1
Reporter: Anukool Rattana
Priority: Critical


My team has observed that if broker process die unclean then it will block 
producer from sending messages to kafka topic.

Here is how to reproduce the problem:
1) Create a Kafka 0.10 with three brokers (A, B and C). 
2) Create topic with replication_factor = 2 
3) Set producer to send messages with "acks=all" meaning all replicas must be 
created before able to proceed next message. 
4) Force IEM (IBM Endpoint Manager) to send patch to broker A and force server 
to reboot after patches installed.
Note: min.insync.replicas = 1


Result: - Producers are not able send messages to kafka topic after broker 
rebooted and come back to join cluster with following error messages. 

[2016-09-28 09:32:41,823] WARN Error while fetching metadata with correlation 
id 0 : {logstash=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

We suspected that number of replication_factor (2) is not sufficient to our 
kafka environment but really need an explanation on what happen when broker 
facing unclean shutdown. 
The same issue occurred when setting cluster with 2 brokers and 
replication_factor = 1.

The workaround i used to recover service is to cleanup both kafka topic log 
file and zookeeper data (rmr /brokers/topics/XXX and rmr /consumers/XXX).

Note:
Topic list after A comeback from rebooted.
Topic:logstash  PartitionCount:3ReplicationFactor:2 Configs:
Topic: logstash Partition: 0Leader: 1   Replicas: 1,3   Isr: 1,3
Topic: logstash Partition: 1Leader: 2   Replicas: 2,1   Isr: 2,1
Topic: logstash Partition: 2Leader: 3   Replicas: 3,2   Isr: 2,3




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4367) MirrorMaker shuts down gracefully without being stopped

2016-11-02 Thread Alex (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex updated KAFKA-4367:

Description: 
Start:
bin/kafka-mirror-maker.sh --new.consumer --consumer.config 
config/ssl_mirroring_consumer.properties --producer.config 
config/ssl_mirroring_producer.properties --whitelist 
"TOPIC1|TOPIC2|TOPIC3|TOPIC4" --num.streams 20 &> /dev/null &

MirrorMaker stops working without being stopped, 30 minutes after start. No 
clue why this problem occurs.


  kafka-mirror-maker.log

[2016-11-01 19:23:32,003] TRACE Produced messages to topic-partition 
CEP.FS.IN-175 with base offset offset 15015 and error: null. 
(org.apache.kafka.clients.producer.internals.RecordBatch)
[2016-11-01 19:23:32,003] TRACE Produced messages to topic-partition 
CEP.FS.IN-151 with base offset offset 15066 and error: null. 
(org.apache.kafka.clients.producer.internals.RecordBatch)
[2016-11-01 19:23:32,003] TRACE Nodes with data ready to send: [Node(8, 
10.126.0.2, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-01 19:23:32,003] TRACE Created 1 produce requests: 
[ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.producer.internals.Sender$1@483c4c7a, 
request=RequestSend(header={api_key=0,api_version=1,correlation_id=219685,client_id=producer-1},
 
body={acks=-1,timeout=3,topic_data=[{topic=CEP.FS.IN,data=[{partition=133,record_set=java.nio.HeapByteBuffer[pos=0
 lim=9085 cap=16384]}]}]}), createdTimeMs=1478017412003, sendTimeMs=0)] 
(org.apache.kafka.clients.producer.internals.Sender)
[2016-11-01 19:23:32,008] TRACE Returning fetched records for assigned 
partition CEP.FS.IN-172 and update consumed position to 3869316 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-11-01 19:23:32,008] TRACE [mirrormaker-thread-7] Sending message with 
value size 485 and offset 3869315 (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,008] TRACE Sending record ProducerRecord(topic=CEP.FS.IN, 
partition=null, key=null, value=[B@12a54f5a with callback 
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@5ea65b8f to topic CEP.FS.IN 
partition 160 (org.apache.kafka.clients.producer.KafkaProducer)
[2016-11-01 19:23:32,008] TRACE Allocating a new 16384 byte message buffer for 
topic CEP.FS.IN partition 160 
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2016-11-01 19:23:32,008] TRACE Waking up the sender since topic CEP.FS.IN 
partition 160 is either full or getting a new batch 
(org.apache.kafka.clients.producer.KafkaProducer)
[2016-11-01 19:23:32,010] TRACE Received produce response from node 7 with 
correlation id 219684 (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-01 19:23:32,010] TRACE Produced messages to topic-partition 
CEP.FS.IN-106 with base offset offset 15086 and error: null. 
(org.apache.kafka.clients.producer.internals.RecordBatch)
[2016-11-01 19:23:32,010] TRACE Produced messages to topic-partition 
CEP.FS.IN-124 with base offset offset 15095 and error: null. 
(org.apache.kafka.clients.producer.internals.RecordBatch)
[2016-11-01 19:23:32,010] TRACE Nodes with data ready to send: [Node(7, 
10.126.0.1, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-01 19:23:32,010] INFO Start clean shutdown. (kafka.tools.MirrorMaker$)
[2016-11-01 19:23:32,010] TRACE Created 1 produce requests: 
[ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.producer.internals.Sender$1@44b788c7, 
request=RequestSend(header={api_key=0,api_version=1,correlation_id=219686,client_id=producer-1},
 
body={acks=-1,timeout=3,topic_data=[{topic=CEP.FS.IN,data=[{partition=160,record_set=java.nio.HeapByteBuffer[pos=0
 lim=511 cap=16384]}]}]}), createdTimeMs=1478017412010, sendTimeMs=0)] 
(org.apache.kafka.clients.producer.internals.Sender)
[2016-11-01 19:23:32,010] INFO Shutting down consumer threads. 
(kafka.tools.MirrorMaker$)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-0] mirrormaker-thread-0 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-1] mirrormaker-thread-1 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-2] mirrormaker-thread-2 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-3] mirrormaker-thread-3 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-4] mirrormaker-thread-4 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-5] mirrormaker-thread-5 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-6] 

[jira] [Updated] (KAFKA-4367) MirrorMaker shuts down gracefully without being stopped

2016-11-02 Thread Alex (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex updated KAFKA-4367:

Description: 
Start:
bin/kafka-mirror-maker.sh --new.consumer --consumer.config 
config/ssl_mirroring_consumer.properties --producer.config 
config/ssl_mirroring_producer.properties --whitelist 
"TOPIC1|TOPIC2|TOPIC3|TOPIC4" --num.streams 20 &> /dev/null &

MirrorMaker stops working without being stopped. No clue why this problem 
occurs.


  kafka-mirror-maker.log

[2016-11-01 19:23:32,010] INFO Shutting down consumer threads. 
(kafka.tools.MirrorMaker$)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-0] mirrormaker-thread-0 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-1] mirrormaker-thread-1 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-2] mirrormaker-thread-2 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-3] mirrormaker-thread-3 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-4] mirrormaker-thread-4 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-5] mirrormaker-thread-5 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-6] mirrormaker-thread-6 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-7] mirrormaker-thread-7 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-8] mirrormaker-thread-8 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-9] mirrormaker-thread-9 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-10] mirrormaker-thread-10 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-11] mirrormaker-thread-11 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-12] mirrormaker-thread-12 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-13] mirrormaker-thread-13 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-14] mirrormaker-thread-14 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-15] mirrormaker-thread-15 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-16] mirrormaker-thread-16 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-17] mirrormaker-thread-17 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-18] mirrormaker-thread-18 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-9] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-11] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-12] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-6] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE [mirrormaker-thread-1] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] INFO [mirrormaker-thread-19] mirrormaker-thread-19 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-13] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] INFO [mirrormaker-thread-13] Flushing producer. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE Flushing accumulated records in producer. 
(org.apache.kafka.clients.producer.KafkaProducer)
[2016-11-01 19:23:32,012] TRACE [mirrormaker-thread-4] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE [mirrormaker-thread-19] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 

[jira] [Updated] (KAFKA-4367) MirrorMaker shuts down gracefully without being stopped

2016-11-02 Thread Alex (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex updated KAFKA-4367:

Description: 
Start:
bin/kafka-mirror-maker.sh --new.consumer --consumer.config 
config/ssl_mirroring_consumer.properties --producer.config 
config/ssl_mirroring_producer.properties --whitelist 
"TOPIC1|TOPIC2|TOPIC3|TOPIC4" --num.streams 20 &> /dev/null &

MirrorMaker stops working without being stopped, 30 minutes after start. No 
clue why this problem occurs.


  kafka-mirror-maker.log

[2016-11-01 19:23:32,010] INFO Shutting down consumer threads. 
(kafka.tools.MirrorMaker$)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-0] mirrormaker-thread-0 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-1] mirrormaker-thread-1 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-2] mirrormaker-thread-2 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-3] mirrormaker-thread-3 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-4] mirrormaker-thread-4 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-5] mirrormaker-thread-5 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-6] mirrormaker-thread-6 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-7] mirrormaker-thread-7 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-8] mirrormaker-thread-8 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-9] mirrormaker-thread-9 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-10] mirrormaker-thread-10 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-11] mirrormaker-thread-11 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-12] mirrormaker-thread-12 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-13] mirrormaker-thread-13 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-14] mirrormaker-thread-14 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-15] mirrormaker-thread-15 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-16] mirrormaker-thread-16 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-17] mirrormaker-thread-17 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-18] mirrormaker-thread-18 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-9] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-11] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-12] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-6] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE [mirrormaker-thread-1] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] INFO [mirrormaker-thread-19] mirrormaker-thread-19 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-13] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] INFO [mirrormaker-thread-13] Flushing producer. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE Flushing accumulated records in producer. 
(org.apache.kafka.clients.producer.KafkaProducer)
[2016-11-01 19:23:32,012] TRACE [mirrormaker-thread-4] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE [mirrormaker-thread-19] Caught 
ConsumerWakeupException, continue iteration. 

[jira] [Created] (KAFKA-4367) MirrorMaker shuts down gracefully without being stopped

2016-11-02 Thread Alex (JIRA)
Alex created KAFKA-4367:
---

 Summary: MirrorMaker shuts down gracefully without being stopped
 Key: KAFKA-4367
 URL: https://issues.apache.org/jira/browse/KAFKA-4367
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.9.0.1
 Environment: RHEL 7
Reporter: Alex


MirrorMaker stops working without being stopped. No clue why this problem 
occurs.


  kafka-mirror-maker.log

[2016-11-01 19:23:32,010] INFO Shutting down consumer threads. 
(kafka.tools.MirrorMaker$)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-0] mirrormaker-thread-0 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-1] mirrormaker-thread-1 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-2] mirrormaker-thread-2 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-3] mirrormaker-thread-3 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-4] mirrormaker-thread-4 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,011] INFO [mirrormaker-thread-5] mirrormaker-thread-5 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-6] mirrormaker-thread-6 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-7] mirrormaker-thread-7 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-8] mirrormaker-thread-8 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-9] mirrormaker-thread-9 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-10] mirrormaker-thread-10 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-11] mirrormaker-thread-11 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-12] mirrormaker-thread-12 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-13] mirrormaker-thread-13 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-14] mirrormaker-thread-14 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-15] mirrormaker-thread-15 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-16] mirrormaker-thread-16 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-17] mirrormaker-thread-17 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,012] INFO [mirrormaker-thread-18] mirrormaker-thread-18 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-9] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-11] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-12] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-6] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE [mirrormaker-thread-1] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] INFO [mirrormaker-thread-19] mirrormaker-thread-19 
shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,013] TRACE [mirrormaker-thread-13] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] INFO [mirrormaker-thread-13] Flushing producer. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE Flushing accumulated records in producer. 
(org.apache.kafka.clients.producer.KafkaProducer)
[2016-11-01 19:23:32,012] TRACE [mirrormaker-thread-4] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] TRACE [mirrormaker-thread-19] Caught 
ConsumerWakeupException, continue iteration. 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2016-11-01 19:23:32,014] INFO 

[GitHub] kafka pull request #2092: MINOR: remove commented out code and System.out.pr...

2016-11-02 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2092

MINOR: remove commented out code and System.out.println

Remove commented out code and System.out.println from 
KTableKTableJoinIntegrationTest

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka cleanup-comments

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2092.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2092


commit bba44b570fb98dbe49c7009b9e96883677bcb215
Author: Damian Guy 
Date:   2016-11-02T10:50:37Z

remove commented out code and sys out




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Work started] (KAFKA-4366) KafkaStreams.close() blocks indefinitely

2016-11-02 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4366 started by Damian Guy.
-
> KafkaStreams.close() blocks indefinitely
> 
>
> Key: KAFKA-4366
> URL: https://issues.apache.org/jira/browse/KAFKA-4366
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.1
>Reporter: Michal Borowiecki
>Assignee: Damian Guy
>
> KafkaStreams.close() method calls join on all its threads without a timeout, 
> meaning indefinitely, which makes it prone to deadlocks and unfit to be used 
> in shutdown hooks.
> (KafkaStreams::close is used in numerous examples by confluent: 
> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams
>  and 
> https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/
>  so we assumed it to be recommended practice)
> A deadlock happens, for instance, if System.exit() is called from within the 
> uncaughtExceptionHandler. (We need to call System.exit() from the 
> uncaughtExceptionHandler because KAFKA-4355 issue shuts down the StreamThread 
> and to recover we want the process to exit, as our infrastructure will then 
> start it up again.)
> The System.exit call (from the uncaughtExceptionHandler, which runs in the 
> StreamThread) will execute the shutdown hook in a new thread and wait for 
> that thread to join. If the shutdown hook calls KafkaStreams.close, it will 
> in turn block waiting for the StreamThread to join, hence the deadlock.
> Runtime.addShutdownHook javadocs state:
> {quote}
> Shutdown hooks run at a delicate time in the life cycle of a virtual machine 
> and should therefore be coded defensively. They should, in particular, be 
> written to be thread-safe and to avoid deadlocks insofar as possible
> {quote}
> and
> {quote}
> Shutdown hooks should also finish their work quickly.
> {quote}
> Therefore the current implementation of KafkaStreams.close() which waits 
> forever for threads to join is completely unsuitable for use in a shutdown 
> hook. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4366) KafkaStreams.close() blocks indefinitely

2016-11-02 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy reassigned KAFKA-4366:
-

Assignee: Damian Guy  (was: Guozhang Wang)

> KafkaStreams.close() blocks indefinitely
> 
>
> Key: KAFKA-4366
> URL: https://issues.apache.org/jira/browse/KAFKA-4366
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.1
>Reporter: Michal Borowiecki
>Assignee: Damian Guy
>
> KafkaStreams.close() method calls join on all its threads without a timeout, 
> meaning indefinitely, which makes it prone to deadlocks and unfit to be used 
> in shutdown hooks.
> (KafkaStreams::close is used in numerous examples by confluent: 
> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams
>  and 
> https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/
>  so we assumed it to be recommended practice)
> A deadlock happens, for instance, if System.exit() is called from within the 
> uncaughtExceptionHandler. (We need to call System.exit() from the 
> uncaughtExceptionHandler because KAFKA-4355 issue shuts down the StreamThread 
> and to recover we want the process to exit, as our infrastructure will then 
> start it up again.)
> The System.exit call (from the uncaughtExceptionHandler, which runs in the 
> StreamThread) will execute the shutdown hook in a new thread and wait for 
> that thread to join. If the shutdown hook calls KafkaStreams.close, it will 
> in turn block waiting for the StreamThread to join, hence the deadlock.
> Runtime.addShutdownHook javadocs state:
> {quote}
> Shutdown hooks run at a delicate time in the life cycle of a virtual machine 
> and should therefore be coded defensively. They should, in particular, be 
> written to be thread-safe and to avoid deadlocks insofar as possible
> {quote}
> and
> {quote}
> Shutdown hooks should also finish their work quickly.
> {quote}
> Therefore the current implementation of KafkaStreams.close() which waits 
> forever for threads to join is completely unsuitable for use in a shutdown 
> hook. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4348) On Mac OS, KafkaConsumer.poll returns 0 when there are still messages on Kafka server

2016-11-02 Thread Yiquan Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628551#comment-15628551
 ] 

Yiquan Zhou commented on KAFKA-4348:


Yes it's very likely the same issue as KAFKA-3135. I can reproduce with the 
code from that issue as well. I've tested with a larger value of 
receive.buffer.bytes (~64K), it did workaround the issue. So we can close the 
issue as duplicated and I'll follow the other one. Thank you both for your help.


> On Mac OS, KafkaConsumer.poll returns 0 when there are still messages on 
> Kafka server
> -
>
> Key: KAFKA-4348
> URL: https://issues.apache.org/jira/browse/KAFKA-4348
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.1
> Environment: Mac OS X EI Capitan, Java 1.8.0_111
>Reporter: Yiquan Zhou
>  Labels: consumer, mac, polling
>
> Steps to reproduce:
> 1. start the zookeeper and kafka server using the default properties from the 
> distribution: 
> $ bin/zookeeper-server-start.sh config/zookeeper.properties
> $ bin/kafka-server-start.sh config/server.properties 
> 2. create a Kafka consumer using the Java API KafkaConsumer.poll(long 
> timeout). It polls the records from the server every second (timeout set to 
> 1000) and prints the number of records polled. The code can be found here: 
> https://gist.github.com/yiquanzhou/a94569a2c4ec8992444c83f3c393f596
> 3. use bin/kafka-verifiable-producer.sh to generate some messages: 
> $ bin/kafka-verifiable-producer.sh --topic connect-test --max-messages 20 
> --broker-list localhost:9092
> wait until all 200k messages are generated and sent to the server. 
> 4. Run the consumer Java code. In the output console of the consumer, we can 
> see that the consumer starts to poll some records, then it polls 0 records 
> for several seconds before polling some more. like this:
> polled 27160 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 26886 records
> polled 26886 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 26701 records
> polled 26214 records
> The bug slows down the consumption of messages a lot. And in our use case, 
> the consumer wrongly assumes that all messages are read from the topic.
> It is only reproducible on Mac OS X but neither on Linux nor Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2091: MINOR: Bug fixed

2016-11-02 Thread himani1
GitHub user himani1 opened a pull request:

https://github.com/apache/kafka/pull/2091

MINOR: Bug fixed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/himani1/kafka minor_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2091.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2091


commit 1b7ecc5b046e01b3ba7f2b1b9688bd9ddeeea8cb
Author: himani1 <1himani.ar...@gmail.com>
Date:   2016-11-02T10:14:01Z

minor bug fixed with %s




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-11-02 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628442#comment-15628442
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

KAFKA-4366 created for the KafkaSteams.close() hanging issue.

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream threads. Sometimes only on one 
> of the stream threads.
> The exception is preceded by:
> {noformat}
> [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,240] INFO 

[jira] [Created] (KAFKA-4366) KafkaStreams.close() blocks indefinitely

2016-11-02 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4366:


 Summary: KafkaStreams.close() blocks indefinitely
 Key: KAFKA-4366
 URL: https://issues.apache.org/jira/browse/KAFKA-4366
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.1, 0.10.1.0
Reporter: Michal Borowiecki
Assignee: Guozhang Wang


KafkaStreams.close() method calls join on all its threads without a timeout, 
meaning indefinitely, which makes it prone to deadlocks and unfit to be used in 
shutdown hooks.

(KafkaStreams::close is used in numerous examples by confluent: 
https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams
 and 
https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/
 so we assumed it to be recommended practice)

A deadlock happens, for instance, if System.exit() is called from within the 
uncaughtExceptionHandler. (We need to call System.exit() from the 
uncaughtExceptionHandler because KAFKA-4355 issue shuts down the StreamThread 
and to recover we want the process to exit, as our infrastructure will then 
start it up again.)

The System.exit call (from the uncaughtExceptionHandler, which runs in the 
StreamThread) will execute the shutdown hook in a new thread and wait for that 
thread to join. If the shutdown hook calls KafkaStreams.close, it will in turn 
block waiting for the StreamThread to join, hence the deadlock.

Runtime.addShutdownHook javadocs state:
{quote}
Shutdown hooks run at a delicate time in the life cycle of a virtual machine 
and should therefore be coded defensively. They should, in particular, be 
written to be thread-safe and to avoid deadlocks insofar as possible
{quote}
and
{quote}
Shutdown hooks should also finish their work quickly.
{quote}
Therefore the current implementation of KafkaStreams.close() which waits 
forever for threads to join is completely unsuitable for use in a shutdown 
hook. 





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4365) In case async producer closes the TCP connection to Kafka broker, last sent messages might be lost.

2016-11-02 Thread Ciprian Pascu (JIRA)
Ciprian Pascu created KAFKA-4365:


 Summary: In case async producer closes the TCP connection to Kafka 
broker, last sent messages might be lost.
 Key: KAFKA-4365
 URL: https://issues.apache.org/jira/browse/KAFKA-4365
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.0.1
Reporter: Ciprian Pascu


I am using kafka-python producer (https://github.com/dpkp/kafka-python). The 
producer is set as async (acks=0) and sends a burst of, for example, 1000 
messages. As consumer I use either Logstash or the Kafka console consumer. 
Quite often it can be seen that the consumer gets less than 1000 messages. 
Also, by checking the messages written by the brokers on the disk, it can be 
seen that not all messages are written. Still, by using tcpdump and Wireshark, 
I can see that all messages have reached the brokers. Also, by adding some test 
logs in Kafka code, I could see that the messages are added to the staged 
receives, but not to completed receives 
(org.apache.kafka.common.network.Selector class). And I believe that happens 
because of the 'isMute' method in the classes implementing 
org.apache.kafka.common.network.TransportLayer: they all(both) seem to check 
also that the 'key' is valid, which doesn't hold true anymore if the TCP 
connection has been closed; despite that, Kafka has already those messages as 
staged receives, so it could add them to the log; besides, since acks=0, no 
responses are needed to be sent. 
This issue is not visible if acks=1 (synchronous producer) or the producer 
keeps the TCP connections to brokers all the time up or enough time for Kafka 
to actually write the logs to disk.
Proposed solution: remove the 'key.isValid()' check from 'isMute' method in 
SslTransportLayer and PlaintextTransportLayer classes 
(org.apache.kafka.common.network package.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Kafka Connect key.converter and value.converter properties for Avro encoding

2016-11-02 Thread david.franklin
I am using Kafka Connect in source mode i.e. using it to send events to Kafka 
topics.

With the key.converter and value.converter properties set to 
org.apache.kafka.connect.storage.StringConverter I can attach a consumer to the 
topics and see the events in a readable form.  This is helpful and reassuring 
but it is not the desired representation for my downstream consumers - these 
require the events to be Avro encoded.

It seems that to write the events to Kafka Avro encoded, these properties need 
to be set to io.confluent.kafka.serializers.KafkaAvroSerializer.  Is this 
correct?

I am not using the Confluent platform, merely the standard Kafka 10 download, 
and have been unable to find out how to get at these from a Maven repository 
jar.  http://docs.confluent.io/3.0.0/app-development.html#java suggest that 
these are available via:

   
 io.confluent
 kafka-avro-serializer
 3.0.0
 

But it doesn't appear to be true.  The class exists in 
https://raw.githubusercontent.com/confluentinc/schema-registry/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java
 but this seems to use the Schema Registry which is something I'd rather avoid.

I'd be grateful for any pointers on the simplest way of getting Avro encoded 
events written to Kafka from a Kafka Connect source connector/task.

Also in the task which creates SourceRecords, I'm choosing Schema.BYTES_SCHEMA 
for the 4th arg in the constructor.  But I'm not clear what this achieves - 
some light shed on that would also be helpful.

Many thanks,
David


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-02 Thread Michael Pearce
Thanks James for taking the time out.

My comments per solution below you commented about. (I note you didn’t comment 
on the 3rd at all , which is the current proposal in the kip)
1) 
a. This forces all clients to have distinct knowledge of platform level 
implementation detail 
b. enforces single serialization technology for all apps payloads and platform 
headers
i. what if apps need to have different serialization e.g. app team need to use 
XML for legacy system reasons but we force at a platform to have to use avro 
because of our headers
c. If we were to have a common Kafka solution, this would force everyone onto a 
single serialization solution, I think this is something we don’t want to do?
d. this doesn’t deal with having large payloads as you’ve mentioned http in 
second solution, think of MIME multipart.
e. End2End encryption, if apps need end2end encryption then platform tooling 
cannot read the header information without decoding the message that then 
breaks reasons for having e2e encryption.
2) 
a. Container is the solution we currently use (we don’t use MIME but it looks 
like a not bad choice if you don’t care about size, or you have big enough 
payloads its small overhead)
i. I think if we don’t go with adding the headers to the message and offset , 
having an common agreed container format is the next best offering.
b. The TiVO specific HTTP MIME type message is indeed a good solution in our 
view
i. Deals with separating headers and payload
ii. Allows multipart messaging
iii. Allows payload to be encrypted yet headers not
iv. Platform tooling doesn’t care about payload and can quickly read headers
v. Well established and known container solution
c. HTTP MIME type headers (String keys) has a large byte overhead though
i. See Nacho’s and Radai’s previous points on this
d. If we agree on say a container format being MIME how does a platform team 
integrate adding its needed headers without enforcing all teams to have to be 
aware of it? Or is this actually ok?
i. Would we make a new consumer and producer Kafka API that is container aware?
e. How would this work with the likes of Kafka Streams , where as a platform 
team we want to add some meta data needed to ever message but we don’t want to 
recode these frameworks.




On 10/29/16, 8:09 AM, "James Cheng"  wrote:

Let me talk about the container format that we are using here at TiVo to 
add headers to our Kafka messages.

Just some quick terminology, so that I don't confuse everyone.
I'm going to use "message body" to refer to the thing returned by 
ConsumerRecord.value()
And I'm going to use "payload" to refer to your data after it has been 
serialized into bytes.

To recap, during the KIP call, we talked about 3 ways to have headers in 
Kafka messages:
1) The message body is your payload, which has headers within it.
2) The message body is a container, which has headers in it as well your 
payload.
3) Extend Kafka to hold headers outside of the message body. The message 
body holds your payload.

1) The message body is your payload, which has headers in it
---
Here's an example of what this may look like, if it were rendered in JSON:

{
"headers" : {
"Host" : "host.domain.com",
"Service" : "PaymentProcessor",
"Timestamp" : "2016-10-28 12:45:56"
},
"Field1" : "value",
"Field2" : "value"
}

In this scenario, headers are really not anything special. They are a part 
of your payload. They may have been auto-included by some mechanism in all of 
your schemas, but they really just are part of your payload. I believe LinkedIn 
uses this mechanism. The "headers" field is a reserved word in all schemas, and 
is somehow auto-inserted into all schemas. The headers schema contains a couple 
fields like "host" and "service" and "timestamp". If LinkedIn decides that a 
new field needs to be added for company-wide infrastructure purposes, then they 
will add it to the schema of "headers", and because "headers" is included 
everywhere, then all schemas will get updated as well.

Because they are simply part of your payload, you need to deserialize your 
payload in order to read the headers.

3) Extend Kafka to hold headers outside of the message body. The message 
body holds your payload.
-
This is what this KIP is discussing. I will let others talk about this.

2) The message body is a container, which has headers in it, as well as 
your payload.
--
At TiVo, we have standardized on a container format that looks very similar 
to HTTP. Let me jump straight to an example:

- example below 
JS/1 123 1024
Host: host.domain.com
Service: SomethingProcessor
Timestamp: 2016-10-28 12:45:56
ObjectTypeInPayload: MyObjectV1

{
"Field1" : "value",
"Field2" : 

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-02 Thread Michael Pearce
Hi Joel , et al.

Any comments on the below idea to handle roll out / compatibility of this 
feature, using a configuration? 

Does it make sense/clear?
Does it add value?
Do we want to enforce flag by default, or value by default, or both?

Cheers
Mike


On 10/27/16, 4:47 PM, "Michael Pearce"  wrote:

Thanks, James, I think this is a really good addition to the KIP details, 
please feel free to amend the wiki/add the use cases, also if any others you 
think of. I definitely think its worthwhile documenting. If you can’t let me 
know ill add them next week (just leaving for a long weekend off)

Re Joel and others comments about upgrade and compatibility.

Rather than trying to auto manage this.

Actually maybe we make a configuration option, both at server and per topic 
level to control the behavior of how the server logic should work out if the 
record, is a tombstone record .

e.g.

key = compation.tombstone.marker

value options:

value   (continues to use null value as tombstone marker)
flag (expects to use the tombstone flag)
value_or_flag (if either is true it treats the record as a tombstone)

This way on upgrade users can keep current behavior, and slowly migrate to 
the new. Having a transition period of using value_or_flag, finally having flag 
only if an organization wishes to use null values without it being treated as a 
tombstone marker (use case noted below)

Having it both global broker level and topic override also allows some 
flexibility here.

Cheers
Mike






On 10/27/16, 8:03 AM, "James Cheng"  wrote:

This KIP would definitely address a gap in the current functionality, 
where you currently can't have a tombstone with any associated content.

That said, I'd like to talk about use cases, to make sure that this is 
in fact useful. The KIP should be updated with whatever use cases we come up 
with.

First of all, an observation: When we speak about log compaction, we 
typically think of "the latest message for a key is retained". In that respect, 
a delete tombstone (i.e. a message with a null payload) is treated the same as 
any other Kafka message: the latest message is retained. It doesn't matter 
whether the latest message is null, or if the latest message has actual 
content. In all cases, the last message is retained.

The only way a delete tombstone is treated differently from other Kafka 
messages is that it automatically disappears after a while. The time of 
deletion is specified using delete.retention.ms.

So what we're really talking about is, do we want to support messages 
in a log-compacted topic that auto-delete themselves after a while?

In a thread from 2015, there was a discussion on first-class support of 
headers between Roger Hoover, Felix GV, Jun Rao, and I. See thread at 
https://groups.google.com/d/msg/confluent-platform/8xPbjyUE_7E/yQ1AeCufL_gJ 
 . 
In that thread, Jun raised a good question that I didn't have a good answer for 
at the time: If a message is going to auto-delete itself after a while, how 
important was the message? That is, what information did the message contain 
that was important *for a while* but not so important that it needed to be kept 
around forever?

Some use cases that I can think of:

1) Tracability. I would like to know who issued this delete tombstone. 
It might include the hostname, IP of the producer of the delete.
2) Timestamps. I would like to know when this delete was issued. This 
use case is already addressed by the availability of per-message timestamps 
that came in 0.10.0
3) Data provenance. I hope I'm using this phrase correctly, but what I 
mean is, where did this delete come from? What processing job emitted it? What 
input to the processing job caused this delete to be produced? For example, if 
a record in topic A was processed and caused a delete tombstone to be emitted 
to topic B, I might like the offset of the topic A message to be attached to 
the topic B message.
4) Distributed tracing for stream topologies. This might be a slight 
repeat of the above use cases. In the microservices world, we can generate 
call-graphs of webservices using tools like Zipkin/opentracing.io 
, or something homegrown like 
https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency
 
.
 I can imagine that you might want to do something similar for stream 
processing topologies, where stream processing jobs carry along and forward 
along a globally 

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-02 Thread Michael Pearce
Hi James,

We would send currently our message headers wrapper so we can have some 
platform/audit data even on the delete available. We find having a custom 
wrapper means some of our internal platform pieces we currently cannot open 
source or share but would love to (see kip-82) 

This data still needed on a delete from our message headers wrapper for our 
tooling and platform needs:

General:

Transaction guid’s used by our APM tooling to track and trace a transaction 
through multiple systems.
ClusterId (we have this custom set on old brokers and look to transition to the 
new clusterId available in 0.10.1.0)  this is used by our inter datacenter 
replication tool that keeps our clusters in multiple DC’s and regions where 
needed in sync 
   we can do this because expect/design our system so a single key for a 
data piece to be transactioned only in one DC (unless DC failure) but can have 
different keys transactioned in other DC’s. But DC’s need a full global view 
and from app perspective agnostic to their DC (platform does this).
   this system is always on and we alert if any consuming lag, as such we 
can expect to get every event (even on compaction)
   this system reconciles on restart if we find consumer lag > x 
predetermined time value (sub minute we set this)
   
We also have the following setup for some flows:

App -> compacted topic -> replicator -> event topic

Apps use kafka for compacted topics for k,v stores instead of the likes of 
postgres, just as per the details in bottled water or any CDC solution, you 
want to capture every change (including delete), there is some business meta 
data even on a DB delete that you want to capture the traditional, who, what, 
where and when set of data that a) needs to be captured when deleting data, and 
like wise we need to still replicate to our event topics for storage, audit and 
replay needs.

Cheers
Mike

On 10/28/16, 8:14 AM, "James Cheng"  wrote:

Michael,

What information would you want to include on a delete tombstone, and what 
would you use it for?

-James

Sent from my iPhone

> On Oct 27, 2016, at 9:17 PM, Michael Pearce  wrote:
> 
> Hi Jay,
> 
> I think use case that is the issue that Konstantin mentioned in the 
kip-82 thread , and also we have at IG is clear use case.
> 
> Many companies are using message wrappers, these are useful because as 
per kip-82 see their use cases (I don't think I need to re iterate the large 
list here) many of these need the headers even on a null value.
> 
> The issue this though then causes is that you cannot send these messages 
onto a compacted topic and ever have a delete/tombstone. And so companies are 
doing things like double send one with an message envelope so it gets 
transported followed by and empty message. Or by having a seperate process 
looking for the empty envelope and pushing back an empty value record to make 
the broker tombstone it off. As mentioned in the kip-82 thread these cause 
nasty race issues and prod issues.
> 
> LinkedIn were also very clear in if you use compaction currently there 
they cannot use their managed Kafka services that rely on their headers 
implementation. This was flagged in the kip-82 discussion also.
> 
> For streams it would be fairly easy to keep its current behaviour by when 
sending a null value to have logic to also add the delete marker. This would be 
the same for any framework built on Kafka if their desire was to keep the same 
logic spark and samza come to mind.
> 
> Like wise as noted, we could make this configurable globally and topic 
level as per this thread where we are discussing the section about 
comparability and rollout.
> 
> 
> 
> Rgds
> Mike
> 
> From: Jay Kreps 
> Sent: Thursday, October 27, 2016 10:54:45 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> 
> I kind of agree with James that it is a bit questionable how valuable any
> data in a delete marker can be since it will be deleted somewhat
> nondeterministically.
> 
> Let's definitely ensure the change is worth the resulting pain and
> additional complexity in the data model.
> 
> I think the two things we maybe conflated in the original compaction work
> was the semantics of the message and its retention policy (I'm not sure,
> but maybe).
> 
> In some sense a normal Kafka topic is a stream of pure appends (inserts). 
A
> compacted topic is a series of revisions to the keyed entity--updates or
> deletes.
> 
> Currently the semantics of the messages are in the eye of the 
beholder--you
> can choose to interpret a stream as either being appends or revisions as
> you choose. This proposal is changing that so 

[jira] [Commented] (KAFKA-3986) completedReceives can contain closed channels

2016-11-02 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15627907#comment-15627907
 ] 

Jason Gustafson commented on KAFKA-3986:


I think this is handled in the patch for KAFKA-3703: 
https://github.com/apache/kafka/pull/1836/.

> completedReceives can contain closed channels 
> --
>
> Key: KAFKA-3986
> URL: https://issues.apache.org/jira/browse/KAFKA-3986
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Ryan P
> Fix For: 0.10.0.2, 0.10.1.1
>
>
> I'm not entirely sure why at this point but it is possible to throw a Null 
> Pointer Exception when processingCompletedReceives. This happens when a 
> fairly substantial number of simultaneously initiated connections are 
> initiated with the server. 
> The processor thread does carry on but it may be worth investigating how the 
> channel could be both closed and  completedReceives. 
> The NPE in question is thrown here:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L490
> It can not be consistently reproduced either. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4348) On Mac OS, KafkaConsumer.poll returns 0 when there are still messages on Kafka server

2016-11-02 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15627866#comment-15627866
 ] 

Jason Gustafson commented on KAFKA-4348:


This sounds like the same issue as KAFKA-3135. We never really got to the 
bottom of it, but I think a workaround was increasing the size of the receive 
buffer (receive.buffer.bytes). Maybe we can close this issue and continue the 
discussion there?

> On Mac OS, KafkaConsumer.poll returns 0 when there are still messages on 
> Kafka server
> -
>
> Key: KAFKA-4348
> URL: https://issues.apache.org/jira/browse/KAFKA-4348
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.1
> Environment: Mac OS X EI Capitan, Java 1.8.0_111
>Reporter: Yiquan Zhou
>  Labels: consumer, mac, polling
>
> Steps to reproduce:
> 1. start the zookeeper and kafka server using the default properties from the 
> distribution: 
> $ bin/zookeeper-server-start.sh config/zookeeper.properties
> $ bin/kafka-server-start.sh config/server.properties 
> 2. create a Kafka consumer using the Java API KafkaConsumer.poll(long 
> timeout). It polls the records from the server every second (timeout set to 
> 1000) and prints the number of records polled. The code can be found here: 
> https://gist.github.com/yiquanzhou/a94569a2c4ec8992444c83f3c393f596
> 3. use bin/kafka-verifiable-producer.sh to generate some messages: 
> $ bin/kafka-verifiable-producer.sh --topic connect-test --max-messages 20 
> --broker-list localhost:9092
> wait until all 200k messages are generated and sent to the server. 
> 4. Run the consumer Java code. In the output console of the consumer, we can 
> see that the consumer starts to poll some records, then it polls 0 records 
> for several seconds before polling some more. like this:
> polled 27160 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 26886 records
> polled 26886 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 0 records
> polled 26701 records
> polled 26214 records
> The bug slows down the consumption of messages a lot. And in our use case, 
> the consumer wrongly assumes that all messages are read from the topic.
> It is only reproducible on Mac OS X but neither on Linux nor Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)