[jira] [Created] (KAFKA-5510) Streams should commit all offsets regularly

2017-06-24 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5510:
--

 Summary: Streams should commit all offsets regularly
 Key: KAFKA-5510
 URL: https://issues.apache.org/jira/browse/KAFKA-5510
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Currently, Streams commits only offsets of partitions it did process records 
for. Thus, if a partition does not have any data for longer then 
{{offsets.retention.minutes}} (default 1 day) the latest committed offset get's 
lost. On failure or restart {{auto.offset.rese}} kicks in potentially resulting 
in reprocessing old data.

Thus, Streams should commit _all_ offset on a regular basis. Not sure what the 
overhead of a commit is -- if it's too expensive to commit all offsets on 
regular commit, we could also have a second config that specifies an 
"commit.all.interval".

This relates to https://issues.apache.org/jira/browse/KAFKA-3806, so we should 
sync to get a solid overall solution.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-4262) Intermittent unit test failure ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment

2017-06-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4262:


This happened again 
(https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5618/testReport/junit/kafka.admin/ReassignPartitionsClusterTest/shouldExecuteThrottledReassignment/)
 with:

{noformat}
java.lang.AssertionError: Expected replication to be < 1 but was 10076
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.admin.ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment(ReassignPartitionsClusterTest.scala:183)
{noformat}

> Intermittent unit test failure 
> ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment
> ---
>
> Key: KAFKA-4262
> URL: https://issues.apache.org/jira/browse/KAFKA-4262
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.10.1.0
>
>
> Unit test seen in PR build where PR did not contain any code changes: 
> https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/6086/testReport/junit/kafka.admin/ReassignPartitionsClusterTest/shouldExecuteThrottledReassignment/
> {quote}
> java.lang.AssertionError: Expected replication to be > 4500.0 but was 210
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.admin.ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment(ReassignPartitionsClusterTest.scala:141)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ...
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-06-24 Thread Michal Borowiecki
I think the discussion on Streams DSL refactoring will render this KIP 
obsolete.


I'll leave is as under discussion until something is agreed and then 
move it to discarded.


Cheers,

Michał


On 03/06/17 10:02, Michal Borowiecki wrote:


I agree maintaining backwards-compatibility here adds a lot of overhead.

I haven't so far found a way to reconcile these elegantly.

Whichever way we go it's better to take the pain sooner rather than 
later. Kafka 0.11.0.0 (through KAFKA-5045 
/KIP-114) increased 
the surface affected by the lack of fully type-parametrised suppliers 
noticeably.


Cheers,

Michał


On 03/06/17 09:43, Damian Guy wrote:
Hmm, i guess this won't work due to adding the additional  to 
the StateStoreSupplier params on reduce, count, aggregate etc.


On Sat, 3 Jun 2017 at 09:06 Damian Guy > wrote:


Hi Michal,

Thanks for the KIP - is there a way we can do this without having
to introduce the new Typed.. Interfaces, overloaded methods etc?
Is it possible that we just need to provide a couple of new
methods on PersistentKeyValueFactory for windowed and
sessionWindowed to return interfaces like you've introduced in
TypedStores?
I admit i haven't looked in much detail if that would work.

My concern is that this is duplicating a bunch of code and
increasing the surface area for what is minimal benefit. It is
one of those cases where i'd love to not have to maintain
backward compatibility.

Thanks,
Damian

On Fri, 2 Jun 2017 at 08:20 Michal Borowiecki
> wrote:

Thanks Matthias,

I appreciate people are busy now preparing the 0.11 release.

One thing I would also appreciate input on is perhaps a
better name for the new TypedStores class, I just picked it
quickly but don't really like it.

Perhaps StateStores would make for a better name?

Cheers,
Michal


On 02/06/17 07:18, Matthias J. Sax wrote:

Thanks for the update Michal.

I did skip over the PR. Looks good to me, as far as I can tell. Maybe
Damian, Xavier, or Ismael can comment on this. Would be good to get
confirmation that the change is backward compatible.


-Matthias


On 5/27/17 11:11 AM, Michal Borowiecki wrote:

Hi all,

I've updated the KIP to reflect the proposed backwards-compatible 
approach:


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481


Given the vast area of APIs affected, I think the PR is easier to read
than the code excerpts in the KIP itself:
https://github.com/apache/kafka/pull/2992/files

