[jira] [Created] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE

2018-09-24 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-7434:


 Summary: DeadLetterQueueReporter throws NPE if transform throws NPE
 Key: KAFKA-7434
 URL: https://issues.apache.org/jira/browse/KAFKA-7434
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
 Environment: jdk 8
Reporter: Michal Borowiecki


A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{quote}{{Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
\{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. 
(org.apache.kafka.connect.runtime.errors.LogReporter)}}
{{java.lang.NullPointerException}}
{{Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)}}
{{java.lang.NullPointerException}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)}}
{{ at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)}}
{{ at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)}}
{{ at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)}}
{{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
{{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}
{{ at java.lang.Thread.run(Thread.java:748)}}
{quote}
 

This is caused by populateContextHeaders only checking if the Throwable is not 
null, but not checking that the message in the Throwable is not null before 
trying to serialize the message:

[https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]

if (context.error() != null) {
    headers.add(ERROR_HEADER_EXCEPTION, 
toBytes(context.error().getClass().getName()));
    headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
toBytes(context.error().getMessage()));



toBytes throws an NPE if passed null as the parameter.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-5677) Remove deprecated punctuate method

2017-07-29 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-5677:


 Summary: Remove deprecated punctuate method
 Key: KAFKA-5677
 URL: https://issues.apache.org/jira/browse/KAFKA-5677
 Project: Kafka
  Issue Type: Task
Reporter: Michal Borowiecki


Task to track the removal of the punctuate method that got deprecated in 
KAFKA-5233 and associated unit tests.
(not sure the fix version number at this point)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5245) KStream builder should capture serdes

2017-06-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047608#comment-16047608
 ] 

Michal Borowiecki commented on KAFKA-5245:
--

Just wanted to say it's great to see there's a ticket for this :-) Always found 
it counter-intuitive that the default serdes are taken from config instead of 
upstream in these cases.

> KStream builder should capture serdes 
> --
>
> Key: KAFKA-5245
> URL: https://issues.apache.org/jira/browse/KAFKA-5245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1
>Reporter: Yeva Byzek
>Assignee: anugrah
>Priority: Minor
>  Labels: beginner, newbie
>
> Even if one specifies a serdes in `builder.stream`, later a call to 
> `groupByKey` may require the serdes again if it differs from the configured 
> streams app serdes. The preferred behavior is that if no serdes is provided 
> to `groupByKey`, it should use whatever was provided in `builder.stream` and 
> not what was in the app.
> From the current docs:
> “When to set explicit serdes: Variants of groupByKey exist to override the 
> configured default serdes of your application, which you must do if the key 
> and/or value types of the resulting KGroupedStream do not match the 
> configured default serdes.”



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer

2017-06-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557
 ] 

Michal Borowiecki edited comment on KAFKA-5419 at 6/9/17 3:42 PM:
--

I think it's a duplicate of KAFKA-2526 and KAFKA-5149


was (Author: mihbor):
I think it's a duplicate of KAFKA-5149

> Console consumer --key-deserializer and --value-deserializer are always 
> overwritten by ByteArrayDeserializer
> 
>
> Key: KAFKA-5419
> URL: https://issues.apache.org/jira/browse/KAFKA-5419
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Priority: Minor
>
> Hi,
> the --key-deserializer and  --value-deserializer options passed to the 
> command line are always overwritten here :
> {code}
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> {code}
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer

2017-06-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557
 ] 

Michal Borowiecki edited comment on KAFKA-5419 at 6/9/17 3:41 PM:
--

I think it's a duplicate of KAFKA-5149


was (Author: mihbor):
I think it's a duplicate of KAFKA-2526

> Console consumer --key-deserializer and --value-deserializer are always 
> overwritten by ByteArrayDeserializer
> 
>
> Key: KAFKA-5419
> URL: https://issues.apache.org/jira/browse/KAFKA-5419
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Priority: Minor
>
> Hi,
> the --key-deserializer and  --value-deserializer options passed to the 
> command line are always overwritten here :
> {code}
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> {code}
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer

2017-06-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557
 ] 

Michal Borowiecki commented on KAFKA-5419:
--

I think it's a duplicate of KAFKA-2526

> Console consumer --key-deserializer and --value-deserializer are always 
> overwritten by ByteArrayDeserializer
> 
>
> Key: KAFKA-5419
> URL: https://issues.apache.org/jira/browse/KAFKA-5419
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Priority: Minor
>
> Hi,
> the --key-deserializer and  --value-deserializer options passed to the 
> command line are always overwritten here :
> {code}
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> {code}
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5246) Remove backdoor that allows any client to produce to internal topics

2017-06-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044179#comment-16044179
 ] 

Michal Borowiecki commented on KAFKA-5246:
--

Would it instead perhaps make sense to document this special client id, 
together with its use-cases listed in the PR comments?

>  Remove backdoor that allows any client to produce to internal topics
> -
>
> Key: KAFKA-5246
> URL: https://issues.apache.org/jira/browse/KAFKA-5246
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Minor
>
> kafka.admim.AdminUtils defines an ‘AdminClientId' val, which looks to be 
> unused in the code, with the exception of a single use in KafkaAPis.scala in 
> handleProducerRequest, where is looks to allow any client, using the special 
> ‘__admin_client' client id, to append to internal topics.
> This looks like a security risk to me, as it would allow any client to 
> produce either rouge offsets or even a record containing something other than 
> group/offset info.
> Can we remove this please?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not

2017-05-27 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027521#comment-16027521
 ] 

Michal Borowiecki commented on KAFKA-5155:
--

Hi [~plavjanik], do you care to submit a pull request with the test and the fix?

