Re: Client Offset Deleted by Broker without cause
Right, they are: http://cupenya.com/files/tmp/hgd23121/consumer-logs.png http://cupenya.com/files/tmp/hgd23121/kafka-broker-logs.png (it's a bit more than in the pasted mail) On 11/01/2017 5:35 PM, Ted Yu wrote: bq. attached screenshots from the log viewer The screenshots didn't go through. Consider using 3rd party site. On Wed, Nov 1, 2017 at 9:18 AM, Elmar Weber <i...@elmarweber.org> wrote: Hello, I had this morning the issue that a client offset got deleted from a broker as it seems. (Kafka 0.11.0.1 with patch for KAFKA-6030 on top) It happened in a test environment and the pipeline stage got re-deployed multiple times, during one restart, when the consumer reconnected, it didn't get any offset and started from the beginning as per it's configuration. The queue didn't receive any new events in the last 48h, so any restart should not do anything. Here the relevant consumer logs: 2017-10-31 17:15:08.453 Successfully joined group events-inst-agg-stream.aggregation.v1 with generation 5 2017-11-01 14:29:46.554 Successfully joined group events-inst-agg-stream.aggregation.v1 with generation 7 2017-11-01 14:51:14.639 Successfully joined group events-inst-agg-stream.aggregation.v1 with generation 9 2017-11-01 14:51:19.068 Committing Map(GroupTopicPartition(events -inst-agg-stream.aggregation.v1,events.lg,0) -> 3830) 2017-11-01 14:51:24.083 Committing Map(GroupTopicPartition(events -inst-agg-stream.aggregation.v1,events.lg,0) -> 11339) You can see the restarts at 17:15 yesterday and 14:29 today were normal. Then the restart at 14:51 started reading from the beginning (we log committed offsets). The relevant leading broker did some "stuff" that was different between 14:29 and 14:51. The full logs are below. From what I can see it deleted a segment from the consumer offsets log, and the next time the consumer connected it got no offset. I can provide the logs of the other kafka nodes if it is useful. I also attached screenshots from the log viewer in case it's easier to read. I found https://issues.apache.org/jira/browse/KAFKA-5600 which looked related, but it's fixed in 0.11.0.1. Any other ideas what the issue could be? [2017-11-01 14:09:46,805] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.Group MetadataManager) [2017-11-01 14:19:46,805] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.Group MetadataManager) [2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Preparing to rebalance group events-inst-agg-stream.aggregation.v1 with old generation 5 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Group events-inst-agg-stream.aggregation.v1 with generation 6 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:29:46,540] INFO [GroupCoordinator 1]: Preparing to rebalance group events-inst-agg-stream.aggregation.v1 with old generation 6 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:29:46,546] INFO [GroupCoordinator 1]: Stabilized group events-inst-agg-stream.aggregation.v1 generation 7 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:29:46,551] INFO [GroupCoordinator 1]: Assignment received from leader for group events-inst-agg-stream.aggregation.v1 for generation 7 (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:29:46,832] INFO Rolled new log segment for '__consumer_offsets-5' in 26 ms. (kafka.log.Log) [2017-11-01 14:29:46,833] INFO [Group Metadata Manager on Broker 1]: Removed 3 expired offsets in 29 milliseconds. (kafka.coordinator.group.Group MetadataManager) [2017-11-01 14:30:23,642] INFO [GroupCoordinator 1]: Preparing to rebalance group key-stats-kafka-stream.stats.v1 with old generation 11 (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:30:23,644] INFO [GroupCoordinator 1]: Group key-stats-kafka-stream.stats.v1 with generation 12 is now empty (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:30:54,731] INFO Deleting segment 0 from log __consumer_offsets-5. (kafka.log.Log) [2017-11-01 14:30:54,767] INFO Deleting index /var/lib/kafka/data/topics/__consumer_offsets-5/.index.deleted (kafka.log.OffsetIndex) [2017-11-01 14:30:54,767] INFO Deleting index /var/lib/kafka/data/topics/__consumer_offsets-5/.timeindex.deleted (kafka.log.TimeIndex) [2017-11-01 14:36:15,590] INFO [GroupCoordinator 1]: Preparing to rebalance group key-stats-kafka-stream.stats.v1 with old generation 12 (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:36:15,594] INFO [GroupCoordinator 1]: Stabilized group key-stats-kafka-stream.stats.v1 generation 13 (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:36:15,612] IN
Client Offset Deleted by Broker without cause
Hello, I had this morning the issue that a client offset got deleted from a broker as it seems. (Kafka 0.11.0.1 with patch for KAFKA-6030 on top) It happened in a test environment and the pipeline stage got re-deployed multiple times, during one restart, when the consumer reconnected, it didn't get any offset and started from the beginning as per it's configuration. The queue didn't receive any new events in the last 48h, so any restart should not do anything. Here the relevant consumer logs: 2017-10-31 17:15:08.453 Successfully joined group events-inst-agg-stream.aggregation.v1 with generation 5 2017-11-01 14:29:46.554 Successfully joined group events-inst-agg-stream.aggregation.v1 with generation 7 2017-11-01 14:51:14.639 Successfully joined group events-inst-agg-stream.aggregation.v1 with generation 9 2017-11-01 14:51:19.068 Committing Map(GroupTopicPartition(events-inst-agg-stream.aggregation.v1,events.lg,0) -> 3830) 2017-11-01 14:51:24.083 Committing Map(GroupTopicPartition(events-inst-agg-stream.aggregation.v1,events.lg,0) -> 11339) You can see the restarts at 17:15 yesterday and 14:29 today were normal. Then the restart at 14:51 started reading from the beginning (we log committed offsets). The relevant leading broker did some "stuff" that was different between 14:29 and 14:51. The full logs are below. From what I can see it deleted a segment from the consumer offsets log, and the next time the consumer connected it got no offset. I can provide the logs of the other kafka nodes if it is useful. I also attached screenshots from the log viewer in case it's easier to read. I found https://issues.apache.org/jira/browse/KAFKA-5600 which looked related, but it's fixed in 0.11.0.1. Any other ideas what the issue could be? [2017-11-01 14:09:46,805] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2017-11-01 14:19:46,805] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Preparing to rebalance group events-inst-agg-stream.aggregation.v1 with old generation 5 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Group events-inst-agg-stream.aggregation.v1 with generation 6 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:29:46,540] INFO [GroupCoordinator 1]: Preparing to rebalance group events-inst-agg-stream.aggregation.v1 with old generation 6 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:29:46,546] INFO [GroupCoordinator 1]: Stabilized group events-inst-agg-stream.aggregation.v1 generation 7 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:29:46,551] INFO [GroupCoordinator 1]: Assignment received from leader for group events-inst-agg-stream.aggregation.v1 for generation 7 (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:29:46,832] INFO Rolled new log segment for '__consumer_offsets-5' in 26 ms. (kafka.log.Log) [2017-11-01 14:29:46,833] INFO [Group Metadata Manager on Broker 1]: Removed 3 expired offsets in 29 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2017-11-01 14:30:23,642] INFO [GroupCoordinator 1]: Preparing to rebalance group key-stats-kafka-stream.stats.v1 with old generation 11 (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:30:23,644] INFO [GroupCoordinator 1]: Group key-stats-kafka-stream.stats.v1 with generation 12 is now empty (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:30:54,731] INFO Deleting segment 0 from log __consumer_offsets-5. (kafka.log.Log) [2017-11-01 14:30:54,767] INFO Deleting index /var/lib/kafka/data/topics/__consumer_offsets-5/.index.deleted (kafka.log.OffsetIndex) [2017-11-01 14:30:54,767] INFO Deleting index /var/lib/kafka/data/topics/__consumer_offsets-5/.timeindex.deleted (kafka.log.TimeIndex) [2017-11-01 14:36:15,590] INFO [GroupCoordinator 1]: Preparing to rebalance group key-stats-kafka-stream.stats.v1 with old generation 12 (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:36:15,594] INFO [GroupCoordinator 1]: Stabilized group key-stats-kafka-stream.stats.v1 generation 13 (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:36:15,612] INFO [GroupCoordinator 1]: Assignment received from leader for group key-stats-kafka-stream.stats.v1 for generation 13 (kafka.coordinator.group.GroupCoordinator) [2017-11-01 14:39:46,805] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2017-11-01 14:49:46,804] INFO [Group Metadata Manager on
Re: Log Compaction Not Picking up Topic [solved]
Hello Xin, hello Jan, worked perfectly. I did a build of an image based on 0.11.0.1 and applied the missing patch, cleaning went through and resulted in the expected size. Thanks a lot for the quick help, Elmar On 10/25/2017 1:03 PM, Xin Li wrote: Hey Elmar, The only thing you need to do is upgrade, Kafka track cleaned offset using cleaner-offset-checkpoint file. Best, Xin Xin Li Data EngineeringXin.Li@ <mailto:xin...@xin.li>trivago.com <mailto:y...@trivago.com>www.trivago.com <http://www.trivago.com/>F +49 (0) 211 540 65 115We're hiring! Check out our vacancies http://company.trivago.com/jobs/Court of registration: Amtsgericht Düsseldorf, HRB 51842 Managing directors: Rolf Schrömgens · Malte Siewert · Peter Vinnemeier · Andrej Lehnert · Johannes Thomas trivago GmbH · Bennigsen-Platz 1 · D – 40474 Düsseldorf * This email message may contain legally privileged and/or confidential information. You are hereby notified that any disclosure, copying, distribution, or use of this email message is strictly prohibited. On 25.10.17, 12:34, "Elmar Weber" <i...@elmarweber.org> wrote: Hi, thanks, I'll give it a try, we run on Kubernetes so it's not a big issue to replicate the whole env including data. One question I'd have left: - How can I force a re-compaction over the whole topic? Because I guess the Log Cleaner market everything so far as not able to clean, how will it recheck the whole log? Best, Elmar On 10/25/2017 12:29 PM, Jan Filipiak wrote: > Hi, > > unfortunatly there is nothing trivial you could do here. > Without upgrading your kafkas you can only bounce the partition back and > forth > between brokers so they compact while its still small. > > With upgrading you could also just cherrypick this very commit or put a > logstatement to verify. > > Given the Logsizes your dealing with, I am very confident that this is > your issue. > > Best Jan > > > On 25.10.2017 12:21, Elmar Weber wrote: >> Hi, >> >> On 10/25/2017 12:15 PM, Xin Li wrote: >> > I think that is a bug, and should be fixed in this task >> https://issues.apache.org/jira/browse/KAFKA-6030. >> > We experience that in our kafka cluster, we just check out the >> 11.0.2 version, build it ourselves. >> >> thanks for the hint, as it looks like a calculation issue, would it be >> possible to verify this by manually changing the clean ratio or some >> other settings? >> >> Best, >> Elmar >> > >
Re: Log Compaction Not Picking up Topic
Hi, thanks, I'll give it a try, we run on Kubernetes so it's not a big issue to replicate the whole env including data. One question I'd have left: - How can I force a re-compaction over the whole topic? Because I guess the Log Cleaner market everything so far as not able to clean, how will it recheck the whole log? Best, Elmar On 10/25/2017 12:29 PM, Jan Filipiak wrote: Hi, unfortunatly there is nothing trivial you could do here. Without upgrading your kafkas you can only bounce the partition back and forth between brokers so they compact while its still small. With upgrading you could also just cherrypick this very commit or put a logstatement to verify. Given the Logsizes your dealing with, I am very confident that this is your issue. Best Jan On 25.10.2017 12:21, Elmar Weber wrote: Hi, On 10/25/2017 12:15 PM, Xin Li wrote: > I think that is a bug, and should be fixed in this task https://issues.apache.org/jira/browse/KAFKA-6030. > We experience that in our kafka cluster, we just check out the 11.0.2 version, build it ourselves. thanks for the hint, as it looks like a calculation issue, would it be possible to verify this by manually changing the clean ratio or some other settings? Best, Elmar
Re: Log Compaction Not Picking up Topic
Hi, On 10/25/2017 12:15 PM, Xin Li wrote: > I think that is a bug, and should be fixed in this task https://issues.apache.org/jira/browse/KAFKA-6030. > We experience that in our kafka cluster, we just check out the 11.0.2 version, build it ourselves. thanks for the hint, as it looks like a calculation issue, would it be possible to verify this by manually changing the clean ratio or some other settings? Best, Elmar
Re: Log Compaction Not Picking up Topic
On 10/25/2017 12:03 PM, Manikumar wrote: any errors in log cleaner logs? Not as far as I can see root@kafka-1:/opt/kafka/logs# cat log-cleaner.log* | grep -i error (empty) However, I've seen that it actually did cleaning of the whole topic (excerpts below), but it didn't seem to find anything worth cleaning (size stayed the same). Are there any global settings that could affect this? I'm running a default config from Kafka, the only things changed are message size, topic creation/deletion and the defaults for retention. Which are all overwritten for this topic (see original post) I'm writing a simple script to read and confirm the key distributions to debug further, but as the same data (without duplicates) is also written to a DB I'm pretty sure that the size is too big for not having gotten compacted. root@kafka-1:/opt/kafka/logs# cat log-cleaner.log* | grep events.lg.aggregated [2017-10-24 11:08:43,462] INFO Cleaner 0: Beginning cleaning of log events.lg.aggregated-0. (kafka.log.LogCleaner) [2017-10-24 11:08:43,462] INFO Cleaner 0: Building offset map for events.lg.aggregated-0... (kafka.log.LogCleaner) [2017-10-24 11:08:43,688] INFO Cleaner 0: Building offset map for log events.lg.aggregated-0 for 1 segments in offset range [0, 81308). (kafka.log.LogCleaner) [2017-10-24 11:08:44,163] INFO Cleaner 0: Offset map for log events.lg.aggregated-0 complete. (kafka.log.LogCleaner) [2017-10-24 11:08:44,165] INFO Cleaner 0: Cleaning log events.lg.aggregated-0 (cleaning prior to Tue Oct 24 11:08:30 UTC 2017, discarding tombstones prior to Thu Jan 01 00:00:00 UTC 1970)... (kafka.log.LogCleaner) [2017-10-24 11:08:44,166] INFO Cleaner 0: Cleaning segment 0 in log events.lg.aggregated-0 (largest timestamp Tue Oct 24 11:08:30 UTC 2017) into 0, retaining deletes. (kafka.log.LogCleaner) [2017-10-24 11:08:47,865] INFO Cleaner 0: Swapping in cleaned segment 0 for segment(s) 0 in log events.lg.aggregated-0. (kafka.log.LogCleaner) Log cleaner thread 0 cleaned log events.lg.aggregated-0 (dirty section = [0, 0]) [2017-10-24 11:10:47,875] INFO Cleaner 0: Beginning cleaning of log events.lg.aggregated-0. (kafka.log.LogCleaner) [2017-10-24 11:10:47,875] INFO Cleaner 0: Building offset map for events.lg.aggregated-0... (kafka.log.LogCleaner) [2017-10-24 11:10:47,910] INFO Cleaner 0: Building offset map for log events.lg.aggregated-0 for 1 segments in offset range [81308, 154902). (kafka.log.LogCleaner) [2017-10-24 11:10:48,410] INFO Cleaner 0: Offset map for log events.lg.aggregated-0 complete. (kafka.log.LogCleaner) [2017-10-24 11:10:48,411] INFO Cleaner 0: Cleaning log events.lg.aggregated-0 (cleaning prior to Tue Oct 24 11:10:32 UTC 2017, discarding tombstones prior to Mon Oct 23 11:08:30 UTC 2017)... (kafka.log.LogCleaner) [2017-10-24 11:10:48,411] INFO Cleaner 0: Cleaning segment 0 in log events.lg.aggregated-0 (largest timestamp Tue Oct 24 11:08:30 UTC 2017) into 0, retaining deletes. (kafka.log.LogCleaner) [2017-10-24 11:10:50,308] INFO Cleaner 0: Swapping in cleaned segment 0 for segment(s) 0 in log events.lg.aggregated-0. (kafka.log.LogCleaner) [2017-10-24 11:10:50,309] INFO Cleaner 0: Cleaning segment 81308 in log events.lg.aggregated-0 (largest timestamp Tue Oct 24 11:10:32 UTC 2017) into 81308, retaining deletes. (kafka.log.LogCleaner) [2017-10-24 11:10:53,389] INFO Cleaner 0: Swapping in cleaned segment 81308 for segment(s) 81308 in log events.lg.aggregated-0. (kafka.log.LogCleaner) Log cleaner thread 0 cleaned log events.lg.aggregated-0 (dirty section = [81308, 81308])
Log Compaction Not Picking up Topic
Hello, I'm having trouble getting Kafka to compact a topic. It's over 300GB and has enough segments to warrant cleaning. It should only be about 40 GB (there is a copy in a db that is unique on the key). Below are the configs we have (default broker) and topic override. Is there something I'm missing on which setting is overriding which one or something still wrongly? retention.ms and delete.retentions.ms I set manually after creation on the topic and some segments have been created already. Kafka version 0.11 Server Defaults for new segments of the topic: The settings used when a new log was created for the topic: {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 6, max.message.bytes -> 2097152, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 8640, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.ms -> 60480, segment.bytes -> 1073741824, retention.ms -> -1, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807} Topic Overrides (overridden after creation). {retention.ms=360, delete.retention.ms=360, max.message.bytes=10485760, cleanup.policy=compact} The full server startup config: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null authorizer.class.name = auto.create.topics.enable = false auto.leader.rebalance.enable = true background.threads = 10 broker.id = 1 broker.id.generation.enable = true broker.rack = europe-west1-c compression.type = producer connections.max.idle.ms = 60 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 3 create.topic.policy.class.name = null default.replication.factor = 1 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 0 group.max.session.timeout.ms = 30 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = null inter.broker.protocol.version = 0.11.0-IV2 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT listeners = null log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 8640 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /var/lib/kafka/data/topics log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 6 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 6 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 0.11.0-IV2 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 30 log.retention.hours = -1 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 6 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 2097152 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 60 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 1 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder producer.purgatory.purge.interval.requests = 1000 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1