Thanks,
Michał

On 07/05/17 10:16, Eno Thereska wrote:

I like this KIP in general and I agree it’s needed. Perhaps Damian can 
comment on the session store issue?

Thanks
Eno

On May 6, 2017, at 10:32 PM, Michal 
Borowiecki
  wrote:

Hi Matthias,

Agreed. I tried your proposal and indeed it would work.

However, I think to maintain full backward compatibility we would also 
need to deprecate Stores.create() and leave it unchanged, while providing a new 
method that returns the more strongly typed Factories.

( This is because PersistentWindowFactory and PersistentSessionFactory cannot extend the existing 
PersistentKeyValueFactory interface, since their build() methods will be returning 
TypedStateStoreSupplier> and TypedStateStoreSupplier> 
respectively, which are NOT subclasses of TypedStateStoreSupplier>. I do not see 
another way around it. Admittedly, my type covariance skills are rudimentary. Does anyone see a better way around 
this? )

Since create() takes only the store name as argument, and I don't see 
what we could overload it with, the new method would need to have a different 
name.

Alternatively, since create(String) is the only method in Stores, we 
could deprecate the entire class and provide a new one. That would be my 
preference. Any ideas what to call it?



All comments and suggestions appreciated.



Cheers,

Michał


On 04/05/17 21:48, Matthias J. Sax wrote:

I had a quick look into this.

With regard to backward compatibility, I think it would be required do
introduce a new type `TypesStateStoreSupplier` (that extends
`StateStoreSupplier`) and to overload all methods that take a
`StateStoreSupplier` that accept the new type instead of the current 
one.

This would allow `.build` to return a `TypedStateStoreSupplier` and
thus, would not break any code. As 

Re: Re: [DISCUSS] KIP-165: Extend Interactive Queries for return latest update timestamp per key

2017-06-24 Thread Michal Borowiecki

Hi Jeyhun,

Could the proposed KeyContext.keyTs() be made more descriptive?

e.g. lastUpdated() or similar? So that users don't have to read the docs 
to know it isn't the creation timestamp for instance.


Cheers,
Michał

On 04/06/17 01:24, Jeyhun Karimov wrote:

Hi Matthias,

Thanks for comments.

  - why do you only consider get() and not range() and all() ?


The corresponding jira concentrates on single key lookups. Moreover, I
could not find a use-case to include range queries to return records with
timestamp. However, theoritically we can include range() and all() as well.

  - we cannot have a second get() (this would be ambiguous) but need

another name like getWithTs() (or something better)

  - what use case do you have in mind for getKeyTs() ? Would a single new

method returning KeyContext not be sufficient?


Thanks for correction, this is my bad.

  - for backward compatibility, we will also need a new interface and

cannot just extend the existing one


  I will correct the KIP accordingly.

Thanks,
Jeyhun

On Fri, Jun 2, 2017 at 7:36 AM, Matthias J. Sax 
wrote:


Thanks for the KIP Jeyhun.

Some comments:
  - why do you only consider get() and not range() and all() ?
  - we cannot have a second get() (this would be ambiguous) but need
another name like getWithTs() (or something better)
  - what use case do you have in mind for getKeyTs() ? Would a single new
method returning KeyContext not be sufficient?
  - for backward compatibility, we will also need a new interface and
cannot just extend the existing one



-Matthias

On 5/29/17 4:55 PM, Jeyhun Karimov wrote:

Dear community,

I want to share KIP-165 [1] based on issue KAFKA-4304 [2].
I would like to get your comments.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-

165%3A+Extend+Interactive+Queries+for+return+latest+
update+timestamp+per+key

[2] https://issues.apache.org/jira/browse/KAFKA-4304

Cheers,
Jeyhun





--
Signature
 Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com 


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK




This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com  and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Build failed in Jenkins: kafka-trunk-jdk8 #1759

2017-06-24 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-5506; Fix NPE in OffsetFetchRequest.toString and logging

--
[...truncated 4.26 MB...]
kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.MetricsDuringTopicCreationDeletionTest > 
testMetricsDuringTopicCreateDelete STARTED

kafka.integration.MetricsDuringTopicCreationDeletionTest > 
testMetricsDuringTopicCreateDelete PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
PASSED

kafka.integration.TopicMetadataTest > testIsrAfterBrokerShutDownAndJoinsBack 
STARTED