> Messages can be deleted prematurely when some producers use timestamps and 
> some not
> ---
>
> Key: KAFKA-5155
> URL: https://issues.apache.org/jira/browse/KAFKA-5155
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Petr Plavjaník
>
> Some messages can be deleted prematurely and never read in following 
> scenario. A producer uses timestamps and produces messages that are appended 
> to the beginning of a log segment. Other producer produces messages without a 
> timestamp. In that case the largest timestamp is made by the old messages 
> with a timestamp and new messages with the timestamp does not influence and 
> the log segment with old and new messages can be delete immediately after the 
> last new message with no timestamp is appended. When all appended messages 
> have no timestamp, then they are not deleted because {{lastModified}} 
> attribute of a {{LogSegment}} is used.
> New test case to {{kafka.log.LogTest}} that fails:
> {code}
>   @Test
>   def 
> shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
> val retentionMs = 1000
> val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
> val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, 
> magicValue = 0)
> val log = createLog(set.sizeInBytes, retentionMs = retentionMs)
> // append some messages to create some segments
> log.append(old)
> for (_ <- 0 until 12)
>   log.append(set)
> assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
>   }
> {code}
> It can be prevented by using {{def largestTimestamp = 
> Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using 
> current timestamp when messages with timestamp {{-1}} are appended.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022695#comment-16022695
 ] 

Michal Borowiecki commented on KAFKA-5319:
--

[~markTC], shouldn't this be in "Patch Available" instead of "Resolved" status 
until the PR is merged?

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
> Attachments: ClusterBalanceCommand.scala
>
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5243) Request to add row limit in ReadOnlyKeyValueStore range function

2017-05-23 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020911#comment-16020911
 ] 

Michal Borowiecki commented on KAFKA-5243:
--

Just a note, replacing the second argument is not an option IMO, as it would 
clash with the current range(K from, K to) method. K is a type parameter that 
itself could be an int, making the two indistinguishable, I think.

Secondly, the existing range() and all() methods expressly do not guarantee 
ordering of the returned iterator. I think the new range(from, to, limit) 
method would only make sense if order in the returned iterator is consistent 
across invocations. This is probably not a problem for the built-in stores, but 
given these stores are meant to be pluggable, perhaps it would be better to not 
force other stores implementations to take on those guarantees? Instead a new 
interface with stronger guarantees could be added e.g. 
ReadOnlyOrderedKeyValueStore extending ReadOnlyKeyValueStore and adding this 
extra method. It could also add the consistent ordering promise on the 
inherited range(from, to) and all() methods. Just a thought.

Probably best to raise a KIP and discuss on the mailing list. Since this is a 
public API change a KIP is required anyway:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 

> Request to add row limit in ReadOnlyKeyValueStore range function
> 
>
> Key: KAFKA-5243
> URL: https://issues.apache.org/jira/browse/KAFKA-5243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Joe Wood
>
> When using distributed queries across a cluster of stream stores it's quite 
> common to use query pagination to limit the number of rows returned. The 
> {{range}} function on {{ReadOnlyKeyValueStore}} only accepts the {{to}} and 
> {{from}} keys. This means that the query created either unncessarily 
> retrieves the entire range and manually limits the rows, or estimates the 
> range based on the key values. Neither options are ideal for processing 
> distributed queries.
> This suggestion is to add an overload to the {{range}} function by adding a 
> third (or replacement second) argument as a suggested row limit count. This 
> means that the range of keys returned will not exceed the supplied count.
> {code:java}
> // Get an iterator over a given range of keys, limiting to limit elements.
> KeyValueIterator range(K from, K to, int limit)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-23 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5233:
-
Fix Version/s: (was: 0.11.0.0)
   0.11.1.0

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 0.11.1.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-15 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5233:
-
Status: Patch Available  (was: In Progress)

Pull request here:
https://github.com/apache/kafka/pull/3055
Looking forward to feedback.

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-15 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5233 started by Michal Borowiecki.

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-05-15 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010530#comment-16010530
 ] 

Michal Borowiecki commented on KAFKA-3455:
--

Hi [~bobbycalderwood],

Can you please describe your use-case, where it would be useful to re-use 
Processor/Transformer implementations?

As to Transformer.punctuate return value having to be null, the javadoc was in 
error but has been fixed on trunk (to be released).

Changing the method signature of Transformer.punctuate would be a 
backward-incompatible change, however, 
[KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]
 will deprecated both methods in favour of a functional interface passed to 
ProcessorContext.schedule(), so it's a small step in the direction you're 
suggesting.

I think AutoCloseable is a false friend in this case. The intention behind 
AutoCloseable is for objects created in a try-with-resources statement to be 
closed when execution exists that statement. However, the Processor is being 
created when you are *defining* the topology and must not be closed from that 
same block of code, since it's used as long as the topology is actually 
*running*, which is happening in different threads.

As to init() and close() I think it would make sense to have them pulled out, 
however, again due to backwards-compatibility it's not as simple as it sounds.
Fortunately, once Java 7 compatibility is dropped, it will be possible to 
change their definition to a default method with an empty body. I think that 
would be backwards-compatible. That would leave only one abstract method for 
Processor and Transformer, process() and transform(), respectively. Since these 
are actually *different* from each other, I'd say that then there'd be no 
repetition.

Would that help your use-cases?

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-13 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5233:
-
Labels: kip  (was: )

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-05-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009365#comment-16009365
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

I've created KAFKA-5233 to track work related to KIP-138. As noted above, the 
considerations on this ticket span beyond the scope of KIP-138, which is 
agnostic to how the stream time gets advanced.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-13 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-5233:


 Summary: Changes to punctuate semantics (KIP-138)
 Key: KAFKA-5233
 URL: https://issues.apache.org/jira/browse/KAFKA-5233
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Michal Borowiecki
Assignee: Michal Borowiecki
 Fix For: 0.11.0.0


This ticket is to track implementation of 
[KIP-138: Change punctuate 
semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5201) Compacted topic could be misused to fill up a disk but deletion policy can't retain legitimate keys

2017-05-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002311#comment-16002311
 ] 

Michal Borowiecki commented on KAFKA-5201:
--

