Thanks, thats defenetly my problem. On Tue, Jul 5, 2016 at 5:35 PM, Ismael Juma <[email protected]> wrote:
> Sorry, I meant (the edit comment button is too near the permanent link > button): > > > https://issues.apache.org/jira/browse/KAFKA-3358?focusedCommentId=15239013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15239013 > > Ismael > > On Tue, Jul 5, 2016 at 12:20 PM, Igor Kuzmenko <[email protected]> wrote: > > > Thanks for reply. Can you provide issue id. Link above doesn't open. > > > > On Tue, Jul 5, 2016 at 2:03 PM, Ismael Juma <[email protected]> wrote: > > > > > This looks like the following which was fixed in 0.10.0.0: > > > > > > > > > > > > https://issues.apache.org/jira/secure/EditComment!default.jspa?id=12948347&commentId=15239013 > > > > > > Ismael > > > > > > On Tue, Jul 5, 2016 at 11:51 AM, Igor Kuzmenko <[email protected]> > > wrote: > > > > > > > Hello I'm using kafka 0.9.0 and sending messages to kafka topic via > > > > KafkaProducer. > > > > My application constantly waits for a new file in directory, reads > it, > > > than > > > > send every line of a file as a message to kafka. > > > > > > > > KafkaProducer creates at start of application and in logs I can see > > that > > > > every 5 min it request cluster metadata. But after a few minutes > > metadata > > > > requests starting to send too often. Here's log: > > > > > > > > 13:06:53.094 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata > > > > request ClientRequest(expectResponse=true, callback=null, > > > > > > > > > > > > > > request=RequestSend(header={api_key=3,api_version=0,correlation_id=689920,client_id=producer-1}, > > > > body={topics=[mobile_connections]})) to node 1001 > > > > 13:06:53.095 DEBUG org.apache.kafka.clients.Metadata - Updated > cluster > > > > metadata version 686315 to Cluster(nodes = [Node(1001, jupiter.bss, > > > 6667)], > > > > partitions = [Partition(topic = mobile_connections, partition = 4, > > > leader = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader > = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader > = > > > > 1001, replicas = [1001,], isr = [1001,]]) > > > > 13:06:53.194 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata > > > > request ClientRequest(expectResponse=true, callback=null, > > > > > > > > > > > > > > request=RequestSend(header={api_key=3,api_version=0,correlation_id=689921,client_id=producer-1}, > > > > body={topics=[mobile_connections]})) to node 1001 > > > > 13:06:53.196 DEBUG org.apache.kafka.clients.Metadata - Updated > cluster > > > > metadata version 686316 to Cluster(nodes = [Node(1001, jupiter.bss, > > > 6667)], > > > > partitions = [Partition(topic = mobile_connections, partition = 4, > > > leader = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader > = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader > = > > > > 1001, replicas = [1001,], isr = [1001,]]) > > > > 13:06:53.294 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata > > > > request ClientRequest(expectResponse=true, callback=null, > > > > > > > > > > > > > > request=RequestSend(header={api_key=3,api_version=0,correlation_id=689922,client_id=producer-1}, > > > > body={topics=[mobile_connections]})) to node 1001 > > > > 13:06:53.295 DEBUG org.apache.kafka.clients.Metadata - Updated > cluster > > > > metadata version 686317 to Cluster(nodes = [Node(1001, jupiter.bss, > > > 6667)], > > > > partitions = [Partition(topic = mobile_connections, partition = 4, > > > leader = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader > = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader > = > > > > 1001, replicas = [1001,], isr = [1001,]]) > > > > 13:06:53.394 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata > > > > request ClientRequest(expectResponse=true, callback=null, > > > > > > > > > > > > > > request=RequestSend(header={api_key=3,api_version=0,correlation_id=689923,client_id=producer-1}, > > > > body={topics=[mobile_connections]})) to node 1001 > > > > 13:06:53.395 DEBUG org.apache.kafka.clients.Metadata - Updated > cluster > > > > metadata version 686318 to Cluster(nodes = [Node(1001, jupiter.bss, > > > 6667)], > > > > partitions = [Partition(topic = mobile_connections, partition = 4, > > > leader = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader > = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader > = > > > > 1001, replicas = [1001,], isr = [1001,]]) > > > > 13:06:53.494 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata > > > > request ClientRequest(expectResponse=true, callback=null, > > > > > > > > > > > > > > request=RequestSend(header={api_key=3,api_version=0,correlation_id=689924,client_id=producer-1}, > > > > body={topics=[mobile_connections]})) to node 1001 > > > > 13:06:53.495 DEBUG org.apache.kafka.clients.Metadata - Updated > cluster > > > > metadata version 686319 to Cluster(nodes = [Node(1001, jupiter.bss, > > > 6667)], > > > > partitions = [Partition(topic = mobile_connections, partition = 4, > > > leader = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader > = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader > = > > > > 1001, replicas = [1001,], isr = [1001,]]) > > > > 13:06:53.594 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata > > > > request ClientRequest(expectResponse=true, callback=null, > > > > > > > > > > > > > > request=RequestSend(header={api_key=3,api_version=0,correlation_id=689925,client_id=producer-1}, > > > > body={topics=[mobile_connections]})) to node 1001 > > > > 13:06:53.595 DEBUG org.apache.kafka.clients.Metadata - Updated > cluster > > > > metadata version 686320 to Cluster(nodes = [Node(1001, jupiter.bss, > > > 6667)], > > > > partitions = [Partition(topic = mobile_connections, partition = 4, > > > leader = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 3, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 2, leader > = > > > > 1001, replicas = [1001,], isr = [1001,], Partition(topic = > > > > mobile_connections, partition = 1, leader = 1001, replicas = [1001,], > > > isr = > > > > [1001,], Partition(topic = mobile_connections, partition = 0, leader > = > > > > 1001, replicas = [1001,], isr = [1001,]]) > > > > 13:06:53.694 DEBUG o.a.kafka.clients.NetworkClient - Sending metadata > > > > request ClientRequest(expectResponse=true, callback=null, > > > > > > > > > > > > > > request=RequestSend(header={api_key=3,api_version=0,correlation_id=689926,client_id=producer-1}, > > > > body={topics=[mobile_connections]})) to node 1001 > > > > > > > > It doesn't seem right to refresh meta data so often. > > > > Maybe problem is that this cluster info is not full. In logs there's > > > > information about one topic only, but cluster has three topics. > > > > > > > > My questions are: > > > > Am I using KafkaProducer correctly? Maybe I should close it and > create > > > new > > > > one on each file? > > > > Why metadata refresh happens so often? > > > > > > > > > >
