[jira] [Commented] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-07-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106404#comment-16106404
 ] 

Matthias J. Sax commented on KAFKA-5233:


I agree with [~mihbor] that we might want to keep the old tests. That's why I 
just added the {{@suppress}} annotation in the other PR you mentioned. At the 
point we decide to really remove deprecated APIs completely, all old code will 
go away anyway.

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 1.0.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5677) Remove deprecated punctuate method

2017-07-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106405#comment-16106405
 ] 

Matthias J. Sax commented on KAFKA-5677:


Not sure if we should do this. See my comment on KAFKA-5233

> Remove deprecated punctuate method
> --
>
> Key: KAFKA-5677
> URL: https://issues.apache.org/jira/browse/KAFKA-5677
> Project: Kafka
>  Issue Type: Task
>Reporter: Michal Borowiecki
>
> Task to track the removal of the punctuate method that got deprecated in 
> KAFKA-5233 and associated unit tests.
> (not sure the fix version number at this point)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5571) Possible deadlock during shutdown in setState in kafka streams 10.2

2017-07-07 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16078420#comment-16078420
 ] 

Matthias J. Sax commented on KAFKA-5571:


[~enothereska] Did some rework on state management in {{trunk}} recently. Guess 
he should know best :)

> Possible deadlock during shutdown in setState in kafka streams 10.2
> ---
>
> Key: KAFKA-5571
> URL: https://issues.apache.org/jira/browse/KAFKA-5571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Greg Fodor
> Attachments: kafka-streams.deadlock.log
>
>
> I'm running a 10.2 job across 5 nodes with 32 stream threads on each node and 
> find that when gracefully shutdown all of them at once via an ansible 
> scripts, some of the nodes end up freezing -- at a glance the attached thread 
> dump implies a deadlock between stream threads trying to update their state 
> via setState. We haven't had this problem before but it may or may not be 
> related to changes in 10.2 (we are upgrading from 10.0 to 10.2)
> when we gracefully shutdown all nodes simultaneously, what typically happens 
> is some subset of the nodes end up not shutting down completely but end up 
> going through a rebalance first. it seems this deadlock requires this 
> rebalancing to occur simultaneously with the graceful shutdown. if we happen 
> to shut them down and no rebalance happens, i don't believe this deadlock is 
> triggered.
> the deadlock appears related to the state change handlers being subscribed 
> across threads and the fact that both StreamThread#setState and 
> StreamStateListener#onChange are both synchronized methods.
> Another thing worth mentioning is that one of the transformers used in the 
> job has a close() method that can take 10-15 seconds to finish since it needs 
> to flush some data to a database. Having a long close() method combined with 
> a rebalance during a shutdown across many threads may be necessary for 
> reproduction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5603) Streams should not abort transaction when closing zombie task

2017-07-17 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5603:
--

 Summary: Streams should not abort transaction when closing zombie 
task
 Key: KAFKA-5603
 URL: https://issues.apache.org/jira/browse/KAFKA-5603
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
Priority: Critical
 Fix For: 0.11.0.1


The contract of the transactional producer API is to not call any transactional 
method after a {{ProducerFenced}} exception was thrown.

Streams however, does an unconditional call within {{StreamTask#close()}} to 
{{abortTransaction()}} in case of unclean shutdown. We need to distinguish 
between a {{ProducerFenced}} and other unclean shutdown cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5167) streams task gets stuck after re-balance due to LockException

2017-07-11 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5167:
---
Fix Version/s: 0.10.2.2

> streams task gets stuck after re-balance due to LockException
> -
>
> Key: KAFKA-5167
> URL: https://issues.apache.org/jira/browse/KAFKA-5167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
>Reporter: Narendra Kumar
>Assignee: Matthias J. Sax
> Fix For: 0.10.2.2, 0.11.0.1, 0.11.1.0
>
> Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once 
> from StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed 
> which I am closing in processor's close method. This instance's close method 
> throws some exception if I call close more than once. Because of this 
> exception, the Kafka streams does not attempt to close the statemanager ie.  
> task.closeStateManager(true) is never called. When a task moves from one 
> thread to another within same machine the task blocks trying to get lock on 
> state directory which is still held by unclosed statemanager and keep 
> throwing the below warning message:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5577) WindowedStreamPartitioner does not provide topic name to serializer

2017-07-10 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5577.

Resolution: Fixed

Fixed via https://github.com/apache/kafka/pull/2776

> WindowedStreamPartitioner does not provide topic name to serializer
> ---
>
> Key: KAFKA-5577
> URL: https://issues.apache.org/jira/browse/KAFKA-5577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.11.0.0, 0.10.2.1
>
>
> WindowedStreamPartitioner does not provide topic name to serializer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5577) WindowedStreamPartitioner does not provide topic name to serializer

2017-07-10 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5577:
--

 Summary: WindowedStreamPartitioner does not provide topic name to 
serializer
 Key: KAFKA-5577
 URL: https://issues.apache.org/jira/browse/KAFKA-5577
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
 Fix For: 0.11.0.0, 0.10.2.1


WindowedStreamPartitioner does not provide topic name to serializer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4490) Add Global Table support to Kafka Streams

2017-07-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16076901#comment-16076901
 ] 

Matthias J. Sax commented on KAFKA-4490:


See: https://issues.apache.org/jira/browse/KAFKA-4628

> Add Global Table support to Kafka Streams
> -
>
> Key: KAFKA-4490
> URL: https://issues.apache.org/jira/browse/KAFKA-4490
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> As per KIP-99 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams
> Add support for Global Tables



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5566) Instable test QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied

2017-07-06 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5566:
--

 Summary: Instable test 
QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied
 Key: KAFKA-5566
 URL: https://issues.apache.org/jira/browse/KAFKA-5566
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Matthias J. Sax
Assignee: Eno Thereska


This test failed about 4 times in the last 24h. Always the same stack trace so 
far:
{noformat}
java.lang.AssertionError: Condition not met within timeout 3. wait for agg 
to be '123'
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:274)
at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied(QueryableStateIntegrationTest.java:793)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4819) Expose states of active tasks to public API

2017-07-12 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4819:
---
Fix Version/s: 0.11.1.0

> Expose states of active tasks to public API
> ---
>
> Key: KAFKA-4819
> URL: https://issues.apache.org/jira/browse/KAFKA-4819
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Assignee: Florian Hussonnois
>Priority: Minor
>  Labels: kip
> Fix For: 0.11.1.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-18 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091762#comment-16091762
 ] 

Matthias J. Sax commented on KAFKA-5386:


For internal repartitioning topics, you can always avoid them by doing manual 
repartitioning via {{through("my-custom-topic-name")}}. For changelog topics no 
such workaround exist atm. Hope this helps.

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2017-07-20 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094568#comment-16094568
 ] 

Matthias J. Sax commented on KAFKA-4750:


As mentioned above. I would go with option (1) as null delete semantics also 
align with changelog delete semantics. Also, if we would allow to put {{null}} 
as regular value, we have the same issue as Java has in general: if a {{get}} 
returns {{null}} it's unclear if {{null}} is a regular value or the key is just 
not there. Within Java (eg. {{HashMap}}) you still can check with 
{{containsKey}} but we don't have an API for this on the stores.

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2017-07-20 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095776#comment-16095776
 ] 

Matthias J. Sax commented on KAFKA-4750:


I understand what you are saying, but encoding an empty collection into 
{{null}} would not work after all, as if we write this to the changelog, it 
might get delete during compaction resulting in potential data loss. Thus, 
Streams is limited by the underlying broker semantics and thus, users need to 
encode empty collections as not-null value anyway.

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores

2017-07-20 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095769#comment-16095769
 ] 

Matthias J. Sax commented on KAFKA-4468:


[~guozhang] We have to distinguish two cases: (1) specifying a window in the 
DSL via `TimeWindows` (plural). For this case, the window size is the same for 
all windows within the store. (2) instantiating a new concrete window via `new 
TimeWindow` (singular). This case could happen if someone uses PAPI with a 
custom window store and custom window logic.

I am not sure how common the second case is, but if we want to support it we 
would need to store the window size. Let me know if I am missing anything here. 
Or do you think, that the second case could be covered by the session store?

> Correctly calculate the window end timestamp after read from state stores
> -
>
> Key: KAFKA-4468
> URL: https://issues.apache.org/jira/browse/KAFKA-4468
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
>
> When storing the WindowedStore on the persistent KV store, we only use the 
> start timestamp of the window as part of the combo-key as (start-timestamp, 
> key). The reason that we do not add the end-timestamp as well is that we can 
> always calculate it from the start timestamp + window_length, and hence we 
> can save 8 bytes per key on the persistent KV store.
> However, after read it (via {{WindowedDeserializer}}) we do not set its end 
> timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix 
> this by calculating its end timestamp as mentioned above.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info

2017-07-20 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095784#comment-16095784
 ] 

Matthias J. Sax commented on KAFKA-5618:


Thanks for following up. If it hangs and the threads never start up it might be 
good to get DEBUG logs to see where is got stuck. Btw: half an hour might be a 
recovery time that could happen depending of the number of stores and store 
sized (ie, record size, number of distinct keys).

> Kafka stream not receive any topic/partitions/records info
> --
>
> Key: KAFKA-5618
> URL: https://issues.apache.org/jira/browse/KAFKA-5618
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, 
> rtp-kafkastreams.log
>
>
> I have 3 brokers and 3 stream consumers.
> I have there are 360 partitions and not able to bring up streams successfully 
> even after several retry.
> I have attached the logs. 
> There are other topics which are having around 16 partitions and they are 
> able to successfully be consumed by kafka client
> when tried getting thread dump using jstack the process is not responding
> Attaching to process ID 10663, please wait...
> Debugger attached successfully.
> Server compiler detected.
> JVM version is 24.141-b02
> Deadlock Detection:
> java.lang.RuntimeException: Unable to deduce type of thread from address 
> 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, 
> JvmtiAgentThread, or SurrogateLockerThread)
> at 
> sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162)
> at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-25 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100120#comment-16100120
 ] 

Matthias J. Sax commented on KAFKA-4327:


[~jeqo] I assigned this to you, as it is be covered by KIP-171.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-25 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4327:
--

Assignee: Jorge Quilcate  (was: Matthias J. Sax)

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16098591#comment-16098591
 ] 

Matthias J. Sax edited comment on KAFKA-5386 at 7/24/17 3:39 PM:
-

Well. With regard to ACL, you can still know the names of the changelog topics: 
They follow the pattern {{--changelog}} -- thus, as 
long as you specify a store name for each `builder.table()` and 
count/reduce/aggregate and joins, you would know the changelog topic names and 
could adjust the ACL accordingly. (only if you omit a store name, Streams 
generates one).

ATM, this feature request does not seem to be high priority. It always depends 
how many people ask for it. Of course, we are more than happy if anybody picks 
this up :) I guess, we would need a KIP though as this change impacts the 
public API.


was (Author: mjsax):
Well. With regard to ACL, you can still know the names of the changelog topics: 
They follow the pattern `--changelog` -- thus, as 
long as you specify a store name for each `builder.table()` and 
count/reduce/aggregate and joins, you would know the changelog topic names and 
could adjust the ACL accordingly. (only if you omit a store name, Streams 
generates one).