Hi Edoardo,
>From the broker's perspective there is no concept of legitimate vs 
>illegitimate keys AFAIK. What keys are legitimate depends on use-case, so 
>should the broker really care about that?
If this is to prevent disk being filled up by producers you don't trust to 
produce legitimate keys, perhaps quota + compact,delete cleanup can guard 
against that? What do you think about that?

> Compacted topic could be misused to fill up a disk but deletion policy can't 
> retain legitimate keys 
> 
>
> Key: KAFKA-5201
> URL: https://issues.apache.org/jira/browse/KAFKA-5201
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Edoardo Comar
>
> Misuse of a topic with cleanup policy = compact
> could lead to a disk being filled if a misbehaving producer keeps producing 
> messages with unique keys.
> The mixed cleanup policy compact,delete could be adopted, but would not 
> guarantee that the latest "legitimate" keys will be kept.
> It would be desirable to have a cleanup policy that attempts to preserve 
> messages with 'legitimate' keys
> This issue needs a KIP but I have no proposed solution yet at the time of 
> writing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not

2017-05-03 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15994575#comment-15994575
 ] 

Michal Borowiecki edited comment on KAFKA-5155 at 5/3/17 9:44 AM:
--

Hi [~huxi_2b],
Personally, I feel the similarity is superficial. KAFKA-4398 is about consuming 
messages in timestamp order, which challenges the current design and basically 
calls out for a new feature.

This ticket on the other hand is reporting a defect, with potential data loss, 
which violates the at-least-once semantics.
However, it does not challenge the design, simply points out that one line of 
code needs changing to cater for the case when msgs with and without timestamps 
are appended to the same segment, which IMHO is a non-contentious bugfix.



was (Author: mihbor):
Hi @huxi,
Personally, I feel the similarity is superficial. KAFKA-4398 is about consuming 
messages in timestamp order, which challenges the current design and basically 
calls out for a new feature.

This ticket on the other hand is reporting a defect, with potential data loss, 
which violates the at-least-once semantics.
However, it does not challenge the design, simply points out that one line of 
code needs changing to cater for the case when msgs with and without timestamps 
are appended to the same segment, which IMHO is a non-contentious bugfix.


> Messages can be deleted prematurely when some producers use timestamps and 
> some not
> ---
>
> Key: KAFKA-5155
> URL: https://issues.apache.org/jira/browse/KAFKA-5155
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Petr Plavjaník
>
> Some messages can be deleted prematurely and never read in following 
> scenario. A producer uses timestamps and produces messages that are appended 
> to the beginning of a log segment. Other producer produces messages without a 
> timestamp. In that case the largest timestamp is made by the old messages 
> with a timestamp and new messages with the timestamp does not influence and 
> the log segment with old and new messages can be delete immediately after the 
> last new message with no timestamp is appended. When all appended messages 
> have no timestamp, then they are not deleted because {{lastModified}} 
> attribute of a {{LogSegment}} is used.
> New test case to {{kafka.log.LogTest}} that fails:
> {code}
>   @Test
>   def 
> shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
> val retentionMs = 1000
> val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
> val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, 
> magicValue = 0)
> val log = createLog(set.sizeInBytes, retentionMs = retentionMs)
> // append some messages to create some segments
> log.append(old)
> for (_ <- 0 until 12)
>   log.append(set)
> assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
>   }
> {code}
> It can be prevented by using {{def largestTimestamp = 
> Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using 
> current timestamp when messages with timestamp {{-1}} are appended.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

2017-05-01 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990683#comment-15990683
 ] 

Michal Borowiecki commented on KAFKA-5144:
--

I understand now, thanks a lot for the thorough explanation!
Closed #2947 with the invalid tests.
Leaving #2948 open as I think it is still of value.

> MinTimestampTracker does not correctly add timestamps lower than the current 
> max
> 
>
> Key: KAFKA-5144
> URL: https://issues.apache.org/jira/browse/KAFKA-5144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements 
> greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find 
> any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990335#comment-15990335
 ] 

Michal Borowiecki commented on KAFKA-5144:
--

Added a second [PR|https://github.com/apache/kafka/pull/2948] which is just a 
refactoring (does not fix the issue!) to make reasoning about the code easier.
It renames variables to express their true meaning and adds comments where it 
matters.
This is a separate PR since it's independent of the tests. If I got it wrong 
somehow and the test cases are indeed invalid, this refactoring is still useful 
as it better documents what the code actually does.

> MinTimestampTracker does not correctly add timestamps lower than the current 
> max
> 
>
> Key: KAFKA-5144
> URL: https://issues.apache.org/jira/browse/KAFKA-5144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements 
> greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find 
> any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990329#comment-15990329
 ] 

Michal Borowiecki commented on KAFKA-5144:
--