kafka.integration.TopicMetadataTest > testIsrAfterBrokerShutDownAndJoinsBack 
PASSED

kafka.integration.TopicMetadataTest > testAutoCreateTopicWithCollision STARTED

kafka.integration.TopicMetadataTest > testAutoCreateTopicWithCollision PASSED

kafka.integration.TopicMetadataTest > testAliveBrokerListWithNoTopics STARTED

kafka.integration.TopicMetadataTest > testAliveBrokerListWithNoTopics PASSED

kafka.integration.TopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.TopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.TopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.TopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.TopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.TopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.TopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.TopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.TopicMetadataTest > testAutoCreateTopicWithInvalidReplication 
STARTED

kafka.integration.TopicMetadataTest > testAutoCreateTopicWithInvalidReplication 
PASSED

kafka.integration.TopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.TopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.FetcherTest > testFetcher STARTED

kafka.integration.FetcherTest > testFetcher PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
STARTED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
PASSED

kafka.integration.UncleanLeaderElectionTest > 
testCleanLeaderElectionDisabledByTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testCleanLeaderElectionDisabledByTopicOverride SKIPPED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionDisabled 
STARTED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionDisabled 
SKIPPED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionInvalidTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionInvalidTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionEnabledByTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionEnabledByTopicOverride PASSED

kafka.integration.MinIsrConfigTest > testDefaultKafkaConfig STARTED

kafka.integration.MinIsrConfigTest > testDefaultKafkaConfig PASSED


[GitHub] kafka pull request #3427: KAFKA-5501 [WIP]: use async zookeeper apis everywh...

2017-06-24 Thread onurkaraman
GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/3427

KAFKA-5501 [WIP]: use async zookeeper apis everywhere

Synchronous zookeeper writes means that we wait an entire round trip before 
doing the next write. With respect to the controller, these synchronous writes 
are happening at a per-partition granularity in several places, so 
partition-heavy clusters suffer from the controller doing many sequential round 
trips to zookeeper.
- PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
- ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-5501

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3427.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3427


commit d118a542c4e08ea1e14fc577f64c746946ac3dea
Author: Onur Karaman 
Date:   2017-06-01T20:14:34Z

use async zookeeper apis everywhere

Synchronous zookeeper writes means that we wait an entire round trip before 
doing the next write. With respect to the controller, these synchronous writes 
are happening at a per-partition granularity in several places, so 
partition-heavy clusters suffer from the controller doing many sequential round 
trips to zookeeper.
- PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
- ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Kafka streams KStream and ktable join issue

2017-06-24 Thread Matthias J. Sax
I thought, we drop records with null key? No?

-Matthias

On 6/23/17 12:25 AM, Damian Guy wrote:
> My guess is it is because the record doesn't have a key, i.e., the key is
> null. We have a fix for this in 0.11, in that we will skip records with a
> null key during restore.
> 
> On Fri, 23 Jun 2017 at 03:57 Matthias J. Sax  wrote:
> 
>> Hi,
>>
>> can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1?
>>
>> It's unclear to me, how an NPE can occur. It seems to happen within
>> Streams library. Might be a bug. Not sure atm.
>>
>>
>> -Matthias
>>
>> On 6/22/17 9:43 AM, Shekar Tippur wrote:
>>> Hello,
>>>
>>> I am trying to perform a simple join operation. I am using Kafka 0.10.2
>>>
>>> I have a "raw" table and a "cache" topics and just 1 partition in my
>> local
>>> environment.
>>>
>>> ktable has these entries
>>>
>>> {"Joe": {"location": "US", "gender": "male"}}
>>> {"Julie": {"location": "US", "gender": "female"}}
>>> {"Kawasaki": {"location": "Japan", "gender": "male"}}
>>>
>>> The kstream gets a event
>>>
>>> {"user": "Joe", "custom": {"choice":"vegan"}}
>>>
>>> I want a output as a join
>>>
>>> {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location":
>> "US",
>>> "gender": "male"}*} }
>>>
>>> I want to take whats in ktable and add to enriched section of the output
>>> stream.
>>>
>>> I have defined serde
>>>
>>> //This is the same serde code from the example.
>>>
>>> final TestStreamsSerializer jsonSerializer = new
>>> TestStreamsSerializer();
>>> final TestStreamsDeserialzer jsonDeserializer = new
>>> TestStreamsDeserialzer();
>>> final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer,
>>> jsonDeserializer);
>>>
>>> //
>>>
>>> KStream raw = builder.stream(Serdes.String(),
>>> jsonSerde, "raw");
>>> KTable  cache = builder.table("cache", "local-cache");
>>>
>>> raw.leftJoin(cache,
>>> (record1, record2) -> record1.get("user") + "-" +
>> record2).to("output");
>>>
>>> I am having trouble understanding how to call the join api.
>>>
>>> With the above code, I seem to get a error:
>>>
>>> [2017-06-22 09:23:31,836] ERROR User provided listener
>>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
>>> streams-pipe failed on partition assignment
>>> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>>>
>>> java.lang.NullPointerException
>>>
>>> at org.rocksdb.RocksDB.put(RocksDB.java:488)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>>>
>>> at
>>>
>> 