ATM, this feature request does not seem to be high priority. It always depends 
how many people ask for it. Of course, we are more than happy if anybody picks 
this up :) I guess, we would need a KIP though as this change impacts the 
public API.

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16098591#comment-16098591
 ] 

Matthias J. Sax commented on KAFKA-5386:


Well. With regard to ACL, you can still know the names of the changelog topics: 
They follow the pattern `--changelog` -- thus, as 
long as you specify a store name for each `builder.table()` and 
count/reduce/aggregate and joins, you would know the changelog topic names and 
could adjust the ACL accordingly. (only if you omit a store name, Streams 
generates one).

ATM, this feature request does not seem to be high priority. It always depends 
how many people ask for it. Of course, we are more than happy if anybody picks 
this up :) I guess, we would need a KIP though as this change impacts the 
public API.

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5648) make Merger extend Aggregator

2017-07-27 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16103319#comment-16103319
 ] 

Matthias J. Sax commented on KAFKA-5648:


The issue is, if one implements old {{Merger}} and does a hotswap of the 
Streams library without recompiling the code. The class ID will not match as 
the class hierarchy did change. The existing class would only implement 
{{Merger}} but not {{Merger extends Aggregator}}.

You are right, that if one recompiles the code, not code change is required to 
pass the build.

> make Merger extend Aggregator
> -
>
> Key: KAFKA-5648
> URL: https://issues.apache.org/jira/browse/KAFKA-5648
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Clemens Valiente
>Assignee: Clemens Valiente
>Priority: Minor
>
> Hi,
> I suggest that Merger should extend Aggregator.
> reason:
> Both classes usually do very similar things. A merger takes two sessions and 
> combines them, an aggregator takes an existing session and aggregates new 
> values into it.
> in some use cases it is actually the same thing, e.g.:
>  -> .map() to  -> 
> .groupByKey().aggregate() to 
> In this case both merger and aggregator do the same thing: take two lists and 
> combine them into one.
> With the proposed change we could pass the Merger as both the merger and 
> aggregator to the .aggregate() method and keep our business logic within one 
> merger class.
> Or in other words: The Merger is simply an Aggregator that happens to 
> aggregate two objects of the same class



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5671) Add StreamsBuilder and deprecate KStreamBuilder

2017-07-27 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5671:
--

 Summary: Add StreamsBuilder and deprecate KStreamBuilder
 Key: KAFKA-5671
 URL: https://issues.apache.org/jira/browse/KAFKA-5671
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101659#comment-16101659
 ] 

Matthias J. Sax commented on KAFKA-4327:


It's Streams tool, so it belongs to package `o.a.k.streams.tools` -- we only 
put it into core because of the ZK dependency and we did not want to add ZK 
dependency to streams module. \cc [~ijuma] [~guozhang] [~ewencp]

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101652#comment-16101652
 ] 

Matthias J. Sax commented on KAFKA-5386:


Understood. Technically, it would be possible to allow users to create 
changelog topic manually. But there are some strings attached. But we got the 
issues with naming conventions multiple times already. Maybe we need to do 
something about it. If you want to work on this, we would be more than happy. 
However, this change would require a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 
Let us know if you need any help preparing a KIP in case you want to pick it 
up. We can also discuss a little more on this JIRA. \cc [~miguno] [~guozhang] 
[~damianguy] [~enothereska] [~bbejeck]

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5648) make Merger extend Aggregator

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102119#comment-16102119
 ] 

Matthias J. Sax commented on KAFKA-5648:


Your observation is correct, that {{Merger}} and {{Aggregator}} are similar. 
You also stated correctly, that the types are different though, as the 
{{Merger}} merges two aggregates of same type, while the Aggregator in general 
merged a single value (of type-A) merges the value into an aggregate (of 
type-B). Thus, {{Merger extends Aggregator make Merger extend Aggregator
> -
>
> Key: KAFKA-5648
> URL: https://issues.apache.org/jira/browse/KAFKA-5648
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Clemens Valiente
>Assignee: Clemens Valiente
>Priority: Minor
>
> Hi,
> I suggest that Merger should extend Aggregator.
> reason:
> Both classes usually do very similar things. A merger takes two sessions and 
> combines them, an aggregator takes an existing session and aggregates new 
> values into it.
> in some use cases it is actually the same thing, e.g.:
>  -> .map() to  -> 
> .groupByKey().aggregate() to 
> In this case both merger and aggregator do the same thing: take two lists and 
> combine them into one.
> With the proposed change we could pass the Merger as both the merger and 
> aggregator to the .aggregate() method and keep our business logic within one 
> merger class.
> Or in other words: The Merger is simply an Aggregator that happens to 
> aggregate two objects of the same class



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-4750) KeyValueIterator returns null values

2017-07-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097368#comment-16097368
 ] 

Matthias J. Sax edited comment on KAFKA-4750 at 7/22/17 3:41 PM:
-

IMHO, we can also improve the code by not calling the serializer in the first 
place if we get a {{put(key, null)}}. Thus, we can insure that {{rawValue}} 
will be {{null}} even if the serializer might not return {{null}}.


was (Author: mjsax):
IMHO, we can also improve the code by not calling the serializer in the first 
place if we get a `put(key, null)`. Thus, we can insure that `rawValue` will be 
`null` even if the serializer might not return `null`.

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2017-07-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097368#comment-16097368
 ] 

Matthias J. Sax commented on KAFKA-4750:


IMHO, we can also improve the code by not calling the serializer in the first 
place if we get a `put(key, null)`. Thus, we can insure that `rawValue` will be 
`null` even if the serializer might not return `null`.

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5530) Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)

2017-06-29 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068951#comment-16068951
 ] 

Matthias J. Sax commented on KAFKA-5530:


Also, Kafka {{0.11.0.0}} was released today. It would be great if you could try 
it there to, to know if the new release is affected by this issue, too. Thanks 
a lot.


> Balancer is dancing with KStream all the time, and due to that Kafka cannot 
> work :-)
> 
>
> Key: KAFKA-5530
> URL: https://issues.apache.org/jira/browse/KAFKA-5530
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
> Environment: Linux, Windows
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: streamer-2.zip, streamer.zip
>
>
> Dears,
> There are problems with balancer in KStreams, when _num.stream.threads_ is 
> bigger than 1 and the number of the input topics are bigger than 1.
> I am doing more less such a setup in the code:
> {code:java}
> // loop over the inTopicName(s) {
> KStream stringInput = kBuilder.stream( STRING_SERDE, 
> STRING_SERDE, inTopicName );
> stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer 
> ).to( outTopicName );
> // } end of loop
> streams = new KafkaStreams( kBuilder, streamsConfig );
> streams.cleanUp();
> streams.start();
> {code}
> And if there are *_num.stream.threads=4_* but there are 2 or more but less 
> than num.stream.threads inTopicNames, then complete application startup is 
> totally self-blocked, by writing endless starnge things in log and not 
> starting.
> Even more problematic is when the nuber of topics is higher than 
> _num.stream.threads_ what I had commented in *KAFKA-5167 streams task gets 
> stuck after re-balance due to LockException*.
> I am attaching logs for two scenarios:
>  * when: 1 < num.stream.threads < numer of topics (KAFKA-5167)
>  * when: 1 < numer of topics < num.stream.threads (this ticket).
> I can fully reproduce the behaviour. Even I found workaround for it, but is 
> not desired. When _num.stream.threads=1_ than all works fine :-(.
> {code:bash}
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-3] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 2.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread 
> [StreamThread-1] Constructed client metadata 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, 
> consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-ab798efe-16a6-4686-bdee-ccd50c937cd7],
>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
> subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-1] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: 

[jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores

2017-06-29 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068935#comment-16068935
 ] 

Matthias J. Sax commented on KAFKA-4468:


[~evis] In 
{{org.apache.kafka.streams.kstream.internals.WindowedDeserializer#deserialize}} 
we instantiate an {{UnlimitedWindow}} as return object. However, we should 
instantiate a {{TimeWindow}} (or {{SessionWindow}}?). For this, we need to get 
the window end time stamp. We (you :)) need to figure out, how we can get the 
time window size (and window type?) into the deserializer to allow us to 
compute the window end timestamp (base on start and size) to return the correct 
window. Thinking about this, we might even need different (de)serializers for 
different window types. Out of my head, I am not sure how we handle/encode 
{{SessionWindows}} (\cc [~damianguy]). I am also wondering, how we should 
handle different window types (ie, {{TimeWindow}} vs {{UnlimetedWIndow}}).

> Correctly calculate the window end timestamp after read from state stores
> -
>
> Key: KAFKA-4468
> URL: https://issues.apache.org/jira/browse/KAFKA-4468
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
>
> When storing the WindowedStore on the persistent KV store, we only use the 
> start timestamp of the window as part of the combo-key as (start-timestamp, 
> key). The reason that we do not add the end-timestamp as well is that we can 
> always calculate it from the start timestamp + window_length, and hence we 
> can save 8 bytes per key on the persistent KV store.
> However, after read it (via {{WindowedDeserializer}}) we do not set its end 
> timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix 
> this by calculating its end timestamp as mentioned above.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5541) Streams should not re-throw if suspending/closing task fails

2017-06-29 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5541:
--

 Summary: Streams should not re-throw if suspending/closing task 
fails
 Key: KAFKA-5541
 URL: https://issues.apache.org/jira/browse/KAFKA-5541
 Project: Kafka
  Issue Type: Bug
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax


Currently, if Stream suspends a task on rebalance or closes a suspended task 
that got revoked, it re-throws any exception that might occur and the thread 
dies. However, this in not really necessary as the task was suspended/closed 
anyway and we can just clean up the task and carry on with the rebalance.

(cf comments https://github.com/apache/kafka/pull/3449#discussion_r124437816)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5167) streams task gets stuck after re-balance due to LockException

2017-06-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16067362#comment-16067362
 ] 

Matthias J. Sax commented on KAFKA-5167:


[~habdank] Seems you created https://issues.apache.org/jira/browse/KAFKA-5530 
for this. Let's take it from there.

> streams task gets stuck after re-balance due to LockException
> -
>
> Key: KAFKA-5167
> URL: https://issues.apache.org/jira/browse/KAFKA-5167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
>Reporter: Narendra Kumar
>Assignee: Matthias J. Sax
> Fix For: 0.10.2.2, 0.11.0.1, 0.11.1.0
>
> Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once 
> from StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed 
> which I am closing in processor's close method. This instance's close method 
> throws some exception if I call close more than once. Because of this 
> exception, the Kafka streams does not attempt to close the statemanager ie.  
> task.closeStateManager(true) is never called. When a task moves from one 
> thread to another within same machine the task blocks trying to get lock on 
> state directory which is still held by unclosed statemanager and keep 
> throwing the below warning message:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-06-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070474#comment-16070474
 ] 

Matthias J. Sax edited comment on KAFKA-5545 at 6/30/17 5:54 PM:
-

Hi. The messages with the stack trace

{noformat}
11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could 
not create task 0_5. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
state directory for task 0_5
{noformat}

are WARN level messages. Those are expected during rebalance and they should 
resolve automatically. And it seems, that it did resolve automatically, as you 
get 