[PR|https://github.com/apache/kafka/pull/2947/commits/a8b223b92c0aec31498e0d6043c8deffb1ea21ef]
 contains the cases that are failing, which I think are valid.
If someone can please confirm I'm not talking nonsense here, I'll proceed with 
a fix.

> MinTimestampTracker does not correctly add timestamps lower than the current 
> max
> 
>
> Key: KAFKA-5144
> URL: https://issues.apache.org/jira/browse/KAFKA-5144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements 
> greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find 
> any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

2017-04-30 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-5144:


 Summary: MinTimestampTracker does not correctly add timestamps 
lower than the current max
 Key: KAFKA-5144
 URL: https://issues.apache.org/jira/browse/KAFKA-5144
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.1
Reporter: Michal Borowiecki
Assignee: Michal Borowiecki


When adding elements MinTimestampTracker removes all existing elements greater 
than the added element.
Perhaps I've missed something and this is intended behaviour but I can't find 
any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990288#comment-15990288
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Agreed. That's what I meant. Beyond the scope of KIP-138 and let's keep it that 
way.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990176#comment-15990176
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

I think the description of this ticket is missing an important detail.
If my understanding is correct, it will behave as described if all the records 
arrive in a single batch.
However, if the records preceding the record with timestamp "1" come in a 
separate batch (I'll use brackets to depict batch boundaries):
{code}
Stream A: [5, 6, 7, 8, 9], [1, 10]

Stream B: [2, 3, 4, 5]
{code}
then initially the timestamp for stream A is going to be set to "5" (minimum of 
the first batch) and since it's not allowed to move back, the second batch 
containing the late arriving record "1" is not going to change that. Stream B 
is going to be drained first until "5".
However, if the batch boundaries are different by just one record and the late 
arriving "1" is in the first batch:
{code}
Stream A: [5, 6, 7, 8, 9, 1], [10]

Stream B: [2, 3, 4, 5]
{code}
 then it's going to behave as currently described.

Please correct me if I got this wrong.
But if that is the case, it feels all too non-deterministic and I think the 
timestamp computation deserves further thought beyond the scope of 
[KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics],
 which is limited to punctuate semantics, but not stream time semantics in 
general.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4593) Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException

2017-04-29 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4593:
-
Description: 
1. Assume 2 running threads A and B, and one task t1 just for simplicity. 
Thread A and B are on different machines so their local state dir are not 
shared.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following exception on thread A:

{code}
java.lang.IllegalStateException: task XXX Log end offset of
YYY-table_stream-changelog-ZZ should not change while
restoring: old end offset .., current offset ..

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
{code}

  was:
1. Assume 2 running threads A and B, and one task t1 jut for simplicity. Thread 
A and B are not different machines so their local state dir are not shared.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following 

[jira] [Updated] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken

2017-04-19 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5090:
-
Affects Version/s: 0.10.2.1

> Kafka Streams SessionStore.findSessions javadoc broken
> --
>
> Key: KAFKA-5090
> URL: https://issues.apache.org/jira/browse/KAFKA-5090
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1
>Reporter: Michal Borowiecki
>Priority: Trivial
>
> {code}
> /**
>  * Fetch any sessions with the matching key and the sessions end is  
> earliestEndTime and the sessions
>  * start is  latestStartTime
>  */
> KeyValueIterator findSessions(final K key, long 
> earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> The conditions in the javadoc comment are inverted (le should be ge and ge 
> shoudl be le), since this is what the code does. They were correct in the 
> original KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows
> {code}
> /**
>  * Find any aggregated session values with the matching key and where the
>  * session’s end time is >= earliestSessionEndTime, i.e, the oldest 
> session to
>  * merge with, and the session’s start time is <= latestSessionStartTime, 
> i.e,
>  * the newest session to merge with.
>  */
>KeyValueIterator findSessionsToMerge(final K key, final 
> long earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> Also, the escaped html character references are missing the trailing 
> semicolon making them render as-is.
> Happy to have this assigned to me to fix as it seems trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken

2017-04-19 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5090:
-
Comment: was deleted

(was: https://github.com/apache/kafka/pull/2874)

> Kafka Streams SessionStore.findSessions javadoc broken
> --
>
> Key: KAFKA-5090
> URL: https://issues.apache.org/jira/browse/KAFKA-5090
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>Priority: Trivial
>
> {code}
> /**
>  * Fetch any sessions with the matching key and the sessions end is  
> earliestEndTime and the sessions
>  * start is  latestStartTime
>  */
> KeyValueIterator findSessions(final K key, long 
> earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> The conditions in the javadoc comment are inverted (le should be ge and ge 
> shoudl be le), since this is what the code does. They were correct in the 
> original KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows
> {code}
> /**
>  * Find any aggregated session values with the matching key and where the
>  * session’s end time is >= earliestSessionEndTime, i.e, the oldest 
> session to
>  * merge with, and the session’s start time is <= latestSessionStartTime, 
> i.e,
>  * the newest session to merge with.
>  */
>KeyValueIterator findSessionsToMerge(final K key, final 
> long earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> Also, the escaped html character references are missing the trailing 
> semicolon making them render as-is.
> Happy to have this assigned to me to fix as it seems trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken

2017-04-19 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5090:
-
Status: Patch Available  (was: Open)

https://github.com/apache/kafka/pull/2874

> Kafka Streams SessionStore.findSessions javadoc broken
> --
>
> Key: KAFKA-5090
> URL: https://issues.apache.org/jira/browse/KAFKA-5090
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>Priority: Trivial
>
> {code}
> /**
>  * Fetch any sessions with the matching key and the sessions end is  
> earliestEndTime and the sessions
>  * start is  latestStartTime
>  */
> KeyValueIterator findSessions(final K key, long 
> earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> The conditions in the javadoc comment are inverted (le should be ge and ge 
> shoudl be le), since this is what the code does. They were correct in the 
> original KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows
> {code}
> /**
>  * Find any aggregated session values with the matching key and where the
>  * session’s end time is >= earliestSessionEndTime, i.e, the oldest 
> session to
>  * merge with, and the session’s start time is <= latestSessionStartTime, 
> i.e,
>  * the newest session to merge with.
>  */
>KeyValueIterator findSessionsToMerge(final K key, final 
> long earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> Also, the escaped html character references are missing the trailing 
> semicolon making them render as-is.
> Happy to have this assigned to me to fix as it seems trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken

2017-04-19 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-5090:


 Summary: Kafka Streams SessionStore.findSessions javadoc broken
 Key: KAFKA-5090
 URL: https://issues.apache.org/jira/browse/KAFKA-5090
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Michal Borowiecki
Priority: Trivial


{code}
/**
 * Fetch any sessions with the matching key and the sessions end is  
earliestEndTime and the sessions
 * start is  latestStartTime
 */
KeyValueIterator findSessions(final K key, long 
earliestSessionEndTime, final long latestSessionStartTime);
{code}

The conditions in the javadoc comment are inverted (le should be ge and ge 
shoudl be le), since this is what the code does. They were correct in the 
original KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows
{code}

/**
 * Find any aggregated session values with the matching key and where the
 * session’s end time is >= earliestSessionEndTime, i.e, the oldest session 
to
 * merge with, and the session’s start time is <= latestSessionStartTime, 
i.e,
 * the newest session to merge with.
 */
   KeyValueIterator findSessionsToMerge(final K key, final 
long earliestSessionEndTime, final long latestSessionStartTime);
{code}

Also, the escaped html character references are missing the trailing semicolon 
making them render as-is.

Happy to have this assigned to me to fix as it seems trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?

2017-04-04 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955260#comment-15955260
 ] 

Michal Borowiecki commented on KAFKA-4971:
--

I'd venture a guess that you are limited by something else than your hdd/ssd 
performance.
Is 1g your total memory in the VM? How much of it is allocated to the kafka jvm 
process?
Some things I can think of:
Is there a lot of activity in the gc.log?
Is the OS not swapping ferociously due to over-allocation of memory by any 
chance?

Hope that helps.

> Why is there no difference between kafka benchmark tests on SSD and HDD? 
> -
>
> Key: KAFKA-4971
> URL: https://issues.apache.org/jira/browse/KAFKA-4971
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.10.0.0
> Environment: Oracle VM VirtualBox
> OS : CentOs 7
> Memory : 1G
> Disk : 8GB
>Reporter: Dasol Kim
>
> I installed OS and kafka in the two SSD and two HDDs  to perform the kafka 
> benchmark test based on the disc difference. As expected, the SSD should show 
> faster results, but according to my experimental results, there is no big 
> difference between SSD and HDD. why? Ohter settings have been set to default.
> *test settings
> zookeeper node  : 1, producer node : 2, broker node : 2(SSD 1, HDD 1)
> test scenario : Two producers send messages to the broker and compare the 
> throughtput per second of kafka installed on SSD and kafka on HDD
> command : ./bin/kafka-producer-perf-test.sh --num-records 100 
> --record-size 2000 --topic test --throughput 10 --producer-props 
> bootstrap.servers=SN02:9092
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?

2017-04-03 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953608#comment-15953608
 ] 

Michal Borowiecki edited comment on KAFKA-4971 at 4/3/17 2:54 PM:
--

I think your question would be easier to respond to if you quantified it by 
providing your test results and the drive specs.
Kafka IO access patterns are designed to be sequential for good reason. 
Spinning disks and OS level buffering are optimised for such IO patterns, but I 
don't know if that alone can account for the miss-match between your 
expectations and the results you are getting on your hardware.


was (Author: mihbor):
I think your question would be easier to respond to if you quantified it by 
providing your test results and the drive specs.
Kafka IO access patterns are designed to be sequential for good reason. 
Spinning disks and OS level buffering are optimised for such IO patterns, but I 
don't know if that alone can account for the miss-match between your 
expectations and the results your getting on your hardware.

> Why is there no difference between kafka benchmark tests on SSD and HDD? 
> -
>
> Key: KAFKA-4971
> URL: https://issues.apache.org/jira/browse/KAFKA-4971
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.10.0.0
> Environment: Oracle VM VirtualBox
> OS : CentOs 7
> Memory : 1G
> Disk : 8GB
>Reporter: Dasol Kim
>
> I installed OS and kafka in the two SSD and two HDDs  to perform the kafka 
> benchmark test based on the disc difference. As expected, the SSD should show 
> faster results, but according to my experimental results, there is no big 
> difference between SSD and HDD. why? Ohter settings have been set to default.
> *test settings
> zookeeper node  : 1, producer node : 2, broker node : 2(SSD 1, HDD 1)
> test scenario : Two producers send messages to the broker and compare the 
> throughtput per second of kafka installed on SSD and kafka on HDD
> command : ./bin/kafka-producer-perf-test.sh --num-records 100 
> --record-size 2000 --topic test --throughput 10 --producer-props 
> bootstrap.servers=SN02:9092
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?

2017-04-03 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953608#comment-15953608
 ] 

Michal Borowiecki commented on KAFKA-4971:
--

I think your question would be easier to respond to if you quantified it by 
providing your test results and the drive specs.
Kafka IO access patterns are designed to be sequential for good reason. 
Spinning disks and OS level buffering are optimised for such IO patterns, but I 
don't know if that alone can account for the miss-match between your 
expectations and the results your getting on your hardware.

> Why is there no difference between kafka benchmark tests on SSD and HDD? 
> -
>
> Key: KAFKA-4971
> URL: https://issues.apache.org/jira/browse/KAFKA-4971
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.10.0.0
> Environment: Oracle VM VirtualBox
> OS : CentOs 7
> Memory : 1G
> Disk : 8GB
>Reporter: Dasol Kim
>
> I installed OS and kafka in the two SSD and two HDDs  to perform the kafka 
> benchmark test based on the disc difference. As expected, the SSD should show 
> faster results, but according to my experimental results, there is no big 
> difference between SSD and HDD. why? Ohter settings have been set to default.
> *test settings
> zookeeper node  : 1, producer node : 2, broker node : 2(SSD 1, HDD 1)
> test scenario : Two producers send messages to the broker and compare the 
> throughtput per second of kafka installed on SSD and kafka on HDD
> command : ./bin/kafka-producer-perf-test.sh --num-records 100 
> --record-size 2000 --topic test --throughput 10 --producer-props 
> bootstrap.servers=SN02:9092
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907523#comment-15907523
 ] 

Michal Borowiecki commented on KAFKA-4835:
--

My point above was that if the customerRef (which is what we partition by) was 
part of the key (not the whole key) then we'd still need to modify the key for 
the purpose of the join operation.
We'd need to do that for both streams, even though they would both be 
partitioned by the same part of the key, hence the re-partitioning (forced 
automatically by kafka streams) would be totally unnecessary.

In more generic terms, I think this can be a common use case. Let' consider it 
using DDD concepts. We have an aggregate comprised of multiple entities. We 
send messages for each entity (not the whole aggregate) but to ensure 
sequential processing for entities belonging to the same aggregate, the 
messages are partitioned by the aggregate id. The entity id is still important, 
especially for compacted topics it would be needed for deletion markers, as the 
key is all there is in that case. Hence it comes naturally to compose the 
message key as ::
Then, if you want to join two such streams by aggregate id, you should be able 
to do it without repartitioning (since both partitioned by the aggregate-id 
part of the msg key). However, since joins are only supported on the whole msg 
key, you're forced to re-map the key to just  prior to the join 
which in turn currently forces repartitioning.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-10 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904785#comment-15904785
 ] 

Michal Borowiecki commented on KAFKA-4835:
--

Yes, that's the case. Message key is a concatenation of activity type + 
activity id but the partitioning is done on the customer.

NB. I don't think it was wise for us to not put the key partitioned on in the 
msg key, however, that ship has sailed, I'm afraid. 
However, my understanding is that even if the partitioning key was part of the 
msg key (e.g. activity type + customerRef + activity id), we'd still be using a 
custom partitioner and we'd still have this issue.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897771#comment-15897771
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Oh, I wouldn't mind that at all. Just thought that you wanted to stick to event 
time semantics for this, but if you're not precious about that then I'm all for 
it :)

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897413#comment-15897413
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Thank you for responding.
Just now I had a thought about the semantics of event time.
It is already possible to provide a TimestampExtractor that determines what the 
event time is, given a message.
It's not far fetched to assume user should also want a way to specify what the 
event time is, given the absence of messages (on one or more input partitions).
Possibly by providing an implementation other than what 
PartitionGroup.timestamp() is doing based on the timestamps of its partitions.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897371#comment-15897371
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Hi [~enothereska],
I have to disagree. It is perfectly clear to me (from the documentation) that 
punctuate is based on event time, not system time. However, the problem is 
event time is not advanced reliably, since a single input stream that doesn't 
receive messages will cause the event time to not be advanced. In an extreme 
case of a poorly partitioned topic, I can imagine some partition may never get 
a message. That would cause a topology that has that partition as input to not 
advance event time ever, hence not fire punctuate ever, regardless of the 
presence of messages on its other input topics. In my opinion, if the purpose 
of punctuate is to perform periodic operations, then this flaw makes it unfit 
for that purpose. 

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2017-03-03 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894498#comment-15894498
 ] 

Michal Borowiecki commented on KAFKA-4601:
--

Created KAFKA-4835.

> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: performance
>
> Consider the following DSL:
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1").map(..);
> KTable counts = source
> .groupByKey()
> .count("Counts");
> KStream sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>   topics: [X-Counts-repartition]
>   children:   
> [KSTREAM-AGGREGATE-02]
>   KSTREAM-AGGREGATE-02:
>   states: [Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the 
> join, which not only introduce unnecessary overheads but also mess up the 
> processing ordering (users are expecting each record to go through 
> aggregation first then the join operator). And in order to get the following 
> simpler topology users today need to add a {{through}} operator after {{map}} 
> manually to enforce repartitioning.
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-SINK-02]
>   KSTREAM-SINK-02:
>   topic:  topic 2
> ProcessorTopology:
>   KSTREAM-SOURCE-03:
>   topics: [topic 2]
>   children:   
> [KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05]
>   KSTREAM-AGGREGATE-04:
>   states: [Counts]
>   KSTREAM-LEFTJOIN-05:
>   states: [Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can 
> consider doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4835) Allow users control over repartitioning

2017-03-03 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4835:


 Summary: Allow users control over repartitioning
 Key: KAFKA-4835
 URL: https://issues.apache.org/jira/browse/KAFKA-4835
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Michal Borowiecki


>From 
>https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030

...it would be good to provide users more control over the repartitioning. 
My use case is as follows (unrelated bits omitted for brevity):
{code}
KTable loggedInCustomers = builder
.stream("customerLogins")
.groupBy((key, activity) -> 
activity.getCustomerRef())
.reduce((first,second) -> second, loginStore());

builder
.stream("balanceUpdates")
.map((key, activity) -> new KeyValue<>(
activity.getCustomerRef(),
activity))
.join(loggedInCustomers, (activity, session) -> ...
.to("sessions");
{code}
Both "groupBy" and "map" in the underlying implementation set the 
repartitionRequired flag (since the key changes), and the aggregation/join that 
follows will create the repartitioned topic.
However, in our case I know that both input streams are already partitioned by 
the customerRef value, which I'm mapping into the key (because it's required by 
the join operation).
So there are 2 unnecessary intermediate topics created with their associated 
overhead, while the ultimate goal is simply to do a join on a value that we 
already use to partition the original streams anyway.
(Note, we don't have the option to re-implement the original input streams to 
make customerRef the message key.)

I think it would be better to allow the user to decide (from their knowledge of 
the incoming streams) whether a repartition is mandatory on aggregation and 
join operations (overloaded version of the methods with the repartitionRequired 
flag exposed maybe?)
An alternative would be to allow users to perform a join on a value other than 
the key (a keyValueMapper parameter to join, like the one used for joins with 
global tables), but I expect that to be more involved and error-prone to use 
for people who don't understand the partitioning requirements well (whereas 
it's safe for global tables).




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2017-02-23 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881030#comment-15881030
 ] 