Re: [VOTE] KIP-161: streams deserialization exception handlers

2017-06-24 Thread Matthias J. Sax
+1

On 6/23/17 9:45 AM, Guozhang Wang wrote:
> +1.
> 
> On Fri, Jun 23, 2017 at 6:42 AM, Bill Bejeck  wrote:
> 
>> Thanks for the KIP!
>>
>> +1
>>
>> -Bill
>>
>> On Fri, Jun 23, 2017 at 7:15 AM, Damian Guy  wrote:
>>
>>> Thanks for the KIP Eno.
>>> +1 (binding)
>>>
>>> On Fri, 23 Jun 2017 at 11:00 Eno Thereska 
>> wrote:
>>>
 Starting voting thread for:

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
>>> deserialization+exception+handlers
 <
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 161:+streams+deserialization+exception+handlers
>

 Thanks
 Eno
>>>
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] KIP-160: Augment KStream.print() to allow users pass in extra parameters in the printed string

2017-06-24 Thread Matthias J. Sax
+1

On 6/23/17 9:43 PM, Guozhang Wang wrote:
> +1
> 
> On Fri, Jun 23, 2017 at 3:16 AM, Eno Thereska 
> wrote:
> 
>> +1 thanks!
>>
>> Eno
>>> On 23 Jun 2017, at 05:29, James Chain  wrote:
>>>
>>> Hi all,
>>>
>>> I apply original idea on KStream#writeAsText() and also update my pull
>>> request.
>>> Please re-review and re-cast the vote.
>>>
>>> James Chien
>>
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-24 Thread Matthias J. Sax
Damian,

thanks for starting this discussion.

I am not a fan of the builder pattern. It's too clumsy to use IMHO an
raised the entry level bar.

I also think that mixing optional parameters with configs is a bad idea.
Have not proposal for this atm but just wanted to mention it. Hope to
find some time to come up with something.

What I don't like in the current proposal is the
.grouped().withKeyMapper() -- the current solution with .groupBy(...)
and .groupByKey() seems better. For clarity, we could rename to
.groupByNewKey(...) and .groupByCurrentKey() (even if we should find
some better names).

The proposed pattern "chains" grouping and aggregation too close
together. I would rather separate both more than less, ie, do into the
opposite direction.

I am also wondering, if we could so something more "fluent". The initial
proposal was like:

>> groupedStream.count()
>>.withStoreName("name")
>>.withCachingEnabled(false)
>>.withLoggingEnabled(config)
>>.table()

The .table() statement in the end was kinda alien.

The current proposal put the count() into the end -- ie, the optional
parameter for count() have to specified on the .grouped() call -- this
does not seems to be the best way either.

I did not think this through in detail, but can't we just do the initial
proposal with the .table() ?

groupedStream.count().withStoreName("name").mapValues(...)

Each .withXXX(...) return the current KTable and all the .withXXX() are
just added to the KTable interface. Or do I miss anything why this wont'
work or any obvious disadvantage?



-Matthias