{noformat}
11:04:13.642 [StreamThread-44] INFO o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-44] Committing all tasks because the commit 
interval 1ms has elapsed
{noformat}

later on. The logs don't show any ERROR logs so I am not sure what the actual 
issues if you observe. Does you app not work properly after the WARN logs about 
the lock go away?

One more question:

{quote}
we cleanup the stream, rebuild topology(tried with reusing topology) and start 
the stream again
{quote}

What exact steps are you doing here? Send a `kill` signal? Call 
`KafkaStreams#close()`? Btw: you should be able to reuse the topology -- you 
just need to create a new KafkaStreams instance with new config. If this does 
not work for you, we should have look into this too.


was (Author: mjsax):
Hi. The messages with the stack trace

{noformat}
11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could 
not create task 0_5. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
state directory for task 0_5
{noformat}

are WARN level messages. Those are expected during rebalance and they should 
resolve automatically. And it seems, that it did resolve automatically, as you 
get 

{noformat}
11:04:13.642 [StreamThread-44] INFO o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-44] Committing all tasks because the commit 
interval 1ms has elapsed
{noformat}

later on. The logs don't show any ERROR logs so I am not sure what the actual 
issues if you observe. Does you app not work properly after the WARN logs about 
the lock go away?

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Critical
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5530) Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)

2017-07-03 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072596#comment-16072596
 ] 

Matthias J. Sax commented on KAFKA-5530:


Glad you could resolve your problem. We did change the default of 
{{max.poll.interval.ms}} to infinite (ie, {{Integer.MAX_VALUE}} already. And we 
are working on some more improvements so Streams should be more robust if the 
value is small, too (cf. https://issues.apache.org/jira/browse/KAFKA-5152)

> Balancer is dancing with KStream all the time, and due to that Kafka cannot 
> work :-)
> 
>
> Key: KAFKA-5530
> URL: https://issues.apache.org/jira/browse/KAFKA-5530
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
> Environment: Linux, Windows
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: streamer_20-topics_1-thread-K-0.11.0.0.log.zip, 
> streamer_20-topics_4-threads-K-0.11.0.0.log.zip, 
> streamer_2-topics_1-thread-K-0.11.0.0.log.zip, 
> streamer_2-topics_4_threads-K-0.11.0.0.log.zip
>
>
> Dears,
> There are problems with balancer in KStreams (v. 0.10.2.x), when 
> _num.stream.threads_ is bigger than 1 and the number of the input topics are 
> bigger than 1.
> I am doing more less such a setup in the code:
> {code:java}
> // loop over the inTopicName(s) {
> KStream stringInput = kBuilder.stream( STRING_SERDE, 
> STRING_SERDE, inTopicName );
> stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer 
> ).to( outTopicName );
> // } end of loop
> streams = new KafkaStreams( kBuilder, streamsConfig );
> streams.cleanUp();
> streams.start();
> {code}
> And if there are *_num.stream.threads=4_* but there are 2 or more but less 
> than num.stream.threads inTopicNames, then complete application startup is 
> totally self-blocked, by writing endless starnge things in log and not 
> starting.
> Even more problematic is when the nuber of topics is higher than 
> _num.stream.threads_ what I had commented in *KAFKA-5167 streams task gets 
> stuck after re-balance due to LockException*.
> I am attaching logs for two scenarios:
>  * when: 1 < num.stream.threads < numer of topics (KAFKA-5167)
>  * when: 1 < numer of topics < num.stream.threads (this ticket).
> I can fully reproduce the behaviour. Even I found workaround for it, but is 
> not desired. When _num.stream.threads=1_ than all works fine :-( (for K v. 
> 0.10.2.x, v. 0.11.0.0 does not work at all).
> {code:bash}
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-3] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 2.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread 
> [StreamThread-1] Constructed client metadata 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, 
> consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-ab798efe-16a6-4686-bdee-ccd50c937cd7],
>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
> subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-1] Completed validating internal topics in partition 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072587#comment-16072587
 ] 

Matthias J. Sax commented on KAFKA-5545:


Can you do me a favor and attach the log file instead of c into the comments? 
I quite hard to keep an overview. Thanks a lot!

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-03 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072622#comment-16072622
 ] 

Matthias J. Sax commented on KAFKA-5528:


[~nravi] I guess the problem is, that you are using a single instance of 
{{GenericProcessor}}. In {{addProcessor}} you return the same object on each 
call, but you need to return a new instance each time. Can you try to change to 
{{addProcessor("PROCESS", () => new GenericProcessor[T](serDe, decrypt, 
config), "SOURCE")}} ? Let us know if this fixes your problem or not.

We see this question regularly lately. Going to add an FAQ :)

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores

2017-07-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16076830#comment-16076830
 ] 

Matthias J. Sax commented on KAFKA-4468:


Thanks for following up. About storing the window size -- it would be good if 
we could avoid this. The example you are doing using windows with different 
sized -- but at DSL level, we know that all windows of a single operator have 
the same size. Thus, we might be able to avoid storing the size. Of course, if 
would be more flexible if we just store the size and sacrifice the space 
saving. Don't have a strong opinion on this. Would be nice to get the opinion 
of others, too.

> Correctly calculate the window end timestamp after read from state stores
> -
>
> Key: KAFKA-4468
> URL: https://issues.apache.org/jira/browse/KAFKA-4468
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
>
> When storing the WindowedStore on the persistent KV store, we only use the 
> start timestamp of the window as part of the combo-key as (start-timestamp, 
> key). The reason that we do not add the end-timestamp as well is that we can 
> always calculate it from the start timestamp + window_length, and hence we 
> can save 8 bytes per key on the persistent KV store.
> However, after read it (via {{WindowedDeserializer}}) we do not set its end 
> timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix 
> this by calculating its end timestamp as mentioned above.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075056#comment-16075056
 ] 

Matthias J. Sax commented on KAFKA-5070:


I was working on https://issues.apache.org/jira/browse/KAFKA-5167 and hoping 
that it will cover this JIRA as well. Thoughts?

> org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the 
> state directory: /opt/rocksdb/pulse10/0_18
> 
>
> Key: KAFKA-5070
> URL: https://issues.apache.org/jira/browse/KAFKA-5070
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux Version
>Reporter: Dhana
>Assignee: Matthias J. Sax
> Attachments: RocksDB_LockStateDirec.7z
>
>
> Notes: we run two instance of consumer in two difference machines/nodes.
> we have 400 partitions. 200  stream threads/consumer, with 2 consumer.
> We perform HA test(on rebalance - shutdown of one of the consumer/broker), we 
> see this happening
> Error:
> 2017-04-05 11:36:09.352 WARN  StreamThread:1184 StreamThread-66 - Could not 
> create task 0_115. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock 
> the state directory: /opt/rocksdb/pulse10/0_115
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-05 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5528.

Resolution: Not A Bug

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075191#comment-16075191
 ] 

Matthias J. Sax commented on KAFKA-5528:


Added FAQ: 
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata?

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5167) streams task gets stuck after re-balance due to LockException

2017-06-27 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5167:


This seems to got broken via 
https://github.com/apache/kafka/commit/f7b7b4745541a576eb0219468263487b07bac959 
again. Thus, not fixed for {{0.11.0.0}}.

> streams task gets stuck after re-balance due to LockException
> -
>
> Key: KAFKA-5167
> URL: https://issues.apache.org/jira/browse/KAFKA-5167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0, 0.10.2.1
>Reporter: Narendra Kumar
>Assignee: Matthias J. Sax
> Fix For: 0.11.1.0, 0.10.2.2, 0.11.0.1
>
> Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once 
> from StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed 
> which I am closing in processor's close method. This instance's close method 
> throws some exception if I call close more than once. Because of this 
> exception, the Kafka streams does not attempt to close the statemanager ie.  
> task.closeStateManager(true) is never called. When a task moves from one 
> thread to another within same machine the task blocks trying to get lock on 
> state directory which is still held by unclosed statemanager and keep 
> throwing the below warning message:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4344) Exception when accessing partition, offset and timestamp in processor class

2017-06-27 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065505#comment-16065505
 ] 

Matthias J. Sax commented on KAFKA-4344:


What version are you using?

> Exception when accessing partition, offset and timestamp in processor class
> ---
>
> Key: KAFKA-4344
> URL: https://issues.apache.org/jira/browse/KAFKA-4344
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: saiprasad mishra
>Assignee: Guozhang Wang
>Priority: Minor
>
> I have a kafka stream pipeline like below
> source topic stream -> filter for null value ->map to make it keyed by id 
> ->custom processor to mystore ->to another topic -> ktable
> I am hitting the below type of exception in a custom processor class if I try 
> to access offset() or partition() or timestamp() from the ProcessorContext in 
> the process() method. I was hoping it would return the partition and offset 
> for the enclosing topic(in this case source topic) where its consuming from 
> or -1 based on the api docs.
> java.lang.IllegalStateException: This should not happen as offset() should 
> only be called while a record is processed
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
>   at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
>  [kafka-streams-0.10.1.0.jar!/:?]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-06-27 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5528:
---
Description: 
We are encountering an {{IllegalStateException}} while trying to access 
{{context.topic()}} from process function. The code is written in Scala and is 
being launched using sbt (spring isn't involved). Here's the code sketch:

{noformat}
class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: 
Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] 
with LazyLogging {
  private var hsmClient: HSMClient = _
  override def init(processorContext: ProcessorContext): Unit = { 
super.init(processorContext) 
hsmClient = HSMClient(config).getOrElse(null) 
  }
  override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
val topic: String = this.context.topic() 
partition: Int = this.context.partition() 
val offset: Long = this.context.offset() 
val timestamp: Long = this.context.timestamp() 
// business logic 
  }
}
{noformat}

The exception is thrown only for the multi-consumer case (when number of 
partitions for a topic > 1 and parallelism > 1). 

  was:
We are encountering an IllegalStateException while trying to access 
context.topic() from process function. The code is written in Scala and is 
being launched using sbt (spring isn't involved). Here's the code sketch:

class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: 
Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] 
with LazyLogging {
  private var hsmClient: HSMClient = _
  override def init(processorContext: ProcessorContext): Unit = { 
super.init(processorContext) 
hsmClient = HSMClient(config).getOrElse(null) 
  }
  override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
val topic: String = this.context.topic() 
partition: Int = this.context.partition() 
val offset: Long = this.context.offset() 
val timestamp: Long = this.context.timestamp() 
// business logic 
  }
}
The exception is thrown only for the multi-consumer case (when number of 
partitions for a topic > 1 and parallelism > 1). 


> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4344) Exception when accessing partition, offset and timestamp in processor class

2017-06-27 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065609#comment-16065609
 ] 

Matthias J. Sax commented on KAFKA-4344:


You did create https://issues.apache.org/jira/browse/KAFKA-5528 for this, 
right. So let's track it there.

