[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804577#comment-16804577 ] xiongqi wu commented on KAFKA-7362: --- Yes, I am stilling looking forward on fixing this issue. I was on hold on this item because 1) there is no active discussion on KIP-370. 2) I am also thinking about combing this with topic deletion. Today a topic can only be deleted when all related brokers are online. This causes topic deletion to stuck if a broker fails and never come back again. I was thinking combining this feature with the change of topic deletion state machine so that topic deletion can always success and orphan partitions can be safely removed when offline brokers come back online. I will update the ticket and KIP-370 after I think a little more about item 2. > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8157) Missing "key.serializer" exception when setting "segment index bytes"
[ https://issues.apache.org/jira/browse/KAFKA-8157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804539#comment-16804539 ] Travis Brad Ellis commented on KAFKA-8157: -- I'd like to work this. Newbie here. I sent a request to be added as a contributor via [d...@kafka.apache.org|mailto:d...@kafka.apache.org] . Planning to assign it to myself once added. > Missing "key.serializer" exception when setting "segment index bytes" > - > > Key: KAFKA-8157 > URL: https://issues.apache.org/jira/browse/KAFKA-8157 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 > Environment: ubuntu 18.10, localhost and Aiven too >Reporter: Cristian D >Priority: Major > Labels: beginner, newbie > > As a `kafka-streams` user, > When I set the "segment index bytes" property > Then I would like to have internal topics with the specified allocated disk > space > > At the moment, when setting the "topic.segment.index.bytes" property, the > application is exiting with following exception: > {code:java} > Exception in thread "main" org.apache.kafka.common.config.ConfigException: > Missing required configuration "key.serializer" which has no default value. > {code} > Tested with `kafka-streams` v2.0.0 and v2.2.0. > > Stack trace: > {code:java} > Exception in thread "main" org.apache.kafka.common.config.ConfigException: > Missing required configuration "key.serializer" which has no default value. > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:392) > at > org.apache.kafka.streams.StreamsConfig.getMainConsumerConfigs(StreamsConfig.java:1014) > at > org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:666) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:718) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:634) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:544) > at app.Main.main(Main.java:36) > {code} > A demo application simulating the exception: > https://github.com/razorcd/java-snippets-and-demo-projects/tree/master/kafkastreamsdemo > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804533#comment-16804533 ] huxihx commented on KAFKA-7965: --- Part of the failure is caused by the fact that the rebalance did not finish as expected. Some consumer was not assigned with any partitions. I could steadily reproduce this issue but still figure out what's going wrong with `waitForRebalance` > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Stanislav Kozlovski >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804532#comment-16804532 ] hackerwin7 commented on KAFKA-8106: --- Maybe we can put the record level validation to client endpoint or others? And record message format add a validation filed to header, In broker side, broker can check this validation field to prevent record level validation. > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8174) Can't call arbitrary SimpleBenchmarks tests from streams_simple_benchmark_test.py
Sophie Blee-Goldman created KAFKA-8174: -- Summary: Can't call arbitrary SimpleBenchmarks tests from streams_simple_benchmark_test.py Key: KAFKA-8174 URL: https://issues.apache.org/jira/browse/KAFKA-8174 Project: Kafka Issue Type: Bug Reporter: Sophie Blee-Goldman When using the script streams_simple_benchmark_test.py you should be able to specify a test name and run that particular method in SimpleBenchmarks. This works for most existing benchmarks, however you can't use this to run the "yahoo" benchmark and you can't add new tests to SimpleBenchmarks and start them successfully. If you try to run yahoo/new test it fails with the error "Not enough parameters are provided; expecting propFileName, testName, numRecords, keySkew, valueSize" in main(); the missing argument turns out to be testName. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804470#comment-16804470 ] Dhruvil Shah commented on KAFKA-7362: - [~xiongqiwu] are you planning to work on this JIRA? If not, I could take a stab at fixing it. > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering
[ https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8159: --- Description: Currently we assume consistent ordering between serialized and deserialized keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes will also obey bytesA < bytesB < bytesC. This is not true in general of the built-in serdes for signed numerical types (eg Integer, Long). Specifically, it is broken by the negative number representations which are lexicographically greater than (all) positive number representations. One consequence of this is that an interactive query of a key range with a negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) will result in "unexpected behavior" depending on the specific store type. For RocksDB stores with caching disabled, an empty iterator will be returned regardless of whether any records do exist in that range. For in-memory stores and ANY store with caching enabled, Streams will throw an unchecked exception and crash. was: If a user creates a queryable state store using one of the signed built-in serdes (eg Integer) for the key, there is nothing preventing records with negative keys from being inserted and/or fetched individually. However if the user tries to query the store for a range of keys starting with a negative number, unexpected behavior results that is store-specific. For RocksDB stores with caching disabled, an empty iterator will be returned. For in-memory stores and ANY store with caching enabled, Streams will throw an unchecked exception. This situation should be handled more gracefully, or users should be informed of this limitation and the result should at least be consist across types of store. > Built-in serdes for signed numbers do not obey lexicographical ordering > --- > > Key: KAFKA-8159 > URL: https://issues.apache.org/jira/browse/KAFKA-8159 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > Currently we assume consistent ordering between serialized and deserialized > keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes > will also obey bytesA < bytesB < bytesC. This is not true in general of the > built-in serdes for signed numerical types (eg Integer, Long). Specifically, > it is broken by the negative number representations which are > lexicographically greater than (all) positive number representations. > > One consequence of this is that an interactive query of a key range with a > negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) > will result in "unexpected behavior" depending on the specific store type. > > For RocksDB stores with caching disabled, an empty iterator will be returned > regardless of whether any records do exist in that range. > For in-memory stores and ANY store with caching enabled, Streams will throw > an unchecked exception and crash. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place
[ https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804456#comment-16804456 ] ASF GitHub Bot commented on KAFKA-7502: --- dongjinleekr commented on pull request #6520: KAFKA-7502: Cleanup KTable materialization logic in a single place URL: https://github.com/apache/kafka/pull/6520 This PR is the final part of KAFKA-7502, which cleans up `KTableImpl#doMapValues` method. (follow up of #6174, #6453, and #6519) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup KTable materialization logic in a single place > -- > > Key: KAFKA-7502 > URL: https://issues.apache.org/jira/browse/KAFKA-7502 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Lee Dongjin >Priority: Major > > Today since we pre-create all the `KTableXXX` operator along with the logical > node, we are effectively duplicating the logic to determine whether the > resulted KTable should be materialized. More specifically, the > materialization principle today is that: > 1) If users specified Materialized in the DSL and it contains a queryable > name. We always materialize. > 2) If users specified Materialized in the DSL but not contains a queryable > name, or if users do not specify a Materialized object at all, Streams may > choose to materialize or not. But in any cases, even if the KTable is > materialized it will not be queryable since there's no queryable name (i.e. > only storeName is not null, but queryableName is null): > 2.a) If the resulted KTable is from an aggregation, we always materialize > since it is needed for storing the aggregation (i.e. we use the > MaterializedInternal constructor with nameProvider != null). > 2.b) If the resulted KTable is from a source topic, we delay the > materialization until the downstream operator requires this KTable to be > materialized or send-old-values (see `KTableSourceNode` and `KTableSource`). > 2.c) If the resulted KTable if from a join, we always materialize if users > creates a Materialized object even without a queryable name. However this can > be optimized similar to 2.b) but is orthogonal to this ticket (see > `KTableImpl#buildJoin` where we always use constructor with nameProvider != > null). > 2.d) If the resulted KTable is from a stateless operation like filter / > mapValues, we never materialize. > > Now, in all of these cases, we have logical node like "KTableKTableJoinNode", > as well as physical node like `ProcessorNode`. Ideally we should always > create the logical Plan (i.e. the StreamsGraph), and then optimize it if > necessary, and then generate the physical plan (i.e. the Topology), however > today we create some physical nodes beforehand, and the above logic is hence > duplicated in the creation of both physical nodes and logical nodes. For > example, in `KTableKTableJoinNode` we check if Materialized is null for > adding a state store, and in `KTableImpl#doJoin` we check if materialized is > specified (case 2.c) above). > Another example is in TableProcessorNode which is used for 2.d) above, in > which it includes the logic whereas its caller, `KTableImpl#doFilter` for > example, also contains the logic when deciding to pass `queryableName` > parameter to `KTableProcessorSupplier`. > This is bug-vulnerable since we may update the logic in one class but forgot > to update the other class. > -- > What we want to have is a cleaner code path similar to what we have for 2.b), > such that when creating the logical nodes we keep track of whether 1) > materialized is specified, and 2) queryable name is provided. And during > optimization phase, we may change the inner physical ProcessorBuilder's > parameters like queryable name etc, and then when it is time to generate the > physical node, we can just blindly take the parameters and go for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering
[ https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8159: --- Summary: Built-in serdes for signed numbers do not obey lexicographical ordering (was: Multi-key range queries with negative keyFrom results in unexpected behavior) > Built-in serdes for signed numbers do not obey lexicographical ordering > --- > > Key: KAFKA-8159 > URL: https://issues.apache.org/jira/browse/KAFKA-8159 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > If a user creates a queryable state store using one of the signed built-in > serdes (eg Integer) for the key, there is nothing preventing records with > negative keys from being inserted and/or fetched individually. However if the > user tries to query the store for a range of keys starting with a negative > number, unexpected behavior results that is store-specific. > > For RocksDB stores with caching disabled, an empty iterator will be returned. > For in-memory stores and ANY store with caching enabled, Streams will throw > an unchecked exception. > > This situation should be handled more gracefully, or users should be informed > of this limitation and the result should at least be consist across types of > store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place
[ https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804449#comment-16804449 ] ASF GitHub Bot commented on KAFKA-7502: --- dongjinleekr commented on pull request #6519: KAFKA-7502: Cleanup KTable materialization logic in a single place URL: https://github.com/apache/kafka/pull/6519 This PR is a follow-up of #6174 and #6453, which cleans up `KTableImpl#doTransformValues` method. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup KTable materialization logic in a single place > -- > > Key: KAFKA-7502 > URL: https://issues.apache.org/jira/browse/KAFKA-7502 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Lee Dongjin >Priority: Major > > Today since we pre-create all the `KTableXXX` operator along with the logical > node, we are effectively duplicating the logic to determine whether the > resulted KTable should be materialized. More specifically, the > materialization principle today is that: > 1) If users specified Materialized in the DSL and it contains a queryable > name. We always materialize. > 2) If users specified Materialized in the DSL but not contains a queryable > name, or if users do not specify a Materialized object at all, Streams may > choose to materialize or not. But in any cases, even if the KTable is > materialized it will not be queryable since there's no queryable name (i.e. > only storeName is not null, but queryableName is null): > 2.a) If the resulted KTable is from an aggregation, we always materialize > since it is needed for storing the aggregation (i.e. we use the > MaterializedInternal constructor with nameProvider != null). > 2.b) If the resulted KTable is from a source topic, we delay the > materialization until the downstream operator requires this KTable to be > materialized or send-old-values (see `KTableSourceNode` and `KTableSource`). > 2.c) If the resulted KTable if from a join, we always materialize if users > creates a Materialized object even without a queryable name. However this can > be optimized similar to 2.b) but is orthogonal to this ticket (see > `KTableImpl#buildJoin` where we always use constructor with nameProvider != > null). > 2.d) If the resulted KTable is from a stateless operation like filter / > mapValues, we never materialize. > > Now, in all of these cases, we have logical node like "KTableKTableJoinNode", > as well as physical node like `ProcessorNode`. Ideally we should always > create the logical Plan (i.e. the StreamsGraph), and then optimize it if > necessary, and then generate the physical plan (i.e. the Topology), however > today we create some physical nodes beforehand, and the above logic is hence > duplicated in the creation of both physical nodes and logical nodes. For > example, in `KTableKTableJoinNode` we check if Materialized is null for > adding a state store, and in `KTableImpl#doJoin` we check if materialized is > specified (case 2.c) above). > Another example is in TableProcessorNode which is used for 2.d) above, in > which it includes the logic whereas its caller, `KTableImpl#doFilter` for > example, also contains the logic when deciding to pass `queryableName` > parameter to `KTableProcessorSupplier`. > This is bug-vulnerable since we may update the logic in one class but forgot > to update the other class. > -- > What we want to have is a cleaner code path similar to what we have for 2.b), > such that when creating the logical nodes we keep track of whether 1) > materialized is specified, and 2) queryable name is provided. And during > optimization phase, we may change the inner physical ProcessorBuilder's > parameters like queryable name etc, and then when it is time to generate the > physical node, we can just blindly take the parameters and go for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8173) Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1
[ https://issues.apache.org/jira/browse/KAFKA-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Anand updated KAFKA-8173: -- Description: After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for all the topics "due to Corrupt time index found, time index file". {code:java} [2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) }} {{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/NewTopic-3/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) [2019-03-28 17:23:55.877+] WARN [Log partition=SecondTopic-3, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/SecondTopic-3/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/SecondTopic-3/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) {code} was: After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for all the topics "due to Corrupt time index found, time index file". {code:java} {{[2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) }} {{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/NewTopic-3/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) [2019-03-28 17:23:55.877+] WARN [Log partition=SecondTopic-3, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/SecondTopic-3/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/SecondTopic-3/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) }} {code} > Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1 > -- > > Key: KAFKA-8173 > URL: https://issues.apache.org/jira/browse/KAFKA-8173 > Project: Kafka > Issue Type: Improvement > Components: offset manager >Reporter: Amit Anand >Priority: Major > > After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for > all the topics "due to Corrupt time index found, time index file". > {code:java} > [2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, > dir=/apps/kafka/data] Found a corrupted index file corresponding to log file > /apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time > index found, time index file > (/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero > size but the last timestamp is 0 which is less than the first timestamp > 1553720469480}, recovering segment and rebuilding index files... > (kafka.log.Log) }} > {{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, > dir=/apps/kafka/data] Found a corrupted index file corresponding to log file > /apps/kafka/data/NewTopic-3/0494.log due to Corrupt time > index found, time index file > (/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero > size but the last timestamp is 0 which is less than the first timestamp > 1553720469480}, recovering segment and rebuilding index files... >
[jira] [Created] (KAFKA-8173) Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1
Amit Anand created KAFKA-8173: - Summary: Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1 Key: KAFKA-8173 URL: https://issues.apache.org/jira/browse/KAFKA-8173 Project: Kafka Issue Type: Improvement Components: offset manager Reporter: Amit Anand After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for all the topics "due to Corrupt time index found, time index file". {code:java} {{[2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) }} {{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/NewTopic-3/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) [2019-03-28 17:23:55.877+] WARN [Log partition=SecondTopic-3, dir=/apps/kafka/data] Found a corrupted index file corresponding to log file /apps/kafka/data/SecondTopic-3/0494.log due to Corrupt time index found, time index file (/apps/kafka/data/SecondTopic-3/0494.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log) }} {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place
[ https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804297#comment-16804297 ] ASF GitHub Bot commented on KAFKA-7502: --- guozhangwang commented on pull request #6453: KAFKA-7502: Cleanup KTable materialization logic in a single place (filter) URL: https://github.com/apache/kafka/pull/6453 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup KTable materialization logic in a single place > -- > > Key: KAFKA-7502 > URL: https://issues.apache.org/jira/browse/KAFKA-7502 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Lee Dongjin >Priority: Major > > Today since we pre-create all the `KTableXXX` operator along with the logical > node, we are effectively duplicating the logic to determine whether the > resulted KTable should be materialized. More specifically, the > materialization principle today is that: > 1) If users specified Materialized in the DSL and it contains a queryable > name. We always materialize. > 2) If users specified Materialized in the DSL but not contains a queryable > name, or if users do not specify a Materialized object at all, Streams may > choose to materialize or not. But in any cases, even if the KTable is > materialized it will not be queryable since there's no queryable name (i.e. > only storeName is not null, but queryableName is null): > 2.a) If the resulted KTable is from an aggregation, we always materialize > since it is needed for storing the aggregation (i.e. we use the > MaterializedInternal constructor with nameProvider != null). > 2.b) If the resulted KTable is from a source topic, we delay the > materialization until the downstream operator requires this KTable to be > materialized or send-old-values (see `KTableSourceNode` and `KTableSource`). > 2.c) If the resulted KTable if from a join, we always materialize if users > creates a Materialized object even without a queryable name. However this can > be optimized similar to 2.b) but is orthogonal to this ticket (see > `KTableImpl#buildJoin` where we always use constructor with nameProvider != > null). > 2.d) If the resulted KTable is from a stateless operation like filter / > mapValues, we never materialize. > > Now, in all of these cases, we have logical node like "KTableKTableJoinNode", > as well as physical node like `ProcessorNode`. Ideally we should always > create the logical Plan (i.e. the StreamsGraph), and then optimize it if > necessary, and then generate the physical plan (i.e. the Topology), however > today we create some physical nodes beforehand, and the above logic is hence > duplicated in the creation of both physical nodes and logical nodes. For > example, in `KTableKTableJoinNode` we check if Materialized is null for > adding a state store, and in `KTableImpl#doJoin` we check if materialized is > specified (case 2.c) above). > Another example is in TableProcessorNode which is used for 2.d) above, in > which it includes the logic whereas its caller, `KTableImpl#doFilter` for > example, also contains the logic when deciding to pass `queryableName` > parameter to `KTableProcessorSupplier`. > This is bug-vulnerable since we may update the logic in one class but forgot > to update the other class. > -- > What we want to have is a cleaner code path similar to what we have for 2.b), > such that when creating the logical nodes we keep track of whether 1) > materialized is specified, and 2) queryable name is provided. And during > optimization phase, we may change the inner physical ProcessorBuilder's > parameters like queryable name etc, and then when it is time to generate the > physical node, we can just blindly take the parameters and go for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process
[ https://issues.apache.org/jira/browse/KAFKA-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bharat Kondeti updated KAFKA-8172: -- Description: Fix to close file handlers before renaming files / directories and open them back if required Following are the file renaming scenarios: * Files are renamed to .deleted so they can be deleted * .cleaned files are renamed to .swap as part of Log.replaceSegments flow * .swap files are renamed to original files as part of Log.replaceSegments flow Following are the folder renaming scenarios: * When a topic is marked for deletion, folder is renamed * As part of replacing current logs with future logs in LogManager In above scenarios, if file handles are not closed, we get file access violation exception Idea is to close the logs and file segments before doing a rename and open them back up if required. *Segments Deletion Scenario* [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: D:\data\Kafka\kafka-logs\test4-1\.log -> D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process cannot access the file because it is being used by another process. at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601) at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588) at kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) at kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170) at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) at kafka.log.Log.maybeHandleIOException(Log.scala:1678) at kafka.log.Log.deleteSegments(Log.scala:1161) at kafka.log.Log.deleteOldSegments(Log.scala:1156) at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228) at kafka.log.Log.deleteOldSegments(Log.scala:1222) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.log.LogManager.cleanupLogs(LogManager.scala:852) at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: java.nio.file.FileSystemException: D:\data\Kafka\kafka-logs\test4-1\.log -> D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process cannot access the file because it is being used by another process. at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301) at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694) ... 32 more *Topic deletion scenario* 2018-06-07 15:05:17,805] ERROR Error while renaming dir for test5-1 in log dir D:\data\Kafka\kafka-logs
[jira] [Commented] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process
[ https://issues.apache.org/jira/browse/KAFKA-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804243#comment-16804243 ] Bharat Kondeti commented on KAFKA-8172: --- Pull request link > FileSystemException: The process cannot access the file because it is being > used by another process > --- > > Key: KAFKA-8172 > URL: https://issues.apache.org/jira/browse/KAFKA-8172 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.1, 2.2.0, 2.1.1 > Environment: Windows >Reporter: Bharat Kondeti >Priority: Major > Fix For: 1.1.1, 2.2.0, 2.1.1 > > Attachments: > 0001-Fix-to-close-the-handlers-before-renaming-files-and-.patch > > > Fix to close file handlers before renaming files / directories and open them > back if required > Following are the file renaming scenarios: > * Files are renamed to .deleted so they can be deleted > * .cleaned files are renamed to .swap as part of Log.replaceSegments flow > * .swap files are renamed to original files as part of Log.replaceSegments > flow > Following are the folder renaming scenarios: > * When a topic is marked for deletion, folder is renamed > * As part of replacing current logs with future logs in LogManager > In above scenarios, if file handles are not closed, we get file access > violation exception > Idea is to close the logs and file segments before doing a rename and open > them back up if required. > *Segments Deletion Scenario* > [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in > dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > D:\data\Kafka\kafka-logs\test4-1\.log -> > D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The > process cannot access the file because it is being used by another process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) > at > sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588) > at > kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) > at > kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170) > at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) > at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) > at kafka.log.Log.maybeHandleIOException(Log.scala:1678) > at kafka.log.Log.deleteSegments(Log.scala:1161) > at kafka.log.Log.deleteOldSegments(Log.scala:1156) > at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228) > at kafka.log.Log.deleteOldSegments(Log.scala:1222) > at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854) > at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:392) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:852) > at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at
[jira] [Commented] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process
[ https://issues.apache.org/jira/browse/KAFKA-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804235#comment-16804235 ] Bharat Kondeti commented on KAFKA-8172: --- Pull request > FileSystemException: The process cannot access the file because it is being > used by another process > --- > > Key: KAFKA-8172 > URL: https://issues.apache.org/jira/browse/KAFKA-8172 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.1, 2.2.0, 2.1.1 > Environment: Windows >Reporter: Bharat Kondeti >Priority: Major > Fix For: 1.1.1, 2.2.0, 2.1.1 > > > Fix to close file handlers before renaming files / directories and open them > back if required > Following are the file renaming scenarios: > * Files are renamed to .deleted so they can be deleted > * .cleaned files are renamed to .swap as part of Log.replaceSegments flow > * .swap files are renamed to original files as part of Log.replaceSegments > flow > Following are the folder renaming scenarios: > * When a topic is marked for deletion, folder is renamed > * As part of replacing current logs with future logs in LogManager > In above scenarios, if file handles are not closed, we get file access > violation exception > Idea is to close the logs and file segments before doing a rename and open > them back up if required. > *Segments Deletion Scenario* > [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in > dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > D:\data\Kafka\kafka-logs\test4-1\.log -> > D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The > process cannot access the file because it is being used by another process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) > at > sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588) > at > kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) > at > kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170) > at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) > at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) > at kafka.log.Log.maybeHandleIOException(Log.scala:1678) > at kafka.log.Log.deleteSegments(Log.scala:1161) > at kafka.log.Log.deleteOldSegments(Log.scala:1156) > at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228) > at kafka.log.Log.deleteOldSegments(Log.scala:1222) > at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854) > at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:392) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:852) > at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Suppressed: java.nio.file.FileSystemException: >
[jira] [Created] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process
Bharat Kondeti created KAFKA-8172: - Summary: FileSystemException: The process cannot access the file because it is being used by another process Key: KAFKA-8172 URL: https://issues.apache.org/jira/browse/KAFKA-8172 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.1.1, 2.2.0, 1.1.1 Environment: Windows Reporter: Bharat Kondeti Fix For: 2.1.1, 2.2.0, 1.1.1 Fix to close file handlers before renaming files / directories and open them back if required Following are the file renaming scenarios: * Files are renamed to .deleted so they can be deleted * .cleaned files are renamed to .swap as part of Log.replaceSegments flow * .swap files are renamed to original files as part of Log.replaceSegments flow Following are the folder renaming scenarios: * When a topic is marked for deletion, folder is renamed * As part of replacing current logs with future logs in LogManager In above scenarios, if file handles are not closed, we get file access violation exception Idea is to close the logs and file segments before doing a rename and open them back up if required. *Segments Deletion Scenario* [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: D:\data\Kafka\kafka-logs\test4-1\.log -> D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process cannot access the file because it is being used by another process. at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601) at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588) at kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) at kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170) at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) at kafka.log.Log.maybeHandleIOException(Log.scala:1678) at kafka.log.Log.deleteSegments(Log.scala:1161) at kafka.log.Log.deleteOldSegments(Log.scala:1156) at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228) at kafka.log.Log.deleteOldSegments(Log.scala:1222) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.log.LogManager.cleanupLogs(LogManager.scala:852) at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: java.nio.file.FileSystemException: D:\data\Kafka\kafka-logs\test4-1\.log -> D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process cannot access the file because it is being used by another process. at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301) at
[jira] [Commented] (KAFKA-6983) Error while deleting segments - The process cannot access the file because it is being used by another process
[ https://issues.apache.org/jira/browse/KAFKA-6983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804224#comment-16804224 ] ASF GitHub Bot commented on KAFKA-6983: --- kondetibharat commented on pull request #6517: KAFKA-6983: Fix to close file handlers before renaming files/directories URL: https://github.com/apache/kafka/pull/6517 Fix to close file handlers before renaming files / directories and open them back if required Following are the file renaming scenarios: - Files are renamed to .deleted so they can be deleted - .cleaned files are renamed to .swap as part of Log.replaceSegments flow - .swap files are renamed to original files as part of Log.replaceSegments flow Following are the folder renaming scenarios: - When a topic is marked for deletion, folder is renamed - As part of replacing current logs with future logs in LogManager In above scenarios, if file handles are not closed, we get file access violation exception Idea is to close the logs and file segments before doing a rename and open them back up if required. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Error while deleting segments - The process cannot access the file because it > is being used by another process > -- > > Key: KAFKA-6983 > URL: https://issues.apache.org/jira/browse/KAFKA-6983 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 1.1.0 > Environment: Windows 10 >Reporter: wade wu >Priority: Major > > .. > [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in > dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > D:\data\Kafka\kafka-logs\test4-1\.log -> > D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The > process cannot access the file because it is being used by another process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) > at > sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588) > at > kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) > at > kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170) > at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) > at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161) > at kafka.log.Log.maybeHandleIOException(Log.scala:1678) > at kafka.log.Log.deleteSegments(Log.scala:1161) > at kafka.log.Log.deleteOldSegments(Log.scala:1156) > at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228) > at kafka.log.Log.deleteOldSegments(Log.scala:1222) > at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854) > at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:392) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:852) > at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) > at
[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804179#comment-16804179 ] John Roesler commented on KAFKA-7895: - Here's the repro I put together. It'll take me a little while to debug it, but I wanted to share my approach. [https://github.com/vvcephei/suppress-demo] -John > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804035#comment-16804035 ] John Roesler edited comment on KAFKA-7895 at 3/28/19 6:01 PM: -- EDIT: actually, nevermind. After replicating your application, I was able to reproduce what you're seeing. I'll let you know when I figure out what's going on. Thanks! Also, can you elaborate on how `TimestampedValueTimestampExtractor` behaves? Thanks, -John was (Author: vvcephei): Thanks! Also, can you elaborate on how `TimestampedValueTimestampExtractor` behaves? Thanks, -John > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804148#comment-16804148 ] Guozhang Wang commented on KAFKA-7965: -- [~enether] Saw another failure: https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3484 ``` kafka.api.ConsumerBounceTest > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup FAILED 08:14:24 java.util.concurrent.ExecutionException: Boxed Error 08:14:24 08:14:24 Caused by: 08:14:24 java.lang.AssertionError: Received 0, expected at least 68 08:14:24 at org.junit.Assert.fail(Assert.java:89) 08:14:24 at org.junit.Assert.assertTrue(Assert.java:42) 08:14:24 at kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:563) 08:14:24 at kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$5(ConsumerBounceTest.scala:347) ``` > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Stanislav Kozlovski >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline
[ https://issues.apache.org/jira/browse/KAFKA-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kehu updated KAFKA-8171: Description: Problem: In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, it will try to group the requests based on deletePartition flag and callback: val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null) When both conditions meet, controller is expected to send only one request to destination broker. However, when adding the requests in ReplicaStateMachine, it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is always empty and controller will always first sends an empty request followed by #partitions requests. Fix: set the callback to null in addStopReplicaRequestForBrokers when replica state transits to offline. PR has been created: https://github.com/apache/kafka/pull/6515 was: Problem: In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, it will try to group the requests based on deletePartition flag and callback: val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null) When both conditions meet, controller is expected to send only one request to destination broker. However, when adding the requests in ReplicaStateMachine, it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is always empty and controller will always first sends an empty request followed by #partitions requests. Fix: set the callback to null in addStopReplicaRequestForBrokers when replica state transits to offline. > callback needs to be null when addStopReplicaRequestForBrokers when replica > state transits to offline > - > > Key: KAFKA-8171 > URL: https://issues.apache.org/jira/browse/KAFKA-8171 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: kehu >Priority: Major > > Problem: > In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, > it will try to group the requests based on deletePartition flag and callback: > val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => > !r.deletePartition && r.callback == null) > When both conditions meet, controller is expected to send only one request to > destination broker. However, when adding the requests in ReplicaStateMachine, > it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is > always empty and controller will always first sends an empty request followed > by #partitions requests. > > Fix: set the callback to null in addStopReplicaRequestForBrokers when replica > state transits to offline. PR has been created: > https://github.com/apache/kafka/pull/6515 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline
kehu created KAFKA-8171: --- Summary: callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline Key: KAFKA-8171 URL: https://issues.apache.org/jira/browse/KAFKA-8171 Project: Kafka Issue Type: Bug Components: controller Reporter: kehu Problem: In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, it will try to group the requests based on deletePartition flag and callback: val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null) When both conditions meet, controller is expected to send only one request to destination broker. However, when adding the requests in ReplicaStateMachine, it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is always empty and controller will always first sends an empty request followed by #partitions requests. Fix: set the callback to null in addStopReplicaRequestForBrokers when replica state transits to offline. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804035#comment-16804035 ] John Roesler commented on KAFKA-7895: - Thanks! Also, can you elaborate on how `TimestampedValueTimestampExtractor` behaves? Thanks, -John > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7981) Add Replica Fetcher and Log Cleaner Count Metrics
[ https://issues.apache.org/jira/browse/KAFKA-7981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804012#comment-16804012 ] ASF GitHub Bot commented on KAFKA-7981: --- viktorsomogyi commented on pull request #6514: KAFKA-7981: Add fetcher and log cleaner thread count metrics URL: https://github.com/apache/kafka/pull/6514 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Replica Fetcher and Log Cleaner Count Metrics > - > > Key: KAFKA-7981 > URL: https://issues.apache.org/jira/browse/KAFKA-7981 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 2.3.0 >Reporter: Viktor Somogyi-Vass >Assignee: Viktor Somogyi-Vass >Priority: Major > Labels: kip > > In some occasions we detected errors where replica fetcher threads or log > cleaners died because of an unrecoverable error and caused more serious > issues in the brokers (from lagging to offline replicas, filling up disks, > etc.). It would often help if the monitoring systems attached to Kafka could > detect these problems early on as it would allow a prompt response from the > user and the greater possibility of capturing the root cause. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804003#comment-16804003 ] hackerwin7 commented on KAFKA-8106: --- The `validateRecord()` is placed in every branch of `validateMessagesAndAssignOffsets()`, Any ideas to put it somewhere to prevent the broker decompression ? > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8103) Kafka SIGSEGV on kafka-network-thread
[ https://issues.apache.org/jira/browse/KAFKA-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803955#comment-16803955 ] Ismael Juma commented on KAFKA-8103: Looks like a JVM JIT bug. I haven't seen this one before, so I wonder what's special about your environment. > Kafka SIGSEGV on kafka-network-thread > - > > Key: KAFKA-8103 > URL: https://issues.apache.org/jira/browse/KAFKA-8103 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1 > Environment: OS > Amazon Linux > Kernel > 4.14.97-74.72.amzn1.x86_64 #1 SMP Tue Feb 5 20:59:30 UTC 2019 x86_64 x86_64 > x86_64 GNU/Linux > Java > openjdk version "1.8.0_191" > OpenJDK Runtime Environment (build 1.8.0_191-b12) > OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) > AWS Instance Type > c5.4xlarge >Reporter: Sean Humbarger >Priority: Major > Attachments: hs_err_pid4345.log > > > We have a 4 node cluster (6 topics, 6 consumer groups) that is processing > 65,000 messages per second and are seeing SIGSEGV crashes at least once a day > (see attachment). Each broker has six disks attached to it to support the > kafka logs. When the crash occurs, we simply restart kafka and everything > seems fine. We don't see anything out of the ordinary in /var/log/messages > or dmesg when the crashes occur. Thus far, we are unable to predict during > the day when the crash will occur or which node it will occur on. > > The problematic frame is as follows: > {code:java} > # Problematic frame: > # J 8628 C2 > org.apache.kafka.common.metrics.stats.Max.update(Lorg/apache/kafka/common/metrics/stats/SampledStat$Sample;Lorg/apache/kafka/common/metrics/MetricConfig;DJ)V > (13 bytes) @ 0x7ff779f9fca0 [0x7ff779f9fc80+0x20] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8170) To add kafka data at rest encryption
[ https://issues.apache.org/jira/browse/KAFKA-8170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803940#comment-16803940 ] Sönke Liebau commented on KAFKA-8170: - Hi [~ashelke], I've proposed [KIP-317|https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality] a while ago, which would probably cover this - though it would do so client-side. It has been dormant for a while now due to lots of other things to do on my end to be honest, but I plan on reviving this very shortly. > To add kafka data at rest encryption > > > Key: KAFKA-8170 > URL: https://issues.apache.org/jira/browse/KAFKA-8170 > Project: Kafka > Issue Type: New Feature > Components: log >Reporter: Akash >Priority: Minor > Labels: features, security > > Kafka have mechanism for wire encryption of data. > But the kafka data at rest which exist in /- > is still unencrypted. > This directories now have log files with actual messages embedded metadata, > but unauthorised user can still recover messages from this files > Addiding encryption for this data would be valuable for preventing message > protection from disk theft, unauthorised user access on servers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8103) Kafka SIGSEGV on kafka-network-thread
[ https://issues.apache.org/jira/browse/KAFKA-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803933#comment-16803933 ] Sean Humbarger commented on KAFKA-8103: --- We are still seeing random JVM crashes. We've switched over from OpenJDK to Oracle 1.8.202 and see the same thing: {code} # # A fatal error has been detected by the Java Runtime Environment: # # SIGSEGV (0xb) at pc=0x7f6c9cd85100, pid=4550, tid=0x7f6a64792700 # # JRE version: Java(TM) SE Runtime Environment (8.0_202-b08) (build 1.8.0_202-b08) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.202-b08 mixed mode linux-amd64 compressed oops) # Problematic frame: # V [libjvm.so+0x2c7100] Handle::Handle(Thread*, oopDesc*)+0x0 # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # If you would like to submit a bug report, please visit: # http://bugreport.java.com/bugreport/crash.jsp # --- T H R E A D --- Current thread (0x7f6c99279000): JavaThread "kafka-request-handler-3" daemon [_thread_in_vm, id=4984, stack(0x7f6a64692000,0x7f6a64793000)] siginfo: si_signo: 11 (SIGSEGV), si_code: 2 (SEGV_ACCERR), si_addr: 0x7f6c9cd85100 Registers: RAX=0x7f6c9da7b2ce, RBX=0x7f6c99279000, RCX=0x0005502d3250, RDX=0x0005502d3250 RSP=0x7f6a647913d8, RBP=0x7f6a64791410, RSI=0x7f6c99279000, RDI=0x7f6a647913e8 R8 =0xaa05a64a, R9 =0x000550a3d9b8, R10=0x7f6c9d488af0, R11=0x0002 R12=0x7f6a64791470, R13=0x0005502d3250, R14=0x7f6c9da85f8c, R15=0x7f6c99279000 RIP=0x7f6c9cd85100, EFLAGS=0x00010246, CSGSFS=0x002b0033, ERR=0x0015 TRAPNO=0x000e Top of Stack: (sp=0x7f6a647913d8) 0x7f6a647913d8: 7f6c9d488b38 00700070 0x7f6a647913e8: 7f6c8a4e702c 7f67d003adaa 0x7f6a647913f8: f2e95d57 0x7f6a64791408: a9e2f767 a9e05757 0x7f6a64791418: 7f6c88e78c88 a9e05757 0x7f6a64791428: 7f6c8a0d149c 0005aa05a64a 0x7f6a64791438: 0005502d3250 0007974aeab8 0x7f6a64791448: 00054f02bab8 00054f17bb60 0x7f6a64791458: 7f6c9dbbacdd 0007974adfe0 0x7f6a64791468: 7f6c9d3cc53f 0003 0x7f6a64791478: 0d70aa32 00054f037260 0x7f6a64791488: 7f6c8ab3cc88 aa147bbef4590578 0x7f6a64791498: 0007a2c82bc0 0007974ae068 0x7f6a647914a8: 0007974adcb0 0007974adfe0 0x7f6a647914b8: 000550a3ddf0 0007974adcf8 0x7f6a647914c8: 0007974adc48 0007a2c83420 0x7f6a647914d8: 7f6cf2e95b69 a9e269d8 0x7f6a647914e8: 7f6c8a2c0934 0007974adba0 0x7f6a647914f8: 7f6c8a000138 7f6a64791550 0x7f6a64791508: 7ffed59c2c60 7f6a64791580 0x7f6a64791518: 0002f4590578 00079d149f38 0x7f6a64791528: 000550a3ddf0 7f6a64791570 0x7f6a64791538: f4590684 0x7f6a64791548: 00079d1b3490 7f6a64791590 0x7f6a64791558: 7f6c9dbbacdd 0007f3a366fd 0x7f6a64791568: 7f6c9dbbacdd 0007974adb60 0x7f6a64791578: 7f6c9d3cc53f 00011172 0x7f6a64791588: 0d7091ab f4590567 0x7f6a64791598: 7f6c8a887a24 0007a2c82bc0 0x7f6a647915a8: 0007a2c832d0 aa147bbe15bd 0x7f6a647915b8: f2e95b56974adb48 0005f2e95b67 0x7f6a647915c8: 0007974adb38 0007974adab0 Instructions: (pc=0x7f6c9cd85100) 0x7f6c9cd850e0: e8 0b 4c 77 00 48 83 c4 30 5b 41 5c 5d c3 66 90 0x7f6c9cd850f0: f3 c3 66 2e 0f 1f 84 00 00 00 00 00 0f 1f 40 00 0x7f6c9cd85100: 48 85 d2 74 63 55 48 89 e5 41 55 41 54 53 49 89 0x7f6c9cd85110: fc 48 89 d3 48 83 ec 08 4c 8b ae 38 01 00 00 49 Register to memory mapping: RAX=0x7f6c9da7b2ce: in /usr/local/java/jdk1.8.0_202/jre/lib/amd64/server/libjvm.so at 0x7f6c9cabe000 RBX=0x7f6c99279000 is a thread RCX=0x0005502d3250 is an oop java.lang.Object - klass: 'java/lang/Object' RDX=0x0005502d3250 is an oop java.lang.Object - klass: 'java/lang/Object' RSP=0x7f6a647913d8 is pointing into the stack for thread: 0x7f6c99279000 RBP=0x7f6a64791410 is pointing into the stack for thread: 0x7f6c99279000 RSI=0x7f6c99279000 is a thread RDI=0x7f6a647913e8 is pointing into the stack for thread: 0x7f6c99279000 R8 =0xaa05a64a is an unknown value R9 =0x000550a3d9b8 is an oop org.apache.kafka.common.utils.KafkaThread - klass: 'org/apache/kafka/common/utils/KafkaThread' R10=0x7f6c9d488af0: in /usr/local/java/jdk1.8.0_202/jre/lib/amd64/server/libjvm.so at 0x7f6c9cabe000 R11=0x0002 is an unknown value R12=0x7f6a64791470 is pointing into the stack for thread:
[jira] [Created] (KAFKA-8170) To add kafka data at rest encryption
Akash created KAFKA-8170: Summary: To add kafka data at rest encryption Key: KAFKA-8170 URL: https://issues.apache.org/jira/browse/KAFKA-8170 Project: Kafka Issue Type: New Feature Components: log Reporter: Akash Kafka have mechanism for wire encryption of data. But the kafka data at rest which exist in /- is still unencrypted. This directories now have log files with actual messages embedded metadata, but unauthorised user can still recover messages from this files Addiding encryption for this data would be valuable for preventing message protection from disk theft, unauthorised user access on servers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803813#comment-16803813 ] Andrew Klopper commented on KAFKA-7895: --- Hi John Roesler Yes, my consumer's isolation level is set to read_committed. Regards Andrew > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration
[ https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Campo updated KAFKA-8165: Description: Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the (largely stateful) application has been consuming ~160 messages per second at a sustained rate for several hours. However it started having connection issues to the brokers. {code:java} Connection to node 3 (/172.31.36.118:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient){code} Also it began showing a lot of these errors: {code:java} WARN [Consumer clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer, groupId=stream-processor] 1 partitions have leader brokers without a matching listener, including [broker-2-health-check-0] (org.apache.kafka.clients.NetworkClient){code} In fact, the _health-check_ topic is in the broker but not consumed by this topology or used in any way by the Streams application (it is just broker healthcheck). It does not complain about topics that are actually consumed by the topology. Some time after these errors (that appear at a rate of 24 appearances per second during ~5 minutes), then the following logs appear: {code:java} [2019-03-27 15:14:47,709] WARN [Consumer clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer, groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient){code} In between 6 and then 3 lines of "Connection could not be established" error messages, 3 of these ones slipped in: {code:java} [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore partition 15 total records to be restored 17 (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener){code} ... one for each different KV store I have (I still have another KV that does not appear, and a WindowedStore store that also does not appear). Then I finally see "Restoration Complete" (using a logging ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it seems it may be fine now to restart the processing. Three minutes later, some events get processed, and I see an OOM error: {code:java} java.lang.OutOfMemoryError: GC overhead limit exceeded{code} ... so given that it usually allows to process during hours under same circumstances, I'm wondering whether there is some memory leak in the connection resources or somewhere in the handling of this scenario. Kafka and KafkaStreams 2.1 was: Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the (largely stateful) application has been consuming ~160 messages per second at a sustained rate for several hours. However it started having connection issues to the brokers. {code:java} Connection to node 3 (/172.31.36.118:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient){code} Also it began showing a lot of these errors: {code:java} WARN [Consumer clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer, groupId=stream-processor] 1 partitions have leader brokers without a matching listener, including [broker-2-health-check-0] (org.apache.kafka.clients.NetworkClient){code} In fact, the _health-check_ topic is in the broker but not consumed by this topology or used in any way by the Streams application (it is just broker healthcheck). It does not complain about topics that are actually consumed by the topology. Some time after these errors (that appear at a rate of 24 appearances per second during ~5 minutes), then the following logs appear: {code:java} [2019-03-27 15:14:47,709] WARN [Consumer clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer, groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient){code} In between 6 and then 3 lines of "Connection could not be established" error messages, 3 of these ones slipped in: [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore partition 15 total records to be restored 17 (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener) ... one for each different KV store I have (I still have another KV that does not appear, and a WindowedStore store that also does not appear). Then I finally see "Restoration Complete" (using a logging ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it seems it may be fine now to restart the processing. Three minutes later, some events get processed, and I see an OOM error: java.lang.OutOfMemoryError: GC overhead limit exceeded ... so given that it usually allows to process during hours under
[jira] [Updated] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration
[ https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Campo updated KAFKA-8165: Environment: 3 nodes, 22 topics, 16 partitions per topic, 1 window store, 4 KV stores. Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 application instance, 2 threads per instance. Kafka 2.1, Kafka Streams 2.1 Amazon Linux. Scala application, on Docker based on openJdk9. was: Kafka 2.1, Kafka Streams 2.1 Amazon Linux, on Docker based on wurstmeister/kafka image > Streams task causes Out Of Memory after connection issues and store > restoration > --- > > Key: KAFKA-8165 > URL: https://issues.apache.org/jira/browse/KAFKA-8165 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 > Environment: 3 nodes, 22 topics, 16 partitions per topic, 1 window > store, 4 KV stores. > Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 > application instance, 2 threads per instance. > Kafka 2.1, Kafka Streams 2.1 > Amazon Linux. > Scala application, on Docker based on openJdk9. >Reporter: Di Campo >Priority: Major > > Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the > (largely stateful) application has been consuming ~160 messages per second at > a sustained rate for several hours. > However it started having connection issues to the brokers. > {code:java} > Connection to node 3 (/172.31.36.118:9092) could not be established. Broker > may not be available. (org.apache.kafka.clients.NetworkClient){code} > Also it began showing a lot of these errors: > {code:java} > WARN [Consumer > clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer, > groupId=stream-processor] 1 partitions have leader brokers without a > matching listener, including [broker-2-health-check-0] > (org.apache.kafka.clients.NetworkClient){code} > In fact, the _health-check_ topic is in the broker but not consumed by this > topology or used in any way by the Streams application (it is just broker > healthcheck). It does not complain about topics that are actually consumed by > the topology. > Some time after these errors (that appear at a rate of 24 appearances per > second during ~5 minutes), then the following logs appear: > {code:java} > [2019-03-27 15:14:47,709] WARN [Consumer > clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer, > groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker > may not be available. (org.apache.kafka.clients.NetworkClient){code} > In between 6 and then 3 lines of "Connection could not be established" error > messages, 3 of these ones slipped in: > [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore > partition 15 total records to be restored 17 > (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener) > > ... one for each different KV store I have (I still have another KV that does > not appear, and a WindowedStore store that also does not appear). > Then I finally see "Restoration Complete" (using a logging > ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it > seems it may be fine now to restart the processing. > Three minutes later, some events get processed, and I see an OOM error: > java.lang.OutOfMemoryError: GC overhead limit exceeded > > ... so given that it usually allows to process during hours under same > circumstances, I'm wondering whether there is some memory leak in the > connection resources or somewhere in the handling of this scenario. > Kafka and KafkaStreams 2.1 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8169) Wrong topic on streams quick start documentation
[ https://issues.apache.org/jira/browse/KAFKA-8169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803656#comment-16803656 ] Tcheutchoua Steve commented on KAFKA-8169: -- This has been adjusted and patched in this [PR|[https://github.com/apache/kafka/pull/6513]] > Wrong topic on streams quick start documentation > > > Key: KAFKA-8169 > URL: https://issues.apache.org/jira/browse/KAFKA-8169 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 2.1.1 >Reporter: Tcheutchoua Steve >Priority: Minor > Labels: documentation > Original Estimate: 1h > Remaining Estimate: 1h > > [Kafka Streams Quick > Start|[https://kafka.apache.org/21/documentation/streams/quickstart]] > Though out the tutorial, the name of the input topic that was created is > `streams-plaintext-input`. However, this was mistaken at some point in the > tutorial and changed to `streams-wordcount-input`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8169) Wrong topic on streams quick start documentation
Tcheutchoua Steve created KAFKA-8169: Summary: Wrong topic on streams quick start documentation Key: KAFKA-8169 URL: https://issues.apache.org/jira/browse/KAFKA-8169 Project: Kafka Issue Type: Improvement Components: documentation Affects Versions: 2.1.1 Reporter: Tcheutchoua Steve [Kafka Streams Quick Start|[https://kafka.apache.org/21/documentation/streams/quickstart]] Though out the tutorial, the name of the input topic that was created is `streams-plaintext-input`. However, this was mistaken at some point in the tutorial and changed to `streams-wordcount-input`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.
[ https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803630#comment-16803630 ] Ismael Juma commented on KAFKA-6042: Fixed. > Kafka Request Handler deadlocks and brings down the cluster. > > > Key: KAFKA-6042 > URL: https://issues.apache.org/jira/browse/KAFKA-6042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 0.11.0.1 > Environment: kafka version: 0.11.0.1 > client versions: 0.8.2.1-0.10.2.1 > platform: aws (eu-west-1a) > nodes: 36 x r4.xlarge > disk storage: 2.5 tb per node (~73% usage per node) > topics: 250 > number of partitions: 48k (approx) > os: ubuntu 14.04 > jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) > 64-Bit Server VM (build 25.131-b11, mixed mode) >Reporter: Ben Corlett >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 0.11.0.2, 1.0.0 > > Attachments: heapusage.png, thread_dump.txt.gz > > > We have been experiencing a deadlock that happens on a consistent server > within our cluster. This happens multiple times a week currently. It first > started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to > resolve the issue. > Sequence of events: > At a seemingly random time broker 125 goes into a deadlock. As soon as it is > deadlocked it will remove all the ISR's for any partition is its the leader > for. > [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > > The other nodes fail to connect to the node 125 > [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch > to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, > minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, > logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, > logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, > logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, > logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, > logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 125 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > As node 125 removed all the ISRs as it was locking up, a failover for any > partition
[jira] [Updated] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.
[ https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6042: --- Affects Version/s: (was: 1.0.0) > Kafka Request Handler deadlocks and brings down the cluster. > > > Key: KAFKA-6042 > URL: https://issues.apache.org/jira/browse/KAFKA-6042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 0.11.0.1 > Environment: kafka version: 0.11.0.1 > client versions: 0.8.2.1-0.10.2.1 > platform: aws (eu-west-1a) > nodes: 36 x r4.xlarge > disk storage: 2.5 tb per node (~73% usage per node) > topics: 250 > number of partitions: 48k (approx) > os: ubuntu 14.04 > jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) > 64-Bit Server VM (build 25.131-b11, mixed mode) >Reporter: Ben Corlett >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 0.11.0.2, 1.0.0 > > Attachments: heapusage.png, thread_dump.txt.gz > > > We have been experiencing a deadlock that happens on a consistent server > within our cluster. This happens multiple times a week currently. It first > started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to > resolve the issue. > Sequence of events: > At a seemingly random time broker 125 goes into a deadlock. As soon as it is > deadlocked it will remove all the ISR's for any partition is its the leader > for. > [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > > The other nodes fail to connect to the node 125 > [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch > to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, > minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, > logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, > logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, > logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, > logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, > logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, > logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 125 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > As node 125 removed all the ISRs as it was locking up, a failover for any > partition without an