On 6/22/17 4:06 AM, Damian Guy wrote:
> Thanks everyone. My latest attempt is below. It builds on the fluent
> approach, but i think it is slightly nicer.
> I agree with some of what Eno said about mixing configy stuff in the DSL,
> but i think that enabling caching and enabling logging are things that
> aren't actually config. I'd probably not add withLogConfig(...) (even
> though it is below) as this is actually config and we already have a way of
> doing that, via the StateStoreSupplier. Arguably we could use the
> StateStoreSupplier for disabling caching etc, but as it stands that is a
> bit of a tedious process for someone that just wants to use the default
> storage engine, but not have caching enabled.
> 
> There is also an orthogonal concern that Guozhang alluded to If you
> want to plug in a custom storage engine and you want it to be logged etc,
> you would currently need to implement that yourself. Ideally we can provide
> a way where we will wrap the custom store with logging, metrics, etc. I
> need to think about where this fits, it is probably more appropriate on the
> Stores API.
> 
> final KeyValueMapper keyMapper = null;
> // count with mapped key
> final KTable count = stream.grouped()
> .withKeyMapper(keyMapper)
> .withKeySerde(Serdes.Long())
> .withValueSerde(Serdes.String())
> .withQueryableName("my-store")
> .count();
> 
> // windowed count
> final KTable windowedCount = stream.grouped()
> .withQueryableName("my-window-store")
> .windowed(TimeWindows.of(10L).until(10))
> .count();
> 
> // windowed reduce
> final Reducer windowedReducer = null;
> final KTable windowedReduce = stream.grouped()
> .withQueryableName("my-window-store")
> .windowed(TimeWindows.of(10L).until(10))
> .reduce(windowedReducer);
> 
> final Aggregator aggregator = null;
> final Initializer init = null;
> 
> // aggregate
> final KTable aggregate = stream.grouped()
> .withQueryableName("my-aggregate-store")
> .aggregate(aggregator, init, Serdes.Long());
> 
> final StateStoreSupplier> stateStoreSupplier = 
> null;
> 
> // aggregate with custom store
> final KTable aggWithCustomStore = stream.grouped()
> .withStateStoreSupplier(stateStoreSupplier)
> .aggregate(aggregator, init);
> 
> // disable caching
> stream.grouped()
> .withQueryableName("name")
> .withCachingEnabled(false)
> .count();
> 
> // disable logging
> stream.grouped()
> .withQueryableName("q")
> .withLoggingEnabled(false)
> .count();
> 
> // override log config
> final Reducer reducer = null;
> stream.grouped()
> .withLogConfig(Collections.singletonMap("segment.size", "10"))
> .reduce(reducer);
> 
> 
> If anyone wants to play around with this you can find the code here:
> https://github.com/dguy/kafka/tree/dsl-experiment
> 
> Note: It won't actually work as most of the methods just return null.
> 
> Thanks,
> Damian
> 
> 
> On Thu, 22 Jun 2017 at 11:18 Ismael Juma  wrote:
> 
>> Thanks Damian. I think both options have pros and cons. And both are better
>> than overload abuse.
>>
>> The fluent API approach reads better, no mention of builder or build
>> 

[jira] [Created] (KAFKA-5509) add gradle option for building tarball with test jars

2017-06-24 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-5509:
---

 Summary: add gradle option for building tarball with test jars
 Key: KAFKA-5509
 URL: https://issues.apache.org/jira/browse/KAFKA-5509
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Eno Thereska
Priority: Minor
 Fix For: 0.11.1.0


Currently we can build releaseTarGz however the resulting tarball does not have 
the test jars. Would be good to have another option releaseAllTestTarGz that 
includes the test jars too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #3420: KAFKA-5506: Fix NPE in OffsetFetchRequest.toString

2017-06-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3420


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-5487) Rolling upgrade test for streams

2017-06-24 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5487.
---
   Resolution: Fixed
Fix Version/s: (was: 0.11.0.1)
   0.11.1.0

Issue resolved by pull request 3411
[https://github.com/apache/kafka/pull/3411]

> Rolling upgrade test for streams
> 
>
> Key: KAFKA-5487
> URL: https://issues.apache.org/jira/browse/KAFKA-5487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> We need to do a basic rolling upgrade test for streams, similar to the 
> tests/kafkatest/tests/core/upgrade_test.py test for Kafka core. Basically we 
> need to test the ability of a streams app to use a different JAR from a 
> different version.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #3411: KAFKA-5487: upgrade and downgrade streams app syst...

2017-06-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3411


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #3424: MINOR: Typo in comments makes debug confusing

2017-06-24 Thread enothereska
Github user enothereska closed the pull request at:

https://github.com/apache/kafka/pull/3424


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---