> Exception when accessing partition, offset and timestamp in processor class
> ---
>
> Key: KAFKA-4344
> URL: https://issues.apache.org/jira/browse/KAFKA-4344
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: saiprasad mishra
>Assignee: Guozhang Wang
>Priority: Minor
>
> I have a kafka stream pipeline like below
> source topic stream -> filter for null value ->map to make it keyed by id 
> ->custom processor to mystore ->to another topic -> ktable
> I am hitting the below type of exception in a custom processor class if I try 
> to access offset() or partition() or timestamp() from the ProcessorContext in 
> the process() method. I was hoping it would return the partition and offset 
> for the enclosing topic(in this case source topic) where its consuming from 
> or -1 based on the api docs.
> java.lang.IllegalStateException: This should not happen as offset() should 
> only be called while a record is processed
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
>   at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
>  [kafka-streams-0.10.1.0.jar!/:?]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2017-07-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3745.

Resolution: Duplicate

See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: api, needs-kip, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4726) ValueMapper should have (read) access to key

2017-07-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4726:
--

Assignee: Jeyhun Karimov

> ValueMapper should have (read) access to key
> 
>
> Key: KAFKA-4726
> URL: https://issues.apache.org/jira/browse/KAFKA-4726
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Assignee: Jeyhun Karimov
>  Labels: kip
>
> {{ValueMapper}} should have read-only access to the key for the value it is 
> mapping.  Sometimes the value transformation will depend on the key.
> It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
> the promise that you won't change the key -- so you might introduce a 
> re-keying phase that is totally unnecessary.  It also requires you to return 
> an identity KeyValue object which costs something to construct (unless we are 
> lucky and the optimizer elides it).
> [ If mapValues() is guaranteed to be no less efficient than map() the issue 
> may be moot, but I presume there are some optimizations that are valid with 
> the former but not latter. ]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5318) Streams state may be misleading

2017-07-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16077336#comment-16077336
 ] 

Matthias J. Sax commented on KAFKA-5318:


Is this contained by KAFKA-5372 ? \cc [~enothereska]

> Streams state may be misleading
> ---
>
> Key: KAFKA-5318
> URL: https://issues.apache.org/jira/browse/KAFKA-5318
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Joao
>
> I rely on "org.apache.kafka.streams.KafkaStreams#state" to know if my current 
> stream instance is properly running. If it becomes unhealthy my provisioning 
> system (Kubernetes) automatically restarts/replaces the instance.
> One of such instance encountered bug 
> https://issues.apache.org/jira/browse/KAFKA-5167.
> The issue is that during the whole time my instance was affected by the 
> linked bug, the stream state was considered healthy when in fact it was not. 
> My instance did not recover automatically from the LockException and I 
> happened to notice something was wrong because I monitor the stream delay, 
> which went into abnormal values.
> This ultimately means that the kafka stream state is unreliable at describing 
> if an instance is actually running as intended.
> There are some improvements in the works from what I was told, such as 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer

2017-07-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4125:
---
Labels: kip  (was: )

