[jira] [Created] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE
Michal Borowiecki created KAFKA-7434: Summary: DeadLetterQueueReporter throws NPE if transform throws NPE Key: KAFKA-7434 URL: https://issues.apache.org/jira/browse/KAFKA-7434 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.0.0 Environment: jdk 8 Reporter: Michal Borowiecki A NPE thrown from a transform in a connector configured with errors.deadletterqueue.context.headers.enable=true causes DeadLetterQueueReporter to break with a NPE. {quote}{{Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is \{topic='', partition=1, offset=0, timestamp=1537370573366, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)}} {{java.lang.NullPointerException}} {{Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)}} {{java.lang.NullPointerException}} {{ at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)}} {{ at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)}} {{ at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)}} {{ at org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)}} {{ at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)}} {{ at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)}} {{ at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)}} {{ at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)}} {{ at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)}} {{ at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)}} {{ at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)}} {{ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)}} {{ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)}} {{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}} {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}} {{ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}} {{ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}} {{ at java.lang.Thread.run(Thread.java:748)}} {quote} This is caused by populateContextHeaders only checking if the Throwable is not null, but not checking that the message in the Throwable is not null before trying to serialize the message: [https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177] if (context.error() != null) { headers.add(ERROR_HEADER_EXCEPTION, toBytes(context.error().getClass().getName())); headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(context.error().getMessage())); toBytes throws an NPE if passed null as the parameter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-5677) Remove deprecated punctuate method
Michal Borowiecki created KAFKA-5677: Summary: Remove deprecated punctuate method Key: KAFKA-5677 URL: https://issues.apache.org/jira/browse/KAFKA-5677 Project: Kafka Issue Type: Task Reporter: Michal Borowiecki Task to track the removal of the punctuate method that got deprecated in KAFKA-5233 and associated unit tests. (not sure the fix version number at this point) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5245) KStream builder should capture serdes
[ https://issues.apache.org/jira/browse/KAFKA-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047608#comment-16047608 ] Michal Borowiecki commented on KAFKA-5245: -- Just wanted to say it's great to see there's a ticket for this :-) Always found it counter-intuitive that the default serdes are taken from config instead of upstream in these cases. > KStream builder should capture serdes > -- > > Key: KAFKA-5245 > URL: https://issues.apache.org/jira/browse/KAFKA-5245 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0, 0.10.2.1 >Reporter: Yeva Byzek >Assignee: anugrah >Priority: Minor > Labels: beginner, newbie > > Even if one specifies a serdes in `builder.stream`, later a call to > `groupByKey` may require the serdes again if it differs from the configured > streams app serdes. The preferred behavior is that if no serdes is provided > to `groupByKey`, it should use whatever was provided in `builder.stream` and > not what was in the app. > From the current docs: > “When to set explicit serdes: Variants of groupByKey exist to override the > configured default serdes of your application, which you must do if the key > and/or value types of the resulting KGroupedStream do not match the > configured default serdes.” -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer
[ https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557 ] Michal Borowiecki edited comment on KAFKA-5419 at 6/9/17 3:42 PM: -- I think it's a duplicate of KAFKA-2526 and KAFKA-5149 was (Author: mihbor): I think it's a duplicate of KAFKA-5149 > Console consumer --key-deserializer and --value-deserializer are always > overwritten by ByteArrayDeserializer > > > Key: KAFKA-5419 > URL: https://issues.apache.org/jira/browse/KAFKA-5419 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Paolo Patierno >Priority: Minor > > Hi, > the --key-deserializer and --value-deserializer options passed to the > command line are always overwritten here : > {code} > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > {code} > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer
[ https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557 ] Michal Borowiecki edited comment on KAFKA-5419 at 6/9/17 3:41 PM: -- I think it's a duplicate of KAFKA-5149 was (Author: mihbor): I think it's a duplicate of KAFKA-2526 > Console consumer --key-deserializer and --value-deserializer are always > overwritten by ByteArrayDeserializer > > > Key: KAFKA-5419 > URL: https://issues.apache.org/jira/browse/KAFKA-5419 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Paolo Patierno >Priority: Minor > > Hi, > the --key-deserializer and --value-deserializer options passed to the > command line are always overwritten here : > {code} > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > {code} > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer
[ https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557 ] Michal Borowiecki commented on KAFKA-5419: -- I think it's a duplicate of KAFKA-2526 > Console consumer --key-deserializer and --value-deserializer are always > overwritten by ByteArrayDeserializer > > > Key: KAFKA-5419 > URL: https://issues.apache.org/jira/browse/KAFKA-5419 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Paolo Patierno >Priority: Minor > > Hi, > the --key-deserializer and --value-deserializer options passed to the > command line are always overwritten here : > {code} > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > {code} > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5246) Remove backdoor that allows any client to produce to internal topics
[ https://issues.apache.org/jira/browse/KAFKA-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044179#comment-16044179 ] Michal Borowiecki commented on KAFKA-5246: -- Would it instead perhaps make sense to document this special client id, together with its use-cases listed in the PR comments? > Remove backdoor that allows any client to produce to internal topics > - > > Key: KAFKA-5246 > URL: https://issues.apache.org/jira/browse/KAFKA-5246 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, > 0.10.2.1 >Reporter: Andy Coates >Assignee: Andy Coates >Priority: Minor > > kafka.admim.AdminUtils defines an ‘AdminClientId' val, which looks to be > unused in the code, with the exception of a single use in KafkaAPis.scala in > handleProducerRequest, where is looks to allow any client, using the special > ‘__admin_client' client id, to append to internal topics. > This looks like a security risk to me, as it would allow any client to > produce either rouge offsets or even a record containing something other than > group/offset info. > Can we remove this please? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not
[ https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027521#comment-16027521 ] Michal Borowiecki commented on KAFKA-5155: -- Hi [~plavjanik], do you care to submit a pull request with the test and the fix? > Messages can be deleted prematurely when some producers use timestamps and > some not > --- > > Key: KAFKA-5155 > URL: https://issues.apache.org/jira/browse/KAFKA-5155 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.2.0 >Reporter: Petr Plavjaník > > Some messages can be deleted prematurely and never read in following > scenario. A producer uses timestamps and produces messages that are appended > to the beginning of a log segment. Other producer produces messages without a > timestamp. In that case the largest timestamp is made by the old messages > with a timestamp and new messages with the timestamp does not influence and > the log segment with old and new messages can be delete immediately after the > last new message with no timestamp is appended. When all appended messages > have no timestamp, then they are not deleted because {{lastModified}} > attribute of a {{LogSegment}} is used. > New test case to {{kafka.log.LogTest}} that fails: > {code} > @Test > def > shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() { > val retentionMs = 1000 > val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0) > val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, > magicValue = 0) > val log = createLog(set.sizeInBytes, retentionMs = retentionMs) > // append some messages to create some segments > log.append(old) > for (_ <- 0 until 12) > log.append(set) > assertEquals("No segment should be deleted", 0, log.deleteOldSegments()) > } > {code} > It can be prevented by using {{def largestTimestamp = > Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using > current timestamp when messages with timestamp {{-1}} are appended. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5319) Add a tool to make cluster replica and leader balance
[ https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022695#comment-16022695 ] Michal Borowiecki commented on KAFKA-5319: -- [~markTC], shouldn't this be in "Patch Available" instead of "Resolved" status until the PR is merged? > Add a tool to make cluster replica and leader balance > - > > Key: KAFKA-5319 > URL: https://issues.apache.org/jira/browse/KAFKA-5319 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 0.10.2.1 >Reporter: Ma Tianchi > Attachments: ClusterBalanceCommand.scala > > > When a new broker is added to cluster,there is not any topics in the new > broker.When we use console command to create a topic without > 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get > replicaAssignment.Even though it is balance at the creating time if the > cluster never change,with more and more brokers added to cluster the replica > balanced will become not well. We also can use 'kafka-reassign-partitions.sh' > to balance ,but the big amount of topics make it not easy.And at the topic > created time , Kafka choose a PreferredReplicaLeader which be put at the > first position of the AR to make leader balance.But the balance will be > destroyed when cluster changed.Using 'kafka-reassign-partitions.sh' to make > partition reassignment may be also destroy the leader balance ,because user > can change the AR of the partition . It may be not balanced , but Kafka > believe cluster leader balance is well with every leaders is the first on at > AR. > So we create a tool to make the number of replicas and number of leaders on > every brokers is balanced.It uses a algorithm to get a balanced replica and > leader reassignment,then uses ReassignPartitionsCommand to real balance the > cluster. > It can be used to make balance when cluster added brokers or cluster is not > balanced .It does not deal with moving replicas of a dead broker to an alive > broker,it only makes replicas and leaders on alive brokers is balanced. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5243) Request to add row limit in ReadOnlyKeyValueStore range function
[ https://issues.apache.org/jira/browse/KAFKA-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020911#comment-16020911 ] Michal Borowiecki commented on KAFKA-5243: -- Just a note, replacing the second argument is not an option IMO, as it would clash with the current range(K from, K to) method. K is a type parameter that itself could be an int, making the two indistinguishable, I think. Secondly, the existing range() and all() methods expressly do not guarantee ordering of the returned iterator. I think the new range(from, to, limit) method would only make sense if order in the returned iterator is consistent across invocations. This is probably not a problem for the built-in stores, but given these stores are meant to be pluggable, perhaps it would be better to not force other stores implementations to take on those guarantees? Instead a new interface with stronger guarantees could be added e.g. ReadOnlyOrderedKeyValueStore extending ReadOnlyKeyValueStore and adding this extra method. It could also add the consistent ordering promise on the inherited range(from, to) and all() methods. Just a thought. Probably best to raise a KIP and discuss on the mailing list. Since this is a public API change a KIP is required anyway: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > Request to add row limit in ReadOnlyKeyValueStore range function > > > Key: KAFKA-5243 > URL: https://issues.apache.org/jira/browse/KAFKA-5243 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.1 >Reporter: Joe Wood > > When using distributed queries across a cluster of stream stores it's quite > common to use query pagination to limit the number of rows returned. The > {{range}} function on {{ReadOnlyKeyValueStore}} only accepts the {{to}} and > {{from}} keys. This means that the query created either unncessarily > retrieves the entire range and manually limits the rows, or estimates the > range based on the key values. Neither options are ideal for processing > distributed queries. > This suggestion is to add an overload to the {{range}} function by adding a > third (or replacement second) argument as a suggested row limit count. This > means that the range of keys returned will not exceed the supplied count. > {code:java} > // Get an iterator over a given range of keys, limiting to limit elements. > KeyValueIteratorrange(K from, K to, int limit) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)
[ https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-5233: - Fix Version/s: (was: 0.11.0.0) 0.11.1.0 > Changes to punctuate semantics (KIP-138) > > > Key: KAFKA-5233 > URL: https://issues.apache.org/jira/browse/KAFKA-5233 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michal Borowiecki >Assignee: Michal Borowiecki > Labels: kip > Fix For: 0.11.1.0 > > > This ticket is to track implementation of > [KIP-138: Change punctuate > semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)
[ https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-5233: - Status: Patch Available (was: In Progress) Pull request here: https://github.com/apache/kafka/pull/3055 Looking forward to feedback. > Changes to punctuate semantics (KIP-138) > > > Key: KAFKA-5233 > URL: https://issues.apache.org/jira/browse/KAFKA-5233 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michal Borowiecki >Assignee: Michal Borowiecki > Labels: kip > Fix For: 0.11.0.0 > > > This ticket is to track implementation of > [KIP-138: Change punctuate > semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5233) Changes to punctuate semantics (KIP-138)
[ https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5233 started by Michal Borowiecki. > Changes to punctuate semantics (KIP-138) > > > Key: KAFKA-5233 > URL: https://issues.apache.org/jira/browse/KAFKA-5233 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michal Borowiecki >Assignee: Michal Borowiecki > Labels: kip > Fix For: 0.11.0.0 > > > This ticket is to track implementation of > [KIP-138: Change punctuate > semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010530#comment-16010530 ] Michal Borowiecki commented on KAFKA-3455: -- Hi [~bobbycalderwood], Can you please describe your use-case, where it would be useful to re-use Processor/Transformer implementations? As to Transformer.punctuate return value having to be null, the javadoc was in error but has been fixed on trunk (to be released). Changing the method signature of Transformer.punctuate would be a backward-incompatible change, however, [KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics] will deprecated both methods in favour of a functional interface passed to ProcessorContext.schedule(), so it's a small step in the direction you're suggesting. I think AutoCloseable is a false friend in this case. The intention behind AutoCloseable is for objects created in a try-with-resources statement to be closed when execution exists that statement. However, the Processor is being created when you are *defining* the topology and must not be closed from that same block of code, since it's used as long as the topology is actually *running*, which is happening in different threads. As to init() and close() I think it would make sense to have them pulled out, however, again due to backwards-compatibility it's not as simple as it sounds. Fortunately, once Java 7 compatibility is dropped, it will be possible to change their definition to a default method with an empty body. I think that would be backwards-compatible. That would leave only one abstract method for Processor and Transformer, process() and transform(), respectively. Since these are actually *different* from each other, I'd say that then there'd be no repetition. Would that help your use-cases? > Connect custom processors with the streams DSL > -- > > Key: KAFKA-3455 > URL: https://issues.apache.org/jira/browse/KAFKA-3455 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Jonathan Bender > Labels: user-experience > Fix For: 0.11.0.0 > > > From the kafka users email thread, we discussed the idea of connecting custom > processors with topologies defined from the Streams DSL (and being able to > sink data from the processor). Possibly this could involve exposing the > underlying processor's name in the streams DSL so it can be connected with > the standard processor API. > {quote} > Thanks for the feedback. This is definitely something we wanted to support > in the Streams DSL. > One tricky thing, though, is that some operations do not translate to a > single processor, but a sub-graph of processors (think of a stream-stream > join, which is translated to actually 5 processors for windowing / state > queries / merging, each with a different internal name). So how to define > the API to return the processor name needs some more thinking. > {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)
[ https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-5233: - Labels: kip (was: ) > Changes to punctuate semantics (KIP-138) > > > Key: KAFKA-5233 > URL: https://issues.apache.org/jira/browse/KAFKA-5233 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michal Borowiecki >Assignee: Michal Borowiecki > Labels: kip > Fix For: 0.11.0.0 > > > This ticket is to track implementation of > [KIP-138: Change punctuate > semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009365#comment-16009365 ] Michal Borowiecki commented on KAFKA-3514: -- I've created KAFKA-5233 to track work related to KIP-138. As noted above, the considerations on this ticket span beyond the scope of KIP-138, which is agnostic to how the stream time gets advanced. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) > {quote} > Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5233) Changes to punctuate semantics (KIP-138)
Michal Borowiecki created KAFKA-5233: Summary: Changes to punctuate semantics (KIP-138) Key: KAFKA-5233 URL: https://issues.apache.org/jira/browse/KAFKA-5233 Project: Kafka Issue Type: Improvement Components: streams Reporter: Michal Borowiecki Assignee: Michal Borowiecki Fix For: 0.11.0.0 This ticket is to track implementation of [KIP-138: Change punctuate semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5201) Compacted topic could be misused to fill up a disk but deletion policy can't retain legitimate keys
[ https://issues.apache.org/jira/browse/KAFKA-5201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002311#comment-16002311 ] Michal Borowiecki commented on KAFKA-5201: -- Hi Edoardo, >From the broker's perspective there is no concept of legitimate vs >illegitimate keys AFAIK. What keys are legitimate depends on use-case, so >should the broker really care about that? If this is to prevent disk being filled up by producers you don't trust to produce legitimate keys, perhaps quota + compact,delete cleanup can guard against that? What do you think about that? > Compacted topic could be misused to fill up a disk but deletion policy can't > retain legitimate keys > > > Key: KAFKA-5201 > URL: https://issues.apache.org/jira/browse/KAFKA-5201 > Project: Kafka > Issue Type: Improvement >Reporter: Edoardo Comar > > Misuse of a topic with cleanup policy = compact > could lead to a disk being filled if a misbehaving producer keeps producing > messages with unique keys. > The mixed cleanup policy compact,delete could be adopted, but would not > guarantee that the latest "legitimate" keys will be kept. > It would be desirable to have a cleanup policy that attempts to preserve > messages with 'legitimate' keys > This issue needs a KIP but I have no proposed solution yet at the time of > writing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not
[ https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15994575#comment-15994575 ] Michal Borowiecki edited comment on KAFKA-5155 at 5/3/17 9:44 AM: -- Hi [~huxi_2b], Personally, I feel the similarity is superficial. KAFKA-4398 is about consuming messages in timestamp order, which challenges the current design and basically calls out for a new feature. This ticket on the other hand is reporting a defect, with potential data loss, which violates the at-least-once semantics. However, it does not challenge the design, simply points out that one line of code needs changing to cater for the case when msgs with and without timestamps are appended to the same segment, which IMHO is a non-contentious bugfix. was (Author: mihbor): Hi @huxi, Personally, I feel the similarity is superficial. KAFKA-4398 is about consuming messages in timestamp order, which challenges the current design and basically calls out for a new feature. This ticket on the other hand is reporting a defect, with potential data loss, which violates the at-least-once semantics. However, it does not challenge the design, simply points out that one line of code needs changing to cater for the case when msgs with and without timestamps are appended to the same segment, which IMHO is a non-contentious bugfix. > Messages can be deleted prematurely when some producers use timestamps and > some not > --- > > Key: KAFKA-5155 > URL: https://issues.apache.org/jira/browse/KAFKA-5155 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.2.0 >Reporter: Petr Plavjaník > > Some messages can be deleted prematurely and never read in following > scenario. A producer uses timestamps and produces messages that are appended > to the beginning of a log segment. Other producer produces messages without a > timestamp. In that case the largest timestamp is made by the old messages > with a timestamp and new messages with the timestamp does not influence and > the log segment with old and new messages can be delete immediately after the > last new message with no timestamp is appended. When all appended messages > have no timestamp, then they are not deleted because {{lastModified}} > attribute of a {{LogSegment}} is used. > New test case to {{kafka.log.LogTest}} that fails: > {code} > @Test > def > shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() { > val retentionMs = 1000 > val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0) > val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, > magicValue = 0) > val log = createLog(set.sizeInBytes, retentionMs = retentionMs) > // append some messages to create some segments > log.append(old) > for (_ <- 0 until 12) > log.append(set) > assertEquals("No segment should be deleted", 0, log.deleteOldSegments()) > } > {code} > It can be prevented by using {{def largestTimestamp = > Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using > current timestamp when messages with timestamp {{-1}} are appended. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max
[ https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990683#comment-15990683 ] Michal Borowiecki commented on KAFKA-5144: -- I understand now, thanks a lot for the thorough explanation! Closed #2947 with the invalid tests. Leaving #2948 open as I think it is still of value. > MinTimestampTracker does not correctly add timestamps lower than the current > max > > > Key: KAFKA-5144 > URL: https://issues.apache.org/jira/browse/KAFKA-5144 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Michal Borowiecki >Assignee: Michal Borowiecki > > When adding elements MinTimestampTracker removes all existing elements > greater than the added element. > Perhaps I've missed something and this is intended behaviour but I can't find > any evidence for that in comments or tests. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max
[ https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990335#comment-15990335 ] Michal Borowiecki commented on KAFKA-5144: -- Added a second [PR|https://github.com/apache/kafka/pull/2948] which is just a refactoring (does not fix the issue!) to make reasoning about the code easier. It renames variables to express their true meaning and adds comments where it matters. This is a separate PR since it's independent of the tests. If I got it wrong somehow and the test cases are indeed invalid, this refactoring is still useful as it better documents what the code actually does. > MinTimestampTracker does not correctly add timestamps lower than the current > max > > > Key: KAFKA-5144 > URL: https://issues.apache.org/jira/browse/KAFKA-5144 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Michal Borowiecki >Assignee: Michal Borowiecki > > When adding elements MinTimestampTracker removes all existing elements > greater than the added element. > Perhaps I've missed something and this is intended behaviour but I can't find > any evidence for that in comments or tests. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max
[ https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990329#comment-15990329 ] Michal Borowiecki commented on KAFKA-5144: -- [PR|https://github.com/apache/kafka/pull/2947/commits/a8b223b92c0aec31498e0d6043c8deffb1ea21ef] contains the cases that are failing, which I think are valid. If someone can please confirm I'm not talking nonsense here, I'll proceed with a fix. > MinTimestampTracker does not correctly add timestamps lower than the current > max > > > Key: KAFKA-5144 > URL: https://issues.apache.org/jira/browse/KAFKA-5144 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Michal Borowiecki >Assignee: Michal Borowiecki > > When adding elements MinTimestampTracker removes all existing elements > greater than the added element. > Perhaps I've missed something and this is intended behaviour but I can't find > any evidence for that in comments or tests. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max
Michal Borowiecki created KAFKA-5144: Summary: MinTimestampTracker does not correctly add timestamps lower than the current max Key: KAFKA-5144 URL: https://issues.apache.org/jira/browse/KAFKA-5144 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.1 Reporter: Michal Borowiecki Assignee: Michal Borowiecki When adding elements MinTimestampTracker removes all existing elements greater than the added element. Perhaps I've missed something and this is intended behaviour but I can't find any evidence for that in comments or tests. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990288#comment-15990288 ] Michal Borowiecki commented on KAFKA-3514: -- Agreed. That's what I meant. Beyond the scope of KIP-138 and let's keep it that way. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990176#comment-15990176 ] Michal Borowiecki commented on KAFKA-3514: -- I think the description of this ticket is missing an important detail. If my understanding is correct, it will behave as described if all the records arrive in a single batch. However, if the records preceding the record with timestamp "1" come in a separate batch (I'll use brackets to depict batch boundaries): {code} Stream A: [5, 6, 7, 8, 9], [1, 10] Stream B: [2, 3, 4, 5] {code} then initially the timestamp for stream A is going to be set to "5" (minimum of the first batch) and since it's not allowed to move back, the second batch containing the late arriving record "1" is not going to change that. Stream B is going to be drained first until "5". However, if the batch boundaries are different by just one record and the late arriving "1" is in the first batch: {code} Stream A: [5, 6, 7, 8, 9, 1], [10] Stream B: [2, 3, 4, 5] {code} then it's going to behave as currently described. Please correct me if I got this wrong. But if that is the case, it feels all too non-deterministic and I think the timestamp computation deserves further thought beyond the scope of [KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics], which is limited to punctuate semantics, but not stream time semantics in general. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4593) Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-4593: - Description: 1. Assume 2 running threads A and B, and one task t1 just for simplicity. Thread A and B are on different machines so their local state dir are not shared. 2. First rebalance is triggered, task t1 is assigned to A (B has no assigned task). 3. During the first rebalance callback, task t1's state store need to be restored on thread A, and this is called in "restoreActiveState" of "createStreamTask". 4. Now suppose thread A has a long GC causing it to stall, a second rebalance then will be triggered and kicked A out of the group; B gets the task t1 and did the same restoration process, after the process thread B continues to process data and update the state store, while at the same time writes more messages to the changelog (so its log end offset has incremented). 5. After a while A resumes from the long GC, not knowing it has actually be kicked out of the group and task t1 is no longer owned to itself, it continues the restoration process but then realize that the log end offset has advanced. When this happens, we will see the following exception on thread A: {code} java.lang.IllegalStateException: task XXX Log end offset of YYY-table_stream-changelog-ZZ should not change while restoring: old end offset .., current offset .. at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359) {code} was: 1. Assume 2 running threads A and B, and one task t1 jut for simplicity. Thread A and B are not different machines so their local state dir are not shared. 2. First rebalance is triggered, task t1 is assigned to A (B has no assigned task). 3. During the first rebalance callback, task t1's state store need to be restored on thread A, and this is called in "restoreActiveState" of "createStreamTask". 4. Now suppose thread A has a long GC causing it to stall, a second rebalance then will be triggered and kicked A out of the group; B gets the task t1 and did the same restoration process, after the process thread B continues to process data and update the state store, while at the same time writes more messages to the changelog (so its log end offset has incremented). 5. After a while A resumes from the long GC, not knowing it has actually be kicked out of the group and task t1 is no longer owned to itself, it continues the restoration process but then realize that the log end offset has advanced. When this happens, we will see the following
[jira] [Updated] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken
[ https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-5090: - Affects Version/s: 0.10.2.1 > Kafka Streams SessionStore.findSessions javadoc broken > -- > > Key: KAFKA-5090 > URL: https://issues.apache.org/jira/browse/KAFKA-5090 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0, 0.10.2.1 >Reporter: Michal Borowiecki >Priority: Trivial > > {code} > /** > * Fetch any sessions with the matching key and the sessions end is > earliestEndTime and the sessions > * start is latestStartTime > */ > KeyValueIteratorfindSessions(final K key, long > earliestSessionEndTime, final long latestSessionStartTime); > {code} > The conditions in the javadoc comment are inverted (le should be ge and ge > shoudl be le), since this is what the code does. They were correct in the > original KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows > {code} > /** > * Find any aggregated session values with the matching key and where the > * session’s end time is >= earliestSessionEndTime, i.e, the oldest > session to > * merge with, and the session’s start time is <= latestSessionStartTime, > i.e, > * the newest session to merge with. > */ >KeyValueIterator findSessionsToMerge(final K key, final > long earliestSessionEndTime, final long latestSessionStartTime); > {code} > Also, the escaped html character references are missing the trailing > semicolon making them render as-is. > Happy to have this assigned to me to fix as it seems trivial. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Issue Comment Deleted] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken
[ https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-5090: - Comment: was deleted (was: https://github.com/apache/kafka/pull/2874) > Kafka Streams SessionStore.findSessions javadoc broken > -- > > Key: KAFKA-5090 > URL: https://issues.apache.org/jira/browse/KAFKA-5090 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki >Priority: Trivial > > {code} > /** > * Fetch any sessions with the matching key and the sessions end is > earliestEndTime and the sessions > * start is latestStartTime > */ > KeyValueIteratorfindSessions(final K key, long > earliestSessionEndTime, final long latestSessionStartTime); > {code} > The conditions in the javadoc comment are inverted (le should be ge and ge > shoudl be le), since this is what the code does. They were correct in the > original KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows > {code} > /** > * Find any aggregated session values with the matching key and where the > * session’s end time is >= earliestSessionEndTime, i.e, the oldest > session to > * merge with, and the session’s start time is <= latestSessionStartTime, > i.e, > * the newest session to merge with. > */ >KeyValueIterator findSessionsToMerge(final K key, final > long earliestSessionEndTime, final long latestSessionStartTime); > {code} > Also, the escaped html character references are missing the trailing > semicolon making them render as-is. > Happy to have this assigned to me to fix as it seems trivial. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken
[ https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-5090: - Status: Patch Available (was: Open) https://github.com/apache/kafka/pull/2874 > Kafka Streams SessionStore.findSessions javadoc broken > -- > > Key: KAFKA-5090 > URL: https://issues.apache.org/jira/browse/KAFKA-5090 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki >Priority: Trivial > > {code} > /** > * Fetch any sessions with the matching key and the sessions end is > earliestEndTime and the sessions > * start is latestStartTime > */ > KeyValueIteratorfindSessions(final K key, long > earliestSessionEndTime, final long latestSessionStartTime); > {code} > The conditions in the javadoc comment are inverted (le should be ge and ge > shoudl be le), since this is what the code does. They were correct in the > original KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows > {code} > /** > * Find any aggregated session values with the matching key and where the > * session’s end time is >= earliestSessionEndTime, i.e, the oldest > session to > * merge with, and the session’s start time is <= latestSessionStartTime, > i.e, > * the newest session to merge with. > */ >KeyValueIterator findSessionsToMerge(final K key, final > long earliestSessionEndTime, final long latestSessionStartTime); > {code} > Also, the escaped html character references are missing the trailing > semicolon making them render as-is. > Happy to have this assigned to me to fix as it seems trivial. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken
Michal Borowiecki created KAFKA-5090: Summary: Kafka Streams SessionStore.findSessions javadoc broken Key: KAFKA-5090 URL: https://issues.apache.org/jira/browse/KAFKA-5090 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.0 Reporter: Michal Borowiecki Priority: Trivial {code} /** * Fetch any sessions with the matching key and the sessions end is earliestEndTime and the sessions * start is latestStartTime */ KeyValueIteratorfindSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime); {code} The conditions in the javadoc comment are inverted (le should be ge and ge shoudl be le), since this is what the code does. They were correct in the original KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows {code} /** * Find any aggregated session values with the matching key and where the * session’s end time is >= earliestSessionEndTime, i.e, the oldest session to * merge with, and the session’s start time is <= latestSessionStartTime, i.e, * the newest session to merge with. */ KeyValueIterator findSessionsToMerge(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); {code} Also, the escaped html character references are missing the trailing semicolon making them render as-is. Happy to have this assigned to me to fix as it seems trivial. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?
[ https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955260#comment-15955260 ] Michal Borowiecki commented on KAFKA-4971: -- I'd venture a guess that you are limited by something else than your hdd/ssd performance. Is 1g your total memory in the VM? How much of it is allocated to the kafka jvm process? Some things I can think of: Is there a lot of activity in the gc.log? Is the OS not swapping ferociously due to over-allocation of memory by any chance? Hope that helps. > Why is there no difference between kafka benchmark tests on SSD and HDD? > - > > Key: KAFKA-4971 > URL: https://issues.apache.org/jira/browse/KAFKA-4971 > Project: Kafka > Issue Type: Test >Affects Versions: 0.10.0.0 > Environment: Oracle VM VirtualBox > OS : CentOs 7 > Memory : 1G > Disk : 8GB >Reporter: Dasol Kim > > I installed OS and kafka in the two SSD and two HDDs to perform the kafka > benchmark test based on the disc difference. As expected, the SSD should show > faster results, but according to my experimental results, there is no big > difference between SSD and HDD. why? Ohter settings have been set to default. > *test settings > zookeeper node : 1, producer node : 2, broker node : 2(SSD 1, HDD 1) > test scenario : Two producers send messages to the broker and compare the > throughtput per second of kafka installed on SSD and kafka on HDD > command : ./bin/kafka-producer-perf-test.sh --num-records 100 > --record-size 2000 --topic test --throughput 10 --producer-props > bootstrap.servers=SN02:9092 > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?
[ https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953608#comment-15953608 ] Michal Borowiecki edited comment on KAFKA-4971 at 4/3/17 2:54 PM: -- I think your question would be easier to respond to if you quantified it by providing your test results and the drive specs. Kafka IO access patterns are designed to be sequential for good reason. Spinning disks and OS level buffering are optimised for such IO patterns, but I don't know if that alone can account for the miss-match between your expectations and the results you are getting on your hardware. was (Author: mihbor): I think your question would be easier to respond to if you quantified it by providing your test results and the drive specs. Kafka IO access patterns are designed to be sequential for good reason. Spinning disks and OS level buffering are optimised for such IO patterns, but I don't know if that alone can account for the miss-match between your expectations and the results your getting on your hardware. > Why is there no difference between kafka benchmark tests on SSD and HDD? > - > > Key: KAFKA-4971 > URL: https://issues.apache.org/jira/browse/KAFKA-4971 > Project: Kafka > Issue Type: Test >Affects Versions: 0.10.0.0 > Environment: Oracle VM VirtualBox > OS : CentOs 7 > Memory : 1G > Disk : 8GB >Reporter: Dasol Kim > > I installed OS and kafka in the two SSD and two HDDs to perform the kafka > benchmark test based on the disc difference. As expected, the SSD should show > faster results, but according to my experimental results, there is no big > difference between SSD and HDD. why? Ohter settings have been set to default. > *test settings > zookeeper node : 1, producer node : 2, broker node : 2(SSD 1, HDD 1) > test scenario : Two producers send messages to the broker and compare the > throughtput per second of kafka installed on SSD and kafka on HDD > command : ./bin/kafka-producer-perf-test.sh --num-records 100 > --record-size 2000 --topic test --throughput 10 --producer-props > bootstrap.servers=SN02:9092 > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?
[ https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953608#comment-15953608 ] Michal Borowiecki commented on KAFKA-4971: -- I think your question would be easier to respond to if you quantified it by providing your test results and the drive specs. Kafka IO access patterns are designed to be sequential for good reason. Spinning disks and OS level buffering are optimised for such IO patterns, but I don't know if that alone can account for the miss-match between your expectations and the results your getting on your hardware. > Why is there no difference between kafka benchmark tests on SSD and HDD? > - > > Key: KAFKA-4971 > URL: https://issues.apache.org/jira/browse/KAFKA-4971 > Project: Kafka > Issue Type: Test >Affects Versions: 0.10.0.0 > Environment: Oracle VM VirtualBox > OS : CentOs 7 > Memory : 1G > Disk : 8GB >Reporter: Dasol Kim > > I installed OS and kafka in the two SSD and two HDDs to perform the kafka > benchmark test based on the disc difference. As expected, the SSD should show > faster results, but according to my experimental results, there is no big > difference between SSD and HDD. why? Ohter settings have been set to default. > *test settings > zookeeper node : 1, producer node : 2, broker node : 2(SSD 1, HDD 1) > test scenario : Two producers send messages to the broker and compare the > throughtput per second of kafka installed on SSD and kafka on HDD > command : ./bin/kafka-producer-perf-test.sh --num-records 100 > --record-size 2000 --topic test --throughput 10 --producer-props > bootstrap.servers=SN02:9092 > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907523#comment-15907523 ] Michal Borowiecki commented on KAFKA-4835: -- My point above was that if the customerRef (which is what we partition by) was part of the key (not the whole key) then we'd still need to modify the key for the purpose of the join operation. We'd need to do that for both streams, even though they would both be partitioned by the same part of the key, hence the re-partitioning (forced automatically by kafka streams) would be totally unnecessary. In more generic terms, I think this can be a common use case. Let' consider it using DDD concepts. We have an aggregate comprised of multiple entities. We send messages for each entity (not the whole aggregate) but to ensure sequential processing for entities belonging to the same aggregate, the messages are partitioned by the aggregate id. The entity id is still important, especially for compacted topics it would be needed for deletion markers, as the key is all there is in that case. Hence it comes naturally to compose the message key as :: Then, if you want to join two such streams by aggregate id, you should be able to do it without repartitioning (since both partitioned by the aggregate-id part of the msg key). However, since joins are only supported on the whole msg key, you're forced to re-map the key to just prior to the join which in turn currently forces repartitioning. > Allow users control over repartitioning > --- > > Key: KAFKA-4835 > URL: https://issues.apache.org/jira/browse/KAFKA-4835 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki > Labels: needs-kip > > From > https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 > ...it would be good to provide users more control over the repartitioning. > My use case is as follows (unrelated bits omitted for brevity): > {code} > KTableloggedInCustomers = builder > .stream("customerLogins") > .groupBy((key, activity) -> > activity.getCustomerRef()) > .reduce((first,second) -> second, loginStore()); > > builder > .stream("balanceUpdates") > .map((key, activity) -> new KeyValue<>( > activity.getCustomerRef(), > activity)) > .join(loggedInCustomers, (activity, session) -> ... > .to("sessions"); > {code} > Both "groupBy" and "map" in the underlying implementation set the > repartitionRequired flag (since the key changes), and the aggregation/join > that follows will create the repartitioned topic. > However, in our case I know that both input streams are already partitioned > by the customerRef value, which I'm mapping into the key (because it's > required by the join operation). > So there are 2 unnecessary intermediate topics created with their associated > overhead, while the ultimate goal is simply to do a join on a value that we > already use to partition the original streams anyway. > (Note, we don't have the option to re-implement the original input streams to > make customerRef the message key.) > I think it would be better to allow the user to decide (from their knowledge > of the incoming streams) whether a repartition is mandatory on aggregation > and join operations (overloaded version of the methods with the > repartitionRequired flag exposed maybe?) > An alternative would be to allow users to perform a join on a value other > than the key (a keyValueMapper parameter to join, like the one used for joins > with global tables), but I expect that to be more involved and error-prone to > use for people who don't understand the partitioning requirements well > (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904785#comment-15904785 ] Michal Borowiecki commented on KAFKA-4835: -- Yes, that's the case. Message key is a concatenation of activity type + activity id but the partitioning is done on the customer. NB. I don't think it was wise for us to not put the key partitioned on in the msg key, however, that ship has sailed, I'm afraid. However, my understanding is that even if the partitioning key was part of the msg key (e.g. activity type + customerRef + activity id), we'd still be using a custom partitioner and we'd still have this issue. > Allow users control over repartitioning > --- > > Key: KAFKA-4835 > URL: https://issues.apache.org/jira/browse/KAFKA-4835 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki > Labels: needs-kip > > From > https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 > ...it would be good to provide users more control over the repartitioning. > My use case is as follows (unrelated bits omitted for brevity): > {code} > KTableloggedInCustomers = builder > .stream("customerLogins") > .groupBy((key, activity) -> > activity.getCustomerRef()) > .reduce((first,second) -> second, loginStore()); > > builder > .stream("balanceUpdates") > .map((key, activity) -> new KeyValue<>( > activity.getCustomerRef(), > activity)) > .join(loggedInCustomers, (activity, session) -> ... > .to("sessions"); > {code} > Both "groupBy" and "map" in the underlying implementation set the > repartitionRequired flag (since the key changes), and the aggregation/join > that follows will create the repartitioned topic. > However, in our case I know that both input streams are already partitioned > by the customerRef value, which I'm mapping into the key (because it's > required by the join operation). > So there are 2 unnecessary intermediate topics created with their associated > overhead, while the ultimate goal is simply to do a join on a value that we > already use to partition the original streams anyway. > (Note, we don't have the option to re-implement the original input streams to > make customerRef the message key.) > I think it would be better to allow the user to decide (from their knowledge > of the incoming streams) whether a repartition is mandatory on aggregation > and join operations (overloaded version of the methods with the > repartitionRequired flag exposed maybe?) > An alternative would be to allow users to perform a join on a value other > than the key (a keyValueMapper parameter to join, like the one used for joins > with global tables), but I expect that to be more involved and error-prone to > use for people who don't understand the partitioning requirements well > (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897771#comment-15897771 ] Michal Borowiecki commented on KAFKA-3514: -- Oh, I wouldn't mind that at all. Just thought that you wanted to stick to event time semantics for this, but if you're not precious about that then I'm all for it :) > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897413#comment-15897413 ] Michal Borowiecki commented on KAFKA-3514: -- Thank you for responding. Just now I had a thought about the semantics of event time. It is already possible to provide a TimestampExtractor that determines what the event time is, given a message. It's not far fetched to assume user should also want a way to specify what the event time is, given the absence of messages (on one or more input partitions). Possibly by providing an implementation other than what PartitionGroup.timestamp() is doing based on the timestamps of its partitions. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897371#comment-15897371 ] Michal Borowiecki commented on KAFKA-3514: -- Hi [~enothereska], I have to disagree. It is perfectly clear to me (from the documentation) that punctuate is based on event time, not system time. However, the problem is event time is not advanced reliably, since a single input stream that doesn't receive messages will cause the event time to not be advanced. In an extreme case of a poorly partitioned topic, I can imagine some partition may never get a message. That would cause a topology that has that partition as input to not advance event time ever, hence not fire punctuate ever, regardless of the presence of messages on its other input topics. In my opinion, if the purpose of punctuate is to perform periodic operations, then this flaw makes it unfit for that purpose. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL
[ https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894498#comment-15894498 ] Michal Borowiecki commented on KAFKA-4601: -- Created KAFKA-4835. > Avoid duplicated repartitioning in KStream DSL > -- > > Key: KAFKA-4601 > URL: https://issues.apache.org/jira/browse/KAFKA-4601 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: performance > > Consider the following DSL: > {code} > Streamsource = builder.stream(Serdes.String(), > Serdes.String(), "topic1").map(..); > KTable counts = source > .groupByKey() > .count("Counts"); > KStream sink = source.leftJoin(counts, ..); > {code} > The resulted topology looks like this: > {code} > ProcessorTopology: > KSTREAM-SOURCE-00: > topics: [topic1] > children: [KSTREAM-MAP-01] > KSTREAM-MAP-01: > children: > [KSTREAM-FILTER-04, KSTREAM-FILTER-07] > KSTREAM-FILTER-04: > children: > [KSTREAM-SINK-03] > KSTREAM-SINK-03: > topic: X-Counts-repartition > KSTREAM-FILTER-07: > children: > [KSTREAM-SINK-06] > KSTREAM-SINK-06: > topic: > X-KSTREAM-MAP-01-repartition > ProcessorTopology: > KSTREAM-SOURCE-08: > topics: > [X-KSTREAM-MAP-01-repartition] > children: > [KSTREAM-LEFTJOIN-09] > KSTREAM-LEFTJOIN-09: > states: [Counts] > KSTREAM-SOURCE-05: > topics: [X-Counts-repartition] > children: > [KSTREAM-AGGREGATE-02] > KSTREAM-AGGREGATE-02: > states: [Counts] > {code} > I.e. there are two repartition topics, one for the aggregate and one for the > join, which not only introduce unnecessary overheads but also mess up the > processing ordering (users are expecting each record to go through > aggregation first then the join operator). And in order to get the following > simpler topology users today need to add a {{through}} operator after {{map}} > manually to enforce repartitioning. > {code} > ProcessorTopology: > KSTREAM-SOURCE-00: > topics: [topic1] > children: [KSTREAM-MAP-01] > KSTREAM-MAP-01: > children: > [KSTREAM-SINK-02] > KSTREAM-SINK-02: > topic: topic 2 > ProcessorTopology: > KSTREAM-SOURCE-03: > topics: [topic 2] > children: > [KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05] > KSTREAM-AGGREGATE-04: > states: [Counts] > KSTREAM-LEFTJOIN-05: > states: [Counts] > {code} > This kind of optimization should be automatic in Streams, which we can > consider doing when extending from one-operator-at-a-time translation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4835) Allow users control over repartitioning
Michal Borowiecki created KAFKA-4835: Summary: Allow users control over repartitioning Key: KAFKA-4835 URL: https://issues.apache.org/jira/browse/KAFKA-4835 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.2.0 Reporter: Michal Borowiecki >From >https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 ...it would be good to provide users more control over the repartitioning. My use case is as follows (unrelated bits omitted for brevity): {code} KTableloggedInCustomers = builder .stream("customerLogins") .groupBy((key, activity) -> activity.getCustomerRef()) .reduce((first,second) -> second, loginStore()); builder .stream("balanceUpdates") .map((key, activity) -> new KeyValue<>( activity.getCustomerRef(), activity)) .join(loggedInCustomers, (activity, session) -> ... .to("sessions"); {code} Both "groupBy" and "map" in the underlying implementation set the repartitionRequired flag (since the key changes), and the aggregation/join that follows will create the repartitioned topic. However, in our case I know that both input streams are already partitioned by the customerRef value, which I'm mapping into the key (because it's required by the join operation). So there are 2 unnecessary intermediate topics created with their associated overhead, while the ultimate goal is simply to do a join on a value that we already use to partition the original streams anyway. (Note, we don't have the option to re-implement the original input streams to make customerRef the message key.) I think it would be better to allow the user to decide (from their knowledge of the incoming streams) whether a repartition is mandatory on aggregation and join operations (overloaded version of the methods with the repartitionRequired flag exposed maybe?) An alternative would be to allow users to perform a join on a value other than the key (a keyValueMapper parameter to join, like the one used for joins with global tables), but I expect that to be more involved and error-prone to use for people who don't understand the partitioning requirements well (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL
[ https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881030#comment-15881030 ] Michal Borowiecki commented on KAFKA-4601: -- Don't know if this belongs in this ticket or warrants a separate one, but I'd suggest, instead of trying to rely on kstreams doing more automatic optimization, it would be good to provide users more control over the repartitioning. My use case is as follows (unrelated bits omitted for brevity): {code} KTableloggedInCustomers = builder .stream("customerLogins") .groupBy((key, activity) -> activity.getCustomerRef()) .reduce((first,second) -> second, loginStore()); builder .stream("balanceUpdates") .map((key, activity) -> new KeyValue<>( activity.getCustomerRef(), activity)) .join(loggedInCustomers, (activity, session) -> ... .to("sessions"); {code} Both "groupBy" and "map" in the underlying implementation set the repartitionRequired flag (since the key changes), and the aggregation/join that follows will create the repartitioned topic. However, in our case I know that both input streams are already partitioned by the customerRef value, which I'm mapping into the key (because it's required by the join operation). So there are 2 unnecessary intermediate topics created with their associated overhead, while the ultimate goal is simply to do a join on a value that we already use to partition the original streams anyway. (Note, we don't have the option to re-implement the original input streams to make customerRef the message key.) I think it would be better to allow the user to decide (from their knowledge of the incoming streams) whether a repartition is mandatory on aggregation and join operations (overloaded version of the methods with the repartitionRequired flag exposed maybe?) An alternative would be to allow users to perform a join on a value other than the key (a keyValueMapper parameter to join, like the one used for joins with global tables), but I expect that to be more involved and error-prone to use for people who don't understand the partitioning requirements well (whereas it's safe for global tables). > Avoid duplicated repartitioning in KStream DSL > -- > > Key: KAFKA-4601 > URL: https://issues.apache.org/jira/browse/KAFKA-4601 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: performance > > Consider the following DSL: > {code} > Stream source = builder.stream(Serdes.String(), > Serdes.String(), "topic1").map(..); > KTable counts = source > .groupByKey() > .count("Counts"); > KStream sink = source.leftJoin(counts, ..); > {code} > The resulted topology looks like this: > {code} > ProcessorTopology: > KSTREAM-SOURCE-00: > topics: [topic1] > children: [KSTREAM-MAP-01] > KSTREAM-MAP-01: > children: > [KSTREAM-FILTER-04, KSTREAM-FILTER-07] > KSTREAM-FILTER-04: > children: > [KSTREAM-SINK-03] > KSTREAM-SINK-03: > topic: X-Counts-repartition > KSTREAM-FILTER-07: > children: > [KSTREAM-SINK-06] > KSTREAM-SINK-06: > topic: > X-KSTREAM-MAP-01-repartition > ProcessorTopology: > KSTREAM-SOURCE-08: > topics: > [X-KSTREAM-MAP-01-repartition] > children: > [KSTREAM-LEFTJOIN-09] > KSTREAM-LEFTJOIN-09: > states: [Counts] > KSTREAM-SOURCE-05: > topics: [X-Counts-repartition] > children: > [KSTREAM-AGGREGATE-02] > KSTREAM-AGGREGATE-02: > states:
[jira] [Created] (KAFKA-4750) KeyValueIterator returns null values
Michal Borowiecki created KAFKA-4750: Summary: KeyValueIterator returns null values Key: KAFKA-4750 URL: https://issues.apache.org/jira/browse/KAFKA-4750 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.1.1 Reporter: Michal Borowiecki The API for ReadOnlyKeyValueStore.range method promises the returned iterator will not return null values. However, after upgrading from 0.10.0.0 to 0.10.1.1 we found null values are returned causing NPEs on our side. I found this happens after removing entries from the store and I found resemblance to SAMZA-94 defect. The problem seems to be as it was there, when deleting entries and having a serializer that does not return null when null is passed in, the state store doesn't actually delete that key/value pair but the iterator will return null value for that key. When I modified our serilizer to return null when null is passed in, the problem went away. However, I believe this should be fixed in kafka streams, perhaps with a similar approach as SAMZA-94. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683056#comment-15683056 ] Michal Borowiecki commented on KAFKA-3514: -- IMO, 2) *is* a severe problem. Punctuate methods (as described by their API) are meant to perform periodic operations. As it currently stands, if any of the input topics doesn't receive messages regularly, the punctuate method won't be called regularly either (due to the min offset across all partitions not advancing), which violates what the API promises. We've worked around it in our app by creating an independent stream and a scheduler sending ticks regularly to an input topic to a Transformer, so that it's punctuate method is called predictably but this is far from ideal. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.2.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628442#comment-15628442 ] Michal Borowiecki commented on KAFKA-4355: -- KAFKA-4366 created for the KafkaSteams.close() hanging issue. > StreamThread intermittently dies with "Topic not found during partition > assignment" when broker restarted > - > > Key: KAFKA-4355 > URL: https://issues.apache.org/jira/browse/KAFKA-4355 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0, 0.10.0.0 > Environment: kafka 0.10.0.0 > kafka 0.10.1.0 > uname -a > Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC > 2016 x86_64 x86_64 x86_64 GNU/Linux > java -version > java version "1.8.0_92" > Java(TM) SE Runtime Environment (build 1.8.0_92-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode) >Reporter: Michal Borowiecki >Assignee: Guozhang Wang > > When (a) starting kafka streams app before the broker or > (b) restarting the broker while the streams app is running: > the stream thread intermittently dies with "Topic not found during partition > assignment" StreamsException. > This happens about between one in 5 or one in 10 times. > Stack trace: > {noformat} > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: scheduler > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > {noformat} > Our app has 2 streams in it, consuming from 2 different topics. > Sometimes the exception happens on both stream threads. Sometimes only on one > of the stream threads. > The exception is preceded by: > {noformat} > [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group > pool-scheduler > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2016-10-28 16:17:55,240] INFO
[jira] [Created] (KAFKA-4366) KafkaStreams.close() blocks indefinitely
Michal Borowiecki created KAFKA-4366: Summary: KafkaStreams.close() blocks indefinitely Key: KAFKA-4366 URL: https://issues.apache.org/jira/browse/KAFKA-4366 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.1, 0.10.1.0 Reporter: Michal Borowiecki Assignee: Guozhang Wang KafkaStreams.close() method calls join on all its threads without a timeout, meaning indefinitely, which makes it prone to deadlocks and unfit to be used in shutdown hooks. (KafkaStreams::close is used in numerous examples by confluent: https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams and https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ so we assumed it to be recommended practice) A deadlock happens, for instance, if System.exit() is called from within the uncaughtExceptionHandler. (We need to call System.exit() from the uncaughtExceptionHandler because KAFKA-4355 issue shuts down the StreamThread and to recover we want the process to exit, as our infrastructure will then start it up again.) The System.exit call (from the uncaughtExceptionHandler, which runs in the StreamThread) will execute the shutdown hook in a new thread and wait for that thread to join. If the shutdown hook calls KafkaStreams.close, it will in turn block waiting for the StreamThread to join, hence the deadlock. Runtime.addShutdownHook javadocs state: {quote} Shutdown hooks run at a delicate time in the life cycle of a virtual machine and should therefore be coded defensively. They should, in particular, be written to be thread-safe and to avoid deadlocks insofar as possible {quote} and {quote} Shutdown hooks should also finish their work quickly. {quote} Therefore the current implementation of KafkaStreams.close() which waits forever for threads to join is completely unsuitable for use in a shutdown hook. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15618009#comment-15618009 ] Michal Borowiecki edited comment on KAFKA-4355 at 10/29/16 12:18 PM: - Trying to work around this issue by calling System.exit from the UncaughtExceptionHandler (once the app dies, it will be re-started by our infrastructure). We are adding a shutdown hook as per example here: http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ {code:java} Runtime.getRuntime().addShutdownHook(new Thread(schedulerStreams::close)); {code} However, even though both stream threads report completion of shutdown: {noformat} [2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) [2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) {noformat} and before that report the closing of their producers and consumers, the app is not stopped. At least the following 2 threads remain active and keep logging: {noformat} [2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590021 after 0ms (org.apache.zookeeper.ClientCnxn) [2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] Sending metadata request {topics=[scheduler]} to node 0 (org.apache.kafka.clients.NetworkClient) [2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = [Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = [0,])]) (org.apache.kafka.clients.Metadata) [2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590022 after 0ms (org.apache.zookeeper.ClientCnxn) {noformat} "Stopped Kafka Stream process" is never logged, so the close method remains blocked on the join here, I suspect: https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227 PS. When we don't add the shutdown hook to call close(), then the app exits correctly on System.exit(). I think it is pretty bad behaviour if the close() method blocks indefinitely, so I'll raise a separate ticket, unless I find one exists for that already. It should be easier to reproduce hopefully. was (Author: mihbor): Trying to work around this issue by calling System.exit from the UncaughtExceptionHandler (once the app dies, it will be re-started by our infrastructure). We are adding a shutdown hook as per example here: http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ {code:java} Runtime.getRuntime().addShutdownHook(new Thread(schedulerStreams::close)); {code} However, even though both stream threads report completion of shutdown: {noformat} [2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) [2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) {noformat} and before that report the closing of their producers and consumers, the app is not stopped. At least the following 2 threads remain active and keep logging: {noformat} [2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590021 after 0ms (org.apache.zookeeper.ClientCnxn) [2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] Sending metadata request {topics=[scheduler]} to node 0 (org.apache.kafka.clients.NetworkClient) [2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = [Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = [0,])]) (org.apache.kafka.clients.Metadata) [2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590022 after 0ms (org.apache.zookeeper.ClientCnxn) {noformat} "Stopped Kafka Stream process" is never logged, so the close method remains blocked on the join here, I suspect: https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227 > StreamThread intermittently dies with "Topic not found during partition >
[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15618009#comment-15618009 ] Michal Borowiecki commented on KAFKA-4355: -- Trying to work around this issue by calling System.exit from the UncaughtExceptionHandler (once the app dies, it will be re-started by our infrastructure). We are adding a shutdown hook as per example here: http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ {code:java} Runtime.getRuntime().addShutdownHook(new Thread(schedulerStreams::close)); {code} However, even though both stream threads report completion of shutdown: {noformat} [2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) [2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) {noformat} and before that report the closing of their producers and consumers, the app is not stopped. At least the following 2 threads remain active and keep logging: {noformat} [2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590021 after 0ms (org.apache.zookeeper.ClientCnxn) [2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] Sending metadata request {topics=[scheduler]} to node 0 (org.apache.kafka.clients.NetworkClient) [2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = [Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = [0,])]) (org.apache.kafka.clients.Metadata) [2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590022 after 0ms (org.apache.zookeeper.ClientCnxn) {noformat} "Stopped Kafka Stream process" is never logged, so the close method remains blocked on the join here, I suspect: https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227 > StreamThread intermittently dies with "Topic not found during partition > assignment" when broker restarted > - > > Key: KAFKA-4355 > URL: https://issues.apache.org/jira/browse/KAFKA-4355 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0, 0.10.0.0 > Environment: kafka 0.10.0.0 > kafka 0.10.1.0 > uname -a > Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC > 2016 x86_64 x86_64 x86_64 GNU/Linux > java -version > java version "1.8.0_92" > Java(TM) SE Runtime Environment (build 1.8.0_92-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode) >Reporter: Michal Borowiecki >Assignee: Guozhang Wang > > When (a) starting kafka streams app before the broker or > (b) restarting the broker while the streams app is running: > the stream thread intermittently dies with "Topic not found during partition > assignment" StreamsException. > This happens about between one in 5 or one in 10 times. > Stack trace: > {noformat} > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: scheduler > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) > at >
[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15616118#comment-15616118 ] Michal Borowiecki commented on KAFKA-4355: -- Perhaps the DefultPartitionGrouper here: https://github.com/apache/kafka/blob/e7663a306f40e9fcbc3096d17fb0f99fa3d11d1d/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java#L81 should instead of StreamsException throw a RetriableException? AbstractCoordinator would then keep looping instead of re-throwing it: https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L320 > StreamThread intermittently dies with "Topic not found during partition > assignment" when broker restarted > - > > Key: KAFKA-4355 > URL: https://issues.apache.org/jira/browse/KAFKA-4355 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0, 0.10.0.0 > Environment: kafka 0.10.0.0 > kafka 0.10.1.0 > uname -a > Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC > 2016 x86_64 x86_64 x86_64 GNU/Linux > java -version > java version "1.8.0_92" > Java(TM) SE Runtime Environment (build 1.8.0_92-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode) >Reporter: Michal Borowiecki >Assignee: Guozhang Wang > > When (a) starting kafka streams app before the broker or > (b) restarting the broker while the streams app is running: > the stream thread intermittently dies with "Topic not found during partition > assignment" StreamsException. > This happens about between one in 5 or one in 10 times. > Stack trace: > {noformat} > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: scheduler > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) > at >
[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15615967#comment-15615967 ] Michal Borowiecki commented on KAFKA-4355: -- My first suspect so far is the ConsumerCoordinator. In this line: https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L301 it sets topics on the metadata from subscriptions, which the debugger shows to contain the correct topic name. 4 lines later: https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L305 it calls client.ensureFreshMetadata(), which can override the topics list. Debugger shows that in the problematic case in https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L313 the passed metadata object already has an empty set of topics, while the subscriptions object contains the topic name. So I think the topic was removed from the metadata in line 305. > StreamThread intermittently dies with "Topic not found during partition > assignment" when broker restarted > - > > Key: KAFKA-4355 > URL: https://issues.apache.org/jira/browse/KAFKA-4355 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0, 0.10.0.0 > Environment: kafka 0.10.0.0 > kafka 0.10.1.0 > uname -a > Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC > 2016 x86_64 x86_64 x86_64 GNU/Linux > java -version > java version "1.8.0_92" > Java(TM) SE Runtime Environment (build 1.8.0_92-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode) >Reporter: Michal Borowiecki >Assignee: Guozhang Wang > > When (a) starting kafka streams app before the broker or > (b) restarting the broker while the streams app is running: > the stream thread intermittently dies with "Topic not found during partition > assignment" StreamsException. > This happens about between one in 5 or one in 10 times. > Stack trace: > {noformat} > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: scheduler > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) > at >
[jira] [Created] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
Michal Borowiecki created KAFKA-4355: Summary: StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted Key: KAFKA-4355 URL: https://issues.apache.org/jira/browse/KAFKA-4355 Project: Kafka Issue Type: Bug Components: streams Environment: kafka 0.10.0.0 kafka 0.10.1.0 Reporter: Michal Borowiecki Assignee: Guozhang Wang When (a) starting kafka streams app before the broker or (b) restarting the broker while the streams app is running: the stream thread intermittently dies with "Topic not found during partition assignment" StreamsException. This happens about between one in 5 or one in 10 times. Stack trace: {noformat} Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Topic not found during partition assignment: scheduler at org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) at org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) {noformat} Our app has 2 streams in it, consuming from 2 different topics. Sometimes the exception happens on both stream threads. Sometimes only on one of the stream threads. The exception is preceded by: {noformat} [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] Completed validating internal topics in partition assignor
[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-4355: - Environment: kafka 0.10.0.0 kafka 0.10.1.0 uname -a Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux java -version java version "1.8.0_92" Java(TM) SE Runtime Environment (build 1.8.0_92-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode) was: kafka 0.10.0.0 kafka 0.10.1.0 > StreamThread intermittently dies with "Topic not found during partition > assignment" when broker restarted > - > > Key: KAFKA-4355 > URL: https://issues.apache.org/jira/browse/KAFKA-4355 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0, 0.10.0.0 > Environment: kafka 0.10.0.0 > kafka 0.10.1.0 > uname -a > Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC > 2016 x86_64 x86_64 x86_64 GNU/Linux > java -version > java version "1.8.0_92" > Java(TM) SE Runtime Environment (build 1.8.0_92-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode) >Reporter: Michal Borowiecki >Assignee: Guozhang Wang > > When (a) starting kafka streams app before the broker or > (b) restarting the broker while the streams app is running: > the stream thread intermittently dies with "Topic not found during partition > assignment" StreamsException. > This happens about between one in 5 or one in 10 times. > Stack trace: > {noformat} > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: scheduler > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > {noformat} > Our app has 2 streams in it, consuming from 2 different topics. > Sometimes the exception happens on both stream threads. Sometimes only on one
[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-4355: - Description: When (a) starting kafka streams app before the broker or (b) restarting the broker while the streams app is running: the stream thread intermittently dies with "Topic not found during partition assignment" StreamsException. This happens about between one in 5 or one in 10 times. Stack trace: {noformat} Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Topic not found during partition assignment: scheduler at org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) at org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) {noformat} Our app has 2 streams in it, consuming from 2 different topics. Sometimes the exception happens on both stream threads. Sometimes only on one of the stream threads. The exception is preceded by: {noformat} [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] Completed validating internal topics in partition assignor (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread) [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] Shutting down
[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted
[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michal Borowiecki updated KAFKA-4355: - Affects Version/s: 0.10.1.0 0.10.0.0 > StreamThread intermittently dies with "Topic not found during partition > assignment" when broker restarted > - > > Key: KAFKA-4355 > URL: https://issues.apache.org/jira/browse/KAFKA-4355 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0, 0.10.0.0 > Environment: kafka 0.10.0.0 > kafka 0.10.1.0 >Reporter: Michal Borowiecki >Assignee: Guozhang Wang > > When (a) starting kafka streams app before the broker or > (b) restarting the broker while the streams app is running: > the stream thread intermittently dies with "Topic not found during partition > assignment" StreamsException. > This happens about between one in 5 or one in 10 times. > Stack trace: > {noformat} > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: scheduler > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > {noformat} > Our app has 2 streams in it, consuming from 2 different topics. > Sometimes the exception happens on both stream threads. Sometimes only on one > of the stream threads. > The exception is preceded by: > {noformat} > [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group > pool-scheduler > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator > lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group > pool-scheduler > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator > lp02485.openbet:19373 (id: 2147483647 rack: null) for group