Re: Kafka connector throughput reduction upon avro schema change
Bumping this. Has anyone here observed this in their Kafka connect deployments? Thanks, Dave On 5/26/17, 1:44 PM, "Dave Hamilton" <dhamil...@nanigans.com> wrote: We are currently using the Kafka S3 connector to ship Avro data to S3. We made a change to one of our Avro schemas and have noticed consumer throughput on the Kafka connector drop considerably. I am wondering if there is anything we can do to avoid such issues when we update schemas in the future? This is what I believe is happening: · The avro producer application is running on 12 instances. They are restarted in a rolling fashion, switching from producing schema version 1 before the restart to schema version 2 afterward. · While the rolling restart is occurring, data on schema version 1 and schema version 2 is simultaneously being written to the topic. · The Kafka connector has to close the current avro file for a partition and ship it whenever it detects a schema change, which is happening several times due to the rolling nature of the schema update deployment and the mixture of message versions being written during this time. This process causes the overall consumer throughput to plummet. Am I reasoning correctly about what we’re observing here? Is there any way to avoid this when we change schemas (short of stopping all instances of the service and bringing them up together on the new schema version)? Thanks, Dave
Re: Kafka connector throughput reduction upon avro schema change
Hi, does anyone have advice on how to deal with this issue? Is it possible that changing a schema compatibility setting could correct it? Thanks, Dave On 5/26/17, 1:44 PM, "Dave Hamilton" <dhamil...@nanigans.com> wrote: We are currently using the Kafka S3 connector to ship Avro data to S3. We made a change to one of our Avro schemas and have noticed consumer throughput on the Kafka connector drop considerably. I am wondering if there is anything we can do to avoid such issues when we update schemas in the future? This is what I believe is happening: · The avro producer application is running on 12 instances. They are restarted in a rolling fashion, switching from producing schema version 1 before the restart to schema version 2 afterward. · While the rolling restart is occurring, data on schema version 1 and schema version 2 is simultaneously being written to the topic. · The Kafka connector has to close the current avro file for a partition and ship it whenever it detects a schema change, which is happening several times due to the rolling nature of the schema update deployment and the mixture of message versions being written during this time. This process causes the overall consumer throughput to plummet. Am I reasoning correctly about what we’re observing here? Is there any way to avoid this when we change schemas (short of stopping all instances of the service and bringing them up together on the new schema version)? Thanks, Dave
Kafka connector throughput reduction upon avro schema change
We are currently using the Kafka S3 connector to ship Avro data to S3. We made a change to one of our Avro schemas and have noticed consumer throughput on the Kafka connector drop considerably. I am wondering if there is anything we can do to avoid such issues when we update schemas in the future? This is what I believe is happening: · The avro producer application is running on 12 instances. They are restarted in a rolling fashion, switching from producing schema version 1 before the restart to schema version 2 afterward. · While the rolling restart is occurring, data on schema version 1 and schema version 2 is simultaneously being written to the topic. · The Kafka connector has to close the current avro file for a partition and ship it whenever it detects a schema change, which is happening several times due to the rolling nature of the schema update deployment and the mixture of message versions being written during this time. This process causes the overall consumer throughput to plummet. Am I reasoning correctly about what we’re observing here? Is there any way to avoid this when we change schemas (short of stopping all instances of the service and bringing them up together on the new schema version)? Thanks, Dave
Kafka log.message.format.version and consumer client versions
Hi, I have a question about the performance implications of upgrading the Kafka message format relating to the following from the upgrade documentation: The message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages. The on disk message format can be configured through log.message.format.version in the server.properties file. The default on-disk message format is 0.10.0. If a consumer client is on a version before 0.10.0.0, it only understands message formats before 0.10.0. In this case, the broker is able to convert messages from the 0.10.0 format to an earlier format before sending the response to the consumer on an older version. However, the broker can't use zero-copy transfer in this case. Would it be sufficient to upgrade all consumers to the new client version (both the Java and Scala clients) to ensure zero-copy transfer is still used after upgrading the message version? Or do all consumers using the Scala API need to be switched to using the new Java consumer API? Thanks, Dave
Re: Error finding consumer coordinators after restart
Just wanted to close the loop on this. It seems the consumer offset logs might have been corrupted by the system restart. Deleting the topic logs and restarting the Kafka service cleared up the problem. Thanks, Dave On 1/12/17, 2:29 PM, "Dave Hamilton" <dhamil...@nanigans.com> wrote: Hello, we ran into a memory issue on a Kafka 0.10.0.1 broker we are running that required a system restart. Since bringing Kafka back up it seems the consumers are having issues finding their coordinators. Here are some errors we’ve seen in our server logs after restarting: [2017-01-12 19:02:10,178] ERROR [Group Metadata Manager on Broker 0]: Error in loading offsets from [__consumer_offsets,40] (kafka.coordinator.GroupMetadataManager) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:99) at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:678) at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:135) at kafka.log.LogSegment.translateOffset(LogSegment.scala:106) at kafka.log.LogSegment.read(LogSegment.scala:127) at kafka.log.Log.read(Log.scala:532) at kafka.coordinator.GroupMetadataManager$$anonfun$kafka$coordinator$GroupMetadataManager$$loadGroupsAndOffsets$1$1.apply$mcV$sp(GroupMetadataManager.scala:380) at kafka.coordinator.GroupMetadataManager$$anonfun$kafka$coordinator$GroupMetadataManager$$loadGroupsAndOffsets$1$1.apply(GroupMetadataManager.scala:374) at kafka.coordinator.GroupMetadataManager$$anonfun$kafka$coordinator$GroupMetadataManager$$loadGroupsAndOffsets$1$1.apply(GroupMetadataManager.scala:374) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:239) at kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$loadGroupsAndOffsets$1(GroupMetadataManager.scala:374) at kafka.coordinator.GroupMetadataManager$$anonfun$loadGroupsForPartition$1.apply$mcV$sp(GroupMetadataManager.scala:353) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) [2017-01-12 19:03:56,468] ERROR [KafkaApi-0] Error when handling request {topics=[__consumer_offsets]} (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403) at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629) at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651) at kafka.server.KafkaApis$$anonfun$29.apply(KafkaApis.scala:668) at kafka.server.KafkaApis$$anonfun$29.apply(KafkaApis.scala:666) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:92) at scala.collection.AbstractSet.map(Set.scala:47) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:666) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:727) at kafka.server.KafkaApis.handle(KafkaApis.scala:79) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:74
Error finding consumer coordinators after restart
Hello, we ran into a memory issue on a Kafka 0.10.0.1 broker we are running that required a system restart. Since bringing Kafka back up it seems the consumers are having issues finding their coordinators. Here are some errors we’ve seen in our server logs after restarting: [2017-01-12 19:02:10,178] ERROR [Group Metadata Manager on Broker 0]: Error in loading offsets from [__consumer_offsets,40] (kafka.coordinator.GroupMetadataManager) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:99) at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:678) at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:135) at kafka.log.LogSegment.translateOffset(LogSegment.scala:106) at kafka.log.LogSegment.read(LogSegment.scala:127) at kafka.log.Log.read(Log.scala:532) at kafka.coordinator.GroupMetadataManager$$anonfun$kafka$coordinator$GroupMetadataManager$$loadGroupsAndOffsets$1$1.apply$mcV$sp(GroupMetadataManager.scala:380) at kafka.coordinator.GroupMetadataManager$$anonfun$kafka$coordinator$GroupMetadataManager$$loadGroupsAndOffsets$1$1.apply(GroupMetadataManager.scala:374) at kafka.coordinator.GroupMetadataManager$$anonfun$kafka$coordinator$GroupMetadataManager$$loadGroupsAndOffsets$1$1.apply(GroupMetadataManager.scala:374) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:239) at kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$loadGroupsAndOffsets$1(GroupMetadataManager.scala:374) at kafka.coordinator.GroupMetadataManager$$anonfun$loadGroupsForPartition$1.apply$mcV$sp(GroupMetadataManager.scala:353) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) [2017-01-12 19:03:56,468] ERROR [KafkaApi-0] Error when handling request {topics=[__consumer_offsets]} (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403) at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629) at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651) at kafka.server.KafkaApis$$anonfun$29.apply(KafkaApis.scala:668) at kafka.server.KafkaApis$$anonfun$29.apply(KafkaApis.scala:666) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:92) at scala.collection.AbstractSet.map(Set.scala:47) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:666) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:727) at kafka.server.KafkaApis.handle(KafkaApis.scala:79) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:744) Also running the kafka-consumer-groups.sh on a consumer group returns the following: Error while executing consumer group command This is not the correct coordinator for this group. org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group. We also see the following logs when trying to restart a Kafka connector: [2017-01-12 17:44:07,941] INFO Discovered coordinator lxskfkdal501.nanigans.com:9092 (id: 2147483647 rack: null) for
Re: Unclaimed partitions
rtb_targeting_server compile_request23 328699002 328699041 39 rtb_targeting_server_lxptedal06.nanigans.com-1430748224863-9f2513a7-1 Thanks for your help, Dave On 4/29/15, 11:30 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Hey Dave, It's hard to say why this is happening without more information. Even if there are no errors in the log, is there anything to indicate that the rebalance process on those hosts even started? Does this happen occasionally or every time you start the consumer group? Can you paste the output of ConsumerOffsetChecker and describe topic? Thanks, Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Wednesday, April 29, 2015 6:46 PM To: users@kafka.apache.org; users@kafka.apache.org Subject: Re: Unclaimed partitions Hi, would anyone be able to help me with this issue? Thanks. - Dave On Tue, Apr 28, 2015 at 1:32 PM -0700, Dave Hamilton dhamil...@nanigans.commailto:dhamil...@nanigans.com wrote: 1. We’re using version 0.8.1.1. 2. No failures in the consumer logs 3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to the consumer group and what their offsets are. 8 of the 12 process each have been assigned two partitions and they’re keeping up with the topic. The other 4 do not get assigned partitions and no consumers in the group are consuming those 8 partitions. Thanks for your help, Dave On 4/28/15, 1:40 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Couple of questions: - What version of the consumer API are you using? - Are you seeing any rebalance failures in the consumer logs? - How do you determine that some partitions are unassigned? Just confirming that you have partitions that are not being consumed from as opposed to consumer threads that aren't assigned any partitions. Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 10:19 AM To: users@kafka.apache.org Subject: Re: Unclaimed partitions I’m sorry, I forgot to specify that these processes are in the same consumer group. Thanks, Dave On 4/28/15, 1:15 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Hi Dave, The simple consumer doesn't do any state management across consumer instances. So I'm not sure how you are assigning partitions in your application code. Did you mean to say that you are using the high level consumer API? Thanks, Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 7:58 AM To: users@kafka.apache.org Subject: Unclaimed partitions Hi, I am trying to consume a 24-partition topic across 12 processes. Each process is using the simple consumer API, and each is being assigned two consumer threads. I have noticed when starting these processes that sometimes some of my processes are not being assigned any partitions, and no rebalance seems to ever be triggered, leaving some of the partitions unclaimed. When I first tried deploying this yesterday, I noticed 8 of the 24 partitions, for 4 of the consumer processes, went unclaimed. Redeploying shortly later corrected this issue. I tried deploying again today, and now I see a different set of 4 processes not getting assigned partitions. The processes otherwise appear to be running normally, they are currently running in production and we are working to get the consumers quietly running before enabling them to do any work. I’m not sure if we might be looking at some sort of timing issue. Does anyone know what might be causing the issues we’re observing? Thanks, Dave
Re: Unclaimed partitions
Hi, would anyone be able to help me with this issue? Thanks. - Dave On Tue, Apr 28, 2015 at 1:32 PM -0700, Dave Hamilton dhamil...@nanigans.commailto:dhamil...@nanigans.com wrote: 1. We’re using version 0.8.1.1. 2. No failures in the consumer logs 3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to the consumer group and what their offsets are. 8 of the 12 process each have been assigned two partitions and they’re keeping up with the topic. The other 4 do not get assigned partitions and no consumers in the group are consuming those 8 partitions. Thanks for your help, Dave On 4/28/15, 1:40 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Couple of questions: - What version of the consumer API are you using? - Are you seeing any rebalance failures in the consumer logs? - How do you determine that some partitions are unassigned? Just confirming that you have partitions that are not being consumed from as opposed to consumer threads that aren't assigned any partitions. Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 10:19 AM To: users@kafka.apache.org Subject: Re: Unclaimed partitions I’m sorry, I forgot to specify that these processes are in the same consumer group. Thanks, Dave On 4/28/15, 1:15 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Hi Dave, The simple consumer doesn't do any state management across consumer instances. So I'm not sure how you are assigning partitions in your application code. Did you mean to say that you are using the high level consumer API? Thanks, Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 7:58 AM To: users@kafka.apache.org Subject: Unclaimed partitions Hi, I am trying to consume a 24-partition topic across 12 processes. Each process is using the simple consumer API, and each is being assigned two consumer threads. I have noticed when starting these processes that sometimes some of my processes are not being assigned any partitions, and no rebalance seems to ever be triggered, leaving some of the partitions unclaimed. When I first tried deploying this yesterday, I noticed 8 of the 24 partitions, for 4 of the consumer processes, went unclaimed. Redeploying shortly later corrected this issue. I tried deploying again today, and now I see a different set of 4 processes not getting assigned partitions. The processes otherwise appear to be running normally, they are currently running in production and we are working to get the consumers quietly running before enabling them to do any work. I’m not sure if we might be looking at some sort of timing issue. Does anyone know what might be causing the issues we’re observing? Thanks, Dave
Re: Unclaimed partitions
1. We’re using version 0.8.1.1. 2. No failures in the consumer logs 3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to the consumer group and what their offsets are. 8 of the 12 process each have been assigned two partitions and they’re keeping up with the topic. The other 4 do not get assigned partitions and no consumers in the group are consuming those 8 partitions. Thanks for your help, Dave On 4/28/15, 1:40 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Couple of questions: - What version of the consumer API are you using? - Are you seeing any rebalance failures in the consumer logs? - How do you determine that some partitions are unassigned? Just confirming that you have partitions that are not being consumed from as opposed to consumer threads that aren't assigned any partitions. Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 10:19 AM To: users@kafka.apache.org Subject: Re: Unclaimed partitions I’m sorry, I forgot to specify that these processes are in the same consumer group. Thanks, Dave On 4/28/15, 1:15 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Hi Dave, The simple consumer doesn't do any state management across consumer instances. So I'm not sure how you are assigning partitions in your application code. Did you mean to say that you are using the high level consumer API? Thanks, Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 7:58 AM To: users@kafka.apache.org Subject: Unclaimed partitions Hi, I am trying to consume a 24-partition topic across 12 processes. Each process is using the simple consumer API, and each is being assigned two consumer threads. I have noticed when starting these processes that sometimes some of my processes are not being assigned any partitions, and no rebalance seems to ever be triggered, leaving some of the partitions unclaimed. When I first tried deploying this yesterday, I noticed 8 of the 24 partitions, for 4 of the consumer processes, went unclaimed. Redeploying shortly later corrected this issue. I tried deploying again today, and now I see a different set of 4 processes not getting assigned partitions. The processes otherwise appear to be running normally, they are currently running in production and we are working to get the consumers quietly running before enabling them to do any work. I’m not sure if we might be looking at some sort of timing issue. Does anyone know what might be causing the issues we’re observing? Thanks, Dave
Re: Unclaimed partitions
I’m sorry, I forgot to specify that these processes are in the same consumer group. Thanks, Dave On 4/28/15, 1:15 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote: Hi Dave, The simple consumer doesn't do any state management across consumer instances. So I'm not sure how you are assigning partitions in your application code. Did you mean to say that you are using the high level consumer API? Thanks, Aditya From: Dave Hamilton [dhamil...@nanigans.com] Sent: Tuesday, April 28, 2015 7:58 AM To: users@kafka.apache.org Subject: Unclaimed partitions Hi, I am trying to consume a 24-partition topic across 12 processes. Each process is using the simple consumer API, and each is being assigned two consumer threads. I have noticed when starting these processes that sometimes some of my processes are not being assigned any partitions, and no rebalance seems to ever be triggered, leaving some of the partitions unclaimed. When I first tried deploying this yesterday, I noticed 8 of the 24 partitions, for 4 of the consumer processes, went unclaimed. Redeploying shortly later corrected this issue. I tried deploying again today, and now I see a different set of 4 processes not getting assigned partitions. The processes otherwise appear to be running normally, they are currently running in production and we are working to get the consumers quietly running before enabling them to do any work. I’m not sure if we might be looking at some sort of timing issue. Does anyone know what might be causing the issues we’re observing? Thanks, Dave