> Provide low-level Processor API meta data in DSL layer
> --
>
> Key: KAFKA-4125
> URL: https://issues.apache.org/jira/browse/KAFKA-4125
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: kip
> Fix For: 0.11.1.0
>
>
> For Processor API, user can get meta data like record offset, timestamp etc 
> via the provided {{Context}} object. It might be useful to allow uses to 
> access this information in DSL layer, too.
> The idea would be, to do it "the Flink way", ie, by providing
> RichFunctions; {{mapValue()}} for example.
> Is takes a {{ValueMapper}} that only has method
> {noformat}
> V2 apply(V1 value);
> {noformat}
> Thus, you cannot get any meta data within apply (it's completely "blind").
> We would add two more interfaces: {{RichFunction}} with a method
> {{open(Context context)}} and
> {noformat}
> RichValueMapper extends ValueMapper, RichFunction
> {noformat}
> This way, the user can chose to implement Rich- or Standard-function and
> we do not need to change existing APIs. Both can be handed into
> {{KStream.mapValues()}} for example. Internally, we check if a Rich
> function is provided, and if yes, hand in the {{Context}} object once, to
> make it available to the user who can now access it within {{apply()}} -- or
> course, the user must set a member variable in {{open()}} to hold the
> reference to the Context object.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer

2017-07-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4125:
---
Description: 
For Processor API, user can get meta data like record offset, timestamp etc via 
the provided {{Context}} object. It might be useful to allow uses to access 
this information in DSL layer, too.

The idea would be, to do it "the Flink way", ie, by providing
RichFunctions; {{mapValue()}} for example.

Is takes a {{ValueMapper}} that only has method

{noformat}
V2 apply(V1 value);
{noformat}

Thus, you cannot get any meta data within apply (it's completely "blind").

We would add two more interfaces: {{RichFunction}} with a method
{{open(Context context)}} and

{noformat}
RichValueMapper extends ValueMapper, RichFunction
{noformat}

This way, the user can chose to implement Rich- or Standard-function and
we do not need to change existing APIs. Both can be handed into
{{KStream.mapValues()}} for example. Internally, we check if a Rich
function is provided, and if yes, hand in the {{Context}} object once, to
make it available to the user who can now access it within {{apply()}} -- or
course, the user must set a member variable in {{open()}} to hold the
reference to the Context object.

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

  was:
For Processor API, user can get meta data like record offset, timestamp etc via 
the provided {{Context}} object. It might be useful to allow uses to access 
this information in DSL layer, too.

The idea would be, to do it "the Flink way", ie, by providing
RichFunctions; {{mapValue()}} for example.

Is takes a {{ValueMapper}} that only has method

{noformat}
V2 apply(V1 value);
{noformat}

Thus, you cannot get any meta data within apply (it's completely "blind").

We would add two more interfaces: {{RichFunction}} with a method
{{open(Context context)}} and

{noformat}
RichValueMapper extends ValueMapper, RichFunction
{noformat}

This way, the user can chose to implement Rich- or Standard-function and
we do not need to change existing APIs. Both can be handed into
{{KStream.mapValues()}} for example. Internally, we check if a Rich
function is provided, and if yes, hand in the {{Context}} object once, to
make it available to the user who can now access it within {{apply()}} -- or
course, the user must set a member variable in {{open()}} to hold the
reference to the Context object.


> Provide low-level Processor API meta data in DSL layer
> --
>
> Key: KAFKA-4125
> URL: https://issues.apache.org/jira/browse/KAFKA-4125
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: kip
> Fix For: 0.11.1.0
>
>
> For Processor API, user can get meta data like record offset, timestamp etc 
> via the provided {{Context}} object. It might be useful to allow uses to 
> access this information in DSL layer, too.
> The idea would be, to do it "the Flink way", ie, by providing
> RichFunctions; {{mapValue()}} for example.
> Is takes a {{ValueMapper}} that only has method
> {noformat}
> V2 apply(V1 value);
> {noformat}
> Thus, you cannot get any meta data within apply (it's completely "blind").
> We would add two more interfaces: {{RichFunction}} with a method
> {{open(Context context)}} and
> {noformat}
> RichValueMapper extends ValueMapper, RichFunction
> {noformat}
> This way, the user can chose to implement Rich- or Standard-function and
> we do not need to change existing APIs. Both can be handed into
> {{KStream.mapValues()}} for example. Internally, we check if a Rich
> function is provided, and if yes, hand in the {{Context}} object once, to
> make it available to the user who can now access it within {{apply()}} -- or
> course, the user must set a member variable in {{open()}} to hold the
> reference to the Context object.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime

2017-07-27 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104122#comment-16104122
 ] 

Matthias J. Sax commented on KAFKA-5660:


{{TopologyBuilderException}} got deprecated via KIP-120 and new 
{{TopologyException}} was introduces. Thus, we need to double check 
{{TopologyException}}, too.

> Don't throw TopologyBuilderException during runtime
> ---
>
> Key: KAFKA-5660
> URL: https://issues.apache.org/jira/browse/KAFKA-5660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be 
> thrown {{KafkaStreams#start()}} is called.
> However, we do throw {{TopologyBuilderException}} within
> - `SourceNodeFactory#getTopics`
> - `ProcessorContextImpl#getStateStore`
> (and maybe somewhere else: we should double check if there are other places 
> in the code like those).
> We should replace those exception with either {{StreamsException}} or with a 
> new exception type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5576) Support Power platform by updating rocksdb

2017-07-27 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5576:
---
Component/s: streams

> Support Power platform by updating rocksdb
> --
>
> Key: KAFKA-5576
> URL: https://issues.apache.org/jira/browse/KAFKA-5576
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: $ cat /etc/lsb-release
> DISTRIB_ID=Ubuntu
> DISTRIB_RELEASE=14.04
> DISTRIB_CODENAME=trusty
> DISTRIB_DESCRIPTION="Ubuntu 14.04.2 LTS"
> $ uname -a
> Linux pts00432-vm20 3.16.0-30-generic #40~14.04.1-Ubuntu SMP Thu Jan 15 
> 17:42:36 UTC 2015 ppc64le ppc64le ppc64le GNU/Linux
>Reporter: Yussuf Shaikh
>Assignee: Yussuf Shaikh
>Priority: Minor
> Fix For: 1.0.0
>
> Attachments: KAFKA-5576.patch, kafka-stream.txt
>
>
> Many test cases are failing with one to the following exceptions related to 
> rocksdb.
> 1. java.lang.NoClassDefFoundError: Could not initialize class 
> org.rocksdb.Options
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:119)
> at 
> org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40)
> 2. java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni4427030040392983276.so: 
> /lib64/ld64.so.2: version `GLIBC_2.22' not found (required by 
> /tmp/librocksdbjni4427030040392983276.so)
> at java.lang.ClassLoader$NativeLibrary.load(Native Method)
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
> at java.lang.Runtime.load0(Runtime.java:809)
> at java.lang.System.load(System.java:1086)
> at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
> 3. java.lang.AssertionError: Condition not met within timeout 3. 
> Expecting 3 records from topic output-topic-2 while only received 0: []
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:274)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:160)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5156) Options for handling exceptions in streams

2017-08-22 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5156:
--

Assignee: Matthias J. Sax  (was: Eno Thereska)

> Options for handling exceptions in streams
> --
>
> Key: KAFKA-5156
> URL: https://issues.apache.org/jira/browse/KAFKA-5156
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Matthias J. Sax
>  Labels: user-experience
> Fix For: 1.0.0
>
>
> This is a task around options for handling exceptions in streams. It focuses 
> around options for dealing with corrupt data (keep going, stop streams, log, 
> retry, etc).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5765) Move merge() from StreamsBuilder to KStream

2017-08-22 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5765:
--

 Summary: Move merge() from StreamsBuilder to KStream
 Key: KAFKA-5765
 URL: https://issues.apache.org/jira/browse/KAFKA-5765
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax


Merging multiple {{KStream}} is done via {{StreamsBuilder#merge()}} (formally 
{{KStreamBuilder#merge()}}). This is quite unnatural and should be done via 
{{KStream#merge()}}.

As {{StreamsBuilder}} is not released yet, this is not a backward incompatible 
change (and KStreamBuilder is already deprecated). We still need a KIP as we 
add a new method to a public {{KStreams}} API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18

2017-08-20 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5070.

Resolution: Duplicate

> org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the 
> state directory: /opt/rocksdb/pulse10/0_18
> 
>
> Key: KAFKA-5070
> URL: https://issues.apache.org/jira/browse/KAFKA-5070
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux Version
>Reporter: Dhana
>Assignee: Matthias J. Sax
> Attachments: RocksDB_LockStateDirec.7z
>
>
> Notes: we run two instance of consumer in two difference machines/nodes.
> we have 400 partitions. 200  stream threads/consumer, with 2 consumer.
> We perform HA test(on rebalance - shutdown of one of the consumer/broker), we 
> see this happening
> Error:
> 2017-04-05 11:36:09.352 WARN  StreamThread:1184 StreamThread-66 - Could not 
> create task 0_115. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock 
> the state directory: /opt/rocksdb/pulse10/0_115
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5603) Streams should not abort transaction when closing zombie task

2017-08-22 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5603:
---
Fix Version/s: (was: 0.11.0.2)
   0.11.0.1

> Streams should not abort transaction when closing zombie task
> -
>
> Key: KAFKA-5603
> URL: https://issues.apache.org/jira/browse/KAFKA-5603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 0.11.0.1
>
>
> The contract of the transactional producer API is to not call any 
> transactional method after a {{ProducerFenced}} exception was thrown.
> Streams however, does an unconditional call within {{StreamTask#close()}} to 
> {{abortTransaction()}} in case of unclean shutdown. We need to distinguish 
> between a {{ProducerFenced}} and other unclean shutdown cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5474) Streams StandbyTask should no checkpoint on commit if EOS is enabled

2017-06-20 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5474:
---
Fix Version/s: (was: 0.11.0.0)

> Streams StandbyTask should no checkpoint on commit if EOS is enabled
> 
>
> Key: KAFKA-5474
> URL: https://issues.apache.org/jira/browse/KAFKA-5474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.1.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: exactly-once
>
> Discovered by system test {{streams_eos_test#test_failure_and_recovery}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5474) Streams StandbyTask should no checkpoint on commit if EOS is enabled

2017-06-20 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5474:
---
Affects Version/s: (was: 0.11.0.0)
   0.11.1.0

> Streams StandbyTask should no checkpoint on commit if EOS is enabled
> 
>
> Key: KAFKA-5474
> URL: https://issues.apache.org/jira/browse/KAFKA-5474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.1.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> Discovered by system test {{streams_eos_test#test_failure_and_recovery}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5485) Streams should not suspend tasks twice

2017-06-20 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5485:
--

 Summary: Streams should not suspend tasks twice
 Key: KAFKA-5485
 URL: https://issues.apache.org/jira/browse/KAFKA-5485
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.11.1.0
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
Priority: Minor


Currently, Streams suspends tasks on rebalance and closes suspended tasks if 
not reassigned. During close, {{suspend()}} is called a second time, also 
calling {{Processor.close()}} for all nodes again.

It would be safer to only call {{suspend()}} once in case users have 
non-idempotent operations in {{Processor.close()}} method and might thus fail. 
(cf. KAFKA-5167)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2017-06-21 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16058103#comment-16058103
 ] 

Matthias J. Sax commented on KAFKA-5488:


Thanks for creating this JIRA. I agree that the index access is a little clumsy 
and support the basic idea. Details to be determined. Can you do us one favor 
and c the proposal into the JIRA itself? It's easier to tack if the ticket is 
self contained and does not link to external resources.

Btw: this will require a KIP. Do you want to work on this by yourself? That 
would be great. :)

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0, 0.10.2.1
>Reporter: Marcel Silberhorn
>  Labels: needs-kip
>
> some details and thoughts about:
> https://gitlab.com/childno.de/apache_kafka/snippets/1665655
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4750) KeyValueIterator returns null values

2017-06-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4750:
---
Affects Version/s: (was: 0.10.1.1)

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4750) KeyValueIterator returns null values

2017-06-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4750:
---
Affects Version/s: 0.10.2.1

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5510) Streams should commit all offsets regularly

2017-06-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5510:
---
Description: 
Currently, Streams commits only offsets of partitions it did process records 
for. Thus, if a partition does not have any data for longer then 
{{offsets.retention.minutes}} (default 1 day) the latest committed offset get's 
lost. On failure or restart {{auto.offset.rese}} kicks in potentially resulting 
in reprocessing old data.

Thus, Streams should commit _all_ offset on a regular basis. Not sure what the 
overhead of a commit is -- if it's too expensive to commit all offsets on 
regular commit, we could also have a second config that specifies an 
"commit.all.interval".

This relates to https://issues.apache.org/jira/browse/KAFKA-3806, so we should 
sync to get a solid overall solution.

At the same time, it might be better to change the semantics of 
{{offsets.retention.minutes}} in the first plase.


  was:
Currently, Streams commits only offsets of partitions it did process records 
for. Thus, if a partition does not have any data for longer then 
{{offsets.retention.minutes}} (default 1 day) the latest committed offset get's 
lost. On failure or restart {{auto.offset.rese}} kicks in potentially resulting 
in reprocessing old data.

Thus, Streams should commit _all_ offset on a regular basis. Not sure what the 
overhead of a commit is -- if it's too expensive to commit all offsets on 
regular commit, we could also have a second config that specifies an 
"commit.all.interval".

This relates to https://issues.apache.org/jira/browse/KAFKA-3806, so we should 
sync to get a solid overall solution.



> Streams should commit all offsets regularly
> ---
>
> Key: KAFKA-5510
> URL: https://issues.apache.org/jira/browse/KAFKA-5510
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>
> Currently, Streams commits only offsets of partitions it did process records 
> for. Thus, if a partition does not have any data for longer then 
> {{offsets.retention.minutes}} (default 1 day) the latest committed offset 
> get's lost. On failure or restart {{auto.offset.rese}} kicks in potentially 
> resulting in reprocessing old data.
> Thus, Streams should commit _all_ offset on a regular basis. Not sure what 
> the overhead of a commit is -- if it's too expensive to commit all offsets on 
> regular commit, we could also have a second config that specifies an 
> "commit.all.interval".
> This relates to https://issues.apache.org/jira/browse/KAFKA-3806, so we 
> should sync to get a solid overall solution.
> At the same time, it might be better to change the semantics of 
> {{offsets.retention.minutes}} in the first plase.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5510) Streams should commit all offsets regularly

2017-06-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5510:
---
Description: 
Currently, Streams commits only offsets of partitions it did process records 
for. Thus, if a partition does not have any data for longer then 
{{offsets.retention.minutes}} (default 1 day) the latest committed offset get's 
lost. On failure or restart {{auto.offset.rese}} kicks in potentially resulting 
in reprocessing old data.

Thus, Streams should commit _all_ offset on a regular basis. Not sure what the 
overhead of a commit is -- if it's too expensive to commit all offsets on 
regular commit, we could also have a second config that specifies an 
"commit.all.interval".

This relates to https://issues.apache.org/jira/browse/KAFKA-3806, so we should 
sync to get a solid overall solution.

At the same time, it might be better to change the semantics of 
{{offsets.retention.minutes}} in the first place. It might be better to apply 
this setting only if the consumer group is completely dead (and not on "last 
commit" and "per partition" basis). Thus, this JIRA would be a workaround fix 
if core cannot be changed quickly enough.


  was:
Currently, Streams commits only offsets of partitions it did process records 
for. Thus, if a partition does not have any data for longer then 
{{offsets.retention.minutes}} (default 1 day) the latest committed offset get's 
lost. On failure or restart {{auto.offset.rese}} kicks in potentially resulting 
in reprocessing old data.

Thus, Streams should commit _all_ offset on a regular basis. Not sure what the 
overhead of a commit is -- if it's too expensive to commit all offsets on 
regular commit, we could also have a second config that specifies an 
"commit.all.interval".

This relates to https://issues.apache.org/jira/browse/KAFKA-3806, so we should 
sync to get a solid overall solution.

At the same time, it might be better to change the semantics of 
{{offsets.retention.minutes}} in the first plase.



> Streams should commit all offsets regularly
> ---
>
> Key: KAFKA-5510
> URL: https://issues.apache.org/jira/browse/KAFKA-5510
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>
> Currently, Streams commits only offsets of partitions it did process records 
> for. Thus, if a partition does not have any data for longer then 
> {{offsets.retention.minutes}} (default 1 day) the latest committed offset 
> get's lost. On failure or restart {{auto.offset.rese}} kicks in potentially 
> resulting in reprocessing old data.
> Thus, Streams should commit _all_ offset on a regular basis. Not sure what 
> the overhead of a commit is -- if it's too expensive to commit all offsets on 
> regular commit, we could also have a second config that specifies an 
> "commit.all.interval".
> This relates to https://issues.apache.org/jira/browse/KAFKA-3806, so we 
> should sync to get a solid overall solution.
> At the same time, it might be better to change the semantics of 
> {{offsets.retention.minutes}} in the first place. It might be better to apply 
> this setting only if the consumer group is completely dead (and not on "last 
> commit" and "per partition" basis). Thus, this JIRA would be a workaround fix 
> if core cannot be changed quickly enough.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2017-06-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062112#comment-16062112
 ] 

Matthias J. Sax commented on KAFKA-4750:


Your observation is correct. I simplified the call chain. The question than is, 
it is a bug in the {{Serde}} that it does not return {{null}} if value is 
{{null}}? Or should we just make Streams itself more robust and change {{byte[] 
rawValue = serdes.rawValue(value);}} to {{byte[] rawValue = value == null ? 
null : serdes.rawValue(value);}} ?

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.11.0.0, 0.10.2.1
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-4262) Intermittent unit test failure ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment

2017-06-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4262:


This happened again 
(https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5618/testReport/junit/kafka.admin/ReassignPartitionsClusterTest/shouldExecuteThrottledReassignment/)
 with:

{noformat}
java.lang.AssertionError: Expected replication to be < 1 but was 10076
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.admin.ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment(ReassignPartitionsClusterTest.scala:183)
{noformat}

> Intermittent unit test failure 
> ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment
> ---
>
> Key: KAFKA-4262
> URL: https://issues.apache.org/jira/browse/KAFKA-4262
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.10.1.0
>
>
> Unit test seen in PR build where PR did not contain any code changes: 
> https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/6086/testReport/junit/kafka.admin/ReassignPartitionsClusterTest/shouldExecuteThrottledReassignment/
> {quote}
> java.lang.AssertionError: Expected replication to be > 4500.0 but was 210
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.admin.ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment(ReassignPartitionsClusterTest.scala:141)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ...
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5464) StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG

2017-06-26 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5464:
---
Fix Version/s: 0.11.1.0

> StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG
> --
>
> Key: KAFKA-5464
> URL: https://issues.apache.org/jira/browse/KAFKA-5464
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.10.2.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.11.1.0, 0.10.2.2, 0.11.0.1
>
>
> In {{StreamsKafkaClient}} we use an {{NetworkClient}} internally and call 
> {{poll}} using {{StreamsConfig.POLL_MS_CONFIG}} as timeout.
> However, {{StreamsConfig.POLL_MS_CONFIG}} is solely meant to be applied to 
> {{KafkaConsumer.poll()}} and it's incorrect to use it for the 
> {{NetworkClient}}. If the config is increased, this can lead to a infinite 
> rebalance and rebalance on the client side is increased and thus, the client 
> is not able to meet broker enforced timeouts anymore.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5464) StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG

2017-06-26 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5464:
---
Fix Version/s: 0.11.0.1
   0.10.2.2

> StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG
> --
>
> Key: KAFKA-5464
> URL: https://issues.apache.org/jira/browse/KAFKA-5464
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.10.2.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> In {{StreamsKafkaClient}} we use an {{NetworkClient}} internally and call 
> {{poll}} using {{StreamsConfig.POLL_MS_CONFIG}} as timeout.
> However, {{StreamsConfig.POLL_MS_CONFIG}} is solely meant to be applied to 
> {{KafkaConsumer.poll()}} and it's incorrect to use it for the 
> {{NetworkClient}}. If the config is increased, this can lead to a infinite 
> rebalance and rebalance on the client side is increased and thus, the client 
> is not able to meet broker enforced timeouts anymore.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2017-06-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16063956#comment-16063956
 ] 

Matthias J. Sax commented on KAFKA-4750:


I think [~damianguy] or [~enothereska] can comment best in this. AFAIK, we use 
{{put(key,null)}} with delete-semantics all over the place. Also for {{KTable}} 
caches. As it align with changelog delete semantics I also think it does make 
sense to keep it this way. I would rather educate user that plug in Serde to 
not return {{null}} if input is not {{null}}. We can also add checks to all 
{{Serde}} calls: (1) never call Serde for {{null}} as we know it must be 
{{null}} anyway (2) if we call Serde with not-null, make sure it does not 
return {{null}} -- otherwise throw exception.

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.11.0.0, 0.10.2.1
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5145) Remove task close() call from closeNonAssignedSuspendedTasks method

2017-06-25 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062424#comment-16062424
 ] 

Matthias J. Sax commented on KAFKA-5145:


[~mingleizhang] Thanks for starting to contribute to Kafka! I just realized, 
that this is a duplicate and there is already a PR for the other Jira (cf 
KAFKA-5485). There should be other "newbie" Jira thought you can pick up.

[~damianguy] Can you add [~mingleizhang] to the contributor list, so he can 
assign Jiras to himself. Thx.

> Remove task close() call from closeNonAssignedSuspendedTasks method
> ---
>
> Key: KAFKA-5145
> URL: https://issues.apache.org/jira/browse/KAFKA-5145
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Narendra Kumar
>  Labels: newbie
> Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> While rebalancing ProcessorNode.close() can be called twice, once  from 
> StreamThread.suspendTasksAndState() and once from  
> StreamThread.closeNonAssignedSuspendedTasks(). If ProcessorNode.close() 
> throws some exception because of calling close() multiple times( i.e. 
> IllegalStateException from  some KafkaConsumer instance being used by some 
> processor for some lookup), we fail to close the task's state manager ( i.e. 
> call to task.closeStateManager(true); fails).  After rebalance, if the same 
> task id is launched on same application instance but in different thread then 
> the task get stuck because it fails to get lock to the task's state directory.
> Since processor close() is already called from 
> StreamThread.suspendTasksAndState() we don't need to call again from 
> StreamThread.closeNonAssignedSuspendedTasks().



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2017-06-25 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062423#comment-16062423
 ] 

Matthias J. Sax commented on KAFKA-4750:


Sounds good to me.

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.11.0.0, 0.10.2.1
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5474) Streams StandbyTask should no checkpoint on commit if EOS is enabled

2017-06-19 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5474:
--

 Summary: Streams StandbyTask should no checkpoint on commit if EOS 
is enabled
 Key: KAFKA-5474
 URL: https://issues.apache.org/jira/browse/KAFKA-5474
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
Priority: Blocker
 Fix For: 0.11.0.0


Discovered by system test {{streams_eos_test#test_failure_and_recovery}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2017-06-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16059527#comment-16059527
 ] 

Matthias J. Sax commented on KAFKA-5488:


You can always start a discussion on the mailing list. For this to be 
productive, it's usually best to propose an initial design and point out the 
strength/weakness. This helps people to give constructive feedback. As the wiki 
is a living document, I would just start a KIP page and dump my thoughts there, 
and start a KIP discussion thread on dev list (as describe at KIP wiki page).

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0, 0.10.2.1
>Reporter: Marcel "childNo͡.de" Trautwein
>  Labels: needs-kip
>
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:lang=java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:lang=java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
> https://gitlab.com/childno.de/apache_kafka/snippets/1665655



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4857) Use AdminClient in Kafka Streams

2017-06-21 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16057842#comment-16057842
 ] 

Matthias J. Sax commented on KAFKA-4857:


I meant was Ismael said. This Jira as intended to add a public API to 
{{KafkaClientSupplier}} to allow returning the {{AdmitClient}}. It requires 
that we do have the {{AdminClient}} in place internally first. Btw: [~guozhang] 
mentioned to me, that he might want to work on the internal swap. We should 
create a JIRA for this and link both to each other with this JIRA being blocked 
by the other.

> Use AdminClient in Kafka Streams
> 
>
> Key: KAFKA-4857
> URL: https://issues.apache.org/jira/browse/KAFKA-4857
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sharad
>  Labels: needs-kip
>
> Streams uses {{KafkaClientSupplier}} to get 
> consumer/restore-consumer/producer clients. Streams also uses one more client 
> for admin purpose namely {{StreamsKafkaClient}} that is instantiated 
> "manually".
> With the newly upcoming {{AdminClient}} from KIP-117, we can simplify (or 
> even replace {{StreamsKafkaClient}} with the new {{AdminClient}}. We 
> furthermore want to unify how the client in generated and extend 
> {{KafkaClientSupplier}} with method that return this client.
> As this is a public API change, a KIP is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5464) StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG

2017-06-16 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5464:
--

 Summary: StreamsKafkaClient should not use 
StreamsConfig.POLL_MS_CONFIG
 Key: KAFKA-5464
 URL: https://issues.apache.org/jira/browse/KAFKA-5464
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.1, 0.11.0.0
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax


In {{StreamsKafkaClient}} we use an {{NetworkClient}} internally and call 
{{poll}} using {{StreamsConfig.POLL_MS_CONFIG}} as timeout.

However, {{StreamsConfig.POLL_MS_CONFIG}} is solely meant to be applied to 
{{KafkaConsumer.poll()}} and it's incorrect to use it for the 
{{NetworkClient}}. If the config is increased, this can lead to a infinite 
rebalance and rebalance on the client side is increased and thus, the client is 
not able to meet broker enforced timeouts anymore.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5882) NullPointerException in ConsumerCoordinator

2017-09-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165085#comment-16165085
 ] 

Matthias J. Sax commented on KAFKA-5882:


I did have a look into the code, and don't understand how a NPE could occur 
here. It this issue reproducible? Do you have any logs?

Btw: it seems to be unrelated to KAFKA-5073 from my current understanding.

> NullPointerException in ConsumerCoordinator
> ---
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2017-09-14 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166549#comment-16166549
 ] 

Matthias J. Sax commented on KAFKA-5882:


We actually do have system test for broker bounces. Thus, in general it should 
work. But of course, there can always be a bug. Without more information, it 
will be hard to figure out...

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5765) Move merge() from StreamsBuilder to KStream

2017-09-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168170#comment-16168170
 ] 

Matthias J. Sax commented on KAFKA-5765:


Sure. If you need any help with the KIP, let us know. Have a look here to get 
started 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 
(If you don't have wiki access, let us know your wiki ID so we can give you 
write permission there, \cc [~guozhang] [~damianguy] -- can you also add 
[~Yohan123] to the contributor list so we can assign this JIRA to him -- this 
will also allow you to assign JIRA to yourself). Thanks a lot!

> Move merge() from StreamsBuilder to KStream
> ---
>
> Key: KAFKA-5765
> URL: https://issues.apache.org/jira/browse/KAFKA-5765
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Matthias J. Sax
>  Labels: needs-kip, newbie
> Fix For: 1.1.0
>
>
> Merging multiple {{KStream}} is done via {{StreamsBuilder#merge()}} (formally 
> {{KStreamBuilder#merge()}}). This is quite unnatural and should be done via 
> {{KStream#merge()}}.
> We need a KIP as we add a new method to a public {{KStreams}} API and 
> deprecate the old {{merge()}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5916) Upgrade rocksdb dependency to 5.7.3

2017-09-16 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5916:
---
Affects Version/s: 0.11.0.0

> Upgrade rocksdb dependency to 5.7.3
> ---
>
> Key: KAFKA-5916
> URL: https://issues.apache.org/jira/browse/KAFKA-5916
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Ted Yu
>Priority: Minor
>
> Currently we use 5.3.6.
> The latest release is 5.7.3 :
> https://github.com/facebook/rocksdb/releases
> We should upgrade to latest rocksdb release.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5765) Move merge() from StreamsBuilder to KStream

2017-09-16 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5765:
--

Assignee: Richard Yu

> Move merge() from StreamsBuilder to KStream
> ---
>
> Key: KAFKA-5765
> URL: https://issues.apache.org/jira/browse/KAFKA-5765
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>  Labels: needs-kip, newbie
> Fix For: 1.1.0
>
> Attachments: 5765.v1.patch
>
>
> Merging multiple {{KStream}} is done via {{StreamsBuilder#merge()}} (formally 
> {{KStreamBuilder#merge()}}). This is quite unnatural and should be done via 
> {{KStream#merge()}}.
> We need a KIP as we add a new method to a public {{KStreams}} API and 
> deprecate the old {{merge()}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5893) ResetIntegrationTest fails

2017-09-18 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5893:
---
Description: 
{noformat}
java.lang.AssertionError: expected:<0> but was:<1>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

One issue with debugging is, that we catch exceptions and print the exception 
message that is {{null}}:
{noformat}
Standard Error
ERROR: null
ERROR: null
{noformat}

After printing the stack trace in case of failure, we got:
{noformat}
ERROR: java.lang.NullPointerException
java.lang.NullPointerException
at 
kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

  was:
{noformat}
java.lang.AssertionError: expected:<0> but was:<1>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

One issue with debugging is, that we catch exceptions and print the exception 
message that is {{null}}:
{noformat}
Standard Error
ERROR: null
ERROR: null
{noformat}

After print the stack trace in case of failure, we got:
{noformat}
ERROR: java.lang.NullPointerException
java.lang.NullPointerException
at 
kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}


> ResetIntegrationTest fails
> --
>
> Key: KAFKA-5893
> URL: https://issues.apache.org/jira/browse/KAFKA-5893
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> {noformat}
> java.lang.AssertionError: expected:<0> but was:<1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
> {noformat}
> One issue with debugging is, that we catch exceptions and print the exception 
> message that is {{null}}:
> {noformat}
> Standard Error
> ERROR: null
> ERROR: null
> {noformat}
> After printing the stack trace in case of failure, we got:
> {noformat}
> ERROR: java.lang.NullPointerException
> java.lang.NullPointerException
>   at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
>   at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5893) ResetIntegrationTest fails

2017-09-18 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5893:
---
Description: 
{noformat}
java.lang.AssertionError: expected:<0> but was:<1>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

-One issue with debugging is, that we catch exceptions and print the exception 
message that is {{null}}:
{noformat}
Standard Error
ERROR: null
ERROR: null
{noformat}-

After print the stack trace in case of failure, we got:
{noformat}
ERROR: java.lang.NullPointerException
java.lang.NullPointerException
at 
kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

  was:
{noformat}
java.lang.AssertionError: expected:<0> but was:<1>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

One issue with debugging is, that we catch exceptions and print the exception 
message that is {{null}}:
{noformat}
Standard Error
ERROR: null
ERROR: null
{noformat}


> ResetIntegrationTest fails
> --
>
> Key: KAFKA-5893
> URL: https://issues.apache.org/jira/browse/KAFKA-5893
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> {noformat}
> java.lang.AssertionError: expected:<0> but was:<1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
> {noformat}
> -One issue with debugging is, that we catch exceptions and print the 
> exception message that is {{null}}:
> {noformat}
> Standard Error
> ERROR: null
> ERROR: null
> {noformat}-
> After print the stack trace in case of failure, we got:
> {noformat}
> ERROR: java.lang.NullPointerException
> java.lang.NullPointerException
>   at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
>   at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5893) ResetIntegrationTest fails

2017-09-18 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5893:
---
Description: 
{noformat}
java.lang.AssertionError: expected:<0> but was:<1>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

One issue with debugging is, that we catch exceptions and print the exception 
message that is {{null}}:
{noformat}
Standard Error
ERROR: null
ERROR: null
{noformat}

After print the stack trace in case of failure, we got:
{noformat}
ERROR: java.lang.NullPointerException
java.lang.NullPointerException
at 
kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

  was:
{noformat}
java.lang.AssertionError: expected:<0> but was:<1>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}

-One issue with debugging is, that we catch exceptions and print the exception 
message that is {{null}}:
{noformat}
Standard Error
ERROR: null
ERROR: null
{noformat}-

After print the stack trace in case of failure, we got:
{noformat}
ERROR: java.lang.NullPointerException
java.lang.NullPointerException
at 
kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
{noformat}


> ResetIntegrationTest fails
> --
>
> Key: KAFKA-5893
> URL: https://issues.apache.org/jira/browse/KAFKA-5893
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> {noformat}
> java.lang.AssertionError: expected:<0> but was:<1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
> {noformat}
> One issue with debugging is, that we catch exceptions and print the exception 
> message that is {{null}}:
> {noformat}
> Standard Error
> ERROR: null
> ERROR: null
> {noformat}
> After print the stack trace in case of failure, we got:
> {noformat}
> ERROR: java.lang.NullPointerException
> java.lang.NullPointerException
>   at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
>   at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5825) Streams not processing when exactly once is set

2017-09-18 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16170462#comment-16170462
 ] 

Matthias J. Sax commented on KAFKA-5825:


[~apurva] Maybe you can provide some input here?

> Streams not processing when exactly once is set
> ---
>
> Key: KAFKA-5825
> URL: https://issues.apache.org/jira/browse/KAFKA-5825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: EmbeddedKafka running on Windows.  Relevant files 
> attached.
>Reporter: Ryan Worsley
> Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala
>
>
> +Set-up+
> I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] 
> for ScalaTest.
> This spins up a single broker internally on a random port.
> I've written two tests - the first without transactions, the second with.  
> They're nearly identical apart from the config and the transactional 
> semantics.  I've written the transactional version based on Neha's 
> [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/]
>  which is the closest thing I could find to instructions.
> The tests wait until a single message is processed by the streams topology, 
> they use this message to complete a promise that the test is waiting on.  
> Once the promise completes the test verifies the value of the promise as 
> being the expected value of the message.
> +Observed behaviour+
> The first test passes fine, the second test times out, the stream processor 
> never seems to read the transactional message.
> +Notes+
> I've attached my build.sbt, log4j.properties and my Tests.scala file in order 
> to make it as easy as possible for someone to re-create.  I'm running on 
> Windows and using Scala as this reflects my workplace.  I completely expect 
> there to be some configuration issue that's causing this, but am unable to 
> proceed at this time.
> Related information: 
> https://github.com/manub/scalatest-embedded-kafka/issues/82



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5825) Streams not processing when exactly once is set

2017-09-18 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16170564#comment-16170564
 ] 

Matthias J. Sax commented on KAFKA-5825:


I just had one more look into your code. Maybe it's just test setup issue. (1) 
it seems, you never close your {{KafkaStreams}} instance and (2) you use the 
same {{application.id}} for both runs. Thus, if you run both tests in parallel, 
both instances would form a consumer group and this would mess up the test.

> Streams not processing when exactly once is set
> ---
>
> Key: KAFKA-5825
> URL: https://issues.apache.org/jira/browse/KAFKA-5825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: EmbeddedKafka running on Windows.  Relevant files 
> attached.
>Reporter: Ryan Worsley
> Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala
>
>
> +Set-up+
> I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] 
> for ScalaTest.
> This spins up a single broker internally on a random port.
> I've written two tests - the first without transactions, the second with.  
> They're nearly identical apart from the config and the transactional 
> semantics.  I've written the transactional version based on Neha's 
> [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/]
>  which is the closest thing I could find to instructions.
> The tests wait until a single message is processed by the streams topology, 
> they use this message to complete a promise that the test is waiting on.  
> Once the promise completes the test verifies the value of the promise as 
> being the expected value of the message.
> +Observed behaviour+
> The first test passes fine, the second test times out, the stream processor 
> never seems to read the transactional message.
> +Notes+
> I've attached my build.sbt, log4j.properties and my Tests.scala file in order 
> to make it as easy as possible for someone to re-create.  I'm running on 
> Windows and using Scala as this reflects my workplace.  I completely expect 
> there to be some configuration issue that's causing this, but am unable to 
> proceed at this time.
> Related information: 
> https://github.com/manub/scalatest-embedded-kafka/issues/82



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5937) Improve ProcessorStateManager exception handling

2017-09-19 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5937:
---
Description: 
We identified the follow issues:

- constructor calls {{StateDirectory#directoryForTask() }} and wraps throw 
ProcessorStateException with LockException and rethrows -> we should not change 
the exception type here
- {{ProcessorStateManager#flush()}} loops through all stores and calls 
{{StateStore#flush()}}; it stops on first {{Exception}} -> we should keep 
looping and flush all remaining stores before rethrowing at the end


  was:
We identified the follow issues:

- constructor calls {{StateDirectory#directoryForTask() }} and wraps throw 
ProcessorStateException with LockException and rethrows -> we should not change 
the exception type here
- {{ProcessorStateManager#close()}} loops through all stores and calls 
{{StateStore#close()}}; it catches {{Exception}} and moves on to close all 
stores -> rethrows first exception eventually as-is -> we should wrap with 
{{ProcessotStateException}} before rethrowing
- {{ProcessorStateManager#flush()}} loops through all stores and calls 
{{StateStore#flush()}}; it stops on first {{Exception}} -> we should keep 
looping and flush all remaining stores before rethrowing at the end



> Improve ProcessorStateManager exception handling
> 
>
> Key: KAFKA-5937
> URL: https://issues.apache.org/jira/browse/KAFKA-5937
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>
> We identified the follow issues:
> - constructor calls {{StateDirectory#directoryForTask() }} and wraps throw 
> ProcessorStateException with LockException and rethrows -> we should not 
> change the exception type here
> - {{ProcessorStateManager#flush()}} loops through all stores and calls 
> {{StateStore#flush()}}; it stops on first {{Exception}} -> we should keep 
> looping and flush all remaining stores before rethrowing at the end



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5958) User StoreListener not available for global stores

2017-09-21 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5958:
--

 Summary: User StoreListener not available for global stores
 Key: KAFKA-5958
 URL: https://issues.apache.org/jira/browse/KAFKA-5958
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.0.0
Reporter: Matthias J. Sax


In 1.0 we added the ability to register a state restore listener, such that 
users can monitor state restore progress via this callback.

However, we no do not use this listener for global stores. Strictly speaking, 
global stores are never recovered, however, at startup they are bootstrapped. 
We might want to consider to user the same handler callback to allow users to 
monitor the bootstrapping of global stores, too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5949) User Callback Exceptions need to be handled properly

2017-09-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5949:
---
Description: 
In Streams, we allow to register multiple user callbacks. We need to handle 
those exceptions gracefully, by catching and wrapping with a StreamsException.

- TimestampExtractor
- DeserializationHandler
- StateRestoreListener

  was:
In Streams, we allow to register multiple user callbacks. We need to handle 
those exceptions gracefully, by catching and wrapping with a StreamsException.

- TimestampExtractor
- DeserailizationHandler
- StateRestoreListener


> User Callback Exceptions need to be handled properly
> 
>
> Key: KAFKA-5949
> URL: https://issues.apache.org/jira/browse/KAFKA-5949
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>
> In Streams, we allow to register multiple user callbacks. We need to handle 
> those exceptions gracefully, by catching and wrapping with a StreamsException.
> - TimestampExtractor
> - DeserializationHandler
> - StateRestoreListener



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5961) NullPointerException when consumer restore read messages with null key.

2017-09-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177246#comment-16177246
 ] 

Matthias J. Sax commented on KAFKA-5961:


[~agomez] Thanks for reporting this. Looking into this, the stack trace you 
share if for {{0.10.2.1}}. Can you also share the stack trace for {{0.11.0.0}}? 
It seems to be fixed there already -- but maybe we look at the wrong code (as 
we don't have the corresponding stack trace) (cf. 
https://github.com/apache/kafka/blob/0.11.0.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L215)
 -- thanks to [~bbejeck] for pointing this out!

> NullPointerException when consumer restore read messages with null key.
> ---
>
> Key: KAFKA-5961
> URL: https://issues.apache.org/jira/browse/KAFKA-5961
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Andres Gomez Ferrer
>
> If you have a kafka streams that use:
> {code:java}
> stream.table("topicA")
> {code}
> When the application is running if you send a message with a null key, it 
> works fine. Later, if you restart the application when the restore consumer 
> starts to read the topicA from the beginning, it crashes because doesn't 
> filter the null key.
> I know that isn't normal send a null key to a topic that is a table topic, 
> but maybe sometimes can happen .. and I think that kafka streams could 
> protect it self.
> This is the stack trace:
> {code}
> ConsumerCoordinator [ERROR] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> my-cep-app_enricher failed on partition assignment
> java.lang.NullPointerException
>   at org.rocksdb.RocksDB.put(RocksDB.java:488)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> 

[jira] [Updated] (KAFKA-5966) Support ByteBuffer serialization in Kafka Streams

2017-09-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5966:
---
Component/s: streams

> Support ByteBuffer serialization in Kafka Streams
> -
>
> Key: KAFKA-5966
> URL: https://issues.apache.org/jira/browse/KAFKA-5966
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>
> Currently Kafka Streams only supports serialization using byte arrays. This 
> means we generate a lot of garbage and spend unnecessary time copying bytes, 
> especially when working with windowed state stores that rely on composite 
> keys. In many places in the code we have extract parts of the composite key 
> to deserialize the either the timestamp or the message key from the state 
> store key (e.g. the methods in WindowStoreUtils)
> Having support for serde into/from ByteBuffers would allow us to reuse the 
> underlying bytearrays and just pass around slices of the underlying Buffers 
> to avoid the unnecessary copying.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5489) Failing test: InternalTopicIntegrationTest.shouldCompactTopicsForStateChangelogs

2017-09-20 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5489:
--

Assignee: Damian Guy  (was: Matthias J. Sax)

> Failing test: 
> InternalTopicIntegrationTest.shouldCompactTopicsForStateChangelogs
> 
>
> Key: KAFKA-5489
> URL: https://issues.apache.org/jira/browse/KAFKA-5489
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Damian Guy
>  Labels: test
>
> Test failed with
> {noformat}
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.kafka.streams.integration.InternalTopicIntegrationTest.shouldCompactTopicsForStateChangelogs(InternalTopicIntegrationTest.java:173)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5225) StreamsResetter doesn't allow custom Consumer properties

2017-09-20 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5225:
---
Labels: kip  (was: needs-kip)

> StreamsResetter doesn't allow custom Consumer properties
> 
>
> Key: KAFKA-5225
> URL: https://issues.apache.org/jira/browse/KAFKA-5225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 0.10.2.1
>Reporter: Dustin Cote
>Assignee: Bharat Viswanadham
>  Labels: kip
>
> The StreamsResetter doesn't let the user pass in any configurations to the 
> embedded consumer. This is a problem in secured environments because you 
> can't configure the embedded consumer to talk to the cluster. The tool should 
> take an approach similar to `kafka.admin.ConsumerGroupCommand` which allows a 
> config file to be passed in the command line for such operations.
> cc [~mjsax]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5156) Options for handling exceptions in streams

2017-09-17 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169481#comment-16169481
 ] 

Matthias J. Sax commented on KAFKA-5156:


A first overview of current exception handling can be found in the wiki: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture#KafkaStreamsArchitecture-ExceptionHandling

> Options for handling exceptions in streams
> --
>
> Key: KAFKA-5156
> URL: https://issues.apache.org/jira/browse/KAFKA-5156
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Matthias J. Sax
>  Labels: user-experience
> Fix For: 1.0.0
>
>
> This is a task around options for handling exceptions in streams. It focuses 
> around options for dealing with corrupt data (keep going, stop streams, log, 
> retry, etc).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5156) Options for handling exceptions in streams

2017-09-17 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169520#comment-16169520
 ] 

Matthias J. Sax commented on KAFKA-5156:


Thx. The doc is still WIP :)

> Options for handling exceptions in streams
> --
>
> Key: KAFKA-5156
> URL: https://issues.apache.org/jira/browse/KAFKA-5156
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Matthias J. Sax
>  Labels: user-experience
> Fix For: 1.0.0
>
>
> This is a task around options for handling exceptions in streams. It focuses 
> around options for dealing with corrupt data (keep going, stop streams, log, 
> retry, etc).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5876) IQ should throw different exceptions for different errors

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5876:
---
Labels: needs-kip newbie++  (was: needs-kip)

> IQ should throw different exceptions for different errors
> -
>
> Key: KAFKA-5876
> URL: https://issues.apache.org/jira/browse/KAFKA-5876
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>  Labels: needs-kip, newbie++
> Fix For: 1.0.0
>
>
> Currently, IQ does only throws {{InvalidStateStoreException}} for all errors 
> that occur. However, we have different types of errors and should throw 
> different exceptions for those types.
> For example, if a store was migrated it must be rediscovered while if a store 
> cannot be queried yet, because it is still re-created after a rebalance, the 
> user just needs to wait until store recreation is finished.
> There might be other examples, too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5660) Don't throw TopologyBuilderException during runtime

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5660:
--

Assignee: Matthias J. Sax

> Don't throw TopologyBuilderException during runtime
> ---
>
> Key: KAFKA-5660
> URL: https://issues.apache.org/jira/browse/KAFKA-5660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be 
> thrown {{KafkaStreams#start()}} is called.
> However, we do throw {{TopologyBuilderException}} within
> - `SourceNodeFactory#getTopics`
> - `ProcessorContextImpl#getStateStore`
> (and maybe somewhere else: we should double check if there are other places 
> in the code like those).
> We should replace those exception with either {{StreamsException}} or with a 
> new exception type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5660) Don't throw TopologyBuilderException during runtime

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5660:
---
Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-5156

> Don't throw TopologyBuilderException during runtime
> ---
>
> Key: KAFKA-5660
> URL: https://issues.apache.org/jira/browse/KAFKA-5660
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be 
> thrown {{KafkaStreams#start()}} is called.
> However, we do throw {{TopologyBuilderException}} within
> - `SourceNodeFactory#getTopics`
> - `ProcessorContextImpl#getStateStore`
> (and maybe somewhere else: we should double check if there are other places 
> in the code like those).
> We should replace those exception with either {{StreamsException}} or with a 
> new exception type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4932) Add UUID Serde

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4932:
---
Labels: newbie  (was: )

> Add UUID Serde
> --
>
> Key: KAFKA-4932
> URL: https://issues.apache.org/jira/browse/KAFKA-4932
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Priority: Minor
>  Labels: needs-kip, newbie
>
> I propose adding serializers and deserializers for the java.util.UUID class.
> I have many use cases where I want to set the key of a Kafka message to be a 
> UUID. Currently, I need to turn UUIDs into strings or byte arrays and use 
> their associated Serdes, but it would be more convenient to serialize and 
> deserialize UUIDs directly.
> I'd propose that the serializer and deserializer use the 36-byte string 
> representation, calling UUID.toString and UUID.fromString. We would also wrap 
> these in a Serde and modify the streams Serdes class to include this in the 
> list of supported types.
> Optionally, we could have the deserializer support a 16-byte representation 
> and it would check the size of the input byte array to determine whether it's 
> a binary or string representation of the UUID. It's not well defined whether 
> the most significant bits or least significant go first, so this deserializer 
> would have to support only one or the other.
> Similary, if the deserializer supported a 16-byte representation, there could 
> be two variants of the serializer, a UUIDStringSerializer and a 
> UUIDBytesSerializer.
> I would be willing to write this PR, but am looking for feedback about 
> whether there are significant concerns here around ambiguity of what the byte 
> representation of a UUID should be, or if there's desire to keep to list of 
> built-in Serdes minimal such that a PR would be unlikely to be accepted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4932) Add UUID Serde

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4932:
---
Labels: needs-kip newbie  (was: newbie)

> Add UUID Serde
> --
>
> Key: KAFKA-4932
> URL: https://issues.apache.org/jira/browse/KAFKA-4932
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Priority: Minor
>  Labels: needs-kip, newbie
>
> I propose adding serializers and deserializers for the java.util.UUID class.
> I have many use cases where I want to set the key of a Kafka message to be a 
> UUID. Currently, I need to turn UUIDs into strings or byte arrays and use 
> their associated Serdes, but it would be more convenient to serialize and 
> deserialize UUIDs directly.
> I'd propose that the serializer and deserializer use the 36-byte string 
> representation, calling UUID.toString and UUID.fromString. We would also wrap 
> these in a Serde and modify the streams Serdes class to include this in the 
> list of supported types.
> Optionally, we could have the deserializer support a 16-byte representation 
> and it would check the size of the input byte array to determine whether it's 
> a binary or string representation of the UUID. It's not well defined whether 
> the most significant bits or least significant go first, so this deserializer 
> would have to support only one or the other.
> Similary, if the deserializer supported a 16-byte representation, there could 
> be two variants of the serializer, a UUIDStringSerializer and a 
> UUIDBytesSerializer.
> I would be willing to write this PR, but am looking for feedback about 
> whether there are significant concerns here around ambiguity of what the byte 
> representation of a UUID should be, or if there's desire to keep to list of 
> built-in Serdes minimal such that a PR would be unlikely to be accepted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5765) Move merge() from StreamsBuilder to KStream

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5765:
---
Labels: needs-kip newbie  (was: needs-kip)