Michal Borowiecki commented on KAFKA-4601:
--

Don't know if this belongs in this ticket or warrants a separate one, but I'd 
suggest, instead of trying to rely on kstreams doing more automatic 
optimization, it would be good to provide users more control over the 
repartitioning. 
My use case is as follows (unrelated bits omitted for brevity):
{code}
KTable loggedInCustomers = builder
.stream("customerLogins")
.groupBy((key, activity) -> 
activity.getCustomerRef())
.reduce((first,second) -> second, loginStore());

builder
.stream("balanceUpdates")
.map((key, activity) -> new KeyValue<>(
activity.getCustomerRef(),
activity))
.join(loggedInCustomers, (activity, session) -> ...
.to("sessions");
{code}
Both "groupBy" and "map" in the underlying implementation set the 
repartitionRequired flag (since the key changes), and the aggregation/join that 
follows will create the repartitioned topic.
However, in our case I know that both input streams are already partitioned by 
the customerRef value, which I'm mapping into the key (because it's required by 
the join operation).
So there are 2 unnecessary intermediate topics created with their associated 
overhead, while the ultimate goal is simply to do a join on a value that we 
already use to partition the original streams anyway.
(Note, we don't have the option to re-implement the original input streams to 
make customerRef the message key.)

I think it would be better to allow the user to decide (from their knowledge of 
the incoming streams) whether a repartition is mandatory on aggregation and 
join operations (overloaded version of the methods with the repartitionRequired 
flag exposed maybe?)
An alternative would be to allow users to perform a join on a value other than 
the key (a keyValueMapper parameter to join, like the one used for joins with 
global tables), but I expect that to be more involved and error-prone to use 
for people who don't understand the partitioning requirements well (whereas 
it's safe for global tables).


> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: performance
>
> Consider the following DSL:
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1").map(..);
> KTable counts = source
> .groupByKey()
> .count("Counts");
> KStream sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>   topics: [X-Counts-repartition]
>   children:   
> [KSTREAM-AGGREGATE-02]
>   KSTREAM-AGGREGATE-02:
>   states: 

[jira] [Created] (KAFKA-4750) KeyValueIterator returns null values

2017-02-09 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4750:


 Summary: KeyValueIterator returns null values
 Key: KAFKA-4750
 URL: https://issues.apache.org/jira/browse/KAFKA-4750
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Michal Borowiecki


The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
will not return null values. However, after upgrading from 0.10.0.0 to 0.10.1.1 
we found null values are returned causing NPEs on our side.

I found this happens after removing entries from the store and I found 
resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
deleting entries and having a serializer that does not return null when null is 
passed in, the state store doesn't actually delete that key/value pair but the 
iterator will return null value for that key.

When I modified our serilizer to return null when null is passed in, the 
problem went away. However, I believe this should be fixed in kafka streams, 
perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2016-11-21 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683056#comment-15683056
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

IMO, 2) *is* a severe problem. Punctuate methods (as described by their API) 
are meant to perform periodic operations. As it currently stands, if any of the 
input topics doesn't receive messages regularly, the punctuate method won't be 
called regularly either (due to the min offset across all partitions not 
advancing), which violates what the API promises. 
We've worked around it in our app by creating an independent stream and a 
scheduler sending ticks regularly to an input topic to a Transformer, so that 
it's punctuate method is called predictably but this is far from ideal.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-11-02 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628442#comment-15628442
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

