[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog
[ https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560622#comment-16560622 ] ASF GitHub Bot commented on KAFKA-7192: --- guozhangwang closed pull request #5430: KAFKA-7192 Follow-up: update checkpoint to the reset beginning offset URL: https://github.com/apache/kafka/pull/5430 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index c1a41cefc23..3bbf42ead27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -26,13 +26,13 @@ static final int NO_CHECKPOINT = -1; -private final Long checkpoint; private final long offsetLimit; private final boolean persistent; private final String storeName; private final TopicPartition partition; private final CompositeRestoreListener compositeRestoreListener; +private long checkpointOffset; private long restoredOffset; private long startingOffset; private long endingOffset; @@ -45,7 +45,7 @@ final String storeName) { this.partition = partition; this.compositeRestoreListener = compositeRestoreListener; -this.checkpoint = checkpoint; +this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint; this.offsetLimit = offsetLimit; this.persistent = persistent; this.storeName = storeName; @@ -60,7 +60,11 @@ public String storeName() { } long checkpoint() { -return checkpoint == null ? NO_CHECKPOINT : checkpoint; +return checkpointOffset; +} + +void setCheckpointOffset(final long checkpointOffset) { +this.checkpointOffset = checkpointOffset; } void restoreStarted() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 1927b5a7af7..9185920f242 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -48,8 +48,9 @@ private final Map endOffsets = new HashMap<>(); private final Map> partitionInfo = new HashMap<>(); private final Map stateRestorers = new HashMap<>(); -private final Map needsRestoring = new HashMap<>(); -private final Map needsInitializing = new HashMap<>(); +private final Set needsRestoring = new HashSet<>(); +private final Set needsInitializing = new HashSet<>(); +private final Set completedRestorers = new HashSet<>(); private final Duration pollTime; public StoreChangelogReader(final Consumer restoreConsumer, @@ -64,9 +65,14 @@ public StoreChangelogReader(final Consumer restoreConsumer, @Override public void register(final StateRestorer restorer) { -restorer.setUserRestoreListener(userStateRestoreListener); -stateRestorers.put(restorer.partition(), restorer); -needsInitializing.put(restorer.partition(), restorer); +if (!stateRestorers.containsKey(restorer.partition())) { +restorer.setUserRestoreListener(userStateRestoreListener); +stateRestorers.put(restorer.partition(), restorer); + +log.trace("Added restorer for changelog {}", restorer.partition()); +} + +needsInitializing.add(restorer.partition()); } public Collection restore(final RestoringTasks active) { @@ -81,16 +87,15 @@ public void register(final StateRestorer restorer) { try { final ConsumerRecords records = restoreConsumer.poll(pollTime); -final Iterator iterator = needsRestoring.keySet().iterator(); -while (iterator.hasNext()) { -final TopicPartition partition = iterator.next(); + +for (final TopicPartition partition : needsRestoring) { final StateRestorer restorer = stateRestorers.get(partition); final long pos = processNext(records.records(partition), restorer, endOffsets.get(partition)); restorer.setRestoredOffset(pos); if (restorer.hasCompleted(pos, endOffsets.get(partition))) { restorer.restoreDone(); endOffsets.remove(partition); -iterator.remove(); +
[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID
[ https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560409#comment-16560409 ] Guozhang Wang commented on KAFKA-7190: -- What you described looks reasonable to me. I'd like to have [~hachikuji] also chime in here since he's originally implemented this logic and can shed some light as well. > Under low traffic conditions purging repartition topics cause WARN statements > about UNKNOWN_PRODUCER_ID > - > > Key: KAFKA-7190 > URL: https://issues.apache.org/jira/browse/KAFKA-7190 > Project: Kafka > Issue Type: Improvement > Components: core, streams >Affects Versions: 1.1.0, 1.1.1 >Reporter: Bill Bejeck >Assignee: lambdaliu >Priority: Major > > When a streams application has little traffic, then it is possible that > consumer purging would delete > even the last message sent by a producer (i.e., all the messages sent by > this producer have been consumed and committed), and as a result, the broker > would delete that producer's ID. The next time when this producer tries to > send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case, > this error is retriable: the producer would just get a new producer id and > retries, and then this time it will succeed. > > Possible fixes could be on the broker side, i.e., delaying the deletion of > the produderIDs for a more extended period or on the streams side developing > a more conservative approach to deleting offsets from repartition topics > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7213) NullPointerException during state restoration in kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560405#comment-16560405 ] Guozhang Wang commented on KAFKA-7213: -- Thanks for reporting this issue [~abhishek.agarwal], before you start I'd suggest maybe checking out the latest trunk (would be released as 2.0 soon) and see if this issue has been resolved. > NullPointerException during state restoration in kafka streams > -- > > Key: KAFKA-7213 > URL: https://issues.apache.org/jira/browse/KAFKA-7213 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Abhishek Agarwal >Assignee: Abhishek Agarwal >Priority: Major > > I had written a custom state store which has a batch restoration callback > registered. What I have observed, when multiple consumer instances are > restarted, the application keeps failing with NullPointerException. The stack > trace is > {noformat} > java.lang.NullPointerException: null > at > org.apache.kafka.streams.state.internals.RocksDBStore.putAll(RocksDBStore.java:351) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore.putAll(RocksDBSlotKeyValueBytesStore.java:100) > ~[streams-core-1.0.0.297.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore$SlotKeyValueBatchRestoreCallback.restoreAll(RocksDBSlotKeyValueBytesStore.java:303) > ~[streams-core-1.0.0.297.jar:?] > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreAll(CompositeRestoreListener.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:75) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:277) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:238) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > ~[kafka-streams-1.0.0.jar:?] > {noformat} > The faulty line in question is > {noformat} > db.write(wOptions, batch); > {noformat} > in RocksDBStore.java which would mean that db variable is null. Probably the > store has been closed and restoration is still being done on it. After going > through the code, I think the problem is when state transitions from > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED and restoration is still in > progress. > In such state transition, while the active tasks themselves are closed, the > changelog reader is not reset. It tries to restore the tasks that have > already been closed, db is null and results in NPE. > I will put in a fix to see if that fixes the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6690) Priorities for Source Topics
[ https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560402#comment-16560402 ] Guozhang Wang commented on KAFKA-6690: -- If we add it on the consumer API, then Streams may be automatically geared with this feature since it is leveraging on consumers (some different considerata would be done on Streams since today its messaging choosing is purely dependent on timestamp synchronization, but this can be deferred to a follow-up discussion). Maybe you can start the discussion thread on the mailing list and ask for the community's opinion on this feature request. > Priorities for Source Topics > > > Key: KAFKA-6690 > URL: https://issues.apache.org/jira/browse/KAFKA-6690 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bala Prassanna I >Assignee: Nick Afshartous >Priority: Major > > We often encounter use cases where we need to prioritise source topics. If a > consumer is listening more than one topic, say, HighPriorityTopic and > LowPriorityTopic, it should consume events from LowPriorityTopic only when > all the events from HighPriorityTopic are consumed. This is needed in Kafka > Streams processor topologies as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog
[ https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560326#comment-16560326 ] ASF GitHub Bot commented on KAFKA-7192: --- guozhangwang opened a new pull request #5430: KAFKA-7192 Follow-up: update checkpoint to the reset beginning offset URL: https://github.com/apache/kafka/pull/5430 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > State-store can desynchronise with changelog > > > Key: KAFKA-7192 > URL: https://issues.apache.org/jira/browse/KAFKA-7192 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1 >Reporter: Jon Bates >Assignee: Guozhang Wang >Priority: Critical > Labels: bugs > Fix For: 2.1.0 > > > n.b. this bug has been verified with exactly-once processing enabled > Consider the following scenario: > * A record, N is read into a Kafka topology > * the state store is updated > * the topology crashes > h3. *Expected behaviour:* > # Node is restarted > # Offset was never updated, so record N is reprocessed > # State-store is reset to position N-1 > # Record is reprocessed > h3. *Actual Behaviour* > # Node is restarted > # Record N is reprocessed (good) > # The state store has the state from the previous processing > I'd consider this a corruption of the state-store, hence the critical > Priority, although High may be more appropriate. > I wrote a proof-of-concept here, which demonstrates the problem on Linux: > [https://github.com/spadger/kafka-streams-sad-state-store] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560143#comment-16560143 ] Yogesh BG commented on KAFKA-7209: -- offsets.topic.replication.factor set to 1 then also i receive something like below {{ Received GroupCoordinator response ClientResponse(receivedTimeMs=1532716985393, latencyMs=15, disconnected=false, requestHeader=\{api_key=10,api_version=1,correlation_id=6157,client_id=ks_0_inst_THUNDER_METRICS-StreamThread-37-consumer}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) for group aggregation-framework030_THUNDER_METRICS}}{{18:43:05.394 [ks_0_inst_THUNDER_LOG_L4-StreamThread-70] DEBUG o.a.k.c.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 0 for partition THUNDER_LOG_L4_PC-17 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0)}} > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5407) Mirrormaker dont start after upgrade
[ https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560099#comment-16560099 ] Fernando Vega edited comment on KAFKA-5407 at 7/27/18 6:31 PM: --- [~omkreddy] [~hachikuji] [~huxi_2b] Just double checking. I try this again and I found a few things: a- Once I upgraded the cluster, I attempted to use the new consumer file again for the mirrormakers we have whitelisting the same topics and I get the same exception. b- However I did another test, using same exact configs that the production topics used the only difference was I created a single topic in order to check if the issue was something related with Kafka or the package installed. I was able to mirror my dummy messages using all new files and configs that we have for production, and it worked just fine. But with the current production topics it doesn't c- Also we have seeing that sometimes the mirrormaker threads die with no reason, I see messages in the logs where it states that the mirrormaker was shutdown successfully, however we haven't stop them or restart them in order to see this message. d- sometimes when we use consumer group scrip to check the lag of consumption we see the list of the topic and its consumers, but in some cases when we display the information we see the topics not having consumers, so what we do is stop mm remove the consumer group and start the mm and that seem to fix it. if you guys can provide any suggestion that will be great, also any tool that you guys suggest that we can use to check, monitor or understand troubleshooting this behavior will be great as well. Listed below are current configs: {noformat} ### ### This file is managed by Puppet. ### # See http://kafka.apache.org/documentation.html#brokerconfigs for default values. # The id of the broker. This must be set to a unique integer for each broker. broker.id=31 # The port the socket server listens on port=9092 # A comma seperated list of directories under which to store log files log.dirs=/kafka1/datalog,/kafka2/datalog,/kafka3/datalog,/kafka4/datalog,/kafka5/datalog,/kafka6/datalog,/kafka7/datalog,/kafka8/datalog,/kafka9/datalog,/kafka10/datalog # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=zookeeper1-repl:2181,zookeeper2-repl:2181,zookeeper3-repl:2181,zookeeper4-repl:2181,zookeeper5-repl:2181/replication/kafka # Additional configuration options may follow here auto.leader.rebalance.enable=true delete.topic.enable=true socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 default.replication.factor=2 auto.create.topics.enable=true num.partitions=1 num.network.threads=8 num.io.threads=40 log.retention.hours=1 log.roll.hours=1 num.replica.fetchers=8 zookeeper.connection.timeout.ms=3 zookeeper.session.timeout.ms=3 inter.broker.protocol.version=0.10.2 log.message.format.version=0.8.2 {noformat} Producer {noformat} # Producer # sjc2 bootstrap.servers=app454.sjc2.com:9092,app455.sjc2.com:9092,app456.sjc2.com:9092,app457.sjc2.com:9092,app458.sjc2.com:9092,app459.sjc2.com:9092 # Producer Configurations acks=0 buffer.memory=67108864 compression.type=gzip linger.ms=10 reconnect.backoff.ms=100 request.timeout.ms=12 retry.backoff.ms=1000 {noformat} Consumer {noformat} bootstrap.servers=app043.atl2.com:9092,app044.atl2.com:9092,app045.atl2.com:9092,app046.atl2.com:9092,app047.atl2.com:9092,app048.atl2.com:9092 group.id=MirrorMaker_atl1 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor receive.buffer.bytes=1048576 send.buffer.bytes=1048576 session.timeout.ms=25 key.deserializer=org.apache.kafka.common.serialization.Deserializer value.deserializer=org.apache.kafka.common.serialization.Deserializer {noformat} was (Author: fvegaucr): [~omkreddy] [~hachikuji] [~huxi_2b] Just double checking. I try this again and I found a few things: a- Once I upgraded the cluster, I attempted to use the new consumer file again for the mirrormakers we have whitelisting the same topics and I get the same exception. b- However I did another test, using same exact configs that the production topics used the only difference was I created a single topic in order to check if the issue was something realted with Kafka or the package installed and mirroring using all new files, and it worked just fine. But with the current production topics it doesnt? c- Also we have seeing that sometimes the mirrormaker threads die with no reason, I see messages in the logs where it states that the mirrormaker was shutdown successfully, however we havent stop them or restart them
[jira] [Commented] (KAFKA-5407) Mirrormaker dont start after upgrade
[ https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560099#comment-16560099 ] Fernando Vega commented on KAFKA-5407: -- [~omkreddy] [~hachikuji] [~huxi_2b] Just double checking. I try this again and I found a few things: a- Once I upgraded the cluster, I attempted to use the new consumer file again for the mirrormakers we have whitelisting the same topics and I get the same exception. b- However I did another test, using same exact configs that the production topics used the only difference was I created a single topic in order to check if the issue was something realted with Kafka or the package installed and mirroring using all new files, and it worked just fine. But with the current production topics it doesnt? c- Also we have seeing that sometimes the mirrormaker threads die with no reason, I see messages in the logs where it states that the mirrormaker was shutdown successfully, however we havent stop them or restart them in order to see this message. d- sometimes when we use consumer group scrip to check the LAG of consumption we see the list of the topic and its consumers, but in some cases when we display the information we see the topics not having consumers, so what we do is stop mm remove the consumer group and start the mm and that seem to fix it. if you guys can provide any suggestion that will be great, also any tool that you guys suggest that we can use to check, monitor or understand troubleshooting this behavior will be great as well. > Mirrormaker dont start after upgrade > > > Key: KAFKA-5407 > URL: https://issues.apache.org/jira/browse/KAFKA-5407 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.1 > Environment: Operating system > CentOS 6.8 > HW > Board Mfg : HP > Board Product : ProLiant DL380p Gen8 > CPU's x2 > Product Manufacturer : Intel > Product Name : Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz > Memory Type : DDR3 SDRAM > SDRAM Capacity: 2048 MB > Total Memory: : 64GB > Hardrives size and layout: > 9 drives using jbod > drive size 3.6TB each >Reporter: Fernando Vega >Priority: Critical > Attachments: broker.hkg1.new, debug.hkg1.new, > mirrormaker-repl-sjc2-to-hkg1.log.8 > > > Currently Im upgrading the cluster from 0.8.2-beta to 0.10.2.1 > So I followed the rolling procedure: > Here the config files: > Consumer > {noformat} > # > # Cluster: repl > # Topic list(goes into command line): > REPL-ams1-global,REPL-atl1-global,REPL-sjc2-global,REPL-ams1-global-PN_HXIDMAP_.*,REPL-atl1-global-PN_HXIDMAP_.*,REPL-sjc2-global-PN_HXIDMAP_.*,REPL-ams1-global-PN_HXCONTEXTUALV2_.*,REPL-atl1-global-PN_HXCONTEXTUALV2_.*,REPL-sjc2-global-PN_HXCONTEXTUALV2_.* > bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092 > group.id=hkg1_cluster > auto.commit.interval.ms=6 > partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor > {noformat} > Producer > {noformat} > hkg1 > # # Producer > # # hkg1 > bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092 > compression.type=gzip > acks=0 > {noformat} > Broker > {noformat} > auto.leader.rebalance.enable=true > delete.topic.enable=true > socket.receive.buffer.bytes=1048576 > socket.send.buffer.bytes=1048576 > default.replication.factor=2 > auto.create.topics.enable=true > num.partitions=1 > num.network.threads=8 > num.io.threads=40 > log.retention.hours=1 > log.roll.hours=1 > num.replica.fetchers=8 > zookeeper.connection.timeout.ms=3 > zookeeper.session.timeout.ms=3 > inter.broker.protocol.version=0.10.2 > log.message.format.version=0.8.2 > {noformat} > I tried also using stock configuraiton with no luck. > The error that I get is this: > {noformat} > 2017-06-07 12:24:45,476] INFO ConsumerConfig values: > auto.commit.interval.ms = 6 > auto.offset.reset = latest > bootstrap.servers = [app454.sjc2.mytest.com:9092, > app455.sjc2.mytest.com:9092, app456.sjc2.mytest.com:9092, > app457.sjc2.mytest.com:9092, app458.sjc2.mytest.com:9092, > app459.sjc2.mytest.com:9092] > check.crcs = true > client.id = MirrorMaker_hkg1-1 > connections.max.idle.ms = 54 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = MirrorMaker_hkg1 > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 >
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559935#comment-16559935 ] Yogesh BG commented on KAFKA-7209: -- Hi can u suggest me anything am missing, we are blocked for our product release due to this bug... is there any way i safely clean the kstrems and restart them with the same application.id??? during thins process some amount of data loss is also fine... or either confirmation that its a bug in streaming app could help me taking some decision abt what alternative restart process i can do... > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7213) NullPointerException during state restoration in kafka streams
Abhishek Agarwal created KAFKA-7213: --- Summary: NullPointerException during state restoration in kafka streams Key: KAFKA-7213 URL: https://issues.apache.org/jira/browse/KAFKA-7213 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Abhishek Agarwal Assignee: Abhishek Agarwal I had written a custom state store which has a batch restoration callback registered. What I have observed, when multiple consumer instances are restarted, the application keeps failing with NullPointerException. The stack trace is {noformat} java.lang.NullPointerException: null at org.apache.kafka.streams.state.internals.RocksDBStore.putAll(RocksDBStore.java:351) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore.putAll(RocksDBSlotKeyValueBytesStore.java:100) ~[streams-core-1.0.0.297.jar:?] at org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore$SlotKeyValueBatchRestoreCallback.restoreAll(RocksDBSlotKeyValueBytesStore.java:303) ~[streams-core-1.0.0.297.jar:?] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreAll(CompositeRestoreListener.java:89) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:75) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:277) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:238) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) ~[kafka-streams-1.0.0.jar:?] {noformat} The faulty line in question is {noformat} db.write(wOptions, batch); {noformat} in RocksDBStore.java which would mean that db variable is null. Probably the store has been closed and restoration is still being done on it. After going through the code, I think the problem is when state transitions from PARTITIONS_ASSIGNED to PARTITIONS_REVOKED and restoration is still in progress. In such state transition, while the active tasks themselves are closed, the changelog reader is not reset. It tries to restore the tasks that have already been closed, db is null and results in NPE. I will put in a fix to see if that fixes the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559492#comment-16559492 ] Yogesh BG commented on KAFKA-7209: -- I tried setting these configuration and try, but no luck {{conf.put("retries",Integer.MAX_VALUE);}} {{conf.put("rebalance.max.retries",Integer.MAX_VALUE);}} {{conf.put("zookeeper.session.timeout.ms",1000);}} > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7212) Bad exception message on failed serialization
D T created KAFKA-7212: -- Summary: Bad exception message on failed serialization Key: KAFKA-7212 URL: https://issues.apache.org/jira/browse/KAFKA-7212 Project: Kafka Issue Type: Bug Components: clients, producer Affects Versions: 1.1.1, 1.0.1 Reporter: D T I use Spring-Kafka to connect to a Kafka-Server. While trying to use Spring's MessageConverter I encountered strange error messages that did not make any sense for me. {noformat} org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer Caused by: java.lang.ClassCastException: org.springframework.messaging.support.GenericMessage cannot be cast to [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:791) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768) at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:285) at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:349) at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:182){noformat} My question was why would Kafka try to cast/convert Spring's GenericMessage to Kafka's ByteArraySerializer? After quite some time trying various config options I debugged the code and found that the exception message was just bad. The message should be something like {noformat} Can't convert value of class org.springframework.messaging.support.GenericMessage to byte[] in class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer{noformat} The issue is caused by line: [https://github.com/apache/kafka/blob/1.1.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L801] and [https://github.com/apache/kafka/blob/1.1.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L809] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6690) Priorities for Source Topics
[ https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559356#comment-16559356 ] Bala Prassanna I commented on KAFKA-6690: - [~guozhang] We would need this in both Consumer API and also in Streams API > Priorities for Source Topics > > > Key: KAFKA-6690 > URL: https://issues.apache.org/jira/browse/KAFKA-6690 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bala Prassanna I >Assignee: Nick Afshartous >Priority: Major > > We often encounter use cases where we need to prioritise source topics. If a > consumer is listening more than one topic, say, HighPriorityTopic and > LowPriorityTopic, it should consume events from LowPriorityTopic only when > all the events from HighPriorityTopic are consumed. This is needed in Kafka > Streams processor topologies as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6690) Priorities for Source Topics
[ https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559356#comment-16559356 ] Bala Prassanna I edited comment on KAFKA-6690 at 7/27/18 7:08 AM: -- [~guozhang] We would need this in both Consumer API and Streams API was (Author: balaprassanna): [~guozhang] We would need this in both Consumer API and also in Streams API > Priorities for Source Topics > > > Key: KAFKA-6690 > URL: https://issues.apache.org/jira/browse/KAFKA-6690 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bala Prassanna I >Assignee: Nick Afshartous >Priority: Major > > We often encounter use cases where we need to prioritise source topics. If a > consumer is listening more than one topic, say, HighPriorityTopic and > LowPriorityTopic, it should consume events from LowPriorityTopic only when > all the events from HighPriorityTopic are consumed. This is needed in Kafka > Streams processor topologies as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7134) KafkaLog4jAppender - Appender exceptions are propagated to caller
[ https://issues.apache.org/jira/browse/KAFKA-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559353#comment-16559353 ] Andras Katona commented on KAFKA-7134: -- When using kafka appender, logging from org.apache.kafka.* packages should be disabled. > KafkaLog4jAppender - Appender exceptions are propagated to caller > - > > Key: KAFKA-7134 > URL: https://issues.apache.org/jira/browse/KAFKA-7134 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: venkata praveen >Assignee: Andras Katona >Priority: Major > > KafkaLog4jAppender exceptions are propagated to caller when Kafka is > down/slow/other, it may cause the application crash. Ideally appender should > print and ignore the exception > or should provide option to ignore/throw the exceptions like > 'ignoreExceptions' property of > https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender -- This message was sent by Atlassian JIRA (v7.6.3#76005)