> Move merge() from StreamsBuilder to KStream
> ---
>
> Key: KAFKA-5765
> URL: https://issues.apache.org/jira/browse/KAFKA-5765
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>  Labels: needs-kip, newbie
> Fix For: 1.0.0
>
>
> Merging multiple {{KStream}} is done via {{StreamsBuilder#merge()}} (formally 
> {{KStreamBuilder#merge()}}). This is quite unnatural and should be done via 
> {{KStream#merge()}}.
> As {{StreamsBuilder}} is not released yet, this is not a backward 
> incompatible change (and KStreamBuilder is already deprecated). We still need 
> a KIP as we add a new method to a public {{KStreams}} API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5158) Options for handling exceptions during processing

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5158:
--

Assignee: Matthias J. Sax

> Options for handling exceptions during processing
> -
>
> Key: KAFKA-5158
> URL: https://issues.apache.org/jira/browse/KAFKA-5158
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>
> Imagine the app-level processing of a (non-corrupted) record fails (e.g. the 
> user attempted to do a RPC to an external system, and this call failed). How 
> can you process such failed records in a scalable way? For example, imagine 
> you need to implement a retry policy such as "retry with exponential 
> backoff". Here, you have the problem that 1. you can't really pause 
> processing a single record because this will pause the processing of the full 
> stream (bottleneck!) and 2. there is no straight-forward way to "sort" failed 
> records based on their "next retry time".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5158) Options for handling exceptions during processing

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5158:
---
Issue Type: Sub-task  (was: New Feature)
Parent: KAFKA-5156

> Options for handling exceptions during processing
> -
>
> Key: KAFKA-5158
> URL: https://issues.apache.org/jira/browse/KAFKA-5158
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>
> Imagine the app-level processing of a (non-corrupted) record fails (e.g. the 
> user attempted to do a RPC to an external system, and this call failed). How 
> can you process such failed records in a scalable way? For example, imagine 
> you need to implement a retry policy such as "retry with exponential 
> backoff". Here, you have the problem that 1. you can't really pause 
> processing a single record because this will pause the processing of the full 
> stream (bottleneck!) and 2. there is no straight-forward way to "sort" failed 
> records based on their "next retry time".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5765) Move merge() from StreamsBuilder to KStream

2017-09-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5765:
---
Description: 
Merging multiple {{KStream}} is done via {{StreamsBuilder#merge()}} (formally 
{{KStreamBuilder#merge()}}). This is quite unnatural and should be done via 
{{KStream#merge()}}.

We need a KIP as we add a new method to a public {{KStreams}} API and deprecate 
the old {{merge()}} method.

  was:
Merging multiple {{KStream}} is done via {{StreamsBuilder#merge()}} (formally 
{{KStreamBuilder#merge()}}). This is quite unnatural and should be done via 
{{KStream#merge()}}.

As {{StreamsBuilder}} is not released yet, this is not a backward incompatible 
change (and KStreamBuilder is already deprecated). We still need a KIP as we 
add a new method to a public {{KStreams}} API.


> Move merge() from StreamsBuilder to KStream
> ---
>
> Key: KAFKA-5765
> URL: https://issues.apache.org/jira/browse/KAFKA-5765
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>  Labels: needs-kip, newbie
> Fix For: 1.1.0
>
>
> Merging multiple {{KStream}} is done via {{StreamsBuilder#merge()}} (formally 
> {{KStreamBuilder#merge()}}). This is quite unnatural and should be done via 
> {{KStream#merge()}}.
> We need a KIP as we add a new method to a public {{KStreams}} API and 
> deprecate the old {{merge()}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   4   5   6   7   8   9   10   >