KAFKA-4366 created for the KafkaSteams.close() hanging issue.

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream threads. Sometimes only on one 
> of the stream threads.
> The exception is preceded by:
> {noformat}
> [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,240] INFO 

[jira] [Created] (KAFKA-4366) KafkaStreams.close() blocks indefinitely

2016-11-02 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4366:


 Summary: KafkaStreams.close() blocks indefinitely
 Key: KAFKA-4366
 URL: https://issues.apache.org/jira/browse/KAFKA-4366
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.1, 0.10.1.0
Reporter: Michal Borowiecki
Assignee: Guozhang Wang


KafkaStreams.close() method calls join on all its threads without a timeout, 
meaning indefinitely, which makes it prone to deadlocks and unfit to be used in 
shutdown hooks.

(KafkaStreams::close is used in numerous examples by confluent: 
https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams
 and 
https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/
 so we assumed it to be recommended practice)

A deadlock happens, for instance, if System.exit() is called from within the 
uncaughtExceptionHandler. (We need to call System.exit() from the 
uncaughtExceptionHandler because KAFKA-4355 issue shuts down the StreamThread 
and to recover we want the process to exit, as our infrastructure will then 
start it up again.)

The System.exit call (from the uncaughtExceptionHandler, which runs in the 
StreamThread) will execute the shutdown hook in a new thread and wait for that 
thread to join. If the shutdown hook calls KafkaStreams.close, it will in turn 
block waiting for the StreamThread to join, hence the deadlock.

Runtime.addShutdownHook javadocs state:
{quote}
Shutdown hooks run at a delicate time in the life cycle of a virtual machine 
and should therefore be coded defensively. They should, in particular, be 
written to be thread-safe and to avoid deadlocks insofar as possible
{quote}
and
{quote}
Shutdown hooks should also finish their work quickly.
{quote}
Therefore the current implementation of KafkaStreams.close() which waits 
forever for threads to join is completely unsuitable for use in a shutdown 
hook. 





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-29 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15618009#comment-15618009
 ] 

Michal Borowiecki edited comment on KAFKA-4355 at 10/29/16 12:18 PM:
-

Trying to work around this issue by calling System.exit from the 
UncaughtExceptionHandler (once the app dies, it will be re-started by our 
infrastructure).
We are adding a shutdown hook as per example here: 
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/

{code:java}
Runtime.getRuntime().addShutdownHook(new 
Thread(schedulerStreams::close));
{code}

However, even though both stream threads report completion of shutdown:
{noformat}
[2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}
and before that report the closing of their producers and consumers, the app is 
not stopped.
At least the following 2 threads remain active and keep logging:
{noformat}
[2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590021 after 0ms 
(org.apache.zookeeper.ClientCnxn)
[2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] 
Sending metadata request {topics=[scheduler]} to node 0 
(org.apache.kafka.clients.NetworkClient)
[2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] 
Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, 
nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = 
[Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = 
[0,])]) (org.apache.kafka.clients.Metadata)
[2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590022 after 0ms 
(org.apache.zookeeper.ClientCnxn)
{noformat}

"Stopped Kafka Stream process" is never logged, so the close method remains 
blocked on the join here, I suspect:
https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227

PS. When we don't add the shutdown hook to call close(), then the app exits 
correctly on System.exit(). I think it is pretty bad behaviour if the close() 
method blocks indefinitely, so I'll raise a separate ticket, unless I find one 
exists for that already. It should be easier to reproduce hopefully.


was (Author: mihbor):
Trying to work around this issue by calling System.exit from the 
UncaughtExceptionHandler (once the app dies, it will be re-started by our 
infrastructure).
We are adding a shutdown hook as per example here: 
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/

{code:java}
Runtime.getRuntime().addShutdownHook(new 
Thread(schedulerStreams::close));
{code}

However, even though both stream threads report completion of shutdown:
{noformat}
[2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}
and before that report the closing of their producers and consumers, the app is 
not stopped.
At least the following 2 threads remain active and keep logging:
{noformat}
[2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590021 after 0ms 
(org.apache.zookeeper.ClientCnxn)
[2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] 
Sending metadata request {topics=[scheduler]} to node 0 
(org.apache.kafka.clients.NetworkClient)
[2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] 
Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, 
nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = 
[Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = 
[0,])]) (org.apache.kafka.clients.Metadata)
[2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590022 after 0ms 
(org.apache.zookeeper.ClientCnxn)
{noformat}

"Stopped Kafka Stream process" is never logged, so the close method remains 
blocked on the join here, I suspect:
https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227

> StreamThread intermittently dies with "Topic not found during partition 
> 

[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-29 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15618009#comment-15618009
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

Trying to work around this issue by calling System.exit from the 
UncaughtExceptionHandler (once the app dies, it will be re-started by our 
infrastructure).
We are adding a shutdown hook as per example here: 
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/

{code:java}
Runtime.getRuntime().addShutdownHook(new 
Thread(schedulerStreams::close));
{code}

However, even though both stream threads report completion of shutdown:
{noformat}
[2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}
and before that report the closing of their producers and consumers, the app is 
not stopped.
At least the following 2 threads remain active and keep logging:
{noformat}
[2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590021 after 0ms 
(org.apache.zookeeper.ClientCnxn)
[2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] 
Sending metadata request {topics=[scheduler]} to node 0 
(org.apache.kafka.clients.NetworkClient)
[2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] 
Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, 
nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = 
[Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = 
[0,])]) (org.apache.kafka.clients.Metadata)
[2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590022 after 0ms 
(org.apache.zookeeper.ClientCnxn)
{noformat}

"Stopped Kafka Stream process" is never logged, so the close method remains 
blocked on the join here, I suspect:
https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> 

[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15616118#comment-15616118
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

Perhaps the DefultPartitionGrouper here:
https://github.com/apache/kafka/blob/e7663a306f40e9fcbc3096d17fb0f99fa3d11d1d/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java#L81
should instead of StreamsException throw a RetriableException?
AbstractCoordinator would then keep looping instead of re-throwing it:
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L320

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> 

[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15615967#comment-15615967
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

My first suspect so far is the ConsumerCoordinator.
In this line: 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L301
it sets topics on the metadata from subscriptions, which the debugger shows to 
contain the correct topic name.
4 lines later: 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L305
it calls client.ensureFreshMetadata(), which can override the topics list.

Debugger shows that in the problematic case in 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L313
 the passed metadata object already has an empty set of topics, while the 
subscriptions object contains the topic name.

So I think the topic was removed from the metadata in line 305. 

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> 

[jira] [Created] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4355:


 Summary: StreamThread intermittently dies with "Topic not found 
during partition assignment" when broker restarted
 Key: KAFKA-4355
 URL: https://issues.apache.org/jira/browse/KAFKA-4355
 Project: Kafka
  Issue Type: Bug
  Components: streams
 Environment: kafka 0.10.0.0
kafka 0.10.1.0
Reporter: Michal Borowiecki
Assignee: Guozhang Wang


When (a) starting kafka streams app before the broker or
(b) restarting the broker while the streams app is running:
the stream thread intermittently dies with "Topic not found during partition 
assignment" StreamsException.
This happens about between one in 5 or one in 10 times.
Stack trace:
{noformat}
Exception in thread "StreamThread-2" 
org.apache.kafka.streams.errors.StreamsException: Topic not found during 
partition assignment: scheduler
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
{noformat}

Our app has 2 streams in it, consuming from 2 different topics.
Sometimes the exception happens on both stream threads. Sometimes only on one 
of the stream threads.

The exception is preceded by:
{noformat}
[2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group pool-scheduler 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Completed validating internal topics in partition assignor 

[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4355:
-
Environment: 
kafka 0.10.0.0
kafka 0.10.1.0

uname -a
Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
2016 x86_64 x86_64 x86_64 GNU/Linux

java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)


  was:
kafka 0.10.0.0
kafka 0.10.1.0


> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream threads. Sometimes only on one 

[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4355:
-
Description: 
When (a) starting kafka streams app before the broker or
(b) restarting the broker while the streams app is running:
the stream thread intermittently dies with "Topic not found during partition 
assignment" StreamsException.
This happens about between one in 5 or one in 10 times.
Stack trace:
{noformat}
Exception in thread "StreamThread-2" 
org.apache.kafka.streams.errors.StreamsException: Topic not found during 
partition assignment: scheduler
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
{noformat}

Our app has 2 streams in it, consuming from 2 different topics.
Sometimes the exception happens on both stream threads. Sometimes only on one 
of the stream threads.

The exception is preceded by:
{noformat}
[2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group pool-scheduler 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Completed validating internal topics in partition assignor 
(org.apache.kafka.streams.processor.internals.StreamPartitionAssignor)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Shutting down (org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Shutting down 

[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4355:
-
Affects Version/s: 0.10.1.0
   0.10.0.0

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream threads. Sometimes only on one 
> of the stream threads.
> The exception is preceded by:
> {noformat}
> [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
> lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
> lp02485.openbet:19373 (id: 2147483647 rack: null) for group