[jira] [Resolved] (KAFKA-5242) add max_number _of_retries to exponential backoff strategy

2017-08-17 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5242.
---
Resolution: Won't Do

This has been fixed by KAFKA-5152 which has already been merged to 0.11.0.1 
already

> add max_number _of_retries to exponential backoff strategy
> --
>
> Key: KAFKA-5242
> URL: https://issues.apache.org/jira/browse/KAFKA-5242
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 0.10.2.2, 0.11.0.1
>
> Attachments: clio_170511.log
>
>
> From time to time, during relabance we are getting a lot of exceptions saying 
> {code}
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the 
> state directory: /app/db/clio/0_0
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>  [kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>  [kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>  [kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>  [kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>  [kafka-clients-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>  [kafka-clients-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [kafka-clients-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>  [kafka-clients-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>  [kafka-clients-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [kafka-clients-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>  [kafka-streams-0.10.2.0.jar!/:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>  [kafka-streams-0.10.2.0.jar!/:?]
> {code}
> (see attached logfile)
> It was actually problem on our side - we ran startStreams() twice and 
> therefore we had two threads touching the same folder structure. 
> But what I've noticed, the backoff strategy in 
> StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after 
> 20 iterations it takes 6hours until the next attempt to start a task. 
> I've noticed latest code contains check for rebalanceTimeoutMs, but that 
> still does not solve the problem especially in case 
> MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka 
> streams just hangs up indefinitely.
> I would personally make that backoffstrategy a bit more configurable with a 
> number of retries that if it exceed a configured value it propagates the 
> exception as any other exception to custom client exception handler.
> (I can provide a patch)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] 0.11.0.1 bug fix release

2017-08-17 Thread Damian Guy
Hi Srikanth,
Optimistically i'm aiming for end of next week. Though it depends on how
quickly the outstanding issues are closed and any other blockers that arise.

Thanks,
Damian

On Thu, 17 Aug 2017 at 07:59 Srikanth Sampath <ssampath.apa...@gmail.com>
wrote:

> Thanks Damian.  What's the ballpark when 0.11.0.1 will be available?
> -Srikanth
>
> On Wed, Aug 16, 2017 at 5:59 PM, Damian Guy <damian@gmail.com> wrote:
>
> > Hi,
> >
> > It seems like it must be time for 0.11.0.1 bug fix release!
> >
> > Since the 0.11.0.0 release we've fixed 30 JIRAs that
> > are targeted for 0.11.0.1:
> >
> > https://issues.apache.org/jira/browse/KAFKA-5700?jql=
> > project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%
> > 20AND%20fixVersion%20%3D%200.11.0.1%20ORDER%20BY%
> > 20priority%20DESC%2C%20key%20DESC
> >
> > We have 15 outstanding issues that are targeted at 0. <http://0.10.2.1/>
> > 11.0.1:
> >
> > https://issues.apache.org/jira/browse/KAFKA-5567?jql=
> > project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%
> > 20fixVersion%20%3D%200.11.0.1%20ORDER%20BY%20priority%
> > 20DESC%2C%20key%20DESC
> >
> > Can the owners of the remaining issues please resolve them or move them
> to
> > a future release.
> >
> > As soon as the remaining tasks for 0.11.0.1 reaches zero i'll create the
> > first RC.
> >
> > Thanks,
> > Damian
> >
>


[DISCUSS] 0.11.0.1 bug fix release

2017-08-16 Thread Damian Guy
Hi,

It seems like it must be time for 0.11.0.1 bug fix release!

Since the 0.11.0.0 release we've fixed 30 JIRAs that
are targeted for 0.11.0.1:

https://issues.apache.org/jira/browse/KAFKA-5700?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%200.11.0.1%20ORDER%20BY%20priority%20DESC%2C%20key%20DESC

We have 15 outstanding issues that are targeted at 0. 
11.0.1:

https://issues.apache.org/jira/browse/KAFKA-5567?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20fixVersion%20%3D%200.11.0.1%20ORDER%20BY%20priority%20DESC%2C%20key%20DESC

Can the owners of the remaining issues please resolve them or move them to
a future release.

As soon as the remaining tasks for 0.11.0.1 reaches zero i'll create the
first RC.

Thanks,
Damian


[jira] [Resolved] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead

2017-08-16 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5440.
---
Resolution: Duplicate

> Kafka Streams report state RUNNING even if all threads are dead
> ---
>
> Key: KAFKA-5440
> URL: https://issues.apache.org/jira/browse/KAFKA-5440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Matthias J. Sax
> Fix For: 0.11.0.1, 1.0.0
>
>
> From the mailing list:
> {quote}
> Hi All,
> We recently implemented a health check for a Kafka Streams based application. 
> The health check is simply checking the state of Kafka Streams by calling 
> KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or 
> NOT_RUNNING states. 
> We truly appreciate having the possibility to easily check the state of Kafka 
> Streams but to our surprise we noticed that KafkaStreams.state() returns 
> RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING 
> state. Is this intended behaviour or is it a bug? Semantically it seems weird 
> to me that KafkaStreams would say it’s RUNNING when it is in fact not 
> consuming anything since all underlying working threads has crashed. 
> If this is intended behaviour I would appreciate an explanation of why that 
> is the case. Also in that case, how could I determine if the consumption from 
> Kafka hasn’t crashed? 
> If this is not intended behaviour, how fast could I expect it to be fixed? I 
> wouldn’t mind fixing it myself but I’m not sure if this is considered trivial 
> or big enough to require a JIRA. Also, if I would implement a fix I’d like 
> your input on what would be a reasonable solution. By just inspecting to code 
> I have an idea but I’m not sure I understand all the implication so I’d be 
> happy to hear your thoughts first. 
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4643) Improve test coverage of StreamsKafkaClient

2017-08-14 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-4643.
---
   Resolution: Fixed
Fix Version/s: 1.0.0

Issue resolved by pull request 3663
[https://github.com/apache/kafka/pull/3663]

> Improve test coverage of StreamsKafkaClient
> ---
>
> Key: KAFKA-4643
> URL: https://issues.apache.org/jira/browse/KAFKA-4643
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>        Reporter: Damian Guy
> Fix For: 1.0.0
>
>
> Exception paths not tested.
> {{getTopicMetadata}} not tested



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-08-10 Thread Damian Guy
> Got it, thanks.
>
> Does it still make sense to have one static constructors for each spec,
> with one constructor having only one parameter to make it more usable, i.e.
> as a user I do not need to give all parameters if I only want to override
> one of them? Maybe we can just name the constructors as `with` but I'm not
> sure if Java distinguish:
>
> public static <K, V> Produced<K, V> with(final Serde keySerde)
> public static <K, V> Produced<K, V> with(final Serde valueSerde)
>
> as two function signatures.
>
>
No that won't work. That is why we have all options, i.e., on Produce
public static <K, V> Produced<K, V> with(final Serde keySerde,
final Serde
valueSerde)
public static <K, V> Produced<K, V> with(final StreamPartitioner<K, V>
partitioner, final Serde keySerde, final Serde valueSerde)
public static <K, V> Produced<K, V> keySerde(final Serde keySerde)
public static <K, V> Produced<K, V> valueSerde(final Serde valueSerde)
public static <K, V> Produced<K, V> streamPartitioner(final
StreamPartitioner<K,
V> partitioner)

So if you only want to use one you can just use the function that takes one
argument.

>
> Guozhang
>
>
> On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy <damian@gmail.com> wrote:
>
> > On Tue, 8 Aug 2017 at 20:11 Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Damian,
> > >
> > > Thanks for the proposal, I had a few comments on the APIs:
> > >
> > > 1. Printed#withFile seems not needed, as users should always spec if it
> > is
> > > to sysOut or to File at the beginning. In addition as a second
> thought, I
> > > think serdes are not useful for prints anyways since we assume
> `toString`
> > > is provided except for byte arrays, in which we will special handle it.
> > >
> > >
> > +1
> >
> >
> > > Another comment about Printed in general is it differs with other
> options
> > > that it is a required option than optional one, since it includes
> > toSysOut
> > > / toFile specs; what are the pros and cons for including these two in
> the
> > > option and hence make it a required option than leaving them at the API
> > > layer and make Printed as optional for mapper / label only?
> > >
> > >
> > It isn't required as we will still have the no-arg print() which will
> just
> > go to sysout as it does now.
> >
> >
> > >
> > > 2.1 KStream#through / to
> > >
> > > We should have an overloaded function without Produced?
> > >
> >
> > Yes - we already have those so they are not part of the KIP, i.e,
> > through(topic)
> >
> >
> > >
> > > 2.2 KStream#groupBy / groupByKey
> > >
> > > We should have an overloaded function without Serialized?
> > >
> >
> > Yes, as above
> >
> > >
> > > 2.3 KGroupedStream#count / reduce / aggregate
> > >
> > > We should have an overloaded function without Materialized?
> > >
> >
> > As above
> >
> > >
> > > 2.4 KStream#join
> > >
> > > We should have an overloaded function without Joined?
> > >
> >
> > as above
> >
> > >
> > >
> > > 2.5 Each of KTable's operators:
> > >
> > > We should have an overloaded function without Produced / Serialized /
> > > Materialized?
> > >
> > >
> > as above
> >
> >
> > >
> > >
> > > 3.1 Produced: the static functions have overlaps, which seems not
> > > necessary. I'd suggest jut having the following three static with
> another
> > > three similar member functions:
> > >
> > > public static <K, V> Produced<K, V> withKeySerde(final Serde
> keySerde)
> > >
> > > public static <K, V> Produced<K, V> withValueSerde(final Serde
> > > valueSerde)
> > >
> > > public static <K, V> Produced<K, V> withStreamPartitioner(final
> > > StreamPartitioner<K, V> partitioner)
> > >
> > > The key idea is that by using the same function name string for static
> > > constructor and member functions, users do not need to remember what
> are
> > > the differences but can call these functions with any ordering they
> want,
> > > and later calls on the same spec will win over early calls.
> > >
> > >
> > That would be great if java supported it, but it doesn't. You can't have

Re: [DISCUSS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-08-09 Thread Damian Guy
On Wed, 9 Aug 2017 at 20:00 Guozhang Wang <wangg...@gmail.com> wrote:

> >> Another comment about Printed in general is it differs with other
> options
> >> that it is a required option than optional one, since it includes
> toSysOut
> >> / toFile specs; what are the pros and cons for including these two in
> the
> >> option and hence make it a required option than leaving them at the API
> >> layer and make Printed as optional for mapper / label only?
> >>
> >>
> >It isn't required as we will still have the no-arg print() which will just
> >go to sysout as it does now.
>
> Got it. So just to clarify are we going to deprecate writeAsText or not?
>
>
Correct.


>
> On Wed, Aug 9, 2017 at 11:38 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > >> The key idea is that by using the same function name string for static
> > >> constructor and member functions, users do not need to remember what
> > are
> > >> the differences but can call these functions with any ordering they
> > want,
> > >> and later calls on the same spec will win over early calls.
> > >>
> > >>
> > >That would be great if java supported it, but it doesn't. You can't have
> > >static an member functions with the same signature.
> >
> > Got it, thanks.
> >
> > Does it still make sense to have one static constructors for each spec,
> > with one constructor having only one parameter to make it more usable,
> i.e.
> > as a user I do not need to give all parameters if I only want to override
> > one of them? Maybe we can just name the constructors as `with` but I'm
> not
> > sure if Java distinguish:
> >
> > public static <K, V> Produced<K, V> with(final Serde keySerde)
> > public static <K, V> Produced<K, V> with(final Serde valueSerde)
> >
> > as two function signatures.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy <damian@gmail.com> wrote:
> >
> >> On Tue, 8 Aug 2017 at 20:11 Guozhang Wang <wangg...@gmail.com> wrote:
> >>
> >> > Damian,
> >> >
> >> > Thanks for the proposal, I had a few comments on the APIs:
> >> >
> >> > 1. Printed#withFile seems not needed, as users should always spec if
> it
> >> is
> >> > to sysOut or to File at the beginning. In addition as a second
> thought,
> >> I
> >> > think serdes are not useful for prints anyways since we assume
> >> `toString`
> >> > is provided except for byte arrays, in which we will special handle
> it.
> >> >
> >> >
> >> +1
> >>
> >>
> >> > Another comment about Printed in general is it differs with other
> >> options
> >> > that it is a required option than optional one, since it includes
> >> toSysOut
> >> > / toFile specs; what are the pros and cons for including these two in
> >> the
> >> > option and hence make it a required option than leaving them at the
> API
> >> > layer and make Printed as optional for mapper / label only?
> >> >
> >> >
> >> It isn't required as we will still have the no-arg print() which will
> just
> >> go to sysout as it does now.
> >>
> >>
> >> >
> >> > 2.1 KStream#through / to
> >> >
> >> > We should have an overloaded function without Produced?
> >> >
> >>
> >> Yes - we already have those so they are not part of the KIP, i.e,
> >> through(topic)
> >>
> >>
> >> >
> >> > 2.2 KStream#groupBy / groupByKey
> >> >
> >> > We should have an overloaded function without Serialized?
> >> >
> >>
> >> Yes, as above
> >>
> >> >
> >> > 2.3 KGroupedStream#count / reduce / aggregate
> >> >
> >> > We should have an overloaded function without Materialized?
> >> >
> >>
> >> As above
> >>
> >> >
> >> > 2.4 KStream#join
> >> >
> >> > We should have an overloaded function without Joined?
> >> >
> >>
> >> as above
> >>
> >> >
> >> >
> >> > 2.5 Each of KTable's operators:
> >> >
> >> > We should have an overloaded function without Produced / Serialized /
> >> > Materialized?
> >> >
>

[jira] [Resolved] (KAFKA-5717) [streams] 'null' values in state stores

2017-08-09 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5717.
---
   Resolution: Fixed
Fix Version/s: 0.11.0.1
   1.0.0

Issue resolved by pull request 3650
[https://github.com/apache/kafka/pull/3650]

> [streams] 'null' values in state stores
> ---
>
> Key: KAFKA-5717
> URL: https://issues.apache.org/jira/browse/KAFKA-5717
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Bart Vercammen
>Assignee: Damian Guy
> Fix For: 1.0.0, 0.11.0.1
>
>
> When restoring the state on an in-memory KeyValue store (at startup of the 
> Kafka Streams application), the _deleted_ values are put in the store as 
> _key_ with _value_ {{null}} instead of being removed from the store.
> (this happens when the underlying kafka topic segment did not get compacted 
> yet)
> After some digging I came across this in {{InMemoryKeyValueStore<K, V>}}:
> {code}
> public synchronized void put(K key, V value) {
> this.map.put(key, value);
> }
> {code}
> I would assume this implementation misses the check on {{value}} being 
> {{null}} to *delete* the entry instead of just storing it.
> In the RocksDB implementation it is done correctly:
> {code}
> if (rawValue == null) {
> try {
> db.delete(wOptions, rawKey);
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-08-09 Thread Damian Guy
On Tue, 8 Aug 2017 at 20:11 Guozhang Wang <wangg...@gmail.com> wrote:

> Damian,
>
> Thanks for the proposal, I had a few comments on the APIs:
>
> 1. Printed#withFile seems not needed, as users should always spec if it is
> to sysOut or to File at the beginning. In addition as a second thought, I
> think serdes are not useful for prints anyways since we assume `toString`
> is provided except for byte arrays, in which we will special handle it.
>
>
+1


> Another comment about Printed in general is it differs with other options
> that it is a required option than optional one, since it includes toSysOut
> / toFile specs; what are the pros and cons for including these two in the
> option and hence make it a required option than leaving them at the API
> layer and make Printed as optional for mapper / label only?
>
>
It isn't required as we will still have the no-arg print() which will just
go to sysout as it does now.


>
> 2.1 KStream#through / to
>
> We should have an overloaded function without Produced?
>

Yes - we already have those so they are not part of the KIP, i.e,
through(topic)


>
> 2.2 KStream#groupBy / groupByKey
>
> We should have an overloaded function without Serialized?
>

Yes, as above

>
> 2.3 KGroupedStream#count / reduce / aggregate
>
> We should have an overloaded function without Materialized?
>

As above

>
> 2.4 KStream#join
>
> We should have an overloaded function without Joined?
>

as above

>
>
> 2.5 Each of KTable's operators:
>
> We should have an overloaded function without Produced / Serialized /
> Materialized?
>
>
as above


>
>
> 3.1 Produced: the static functions have overlaps, which seems not
> necessary. I'd suggest jut having the following three static with another
> three similar member functions:
>
> public static <K, V> Produced<K, V> withKeySerde(final Serde keySerde)
>
> public static <K, V> Produced<K, V> withValueSerde(final Serde
> valueSerde)
>
> public static <K, V> Produced<K, V> withStreamPartitioner(final
> StreamPartitioner<K, V> partitioner)
>
> The key idea is that by using the same function name string for static
> constructor and member functions, users do not need to remember what are
> the differences but can call these functions with any ordering they want,
> and later calls on the same spec will win over early calls.
>
>
That would be great if java supported it, but it doesn't. You can't have
static an member functions with the same signature.


>
> 3.2 Serialized: similarly
>
> public static <K, V> Serialized<K, V> withKeySerde(final Serde keySerde)
>
> public static <K, V> Serialized<K, V> withValueSerde(final Serde
> valueSerde)
>
> public Serialized<K, V> withKeySerde(final Serde keySerde)
>
> public Serialized<K, V> withValueSerde(final Serde valueSerde)
>

as above


>
> Also it has a final Serde otherValueSerde in one of its static
> constructor, it that intentional?
>

Nope: thanks.

>
> 3.3. Joined: similarly, keep the static constructor signatures the same as
> its corresponding member fields.
>
>
As above


> 3.4 Materialized: it is a bit special, and I think we can keep its static
> constructors with only two `as` as they are today.K
>
>
4. Is there any modifications on StateStoreSupplier? Is it replaced by
> BytesStoreSupplier? Seems some more descriptions are lacking here. Also in
>
>
No modifications to StateStoreSupplier. It is superseceded by
BytesStoreSupplier.



> public static <K, V, S extends StateStore> Materialized<K, V, S>
> as(final StateStoreSupplier
> supplier)
>
> Is the parameter in type of BytesStoreSupplier?
>

Yep - thanks


>
>
>
>
> Guozhang
>
>
> On Thu, Jul 27, 2017 at 5:26 AM, Damian Guy <damian@gmail.com> wrote:
>
> > Updated link:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+
> > use+of+custom+storage+engines
> >
> > Thanks,
> > Damian
> >
> > On Thu, 27 Jul 2017 at 13:09 Damian Guy <damian@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I've put together a KIP to make some changes to the KafkaStreams DSL
> that
> > > will hopefully allow us to:
> > > 1) reduce the explosion of overloads
> > > 2) add new features without having to continue adding more overloads
> > > 3) provide simpler ways for people to use custom storage engines and
> wrap
> > > them with logging, caching etc if desired
> > > 4) enable per-operator caching rather than global caching without
> having
> > > to resort to supplying a StateStoreSupplier when you just want to turn
> > > caching off.
> > >
> > > The KIP is here:
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=73631309
> > >
> > > Thanks,
> > > Damian
> > >
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Created] (KAFKA-5702) Refactor StreamThread to separate concerns and enable better testability

2017-08-04 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5702:
-

 Summary: Refactor StreamThread to separate concerns and enable 
better testability
 Key: KAFKA-5702
 URL: https://issues.apache.org/jira/browse/KAFKA-5702
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Damian Guy
Assignee: Damian Guy


{{StreamThread}} does a lot of stuff, i.e., managing and creating tasks, 
getting data from consumers, updating standby tasks, punctuating, rebalancing 
etc. With the current design it is extremely hard to reason about and is quite 
tightly coupled. 
We need to start to tease out some of the separate concerns from StreamThread, 
ie, TaskManager, RebalanceListener etc. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: KStreamPrintTest : no differences in the unit tests

2017-08-02 Thread Damian Guy
Yes - they are basically the same. Feel free to submit a patch to remove
one of them

On Wed, 2 Aug 2017 at 15:28 Paolo Patierno  wrote:

> Hi devs,
>
> taking a look at KStreamPrintTest I can't find any substantial difference
> between the two tests :
>
>
> testPrintStreamWithProvidedKeyValueMapper
>
> testPrintKeyValueWithName
>
>
> The only tiny difference is the mapper output "%d, %s" instead of "(%d,
> %s)" but then all the code seems to be the same (some other "final" missing
> ...).
>
> Is there something that my eyes can't see ?
>
>
> Thanks,
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
>


[jira] [Created] (KAFKA-5689) Refactor WindowStore hierarchy so that Metered Store is the outermost store

2017-08-01 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5689:
-

 Summary: Refactor  WindowStore hierarchy so that Metered Store is 
the outermost store
 Key: KAFKA-5689
 URL: https://issues.apache.org/jira/browse/KAFKA-5689
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Damian Guy
Assignee: Damian Guy


MeteredWinowStore is currently not the outermost store. Further it needs to 
have the inner store as <Bytes, bytes[]> to allow easy plugability of custom 
storage engines.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-08-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5154.
---
Resolution: Fixed

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>Assignee: Damian Guy
> Attachments: 5154_problem.log, clio_afa596e9b809.gz, clio_reduced.gz, 
> clio.txt.gz
>
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:46:37,254 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T18:04:25,993 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T18:04:29,401 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T18:05:10,877 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-05-01T00:01:55,707 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-05-01T00:01:59,027 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-05-01T00:01:59,031 ERROR StreamThread-1 
> org.apache.kafka.streams.processor.internals.StreamThread.run() @376 - 
> stream-thread [StreamThread-1] Streams application error during processing:
>  java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>   at 
> or

Re: Kafka Streams debugging with "no fluent" API choice

2017-08-01 Thread Damian Guy
Yeah, right. Sorry i missed that. The JIRA doesn't need a KIP

On Tue, 1 Aug 2017 at 15:20 Paolo Patierno <ppatie...@live.com> wrote:

> Hi Damian,
>
>
> changing the print() method for sure needs a KIP but I guess there is some
> reason we don't know why they decided to not have a fluent API for that.
>
> Regarding my JIRA I don't think a KIP is required, it's just internal
> stuff ... no ?
>
>
> Thanks
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>
>
> 
> From: Damian Guy <damian@gmail.com>
> Sent: Tuesday, August 1, 2017 2:11 PM
> To: dev@kafka.apache.org
> Subject: Re: Kafka Streams debugging with "no fluent" API choice
>
> Hi Paolo,
>
> The change would require a KIP as it is a public API change.  I don't see
> any harm in making the change, but I also don't think it is that difficult
> to use peek to achieve the same thing.
>
> Thanks,
> Damian
>
> On Tue, 1 Aug 2017 at 13:52 Paolo Patierno <ppatie...@live.com> wrote:
>
> > Thanks Damian,
> >
> >
> > I knew about that but you have to write the code for printing by
> yourself.
> > Of course you can do that even with the print() without using the default
> > keyvalue mapper but passing a custom one.
> >
> > At same time if you want to print you should use a Serdes for key and
> > value if they are bytes and it's something which happens for free with
> > print() passing Serdes instances.
> >
> >
> > Another point is ...
> >
> > as both foreach() and peek() methods relies on the KStreamPeek node, it
> > could be the same for the print() method which uses a KStreamPrint that
> is
> > a special KStreamPeek with forwardDownStream = false and providing the
> > usage of Serdes. For this I have opened the following JIRA :
> >
> >
> > https://issues.apache.org/jira/browse/KAFKA-5684
> >
> >
> > What do you think ?
> >
> >
> > Thanks
> >
> >
> > Paolo Patierno
> > Senior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoT
> > Microsoft Azure Advisor
> >
> > Twitter : @ppatierno<http://twitter.com/ppatierno>
> > Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> > Blog : DevExperience<http://paolopatierno.wordpress.com/>
> >
> >
> > 
> > From: Damian Guy <damian@gmail.com>
> > Sent: Tuesday, August 1, 2017 12:11 PM
> > To: dev@kafka.apache.org
> > Subject: Re: Kafka Streams debugging with "no fluent" API choice
> >
> > I don't know specifically why this is removed, however if you want to get
> > the same functionality you can use peek, i.e:
> >
> > stream.map(...).peek(...).filter(..)
> >
> > You can log the key values out in the peek call.
> >
> > On Tue, 1 Aug 2017 at 11:48 Paolo Patierno <ppatie...@live.com> wrote:
> >
> > > Hi guys,
> > >
> > >
> > > I was thinking about Kafka Streams debug features and why the print()
> > > method overloads didn't return a KStream, in order to have a fluent DSL
> > > construction even during debugging, but just void.
> > >
> > > Then I came across this PR :
> > >
> > >
> > > https://github.com/apache/kafka/pull/1187
> > >
> > >
> > > May I ask why the form :
> > >
> > >
> > > stream1 = source.map(...).filter(...);
> > > stream1.print();   // this is for debugging, deleted before moving to
> > > productiong
> > >
> > > stream2 = stream1.transform(...);
> > > stream2.print();   // this is for debugging, deleted before moving to
> > > productiong
> > >
> > > was considered better then the fluent one :
> > >
> > >
> > > source.map(...).filter(...)
> > >   .print()   // this is for debugging, deleted before moving to
> > > productiong
> > >   .transform(...)
> > >   .print();   // this is for debugging, deleted before moving
> to
> > > productiong
> > >
> > >
> > > In this first case the user has to break the topology for printing and
> > > after that, when the code works fine, he has to rewrite the code for
> > > avoiding these broken processors having a fluent construction.
> > >
> > > In the second solution, the user has just to remove the print() calls
> > > without touching the other parts of the code.
> > >
> > > I really liked the first solution (it was something I was going to
> > propose
> > > before coming across the PR :-)).
> > >
> > >
> > > Why this preference on the first one ?
> > >
> > >
> > > Thanks
> > >
> > >
> > >
> > > Paolo Patierno
> > > Senior Software Engineer (IoT) @ Red Hat
> > > Microsoft MVP on Windows Embedded & IoT
> > > Microsoft Azure Advisor
> > >
> > > Twitter : @ppatierno<http://twitter.com/ppatierno>
> > > Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> > > Blog : DevExperience<http://paolopatierno.wordpress.com/>
> > >
> >
>


Re: Kafka Streams debugging with "no fluent" API choice

2017-08-01 Thread Damian Guy
Hi Paolo,

The change would require a KIP as it is a public API change.  I don't see
any harm in making the change, but I also don't think it is that difficult
to use peek to achieve the same thing.

Thanks,
Damian

On Tue, 1 Aug 2017 at 13:52 Paolo Patierno <ppatie...@live.com> wrote:

> Thanks Damian,
>
>
> I knew about that but you have to write the code for printing by yourself.
> Of course you can do that even with the print() without using the default
> keyvalue mapper but passing a custom one.
>
> At same time if you want to print you should use a Serdes for key and
> value if they are bytes and it's something which happens for free with
> print() passing Serdes instances.
>
>
> Another point is ...
>
> as both foreach() and peek() methods relies on the KStreamPeek node, it
> could be the same for the print() method which uses a KStreamPrint that is
> a special KStreamPeek with forwardDownStream = false and providing the
> usage of Serdes. For this I have opened the following JIRA :
>
>
> https://issues.apache.org/jira/browse/KAFKA-5684
>
>
> What do you think ?
>
>
> Thanks
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>
>
> 
> From: Damian Guy <damian@gmail.com>
> Sent: Tuesday, August 1, 2017 12:11 PM
> To: dev@kafka.apache.org
> Subject: Re: Kafka Streams debugging with "no fluent" API choice
>
> I don't know specifically why this is removed, however if you want to get
> the same functionality you can use peek, i.e:
>
> stream.map(...).peek(...).filter(..)
>
> You can log the key values out in the peek call.
>
> On Tue, 1 Aug 2017 at 11:48 Paolo Patierno <ppatie...@live.com> wrote:
>
> > Hi guys,
> >
> >
> > I was thinking about Kafka Streams debug features and why the print()
> > method overloads didn't return a KStream, in order to have a fluent DSL
> > construction even during debugging, but just void.
> >
> > Then I came across this PR :
> >
> >
> > https://github.com/apache/kafka/pull/1187
> >
> >
> > May I ask why the form :
> >
> >
> > stream1 = source.map(...).filter(...);
> > stream1.print();   // this is for debugging, deleted before moving to
> > productiong
> >
> > stream2 = stream1.transform(...);
> > stream2.print();   // this is for debugging, deleted before moving to
> > productiong
> >
> > was considered better then the fluent one :
> >
> >
> > source.map(...).filter(...)
> >   .print()   // this is for debugging, deleted before moving to
> > productiong
> >   .transform(...)
> >   .print();   // this is for debugging, deleted before moving to
> > productiong
> >
> >
> > In this first case the user has to break the topology for printing and
> > after that, when the code works fine, he has to rewrite the code for
> > avoiding these broken processors having a fluent construction.
> >
> > In the second solution, the user has just to remove the print() calls
> > without touching the other parts of the code.
> >
> > I really liked the first solution (it was something I was going to
> propose
> > before coming across the PR :-)).
> >
> >
> > Why this preference on the first one ?
> >
> >
> > Thanks
> >
> >
> >
> > Paolo Patierno
> > Senior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoT
> > Microsoft Azure Advisor
> >
> > Twitter : @ppatierno<http://twitter.com/ppatierno>
> > Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> > Blog : DevExperience<http://paolopatierno.wordpress.com/>
> >
>


Re: Kafka Streams debugging with "no fluent" API choice

2017-08-01 Thread Damian Guy
I don't know specifically why this is removed, however if you want to get
the same functionality you can use peek, i.e:

stream.map(...).peek(...).filter(..)

You can log the key values out in the peek call.

On Tue, 1 Aug 2017 at 11:48 Paolo Patierno  wrote:

> Hi guys,
>
>
> I was thinking about Kafka Streams debug features and why the print()
> method overloads didn't return a KStream, in order to have a fluent DSL
> construction even during debugging, but just void.
>
> Then I came across this PR :
>
>
> https://github.com/apache/kafka/pull/1187
>
>
> May I ask why the form :
>
>
> stream1 = source.map(...).filter(...);
> stream1.print();   // this is for debugging, deleted before moving to
> productiong
>
> stream2 = stream1.transform(...);
> stream2.print();   // this is for debugging, deleted before moving to
> productiong
>
> was considered better then the fluent one :
>
>
> source.map(...).filter(...)
>   .print()   // this is for debugging, deleted before moving to
> productiong
>   .transform(...)
>   .print();   // this is for debugging, deleted before moving to
> productiong
>
>
> In this first case the user has to break the topology for printing and
> after that, when the code works fine, he has to rewrite the code for
> avoiding these broken processors having a fluent construction.
>
> In the second solution, the user has just to remove the print() calls
> without touching the other parts of the code.
>
> I really liked the first solution (it was something I was going to propose
> before coming across the PR :-)).
>
>
> Why this preference on the first one ?
>
>
> Thanks
>
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
>


[jira] [Created] (KAFKA-5680) Don't materliaze physical state stores in KTable filter/map etc operations

2017-07-31 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5680:
-

 Summary: Don't materliaze physical state stores in KTable 
filter/map etc operations
 Key: KAFKA-5680
 URL: https://issues.apache.org/jira/browse/KAFKA-5680
 Project: Kafka
  Issue Type: Bug
Reporter: Damian Guy


Presently, for IQ, we will materialize physical state stores for 
{{KTable#filter}} {{KTable#mapValues}} etc operations if the user provides a 
{{queryableStoreName}}. This results in changelog topics, memory, disk space 
that we can avoid by providing a view on the original state store.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5673) Refactor KeyValueStore hierarchy so that MeteredKeyValueStore is the outermost store

2017-07-28 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5673:
-

 Summary: Refactor KeyValueStore hierarchy so that 
MeteredKeyValueStore is the outermost store
 Key: KAFKA-5673
 URL: https://issues.apache.org/jira/browse/KAFKA-5673
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy


MeteredKeyValueStore is currently not the outermost store. Further it needs to 
have the inner store as {{<Bytes, bytes[]>}} to allow easy plugability of 
custom storage engines. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5672) Move measureLatencyNs from StreamsMetricsImpl to StreamsMetrics

2017-07-28 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5672:
-

 Summary: Move measureLatencyNs from StreamsMetricsImpl to 
StreamsMetrics 
 Key: KAFKA-5672
 URL: https://issues.apache.org/jira/browse/KAFKA-5672
 Project: Kafka
  Issue Type: Bug
Reporter: Damian Guy


StreamsMetricsImpl currently has the method {{measureLatencyNs}} but it is not 
on {{StreamsMetrics} - this should be moved to the interface so we can stop 
depending on the impl. Further, the {{Runnable}} argument passed to 
{{measureLatencyNs}} should be changed to some functional interface that can 
also return a value.

As this is a public API change it will require a KIP



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5549) Explain that `client.id` is just used as a prefix within Streams

2017-07-27 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5549.
---
   Resolution: Fixed
Fix Version/s: 1.0.0

Issue resolved by pull request 3544
[https://github.com/apache/kafka/pull/3544]

> Explain that `client.id` is just used as a prefix within Streams
> 
>
> Key: KAFKA-5549
> URL: https://issues.apache.org/jira/browse/KAFKA-5549
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 1.0.0
>
>
> We should explain, that {{client.id}} is used as a prefix for internal 
> consumer, producer, and restore-consumer and not reuse 
> {{CommonClientConfigs.CLIENT_ID_DOC}} within {{StreamsConfig}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-07-27 Thread Damian Guy
Updated link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines

Thanks,
Damian

On Thu, 27 Jul 2017 at 13:09 Damian Guy <damian@gmail.com> wrote:

> Hi,
>
> I've put together a KIP to make some changes to the KafkaStreams DSL that
> will hopefully allow us to:
> 1) reduce the explosion of overloads
> 2) add new features without having to continue adding more overloads
> 3) provide simpler ways for people to use custom storage engines and wrap
> them with logging, caching etc if desired
> 4) enable per-operator caching rather than global caching without having
> to resort to supplying a StateStoreSupplier when you just want to turn
> caching off.
>
> The KIP is here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73631309
>
> Thanks,
> Damian
>


[DISCUSS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-07-27 Thread Damian Guy
Hi,

I've put together a KIP to make some changes to the KafkaStreams DSL that
will hopefully allow us to:
1) reduce the explosion of overloads
2) add new features without having to continue adding more overloads
3) provide simpler ways for people to use custom storage engines and wrap
them with logging, caching etc if desired
4) enable per-operator caching rather than global caching without having to
resort to supplying a StateStoreSupplier when you just want to turn caching
off.

The KIP is here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73631309

Thanks,
Damian


[jira] [Created] (KAFKA-5655) Add new API methods to KGroupedTable

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5655:
-

 Summary: Add new API methods to KGroupedTable
 Key: KAFKA-5655
 URL: https://issues.apache.org/jira/browse/KAFKA-5655
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy


Placeholder until API finalized



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5653) Add new API methods to KTable

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5653:
-

 Summary: Add new API methods to KTable
 Key: KAFKA-5653
 URL: https://issues.apache.org/jira/browse/KAFKA-5653
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy


placeholder until API finalized



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5654) Add new API methods to KGroupedStream

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5654:
-

 Summary: Add new API methods to KGroupedStream
 Key: KAFKA-5654
 URL: https://issues.apache.org/jira/browse/KAFKA-5654
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy
Assignee: Damian Guy


Placeholder until API finalized



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5652) Add new api methods to KStream

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5652:
-

 Summary: Add new api methods to KStream
 Key: KAFKA-5652
 URL: https://issues.apache.org/jira/browse/KAFKA-5652
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy
Assignee: Damian Guy


Add new methods from KIP-182 to {{KStream}}
 until finalized



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5651) KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5651:
-

 Summary: KIP-182: Reduce Streams DSL overloads and allow easier 
use of custom storage engines
 Key: KAFKA-5651
 URL: https://issues.apache.org/jira/browse/KAFKA-5651
 Project: Kafka
  Issue Type: New Feature
Reporter: Damian Guy
Assignee: Damian Guy






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5650) Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182)

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5650:
-

 Summary: Provide a simple way for custom storage engines to use 
streams wrapped stores (KIP-182)
 Key: KAFKA-5650
 URL: https://issues.apache.org/jira/browse/KAFKA-5650
 Project: Kafka
  Issue Type: Bug
Reporter: Damian Guy
Assignee: Damian Guy


As per KIP-182:
A new interface will be added:
{code}
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder {
 
StateStoreBuilder withCachingEnabled();
StateStoreBuilder withCachingDisabled();
StateStoreBuilder withLoggingEnabled(Map<String, String> config);
StateStoreBuilder withLoggingDisabled();
T build();
}
{code}

This interface will be used to wrap stores with caching, logging etc.
Additionally some convenience methods on the {{Stores}} class:

{code}
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> 
persistentKeyValueStore(final String name,

 final Serde keySerde,

 final Serde valueSerde)
 
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> 
inMemoryKeyValueStore(final String name,

final Serde keySerde,

final Serde valueSerde)
 
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final 
String name,
final int 
capacity,
final Serde 
keySerde,
final Serde 
valueSerde)
 
public static <K, V> StateStoreSupplier<WindowStore<K, V>> 
persistentWindowStore(final String name,

final Windows windows,

final Serde keySerde,

final Serde valueSerde)
 
public static <K, V> StateStoreSupplier<SessionStore<K, V>> 
persistentSessionStore(final String name,
  
final SessionWindows windows,
  
final Serde keySerde,
  
final Serde valueSerde)
 
/**
 *  The following methods are for use with the PAPI. They allow building of 
StateStores that can be wrapped with
 *  caching, logging, and any other convenient wrappers provided by the 
KafkaStreams library
 */ 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final 
StateStoreSupplier<WindowStore<K, V>> supplier)
 
public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final 
StateStoreSupplier<KeyValueStore<K, V>> supplier)
 
public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final 
StateStoreSupplier<SessionStore<K, V>> supplier)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] KIP-167 (Addendum): Add interface for the state store restoration process

2017-07-26 Thread Damian Guy
+1

On Tue, 25 Jul 2017 at 18:17 Sriram Subramanian  wrote:

> +1
>
> On Fri, Jul 21, 2017 at 12:08 PM, Guozhang Wang 
> wrote:
>
> > +1
> >
> > On Thu, Jul 20, 2017 at 11:00 PM, Matthias J. Sax  >
> > wrote:
> >
> > > +1
> > >
> > > On 7/20/17 4:22 AM, Bill Bejeck wrote:
> > > > Hi,
> > > >
> > > > After working on the PR for this KIP I discovered that we need to add
> > and
> > > > additional parameter (TopicPartition) to the StateRestoreListener
> > > interface
> > > > methods.
> > > >
> > > > The addition of the TopicPartition is required as the
> > > StateRestoreListener
> > > > is for the entire application, thus all tasks with recovering state
> > > stores
> > > > call the same listener instance.  The TopicPartition is needed to
> > > > disambiguate the progress of the state store recovery.
> > > >
> > > > For those that have voted before, please review the updated KIP
> > > >  > > 167:+Add+interface+for+the+state+store+restoration+process>
> > > > and
> > > > re-vote.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [VOTE]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-07-21 Thread Damian Guy
Hi Jeyhun,

Feel free to close the vote. It has been accepted.

Thanks,
Damian

On Mon, 17 Jul 2017 at 06:18 Guozhang Wang <wangg...@gmail.com> wrote:

> +1. Thanks!
>
> On Sat, Jul 8, 2017 at 1:35 AM, Damian Guy <damian@gmail.com> wrote:
>
> > +1
> > On Fri, 7 Jul 2017 at 16:08, Eno Thereska <eno.there...@gmail.com>
> wrote:
> >
> > > +1 (non-binding) Thanks.
> > >
> > > Eno
> > > > On 6 Jul 2017, at 21:49, Gwen Shapira <g...@confluent.io> wrote:
> > > >
> > > > +1
> > > >
> > > > On Wed, Jul 5, 2017 at 9:25 AM Matthias J. Sax <
> matth...@confluent.io>
> > > > wrote:
> > > >
> > > >> +1
> > > >>
> > > >> On 6/27/17 1:41 PM, Jeyhun Karimov wrote:
> > > >>> Dear all,
> > > >>>
> > > >>> I would like to start the vote on KIP-149 [1].
> > > >>>
> > > >>>
> > > >>> Cheers,
> > > >>> Jeyhun
> > > >>>
> > > >>>
> > > >>> [1]
> > > >>>
> > > >>
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 149%3A+Enabling+key+access+in+ValueTransformer%2C+
> > ValueMapper%2C+and+ValueJoiner
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Resolved] (KAFKA-3741) Allow setting of default topic configs via StreamsConfig

2017-07-21 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-3741.
---
   Resolution: Fixed
Fix Version/s: 0.11.1.0

Issue resolved by pull request 3459
[https://github.com/apache/kafka/pull/3459]

> Allow setting of default topic configs via StreamsConfig
> 
>
> Key: KAFKA-3741
> URL: https://issues.apache.org/jira/browse/KAFKA-3741
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>Assignee: Damian Guy
>  Labels: api
> Fix For: 0.11.1.0
>
>
> Kafka Streams currently allows you to specify a replication factor for 
> changelog and repartition topics that it creates.  It should also allow you 
> to specify any other TopicConfig. These should be used as defaults when 
> creating Internal topics. The defaults should be overridden by any configs 
> provided by the StateStoreSuppliers etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] KIP-173: Add prefix to StreamsConfig to enable setting default internal topic configs

2017-07-21 Thread Damian Guy
Sorry, i mentioned Gouzhang twice in the vote.

Actual votes:
3 binding (Guozhang, Damian, Ismael)
2 non-binding (Eno, Matthias)

Thanks,
Damian

On Fri, 21 Jul 2017 at 10:00 Damian Guy <damian@gmail.com> wrote:

> Hi,
> The Vote for this KIP is now closed.
> KIP-173 has been accepted with
> 3 binding (Guozhang, Damian, Ismael)
> 2 non-binding (Eno, Gouzhang)
>
> Thanks,
> Damian
>
> On Thu, 20 Jul 2017 at 14:29 Ismael Juma <ism...@juma.me.uk> wrote:
>
>> Yes, I know it is a separate thread in the archives, it's just that
>> Gmail's
>> algorithm decided to place it under the same thread. This is a recurring
>> problem and it affects discoverability of vote threads for Gmail users (I
>> assume there are many such users).
>>
>> Ismael
>>
>> On Thu, Jul 20, 2017 at 1:32 AM, Damian Guy <damian@gmail.com> wrote:
>>
>> > Thanks Ismael, AFAICT it is already a separate thread:
>> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201707.mbox/%
>> > 3ccajiktexrb1hbojrm7tmy1hdm_+t+psrz_6pawhnlbokp46o...@mail.gmail.com%3e
>> >
>> > On Wed, 19 Jul 2017 at 23:24 Guozhang Wang <wangg...@gmail.com> wrote:
>> >
>> > > Per 1. I suggested exposing the constant since we are doing so for
>> > consumer
>> > > and producer configs prefix as well (CONSUMER_PREFIX, etc).
>> > >
>> > > Guozhang
>> > >
>> > > On Wed, Jul 19, 2017 at 6:01 AM, Ismael Juma <ism...@juma.me.uk>
>> wrote:
>> > >
>> > > > Thanks for the KIP, Damian. +1 (binding). A couple of minor
>> comments:
>> > > >
>> > > > 1. Do we need to expose the TOPIC_PREFIX constant?
>> > > > 2. The vote thread ended up inside the discuss thread in Gmail. It
>> may
>> > be
>> > > > worth sending another email to make it clear that the vote is
>> ongoing.
>> > > You
>> > > > can link back to this thread so that the existing votes are still
>> > > counted.
>> > > >
>> > > > Ismael
>> > > >
>> > > > On Mon, Jul 17, 2017 at 4:43 AM, Damian Guy <damian@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > I'd like to kick off the vote for KIP-173:
>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > 173%3A+Add+prefix+to+StreamsConfig+to+enable+
>> > setting+default+internal+
>> > > > > topic+configs
>> > > > >
>> > > > > A PR for this can be found here: https://github.com/apache/
>> > > > kafka/pull/3459
>> > > > >
>> > > > > Thanks,
>> > > > > Damian
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>


Re: [VOTE] KIP-173: Add prefix to StreamsConfig to enable setting default internal topic configs

2017-07-21 Thread Damian Guy
Hi,
The Vote for this KIP is now closed.
KIP-173 has been accepted with
3 binding (Guozhang, Damian, Ismael)
2 non-binding (Eno, Gouzhang)

Thanks,
Damian

On Thu, 20 Jul 2017 at 14:29 Ismael Juma <ism...@juma.me.uk> wrote:

> Yes, I know it is a separate thread in the archives, it's just that Gmail's
> algorithm decided to place it under the same thread. This is a recurring
> problem and it affects discoverability of vote threads for Gmail users (I
> assume there are many such users).
>
> Ismael
>
> On Thu, Jul 20, 2017 at 1:32 AM, Damian Guy <damian@gmail.com> wrote:
>
> > Thanks Ismael, AFAICT it is already a separate thread:
> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201707.mbox/%
> > 3ccajiktexrb1hbojrm7tmy1hdm_+t+psrz_6pawhnlbokp46o...@mail.gmail.com%3e
> >
> > On Wed, 19 Jul 2017 at 23:24 Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Per 1. I suggested exposing the constant since we are doing so for
> > consumer
> > > and producer configs prefix as well (CONSUMER_PREFIX, etc).
> > >
> > > Guozhang
> > >
> > > On Wed, Jul 19, 2017 at 6:01 AM, Ismael Juma <ism...@juma.me.uk>
> wrote:
> > >
> > > > Thanks for the KIP, Damian. +1 (binding). A couple of minor comments:
> > > >
> > > > 1. Do we need to expose the TOPIC_PREFIX constant?
> > > > 2. The vote thread ended up inside the discuss thread in Gmail. It
> may
> > be
> > > > worth sending another email to make it clear that the vote is
> ongoing.
> > > You
> > > > can link back to this thread so that the existing votes are still
> > > counted.
> > > >
> > > > Ismael
> > > >
> > > > On Mon, Jul 17, 2017 at 4:43 AM, Damian Guy <damian@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I'd like to kick off the vote for KIP-173:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 173%3A+Add+prefix+to+StreamsConfig+to+enable+
> > setting+default+internal+
> > > > > topic+configs
> > > > >
> > > > > A PR for this can be found here: https://github.com/apache/
> > > > kafka/pull/3459
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: [DISCUSS] 2017 October release planning and release version

2017-07-20 Thread Damian Guy
+1 on 1.0!
Are we also going to move to java 8?
I also think we should drop the Unstable annotations completely.

Cheers,
Damian

On Wed, 19 Jul 2017 at 21:36 Guozhang Wang  wrote:

> Hi Stevo,
>
> Just trying to add to what Ismael has already replied you:
>
>
> > Practice/"features" like protocol version being a parameter, and
> defaulting
> > to latest so auto updated with dependency update which introduces new
> > protocol/behavior should not be used in public client APIs. To switch
> > between backward incompatible APIs (contract and behaviors), ideally user
> > should explicitly have to change code and not dependency only, but at
> least
> > it should be clearly communicated that there are breaking changes to
> expect
> > even with just dependency update by e.g. giving major version release
> clear
> > meaning. If app dependency on Kafka client library minor.patch on same
> > major is updated, and if there's a change in behavior or API requiring
> app
> > code change - it's a bug.
> >
> > Change introduced contrary to the SLO, is OK to be reported as bug.
> > Everything else is improvement or feature request.
> >
> > If this was the case, and 1.0.0 was released today with APIs as they are
> > now, Scala client APIs even though deprecated would not break and require
> > refactoring with every 1.* minor/patch release, and would only be allowed
> > to be broken or removed in future major release, like 2.0.0
>
> Just to clarify, my proposal is that moving forward beyond the next release
> we will not make any public API breaking changes in any of the major or
> minor releases, but will only mark them as "deprecated", and deprecated
> public APIs will be only considered for removing as early as the next major
> release: so if we mark the scala consumer APIs as deprecated in 1.0.0, we
> should only be consider removing it at 2.0.0 or even later.
>
> > It should be also clear how long is each version supported - e.g. if
> > minor.patch had meaning that there are no backward incompatible changes,
> > it's OK to file a bug only for current major.minor.patch; previous major
> > and its last minor.patch can only have patches released up to some time
> > like 1 up to 3 months.
>
> Currently in practice we have not ever done, for example a bugfix release
> on an older major / minor release: i.e. once we have released say 0.10.2.0
> we did not release 0.10.1.2 any more. So practically speaking we do not
> have a "support period" for older versions yet, and in the next coming
> release I do not have plans to propose some concrete policy for that
> matter.
>
>
> Guozhang
>
>
>
> On Wed, Jul 19, 2017 at 2:12 AM, Ismael Juma  wrote:
>
> > Hi Stevo,
> >
> > Thanks for your feedback. We should definitely do a better job of
> > documenting things. We basically follow semantic versioning, but it's
> > currently a bit confusing because:
> >
> > 1. There are 4 segments in the version. The "0." part should be ignored
> > when deciding what is major, minor and patch at the moment, but many
> people
> > don't know this. Once we move to 1.0.0, that problem goes away.
> >
> > 2. To know what is a public API, you must check the Javadoc (
> > https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> > kafka/clients/consumer/KafkaConsumer.html).
> > If it's not listed there, it's not public API. Ideally, it would be
> obvious
> > from the package name (i.e. there would be "internals" in the name), but
> we
> > are not there yet. The exception are the old Scala APIs, but they have
> all
> > been deprecated and they will be removed eventually (the old Scala
> > consumers won't be removed until the June 2018 release at the earliest in
> > order to give people time to migrate).
> >
> > 3. Even though we are following reasonably common practices, we haven't
> > documented them all in one place. It would be great to do it during the
> > next release cycle.
> >
> > A few comments below.
> >
> > On Wed, Jul 19, 2017 at 1:31 AM, Stevo Slavić  wrote:
> >
> > > - APIs not labeled or labeled as stable
> > > -- change in major version is only one that can break backward
> > > compatibility (client APIs or behavior)
> > >
> >
> > To clarify, stable APIs should not be changed in an incompatible way
> > without a deprecation cycle. Independently of whether it's a major
> release
> > or not.
> >
> >
> > > -- change in minor version can introduce new features, but not break
> > > backward compatibility
> > > -- change in patch version, is for bug fixes only.
> > >
> >
> > Right, this has been the case for a while already. Also see annotations
> > below.
> >
> >
> > > - APIs labeled as evolving can be broken in backward incompatible way
> in
> > > any release, but are assumed less likely to be broken compared to
> > unstable
> > > APIs
> > > - APIs labeled as unstable can be broken in backward incompatible way
> in
> > > any release, major, minor or patch
> > >
> >
> > The relevant 

Re: [VOTE] KIP-173: Add prefix to StreamsConfig to enable setting default internal topic configs

2017-07-20 Thread Damian Guy
Thanks Ismael, AFAICT it is already a separate thread:
http://mail-archives.apache.org/mod_mbox/kafka-dev/201707.mbox/%3ccajiktexrb1hbojrm7tmy1hdm_+t+psrz_6pawhnlbokp46o...@mail.gmail.com%3e

On Wed, 19 Jul 2017 at 23:24 Guozhang Wang <wangg...@gmail.com> wrote:

> Per 1. I suggested exposing the constant since we are doing so for consumer
> and producer configs prefix as well (CONSUMER_PREFIX, etc).
>
> Guozhang
>
> On Wed, Jul 19, 2017 at 6:01 AM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > Thanks for the KIP, Damian. +1 (binding). A couple of minor comments:
> >
> > 1. Do we need to expose the TOPIC_PREFIX constant?
> > 2. The vote thread ended up inside the discuss thread in Gmail. It may be
> > worth sending another email to make it clear that the vote is ongoing.
> You
> > can link back to this thread so that the existing votes are still
> counted.
> >
> > Ismael
> >
> > On Mon, Jul 17, 2017 at 4:43 AM, Damian Guy <damian@gmail.com>
> wrote:
> >
> > > Hi,
> > >
> > > I'd like to kick off the vote for KIP-173:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+
> > > topic+configs
> > >
> > > A PR for this can be found here: https://github.com/apache/
> > kafka/pull/3459
> > >
> > > Thanks,
> > > Damian
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: KIP-162: Enable topic deletion by default

2017-07-19 Thread Damian Guy
+1

On Wed, 31 May 2017 at 13:36 Jim Jagielski  wrote:

> +1
> > On May 27, 2017, at 9:27 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
> >
> > Sure, that sounds good.
> >
> > I suggested that to keep command line behavior consistent.
> > Plus, removal of ACL access is something that can be easily undone, but
> > topic deletion is not reversible.
> > So, perhaps a new follow-up JIRA to this KIP to add the confirmation for
> > topic deletion.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   Gwen Shapira 
> > To: dev@kafka.apache.org, us...@kafka.apache.org
> > Date:   05/27/2017 11:04 AM
> > Subject:Re: KIP-162: Enable topic deletion by default
> >
> >
> >
> > Thanks Vahid,
> >
> > Do you mind if we leave the command-line out of scope for this?
> >
> > I can see why adding confirmations, options to bypass confirmations, etc
> > would be an improvement. However, I've seen no complaints about the
> > current
> > behavior of the command-line and the KIP doesn't change it at all. So I'd
> > rather address things separately.
> >
> > Gwen
> >
> > On Fri, May 26, 2017 at 8:10 PM Vahid S Hashemian
> > 
> > wrote:
> >
> >> Gwen, thanks for the KIP.
> >> It looks good to me.
> >>
> >> Just a minor suggestion: It would be great if the command asks for a
> >> confirmation (y/n) before deleting the topic (similar to how removing
> > ACLs
> >> works).
> >>
> >> Thanks.
> >> --Vahid
> >>
> >>
> >>
> >> From:   Gwen Shapira 
> >> To: "dev@kafka.apache.org" , Users
> >> 
> >> Date:   05/26/2017 07:04 AM
> >> Subject:KIP-162: Enable topic deletion by default
> >>
> >>
> >>
> >> Hi Kafka developers, users and friends,
> >>
> >> I've added a KIP to improve our out-of-the-box usability a bit:
> >> KIP-162: Enable topic deletion by default:
> >>
> >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-162+-+Enable+topic+deletion+by+default
> >
> >>
> >>
> >> Pretty simple :) Discussion and feedback are welcome.
> >>
> >> Gwen
> >>
> >>
> >>
> >>
> >>
> >
> >
> >
> >
>
>


Re: [VOTE] KIP-162: Enable topic deletion by default

2017-07-19 Thread Damian Guy
+1

On Tue, 6 Jun 2017 at 18:19 BigData dev  wrote:

> +1 (non-binding)
>
> Thanks,
> Bharat
>
> On Tue, Jun 6, 2017 at 9:21 AM, Ashwin Sinha 
> wrote:
>
> > +1
> >
> > On Tue, Jun 6, 2017 at 11:20 PM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> > > +1 (non binding), thanks
> > >
> > > On Tue, Jun 6, 2017 at 2:16 PM, Bill Bejeck  wrote:
> > > > +1
> > > >
> > > > -Bill
> > > >
> > > > On Tue, Jun 6, 2017 at 9:08 AM, Ismael Juma 
> wrote:
> > > >
> > > >> Thanks for the KIP, Gwen. +1 (binding).
> > > >>
> > > >> Ismael
> > > >>
> > > >> On Tue, Jun 6, 2017 at 5:37 AM, Gwen Shapira 
> > wrote:
> > > >>
> > > >> > Hi,
> > > >> >
> > > >> > The discussion has been quite positive, so I posted a JIRA, a PR
> and
> > > >> > updated the KIP with the latest decisions.
> > > >> >
> > > >> > Lets officially vote on the KIP:
> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> > 162+-+Enable+topic+deletion+by+default
> > > >> >
> > > >> > JIRA is here: https://issues.apache.org/jira/browse/KAFKA-5384
> > > >> >
> > > >> > Gwen
> > > >> >
> > > >>
> > >
> >
> >
> >
> > --
> > Thanks and Regards,
> > Ashwin
> >
>


[VOTE] KIP-173: Add prefix to StreamsConfig to enable setting default internal topic configs

2017-07-17 Thread Damian Guy
Hi,

I'd like to kick off the vote for KIP-173:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs

A PR for this can be found here: https://github.com/apache/kafka/pull/3459

Thanks,
Damian


Re: [DISCUSS] KIP-173: Add prefix to StreamsConfig to enable setting default internal topic configs

2017-07-17 Thread Damian Guy
I'm going to leave it as is for now. We can do another KIP later if it is
deemed necessary - which i'm not yet convinced it is.

On Sun, 16 Jul 2017 at 04:39 Matthias J. Sax <matth...@confluent.io> wrote:

> We can use the `admin.` prefix for the current `StreamsKafkaClient`,
> too. As switching to AdminClient does not require a KIP, we can save one
> KIP.
>
> But I am fine either way if you want to exclude it.
>
> -Matthias
>
> On 7/15/17 1:59 PM, Damian Guy wrote:
> > Thanks Matthias. I'll have a look into it. Though i guess we are planning
> > on migrating to the Kafka AdminClient, so it may be worth leaving it
> until
> > then. Not sure
> >
> > On Tue, 11 Jul 2017 at 09:34 Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> I think, it might be helpful to add a "admin." prefix, too. For configs
> >> for StreamsKafkaClient and later AdminClient.
> >>
> >> Can so picky back this on this KIP or should we do a new KIP?
> >>
> >>
> >> -Matthias
> >>
> >> On 7/10/17 6:45 PM, Guozhang Wang wrote:
> >>> Thanks Damian. LGTM.
> >>>
> >>> Guozhang
> >>>
> >>> On Mon, Jul 10, 2017 at 1:35 PM, Damian Guy <damian@gmail.com>
> >> wrote:
> >>>
> >>>> Thanks Guozhang, i added a couple of example usages to the KIP
> >>>>
> >>>> On Fri, 30 Jun 2017 at 17:06 Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>>>
> >>>>> Thanks for the proposal Damian.
> >>>>>
> >>>>> The PR looks promising to me. One minor comment for the wiki page is
> >> that
> >>>>> you can add some example on how to call the proposed interface (e.g.
> >> the
> >>>>> ones you added in unit test in the PR).
> >>>>>
> >>>>> Another question is that this variable will not be included in the
> web
> >>>> docs
> >>>>> in `toHtmlTable` unfortunately, but I do not have a better approach
> >>>> either.
> >>>>> So maybe we can do no better than manually change the web docs for
> >>>>> educating users.
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Fri, Jun 30, 2017 at 4:18 AM, Damian Guy <damian@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I've put together what will hopefully be a not too contentious KIP
> to
> >>>>>> enable the setting of default configs for streams internal topics
> via
> >>>>>> StreamsConfig.
> >>>>>>
> >>>>>> You can find the KIP here:
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>
> 173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+
> >>>>>> topic+configs
> >>>>>> There is a PR here: https://github.com/apache/kafka/pull/3459
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Damian
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-173: Add prefix to StreamsConfig to enable setting default internal topic configs

2017-07-15 Thread Damian Guy
Thanks Matthias. I'll have a look into it. Though i guess we are planning
on migrating to the Kafka AdminClient, so it may be worth leaving it until
then. Not sure

On Tue, 11 Jul 2017 at 09:34 Matthias J. Sax <matth...@confluent.io> wrote:

> I think, it might be helpful to add a "admin." prefix, too. For configs
> for StreamsKafkaClient and later AdminClient.
>
> Can so picky back this on this KIP or should we do a new KIP?
>
>
> -Matthias
>
> On 7/10/17 6:45 PM, Guozhang Wang wrote:
> > Thanks Damian. LGTM.
> >
> > Guozhang
> >
> > On Mon, Jul 10, 2017 at 1:35 PM, Damian Guy <damian@gmail.com>
> wrote:
> >
> >> Thanks Guozhang, i added a couple of example usages to the KIP
> >>
> >> On Fri, 30 Jun 2017 at 17:06 Guozhang Wang <wangg...@gmail.com> wrote:
> >>
> >>> Thanks for the proposal Damian.
> >>>
> >>> The PR looks promising to me. One minor comment for the wiki page is
> that
> >>> you can add some example on how to call the proposed interface (e.g.
> the
> >>> ones you added in unit test in the PR).
> >>>
> >>> Another question is that this variable will not be included in the web
> >> docs
> >>> in `toHtmlTable` unfortunately, but I do not have a better approach
> >> either.
> >>> So maybe we can do no better than manually change the web docs for
> >>> educating users.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Fri, Jun 30, 2017 at 4:18 AM, Damian Guy <damian@gmail.com>
> >> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I've put together what will hopefully be a not too contentious KIP to
> >>>> enable the setting of default configs for streams internal topics via
> >>>> StreamsConfig.
> >>>>
> >>>> You can find the KIP here:
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+
> >>>> topic+configs
> >>>> There is a PR here: https://github.com/apache/kafka/pull/3459
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
> >
> >
>
>


Re: [VOTE] KIP-167: Add interface for the state store restoration process

2017-07-15 Thread Damian Guy
+1

On Thu, 13 Jul 2017 at 07:13 Eno Thereska  wrote:

> +1 (non-binding).
>
> Thanks Bill.
>
> Eno
> > On 12 Jul 2017, at 09:12, Bill Bejeck  wrote:
> >
> > All,
> >
> > Now that we've concluded a second round of discussion on KIP-167, I'd
> like
> > to start a vote.
> >
> >
> > Thanks,
> > Bill
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-167%3A+Add+interface+for+the+state+store+restoration+process
>
>


Re: [ANNOUNCE] New Kafka PMC member Jason Gustafson

2017-07-12 Thread Damian Guy
Congratulations Jason

On Wed, 12 Jul 2017 at 08:57 Sriram Subramanian  wrote:

> Congratulations Jason!
>
> On Wed, Jul 12, 2017 at 8:02 AM, Rajini Sivaram 
> wrote:
>
> > Congratulations, Jason!
> >
> > On Wed, Jul 12, 2017 at 3:53 PM, Ismael Juma  wrote:
> >
> > > Congratulations Jason!
> > >
> > > Ismael
> > >
> > > On Tue, Jul 11, 2017 at 10:32 PM, Guozhang Wang 
> > > wrote:
> > >
> > > > Hi Everyone,
> > > >
> > > > Jason Gustafson has been very active in contributing to the Kafka
> > > community
> > > > since he became a Kafka committer last September and has done lots of
> > > > significant work including the most recent exactly-once project. In
> > > > addition, Jason has initiated or participated in the design
> discussion
> > of
> > > > more than 30 KIPs in which he has consistently brought in great
> > judgement
> > > > and insights throughout his communication. I am glad to announce that
> > > Jason
> > > > has now become a PMC member of the project.
> > > >
> > > > Congratulations, Jason!
> > > >
> > > > -- Guozhang
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-173: Add prefix to StreamsConfig to enable setting default internal topic configs

2017-07-10 Thread Damian Guy
Thanks Guozhang, i added a couple of example usages to the KIP

On Fri, 30 Jun 2017 at 17:06 Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks for the proposal Damian.
>
> The PR looks promising to me. One minor comment for the wiki page is that
> you can add some example on how to call the proposed interface (e.g. the
> ones you added in unit test in the PR).
>
> Another question is that this variable will not be included in the web docs
> in `toHtmlTable` unfortunately, but I do not have a better approach either.
> So maybe we can do no better than manually change the web docs for
> educating users.
>
>
> Guozhang
>
>
> On Fri, Jun 30, 2017 at 4:18 AM, Damian Guy <damian@gmail.com> wrote:
>
> > Hi,
> >
> > I've put together what will hopefully be a not too contentious KIP to
> > enable the setting of default configs for streams internal topics via
> > StreamsConfig.
> >
> > You can find the KIP here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+
> > topic+configs
> > There is a PR here: https://github.com/apache/kafka/pull/3459
> >
> > Thanks,
> > Damian
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Resolved] (KAFKA-5157) Options for handling corrupt data during deserialization

2017-07-10 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5157.
---
Resolution: Fixed

Issue resolved by pull request 3423
[https://github.com/apache/kafka/pull/3423]

> Options for handling corrupt data during deserialization
> 
>
> Key: KAFKA-5157
> URL: https://issues.apache.org/jira/browse/KAFKA-5157
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>  Labels: user-experience
> Fix For: 0.11.1.0
>
>
> When there is a bad formatted data in the source topics, deserialization will 
> throw a runtime exception all the way to the users. And since deserialization 
> happens before it was ever processed at the beginning of the topology, today 
> there is no ways to handle such errors on the user-app level.
> We should consider allowing users to handle such "poison pills" in a 
> customizable way.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5578) Streams Task Assignor should consider the staleness of state directories when allocating tasks

2017-07-10 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5578:
-

 Summary: Streams Task Assignor should consider the staleness of 
state directories when allocating tasks
 Key: KAFKA-5578
 URL: https://issues.apache.org/jira/browse/KAFKA-5578
 Project: Kafka
  Issue Type: Bug
Reporter: Damian Guy


During task assignment we use the presence of a state directory to assign 
precedence to which instances should be assigned the task. We first chose 
previous active tasks, but then fall back to the existence of a state dir. 
Unfortunately we don't take into account the recency of the data from the 
available state dirs. So in the case where a task has run on many instances, it 
may be that we chose an instance that has relatively old data.

When doing task assignment we should take into consideration the age of the 
data in the state dirs. We could use the data from the checkpoint files to 
determine which instance is most up-to-date and attempt to assign accordingly 
(obviously making sure that tasks are still balanced across available instances)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-07-08 Thread Damian Guy
+1
On Fri, 7 Jul 2017 at 16:08, Eno Thereska  wrote:

> +1 (non-binding) Thanks.
>
> Eno
> > On 6 Jul 2017, at 21:49, Gwen Shapira  wrote:
> >
> > +1
> >
> > On Wed, Jul 5, 2017 at 9:25 AM Matthias J. Sax 
> > wrote:
> >
> >> +1
> >>
> >> On 6/27/17 1:41 PM, Jeyhun Karimov wrote:
> >>> Dear all,
> >>>
> >>> I would like to start the vote on KIP-149 [1].
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner
> >>>
> >>
> >>
>
>


Re: Possible deadlock during shutdown in setState in kafka streams 10.2

2017-07-08 Thread Damian Guy
Thanks
On Fri, 7 Jul 2017 at 18:20, Greg Fodor <gfo...@gmail.com> wrote:

> Sure thing: https://issues.apache.org/jira/browse/KAFKA-5571
>
> On Fri, Jul 7, 2017 at 2:59 AM, Damian Guy <damian@gmail.com> wrote:
>
> > Hi Greg,
> >
> > Would you mind creating a JIRA for this with the thread dump ( i don't
> see
> > it attached to your message).
> >
> > Thanks,
> > Damian
> >
> > On Fri, 7 Jul 2017 at 10:36 Greg Fodor <gfo...@gmail.com> wrote:
> >
> > > I'm running a 10.2 job across 5 nodes with 32 stream threads on each
> node
> > > and find that when gracefully shutdown all of them at once via an
> ansible
> > > scripts, some of the nodes end up freezing -- at a glance the attached
> > > thread dump implies a deadlock between stream threads trying to update
> > > their state via setState. We haven't had this problem before but it may
> > or
> > > may not be related to changes in 10.2 (we are upgrading from 10.0 to
> > 10.2)
> > >
> > > when we gracefully shutdown all nodes simultaneously, what typically
> > > happens is some subset of the nodes end up not shutting down completely
> > but
> > > end up going through a rebalance first. it seems this deadlock requires
> > > this rebalancing to occur simultaneously with the graceful shutdown. if
> > we
> > > happen to shut them down and no rebalance happens, i don't believe this
> > > deadlock is triggered.
> > >
> > > the deadlock appears related to the state change handlers being
> > subscribed
> > > across threads and the fact that both StreamThread#setState and
> > > StreamStateListener#onChange are both synchronized methods.
> > >
> >
>


[jira] [Resolved] (KAFKA-5566) Instable test QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied

2017-07-07 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5566.
---
   Resolution: Fixed
Fix Version/s: 0.11.0.1
   0.11.1.0

Issue resolved by pull request 3500
[https://github.com/apache/kafka/pull/3500]

> Instable test QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied
> -
>
> Key: KAFKA-5566
> URL: https://issues.apache.org/jira/browse/KAFKA-5566
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Eno Thereska
> Fix For: 0.11.1.0, 0.11.0.1
>
>
> This test failed about 4 times in the last 24h. Always the same stack trace 
> so far:
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. wait for 
> agg to be '123'
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:274)
>   at 
> org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied(QueryableStateIntegrationTest.java:793)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Possible deadlock during shutdown in setState in kafka streams 10.2

2017-07-07 Thread Damian Guy
Hi Greg,

Would you mind creating a JIRA for this with the thread dump ( i don't see
it attached to your message).

Thanks,
Damian

On Fri, 7 Jul 2017 at 10:36 Greg Fodor  wrote:

> I'm running a 10.2 job across 5 nodes with 32 stream threads on each node
> and find that when gracefully shutdown all of them at once via an ansible
> scripts, some of the nodes end up freezing -- at a glance the attached
> thread dump implies a deadlock between stream threads trying to update
> their state via setState. We haven't had this problem before but it may or
> may not be related to changes in 10.2 (we are upgrading from 10.0 to 10.2)
>
> when we gracefully shutdown all nodes simultaneously, what typically
> happens is some subset of the nodes end up not shutting down completely but
> end up going through a rebalance first. it seems this deadlock requires
> this rebalancing to occur simultaneously with the graceful shutdown. if we
> happen to shut them down and no rebalance happens, i don't believe this
> deadlock is triggered.
>
> the deadlock appears related to the state change handlers being subscribed
> across threads and the fact that both StreamThread#setState and
> StreamStateListener#onChange are both synchronized methods.
>


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-07-07 Thread Damian Guy
Hi Jeyhun,

About overrides, what other alternatives do we have? For
> backwards-compatibility we have to add extra methods to the existing ones.
>
>
It wasn't clear to me in the KIP if these are new methods or replacing
existing ones.
Also, we are currently discussing options for replacing the overrides.

Thanks,
Damian


> About ProcessorContext vs RecordContext, you are right. I think I need to
> implement a prototype to understand the full picture as some parts of the
> KIP might not be as straightforward as I thought.
>
>
> Cheers,
> Jeyhun
>
> On Wed, Jul 5, 2017 at 10:40 AM Damian Guy <damian@gmail.com> wrote:
>
> > HI Jeyhun,
> >
> > Is the intention that these methods are new overloads on the KStream,
> > KTable, etc?
> >
> > It is worth noting that a ProcessorContext is not a RecordContext. A
> > RecordContext, as it stands, only exists during the processing of a
> single
> > record. Whereas the ProcessorContext exists for the lifetime of the
> > Processor. Sot it doesn't make sense to cast a ProcessorContext to a
> > RecordContext.
> > You mentioned above passing the InternalProcessorContext to the init()
> > calls. It is internal for a reason and i think it should remain that way.
> > It might be better to move the recordContext() method from
> > InternalProcessorContext to ProcessorContext.
> >
> > In the KIP you have an example showing:
> > richMapper.init((RecordContext) processorContext);
> > But the interface is:
> > public interface RichValueMapper<V, VR> {
> > VR apply(final V value, final RecordContext recordContext);
> > }
> > i.e., there is no init(...), besides as above this wouldn't make sense.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.kari...@gmail.com> wrote:
> >
> > > Hi Matthias,
> > >
> > > Actually my intend was to provide to RichInitializer and later on we
> > could
> > > provide the context of the record as you also mentioned.
> > > I remove that not to confuse the users.
> > > Regarding the RecordContext and ProcessorContext interfaces, I just
> > > realized the InternalProcessorContext class. Can't we pass this as a
> > > parameter to init() method of processors? Then we would be able to get
> > > RecordContext easily with just a method call.
> > >
> > >
> > > Cheers,
> > > Jeyhun
> > >
> > > On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > One more thing:
> > > >
> > > > I don't think `RichInitializer` does make sense. As we don't have any
> > > > input record, there is also no context. We could of course provide
> the
> > > > context of the record that triggers the init call, but this seems to
> be
> > > > semantically questionable. Also, the context for this first record
> will
> > > > be provided by the consecutive call to aggregate anyways.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 6/29/17 1:11 PM, Matthias J. Sax wrote:
> > > > > Thanks for updating the KIP.
> > > > >
> > > > > I have one concern with regard to backward compatibility. You
> suggest
> > > to
> > > > > use RecrodContext as base interface for ProcessorContext. This will
> > > > > break compatibility.
> > > > >
> > > > > I think, we should just have two independent interfaces. Our own
> > > > > ProcessorContextImpl class would implement both. This allows us to
> > cast
> > > > > it to `RecordContext` and thus limit the visible scope.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > >
> > > > > On 6/27/17 1:35 PM, Jeyhun Karimov wrote:
> > > > >> Hi all,
> > > > >>
> > > > >> I updated the KIP w.r.t. discussion and comments.
> > > > >> Basically I eliminated overloads for particular method if they are
> > > more
> > > > >> than 3.
> > > > >> As we can see there are a lot of overloads (and more will come
> with
> > > > KIP-149
> > > > >> :) )
> > > > >> So, is it wise to
> > > > >> wait the result of constructive DSL thread or
> > > > >> extend KIP to address this issue as well or
&g

Re: RocksDB flushing issue on 0.10.2 streams

2017-07-06 Thread Damian Guy
Hi Greg,
I've been able to reproduce it by running multiple instances with standby
tasks and many threads. If i force some rebalances, then i see the failure.
Now to see if i can repro in a test.
I think it is probably the same issue as:
https://issues.apache.org/jira/browse/KAFKA-5070

On Thu, 6 Jul 2017 at 12:43 Damian Guy <damian@gmail.com> wrote:

> Greg, what OS are you running on?
> Are you able to reproduce this in a test at all?
> For instance, based on what you described it would seem that i should be
> able to start a streams app, wait for it to be up and running, run the
> state dir cleanup, see it fail. However, i can't reproduce it.
>
> On Wed, 5 Jul 2017 at 23:23 Damian Guy <damian@gmail.com> wrote:
>
>> Thanks Greg. I'll look into it more tomorrow. Just finding it difficult
>> to reproduce in a test.
>> Thanks for providing the sequence, gives me something to try and repo.
>> Appreciated.
>>
>> Thanks,
>> Damian
>> On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote:
>>
>>> Also, the sequence of events is:
>>>
>>> - Job starts, rebalance happens, things run along smoothly.
>>> - After 10 minutes (retrospectively) the cleanup task kicks on and
>>> removes
>>> some directories
>>> - Tasks immediately start failing when trying to flush their state stores
>>>
>>>
>>>
>>> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote:
>>>
>>> > The issue I am hitting is not the directory locking issues we've seen
>>> in
>>> > the past. The issue seems to be, as you mentioned, that the state dir
>>> is
>>> > getting deleted by the store cleanup process, but there are still tasks
>>> > running that are trying to flush the state store. It seems more than a
>>> > little scary given that right now it seems either a) there are tasks
>>> > running that should have been re-assigned or b) the cleanup job is
>>> removing
>>> > state directories for currently running + assigned tasks (perhaps
>>> during a
>>> > rebalance there is a race condition?) I'm guessing there's probably a
>>> more
>>> > benign explanation, but that is what it looks like right now.
>>> >
>>> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian@gmail.com>
>>> wrote:
>>> >
>>> >> BTW - i'm trying to reproduce it, but not having much luck so far...
>>> >>
>>> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian@gmail.com> wrote:
>>> >>
>>> >> > Thans for the updates Greg. There were some minor changes around
>>> this in
>>> >> > 0.11.0 to make it less likely to happen, but we've only ever seen
>>> the
>>> >> > locking fail in the event of a rebalance. When everything is running
>>> >> state
>>> >> > dirs shouldn't be deleted if they are being used as the lock will
>>> fail.
>>> >> >
>>> >> >
>>> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:
>>> >> >
>>> >> >> I can report that setting state.cleanup.delay.ms to a very large
>>> value
>>> >> >> (effectively disabling it) works around the issue. It seems that
>>> the
>>> >> state
>>> >> >> store cleanup process can somehow get out ahead of another task
>>> that
>>> >> still
>>> >> >> thinks it should be writing to the state store/flushing it. In my
>>> test
>>> >> >> runs, this does not seem to be happening during a rebalancing
>>> event,
>>> >> but
>>> >> >> after the cluster is stable.
>>> >> >>
>>> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com>
>>> wrote:
>>> >> >>
>>> >> >> > Upon another run, I see the same error occur during a rebalance,
>>> so
>>> >> >> either
>>> >> >> > my log was showing a rebalance or there is a shared underlying
>>> issue
>>> >> >> with
>>> >> >> > state stores.
>>> >> >> >
>>> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com>
>>> >> wrote:
>>> >> >> >
>>> >> >> >> A

Re: RocksDB flushing issue on 0.10.2 streams

2017-07-06 Thread Damian Guy
Greg, what OS are you running on?
Are you able to reproduce this in a test at all?
For instance, based on what you described it would seem that i should be
able to start a streams app, wait for it to be up and running, run the
state dir cleanup, see it fail. However, i can't reproduce it.

On Wed, 5 Jul 2017 at 23:23 Damian Guy <damian@gmail.com> wrote:

> Thanks Greg. I'll look into it more tomorrow. Just finding it difficult to
> reproduce in a test.
> Thanks for providing the sequence, gives me something to try and repo.
> Appreciated.
>
> Thanks,
> Damian
> On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote:
>
>> Also, the sequence of events is:
>>
>> - Job starts, rebalance happens, things run along smoothly.
>> - After 10 minutes (retrospectively) the cleanup task kicks on and removes
>> some directories
>> - Tasks immediately start failing when trying to flush their state stores
>>
>>
>>
>> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote:
>>
>> > The issue I am hitting is not the directory locking issues we've seen in
>> > the past. The issue seems to be, as you mentioned, that the state dir is
>> > getting deleted by the store cleanup process, but there are still tasks
>> > running that are trying to flush the state store. It seems more than a
>> > little scary given that right now it seems either a) there are tasks
>> > running that should have been re-assigned or b) the cleanup job is
>> removing
>> > state directories for currently running + assigned tasks (perhaps
>> during a
>> > rebalance there is a race condition?) I'm guessing there's probably a
>> more
>> > benign explanation, but that is what it looks like right now.
>> >
>> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian@gmail.com>
>> wrote:
>> >
>> >> BTW - i'm trying to reproduce it, but not having much luck so far...
>> >>
>> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian@gmail.com> wrote:
>> >>
>> >> > Thans for the updates Greg. There were some minor changes around
>> this in
>> >> > 0.11.0 to make it less likely to happen, but we've only ever seen the
>> >> > locking fail in the event of a rebalance. When everything is running
>> >> state
>> >> > dirs shouldn't be deleted if they are being used as the lock will
>> fail.
>> >> >
>> >> >
>> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:
>> >> >
>> >> >> I can report that setting state.cleanup.delay.ms to a very large
>> value
>> >> >> (effectively disabling it) works around the issue. It seems that the
>> >> state
>> >> >> store cleanup process can somehow get out ahead of another task that
>> >> still
>> >> >> thinks it should be writing to the state store/flushing it. In my
>> test
>> >> >> runs, this does not seem to be happening during a rebalancing event,
>> >> but
>> >> >> after the cluster is stable.
>> >> >>
>> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com>
>> wrote:
>> >> >>
>> >> >> > Upon another run, I see the same error occur during a rebalance,
>> so
>> >> >> either
>> >> >> > my log was showing a rebalance or there is a shared underlying
>> issue
>> >> >> with
>> >> >> > state stores.
>> >> >> >
>> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com>
>> >> wrote:
>> >> >> >
>> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set to
>> >> MAX_VALUE.
>> >> >> >>
>> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com>
>> >> wrote:
>> >> >> >>
>> >> >> >>> I've nuked the nodes this happened on, but the job had been
>> running
>> >> >> for
>> >> >> >>> about 5-10 minutes across 5 nodes before this happened. Does the
>> >> log
>> >> >> show a
>> >> >> >>> rebalance was happening? It looks to me like the standby task
>> was
>> >> just
>> >> >> >>> committing as part

[jira] [Created] (KAFKA-5562) Do streams state directory cleanup on a single thread

2017-07-06 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5562:
-

 Summary: Do streams state directory cleanup on a single thread
 Key: KAFKA-5562
 URL: https://issues.apache.org/jira/browse/KAFKA-5562
 Project: Kafka
  Issue Type: Bug
Reporter: Damian Guy
Assignee: Damian Guy


Currently in streams we clean up old state directories every so often (as 
defined by {{state.cleanup.delay.ms}}). However, every StreamThread runs the 
cleanup, which is both unnecessary and can potentially lead to race conditions.

It would be better to perform the state cleanup on a single thread and only 
when the {{KafkaStreams}} instance is in a running state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5508) Documentation for altering topics

2017-07-06 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5508.
---
   Resolution: Fixed
Fix Version/s: 0.11.0.1
   0.11.1.0

Issue resolved by pull request 3429
[https://github.com/apache/kafka/pull/3429]

> Documentation for altering topics
> -
>
> Key: KAFKA-5508
> URL: https://issues.apache.org/jira/browse/KAFKA-5508
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Tom Bentley
>Assignee: huxihx
>Priority: Minor
> Fix For: 0.11.1.0, 0.11.0.1
>
>
> According to the upgrade documentation:
> bq. Altering topic configuration from the kafka-topics.sh script 
> (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the 
> kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. 
> But the Operations documentation still tells people to use kafka-topics.sh to 
> alter their topic configurations.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [ANNOUNCE] New Kafka PMC member Ismael Juma

2017-07-05 Thread Damian Guy
Congratulations Ismael! Very well deserved.
Cheers,
Damian
On Wed, 5 Jul 2017 at 22:54, Dong Lin  wrote:

> Congratulations Ismael!
>
> On Wed, Jul 5, 2017 at 1:55 PM, Jun Rao  wrote:
>
> > Hi, Everyone,
> >
> > Ismael Juma has been active in the Kafka community since he became
> > a Kafka committer about a year ago. I am glad to announce that Ismael is
> > now a member of Kafka PMC.
> >
> > Congratulations, Ismael!
> >
> > Jun
> >
>


Re: RocksDB flushing issue on 0.10.2 streams

2017-07-05 Thread Damian Guy
Thanks Greg. I'll look into it more tomorrow. Just finding it difficult to
reproduce in a test.
Thanks for providing the sequence, gives me something to try and repo.
Appreciated.

Thanks,
Damian
On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote:

> Also, the sequence of events is:
>
> - Job starts, rebalance happens, things run along smoothly.
> - After 10 minutes (retrospectively) the cleanup task kicks on and removes
> some directories
> - Tasks immediately start failing when trying to flush their state stores
>
>
>
> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote:
>
> > The issue I am hitting is not the directory locking issues we've seen in
> > the past. The issue seems to be, as you mentioned, that the state dir is
> > getting deleted by the store cleanup process, but there are still tasks
> > running that are trying to flush the state store. It seems more than a
> > little scary given that right now it seems either a) there are tasks
> > running that should have been re-assigned or b) the cleanup job is
> removing
> > state directories for currently running + assigned tasks (perhaps during
> a
> > rebalance there is a race condition?) I'm guessing there's probably a
> more
> > benign explanation, but that is what it looks like right now.
> >
> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian@gmail.com> wrote:
> >
> >> BTW - i'm trying to reproduce it, but not having much luck so far...
> >>
> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian@gmail.com> wrote:
> >>
> >> > Thans for the updates Greg. There were some minor changes around this
> in
> >> > 0.11.0 to make it less likely to happen, but we've only ever seen the
> >> > locking fail in the event of a rebalance. When everything is running
> >> state
> >> > dirs shouldn't be deleted if they are being used as the lock will
> fail.
> >> >
> >> >
> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:
> >> >
> >> >> I can report that setting state.cleanup.delay.ms to a very large
> value
> >> >> (effectively disabling it) works around the issue. It seems that the
> >> state
> >> >> store cleanup process can somehow get out ahead of another task that
> >> still
> >> >> thinks it should be writing to the state store/flushing it. In my
> test
> >> >> runs, this does not seem to be happening during a rebalancing event,
> >> but
> >> >> after the cluster is stable.
> >> >>
> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com>
> wrote:
> >> >>
> >> >> > Upon another run, I see the same error occur during a rebalance, so
> >> >> either
> >> >> > my log was showing a rebalance or there is a shared underlying
> issue
> >> >> with
> >> >> > state stores.
> >> >> >
> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com>
> >> wrote:
> >> >> >
> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set to
> >> MAX_VALUE.
> >> >> >>
> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com>
> >> wrote:
> >> >> >>
> >> >> >>> I've nuked the nodes this happened on, but the job had been
> running
> >> >> for
> >> >> >>> about 5-10 minutes across 5 nodes before this happened. Does the
> >> log
> >> >> show a
> >> >> >>> rebalance was happening? It looks to me like the standby task was
> >> just
> >> >> >>> committing as part of normal operations.
> >> >> >>>
> >> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian@gmail.com
> >
> >> >> wrote:
> >> >> >>>
> >> >> >>>> Hi Greg,
> >> >> >>>>
> >> >> >>>> Obviously a bit difficult to read the RocksDBException, but my
> >> guess
> >> >> is
> >> >> >>>> it
> >> >> >>>> is because the state directory gets deleted right before the
> flush
> >> >> >>>> happens:
> >> >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO
> >&g

Re: RocksDB flushing issue on 0.10.2 streams

2017-07-05 Thread Damian Guy
BTW - i'm trying to reproduce it, but not having much luck so far...

On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian@gmail.com> wrote:

> Thans for the updates Greg. There were some minor changes around this in
> 0.11.0 to make it less likely to happen, but we've only ever seen the
> locking fail in the event of a rebalance. When everything is running state
> dirs shouldn't be deleted if they are being used as the lock will fail.
>
>
> On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:
>
>> I can report that setting state.cleanup.delay.ms to a very large value
>> (effectively disabling it) works around the issue. It seems that the state
>> store cleanup process can somehow get out ahead of another task that still
>> thinks it should be writing to the state store/flushing it. In my test
>> runs, this does not seem to be happening during a rebalancing event, but
>> after the cluster is stable.
>>
>> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> wrote:
>>
>> > Upon another run, I see the same error occur during a rebalance, so
>> either
>> > my log was showing a rebalance or there is a shared underlying issue
>> with
>> > state stores.
>> >
>> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> wrote:
>> >
>> >> Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE.
>> >>
>> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> wrote:
>> >>
>> >>> I've nuked the nodes this happened on, but the job had been running
>> for
>> >>> about 5-10 minutes across 5 nodes before this happened. Does the log
>> show a
>> >>> rebalance was happening? It looks to me like the standby task was just
>> >>> committing as part of normal operations.
>> >>>
>> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian@gmail.com>
>> wrote:
>> >>>
>> >>>> Hi Greg,
>> >>>>
>> >>>> Obviously a bit difficult to read the RocksDBException, but my guess
>> is
>> >>>> it
>> >>>> is because the state directory gets deleted right before the flush
>> >>>> happens:
>> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO
>> [StreamThread-21:StateDirector
>> >>>> y@213]
>> >>>> - Deleting obsolete state directory 0_10 for task 0_10
>> >>>>
>> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070.
>> >>>>
>> >>>> It looks like your application is constantly rebalancing during store
>> >>>> intialization, which may be the reason this bug comes about (there
>> is a
>> >>>> chance that the state dir lock is released so when the thread tries
>> to
>> >>>> removes the stale state directory it is able to get the lock). You
>> >>>> probably
>> >>>> want to configure `max.poll.interval.ms` to be a reasonably large
>> >>>> value ( i
>> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
>> >>>> setting `state.cleanup.delay.ms` to a higher value (default is 10
>> >>>> minutes),
>> >>>> to try and avoid it happening during a rebalance (I know this isn't a
>> >>>> fix,
>> >>>> but will make it less likely to happen).
>> >>>>
>> >>>> Thanks,
>> >>>> Damian
>> >>>>
>> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote:
>> >>>>
>> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use
>> Kafka
>> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that
>> >>>> has 4
>> >>>> > state stores and runs across a few hundred partitions, and as part
>> of
>> >>>> load
>> >>>> > testing the job we are trying to reload our data out of kafka into
>> a
>> >>>> test
>> >>>> > db. The result is we are able to load about 4M tuples and then this
>> >>>> error
>> >>>> > pops up on all of the stream nodes simultaneously. There are 4
>> rocksdb
>> >>>> > stores in question and there are lots of these errors which takes
>> it
>> >>>> do

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-07-05 Thread Damian Guy
HI Jeyhun,

Is the intention that these methods are new overloads on the KStream,
KTable, etc?

It is worth noting that a ProcessorContext is not a RecordContext. A
RecordContext, as it stands, only exists during the processing of a single
record. Whereas the ProcessorContext exists for the lifetime of the
Processor. Sot it doesn't make sense to cast a ProcessorContext to a
RecordContext.
You mentioned above passing the InternalProcessorContext to the init()
calls. It is internal for a reason and i think it should remain that way.
It might be better to move the recordContext() method from
InternalProcessorContext to ProcessorContext.

In the KIP you have an example showing:
richMapper.init((RecordContext) processorContext);
But the interface is:
public interface RichValueMapper {
VR apply(final V value, final RecordContext recordContext);
}
i.e., there is no init(...), besides as above this wouldn't make sense.

Thanks,
Damian

On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov  wrote:

> Hi Matthias,
>
> Actually my intend was to provide to RichInitializer and later on we could
> provide the context of the record as you also mentioned.
> I remove that not to confuse the users.
> Regarding the RecordContext and ProcessorContext interfaces, I just
> realized the InternalProcessorContext class. Can't we pass this as a
> parameter to init() method of processors? Then we would be able to get
> RecordContext easily with just a method call.
>
>
> Cheers,
> Jeyhun
>
> On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax 
> wrote:
>
> > One more thing:
> >
> > I don't think `RichInitializer` does make sense. As we don't have any
> > input record, there is also no context. We could of course provide the
> > context of the record that triggers the init call, but this seems to be
> > semantically questionable. Also, the context for this first record will
> > be provided by the consecutive call to aggregate anyways.
> >
> >
> > -Matthias
> >
> > On 6/29/17 1:11 PM, Matthias J. Sax wrote:
> > > Thanks for updating the KIP.
> > >
> > > I have one concern with regard to backward compatibility. You suggest
> to
> > > use RecrodContext as base interface for ProcessorContext. This will
> > > break compatibility.
> > >
> > > I think, we should just have two independent interfaces. Our own
> > > ProcessorContextImpl class would implement both. This allows us to cast
> > > it to `RecordContext` and thus limit the visible scope.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 6/27/17 1:35 PM, Jeyhun Karimov wrote:
> > >> Hi all,
> > >>
> > >> I updated the KIP w.r.t. discussion and comments.
> > >> Basically I eliminated overloads for particular method if they are
> more
> > >> than 3.
> > >> As we can see there are a lot of overloads (and more will come with
> > KIP-149
> > >> :) )
> > >> So, is it wise to
> > >> wait the result of constructive DSL thread or
> > >> extend KIP to address this issue as well or
> > >> continue as it is?
> > >>
> > >> Cheers,
> > >> Jeyhun
> > >>
> > >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang 
> > wrote:
> > >>
> > >>> LGTM. Thanks!
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <
> je.kari...@gmail.com>
> > >>> wrote:
> > >>>
> >  Thanks for the comment Matthias. After all the discussion (thanks to
> > all
> >  participants), I think this (single method that passes in a
> > RecordContext
> >  object) is the best alternative.
> >  Just a side note: I think KAFKA-3907 [1] can also be integrated into
> > the
> >  KIP by adding related method inside RecordContext interface.
> > 
> > 
> >  [1] https://issues.apache.org/jira/browse/KAFKA-3907
> > 
> > 
> >  Cheers,
> >  Jeyhun
> > 
> >  On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <
> > matth...@confluent.io>
> >  wrote:
> > 
> > > Hi,
> > >
> > > I would like to push this discussion further. It seems we got nice
> > > alternatives (thanks for the summary Jeyhun!).
> > >
> > > With respect to RichFunctions and allowing them to be stateful, I
> > have
> > > my doubt as expressed already. From my understanding, the idea was
> to
> > > give access to record metadata information only. If you want to do
> a
> > > stateful computation you should rather use #transform().
> > >
> > > Furthermore, as pointed out, we would need to switch to a
> > > supplier-pattern introducing many more overloads.
> > >
> > > For those reason, I advocate for a simple interface with a single
> > >>> method
> > > that passes in a RecordContext object.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 6/6/17 5:15 PM, Guozhang Wang wrote:
> > >> Thanks for the comprehensive summary!
> > >>
> > >> Personally I'd prefer the option of passing RecordContext as an
> > > additional
> > >> 

Re: RocksDB flushing issue on 0.10.2 streams

2017-07-05 Thread Damian Guy
Thans for the updates Greg. There were some minor changes around this in
0.11.0 to make it less likely to happen, but we've only ever seen the
locking fail in the event of a rebalance. When everything is running state
dirs shouldn't be deleted if they are being used as the lock will fail.


On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:

> I can report that setting state.cleanup.delay.ms to a very large value
> (effectively disabling it) works around the issue. It seems that the state
> store cleanup process can somehow get out ahead of another task that still
> thinks it should be writing to the state store/flushing it. In my test
> runs, this does not seem to be happening during a rebalancing event, but
> after the cluster is stable.
>
> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> wrote:
>
> > Upon another run, I see the same error occur during a rebalance, so
> either
> > my log was showing a rebalance or there is a shared underlying issue with
> > state stores.
> >
> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> wrote:
> >
> >> Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE.
> >>
> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> wrote:
> >>
> >>> I've nuked the nodes this happened on, but the job had been running for
> >>> about 5-10 minutes across 5 nodes before this happened. Does the log
> show a
> >>> rebalance was happening? It looks to me like the standby task was just
> >>> committing as part of normal operations.
> >>>
> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian@gmail.com>
> wrote:
> >>>
> >>>> Hi Greg,
> >>>>
> >>>> Obviously a bit difficult to read the RocksDBException, but my guess
> is
> >>>> it
> >>>> is because the state directory gets deleted right before the flush
> >>>> happens:
> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO  [StreamThread-21:StateDirector
> >>>> y@213]
> >>>> - Deleting obsolete state directory 0_10 for task 0_10
> >>>>
> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070.
> >>>>
> >>>> It looks like your application is constantly rebalancing during store
> >>>> intialization, which may be the reason this bug comes about (there is
> a
> >>>> chance that the state dir lock is released so when the thread tries to
> >>>> removes the stale state directory it is able to get the lock). You
> >>>> probably
> >>>> want to configure `max.poll.interval.ms` to be a reasonably large
> >>>> value ( i
> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
> >>>> setting `state.cleanup.delay.ms` to a higher value (default is 10
> >>>> minutes),
> >>>> to try and avoid it happening during a rebalance (I know this isn't a
> >>>> fix,
> >>>> but will make it less likely to happen).
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>>
> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote:
> >>>>
> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use
> Kafka
> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that
> >>>> has 4
> >>>> > state stores and runs across a few hundred partitions, and as part
> of
> >>>> load
> >>>> > testing the job we are trying to reload our data out of kafka into a
> >>>> test
> >>>> > db. The result is we are able to load about 4M tuples and then this
> >>>> error
> >>>> > pops up on all of the stream nodes simultaneously. There are 4
> rocksdb
> >>>> > stores in question and there are lots of these errors which takes it
> >>>> down.
> >>>> > This bug *does* not seem to occur on 0.10.1.
> >>>> >
> >>>> > A similar error was mentioned here:
> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070
> >>>> >
> >>>> > Full log attached.
> >>>> >
> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10]
> >>>> > Failed to flush state store session-id-start-events
> >>>> &

[jira] [Created] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed

2017-07-04 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5556:
-

 Summary: KafkaConsumer throws: java.lang.IllegalStateException: > 
Attempt to retrieve exception from future which hasn't failed
 Key: KAFKA-5556
 URL: https://issues.apache.org/jira/browse/KAFKA-5556
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.11.0.0, 0.10.2.1
Reporter: Damian Guy


>From the user list:
I have been running a streaming application on some data set. Things
usually run ok. Today I was trying to run the same application on Kafka
(ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
running for quite some time, I got the following exception ..
{code}
Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> Attempt to retrieve exception from future which hasn't failed
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
{code}

Looks like we should check if the future is done, i.e., check the return value 
from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: RocksDB flushing issue on 0.10.2 streams

2017-07-04 Thread Damian Guy
Hi Greg,

Obviously a bit difficult to read the RocksDBException, but my guess is it
is because the state directory gets deleted right before the flush happens:
2017-07-04 10:54:46,829 [myid:] - INFO  [StreamThread-21:StateDirectory@213]
- Deleting obsolete state directory 0_10 for task 0_10

Yes it looks like it is possibly the same bug as KAFKA-5070.

It looks like your application is constantly rebalancing during store
intialization, which may be the reason this bug comes about (there is a
chance that the state dir lock is released so when the thread tries to
removes the stale state directory it is able to get the lock). You probably
want to configure `max.poll.interval.ms` to be a reasonably large value ( i
think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
setting `state.cleanup.delay.ms` to a higher value (default is 10 minutes),
to try and avoid it happening during a rebalance (I know this isn't a fix,
but will make it less likely to happen).

Thanks,
Damian

On Tue, 4 Jul 2017 at 12:43 Greg Fodor  wrote:

> Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka
> Streams 0.10.2.1 and are hitting a problem. We have an ETL job that has 4
> state stores and runs across a few hundred partitions, and as part of load
> testing the job we are trying to reload our data out of kafka into a test
> db. The result is we are able to load about 4M tuples and then this error
> pops up on all of the stream nodes simultaneously. There are 4 rocksdb
> stores in question and there are lots of these errors which takes it down.
> This bug *does* not seem to occur on 0.10.1.
>
> A similar error was mentioned here:
> https://issues.apache.org/jira/browse/KAFKA-5070
>
> Full log attached.
>
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_10]
> Failed to flush state store session-id-start-events
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
> at
> org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:797)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
> while executing flush from store session-id-start-events
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
> ... 6 more
> Caused by: org.rocksdb.RocksDBException: v
> at org.rocksdb.RocksDB.flush(Native Method)
> at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
> ... 13 more
>
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-04 Thread Damian Guy
Hi Jan,

Thanks very much for the input.

On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <jan.filip...@trivago.com> wrote:

> Hi Damian,
>
> I do see your point of something needs to change. But I fully agree with
> Gouzhang when he says.
> ---
>
> But since this is a incompatibility change, and we are going to remove the
> compatibility annotations soon it means we only have one chance and we
> really have to make it right.
> 
>
>
I think we all agree on this one! Hence the discussion.


> I fear all suggestions do not go far enough to become something that will
> carry on for very much longer.
> I am currently working on KAFKA-3705 and try to find the most easy way for
> the user to give me all the required functionality. The easiest interface I
> could come up so far can be looked at here.
>
>
> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L622
>
>
And its already horribly complicated. I am currently unable to find the
> right abstraction level to have everything falling into place naturally. To
> be honest I already think introducing
>
>
To be fair that is not a particularly easy problem to solve!


>
> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L493
>
> was unideal and makes everything a mess.


I'm not sure i agree that it makes everything a mess, but It could have
been done differently.

The JoinType:Whatever is also not really flexible. 2 things come to my mind:
>
> 1. I don't think we should rule out config based decisions say configs like
> streams.$applicationID.joins.$joinname.conf = value
>

Is this just for config? Or are you suggesting that we could somehow "code"
the join in a config file?


> This can allow for tremendous changes without single API change and IMO it
> was not considered enough yet.
>
> 2. Push logic from the DSL to the Callback classes. A ValueJoiner for
> example can be used to implement different join types as the user wishes.
>

Do you have an example of how this might look?


> As Gouzhang said: stopping to break users is very important.


Of course. We want to make it as easy as possible for people to use
streams.


especially with this changes + All the plans I sadly only have in my head
> but hopefully the first link can give a glimpse.
>
> Thanks for preparing the examples made it way clearer to me what exactly
> we are talking about. I would argue to go a bit slower and more carefull on
> this one. At some point we need to get it right. Peeking over to the hadoop
> guys with their hughe userbase. Config files really work well for them.
>
> Best Jan
>
>
>
>
>
> On 30.06.2017 09:31, Damian Guy wrote:
> > Thanks Matthias
> >
> > On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> I am just catching up on this thread, so sorry for the long email in
> >> advance... Also, it's to some extend a dump of thoughts and not always a
> >> clear proposal. Still need to think about this in more detail. But maybe
> >> it helps other to get new ideas :)
> >>
> >>
> >>>> However, I don't understand your argument about putting aggregate()
> >>>> after the withXX() -- all the calls to withXX() set optional
> parameters
> >>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
> >>>> indicates that the withXX() belongs to the groupBy(). IMHO, this might
> >>>> be quite confusion for developers.
> >>>>
> >>>>
> >>> I see what you are saying, but the grouped stream is effectively a
> no-op
> >>> until you call one of the aggregate/count/reduce etc functions. So the
> >>> optional params are ones that are applicable to any of the operations
> you
> >>> can perform on this grouped stream. Then the final
> >>> count()/reduce()/aggregate() call has any of the params that are
> >>> required/specific to that function.
> >>>
> >> I understand your argument, but you don't share the conclusion. If we
> >> need a "final/terminal" call, the better way might be
> >>
> >> .groupBy().count().withXX().build()
> >>
> >> (with a better name for build() though)
> >>
> >>
> > The point is that all the other calls, i.e,withBlah, windowed, etc apply
> > too all the aggregate functions. The terminal call being the actual type
> of
> > aggregation you want to

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-30 Thread Damian Guy
On Fri, 30 Jun 2017 at 18:40 Matthias J. Sax <matth...@confluent.io> wrote:

> > Hmmm, i don't agree. Until is a property of the window. It is going to be
> > potentially different for every window operation you do in a streams app.
>
> I am not sure.
>
> (1) for me, it is definitely a config for the operator for how long
> windows are maintained. It's not part of the window definition (as per
> definition there is nothing like a retention time -- that's just a
> practical necessity as we don't have infinite memory).
>

You could also argue that the size of the window is config.


> (2) Maybe you want different values for different windows, but than we
> need operator level configs instead of global configs. But it's not a
> reason to add config methods to the DSL.
>

I don't think we should go down this path. It is pretty simple as it is and
config driven development is not something i'd like to strive for.


>
> (3) I am actually not too sure if local configs for window retention are
> too useful.
> (a) If you have consecutive windows, there is some dependency anyway, as
> a downstream window cannot exploit a larger retention time than an
> upstream window.
> (b) Shouldn't retention time be more or less applied to the "input
> topics" -- the point is, how long do you want to accept late date? That
> sound like a global question for the applications with regard to the
> input -- not necessarily a operator/window question.
>
>
Sure, within a sub-topolgy it makes sense for the window retention to be
the same. However, different sub-toplogies may have different constraints.


>
> -Matthias
>
> On 6/30/17 12:31 AM, Damian Guy wrote:
> > Thanks Matthias
> >
> > On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> I am just catching up on this thread, so sorry for the long email in
> >> advance... Also, it's to some extend a dump of thoughts and not always a
> >> clear proposal. Still need to think about this in more detail. But maybe
> >> it helps other to get new ideas :)
> >>
> >>
> >>>> However, I don't understand your argument about putting aggregate()
> >>>> after the withXX() -- all the calls to withXX() set optional
> parameters
> >>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
> >>>> indicates that the withXX() belongs to the groupBy(). IMHO, this might
> >>>> be quite confusion for developers.
> >>>>
> >>>>
> >>> I see what you are saying, but the grouped stream is effectively a
> no-op
> >>> until you call one of the aggregate/count/reduce etc functions. So the
> >>> optional params are ones that are applicable to any of the operations
> you
> >>> can perform on this grouped stream. Then the final
> >>> count()/reduce()/aggregate() call has any of the params that are
> >>> required/specific to that function.
> >>>
> >>
> >> I understand your argument, but you don't share the conclusion. If we
> >> need a "final/terminal" call, the better way might be
> >>
> >> .groupBy().count().withXX().build()
> >>
> >> (with a better name for build() though)
> >>
> >>
> > The point is that all the other calls, i.e,withBlah, windowed, etc apply
> > too all the aggregate functions. The terminal call being the actual type
> of
> > aggregation you want to do. I personally find this more natural than
> > groupBy().count().withBlah().build()
> >
> >
> >>> groupedStream.count(/** non windowed count**/)
> >>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
> >>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
> >>
> >>
> >> I like this. However, I don't see a reason to have windowed() and
> >> sessionWindowed(). We should have one top-level `Windows` interface that
> >> both `TimeWindows` and `SessionWindows` implement and just have a single
> >> windowed() method that accepts all `Windows`. (I did not like the
> >> separation of `SessionWindows` in the first place, and this seems to be
> >> an opportunity to clean this up. It was hard to change when we
> >> introduced session windows)
> >>
> >
> > Yes - true we should look into that.
> >
> >
> >>
> >> Btw: we do you the imperative groupBy() and groupByKey(), and thus we
> >> might also want to use windowBy() (instead of windowed()). Not sure how
> >> important this is, but it seems 

Re: [DISCUSS] KIP-167: Add interface for the state store restoration process

2017-06-30 Thread Damian Guy
Thanks for the updated KIP Bill.

In the PR you have AbstractBatchingRestoreCallback and
AbstractNotifyingRestoreCallback which are both in public packages, so are
part of the API. I think you need to add those to the KIP to round it off.

Otherwise LGTM.

Thanks,
Damian

On Fri, 30 Jun 2017 at 12:39 Bill Bejeck  wrote:

> Hi,
>
> I updated the KIP yesterday and reposted on the original thread, but I
> think it may get lost in the shuffle.  I'd like to have one more round of
> discussion on the KIP found here:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-167%3A+Add+interface+for+the+state+store+restoration+process
>
> There's also an initial PR here :
> https://github.com/apache/kafka/pull/3325
>


[DISCUSS] KIP-173: Add prefix to StreamsConfig to enable setting default internal topic configs

2017-06-30 Thread Damian Guy
Hi,

I've put together what will hopefully be a not too contentious KIP to
enable the setting of default configs for streams internal topics via
StreamsConfig.

You can find the KIP here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs
There is a PR here: https://github.com/apache/kafka/pull/3459

Thanks,
Damian


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-30 Thread Damian Guy
n some feedback about "I
> wanted to count a stream, but there was no count() method -- I first
> needed to figure out, that I need to group the stream first to be able
> to count it. It does make sense in hindsight but was not obvious in the
> beginning". Thus, carrying out this thought, we could also do the
> following:
>
> stream.count().groupedBy().windowedBy().table();
>
> -> Note, I use "grouped" and "windowed" instead of imperative here, as
> it comes after the count()
>
> This would be more consistent than your proposal (that has grouping
> before but windowing after count()). It might even allow us to enrich
> the API with a some syntactic sugar like `stream.count().table()` to get
> the overall count of all records (this would obviously not scale, but we
> could support it -- if not now, maybe later).
>
>
I guess i'd prefer
stream.groupBy().windowBy().count()
stream.groupBy().windowBy().reduce()
stream.groupBy().count()

As i said above, everything that happens before the final aggregate call
can be applied to any of them. So it makes sense to me to do those things
ahead of the final aggregate call.


> Last about builder pattern. I am convinced that we need some "terminal"
> operator/method that tells us when to add the processor to the topology.
> But I don't see the need for a plain builder pattern that feels alien to
> me (see my argument about the second join proposal). Using .stream() /
> .table() as use in many examples might work. But maybe a more generic
> name that we can use in all places like build() or apply() might also be
> an option.
>
>
Sure, a generic name might be ok.




>
> -Matthias
>
>
>
> On 6/29/17 7:37 AM, Damian Guy wrote:
> > Thanks Kyle.
> >
> > On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <winkelman.k...@gmail.com>
> > wrote:
> >
> >> Hi Damian,
> >>
> >>>>>> When trying to program in the fluent API that has been discussed
> most
> >> it
> >>>>>> feels difficult to know when you will actually get an object you can
> >> reuse.
> >>>>>> What if I make one KGroupedStream that I want to reuse, is it legal
> to
> >>>>>> reuse it or does this approach expect you to call grouped each time?
> >>
> >>>> I'd anticipate that once you have a KGroupedStream you can re-use it
> as
> >> you
> >>>> can today.
> >>
> >> You said it yourself in another post that the grouped stream is
> >> effectively a no-op until a count, reduce, or aggregate. The way I see
> it
> >> you wouldn’t be able to reuse anything except KStreams and KTables,
> because
> >> most of this fluent api would continue returning this (this being the
> >> builder object currently being manipulated).
> >
> > So, if you ever store a reference to anything but KStreams and KTables
> and
> >> you use it in two different ways then its possible you make conflicting
> >> withXXX() calls on the same builder.
> >>
> >>
> > No necessarily true. It could return a new instance of the builder, i.e.,
> > the builders being immutable. So if you held a reference to the builder
> it
> > would always be the same as it was when it was created.
> >
> >
> >> GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped();
> >> GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
> >> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
> >>
> >> I’ll admit that this shouldn’t happen but some user is going to do it
> >> eventually…
> >> Depending on implementation uses of groupedStreamWithDefaultSerdes would
> >> most likely be equivalent to the version withDeclaredSerdes. One work
> >> around would be to always make copies of the config objects you are
> >> building, but this approach has its own problem because now we have to
> >> identify which configs are equivalent so we don’t create repeated
> >> processors.
> >>
> >> The point of this long winded example is that we always have to be
> >> thinking about all of the possible ways it could be misused by a user
> >> (causing them to see hard to diagnose problems).
> >>
> >
> > Exactly! That is the point of the discussion really.
> >
> >
> >>
> >> In my attempt at a couple methods with builders I feel that I could
> >> confidently say the user couldn’t really mess it up.
> >>> // Count
> >>> KTable<String, Long> count =
> >>> k

Re: [VOTE] KIP 130: Expose states of active tasks to KafkaStreams public API

2017-06-30 Thread Damian Guy
I know i voted before, but now my vote is binding so...

+1 (binding)

On Thu, 18 May 2017 at 23:46 Matthias J. Sax <matth...@confluent.io> wrote:

> +1
>
> On 5/18/17 8:26 AM, Bill Bejeck wrote:
> > +1
> >
> > On Thu, May 18, 2017 at 6:54 AM, Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> >> Hi all,
> >>
> >> I've finally found time to update the KIP. The toString() is annotated
> as
> >> deprecated. I have also rebase the PR with the current trunk.
> >> So sorry to have been so long on this KIP.
> >>
> >> Thanks.
> >>
> >> 2017-04-24 18:56 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
> >>
> >>> Florian, could you also add the part of deprecating
> >> `KafkaStreams.toString`
> >>> in your KIP as well?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Fri, Apr 21, 2017 at 8:32 AM, Damian Guy <damian@gmail.com>
> >> wrote:
> >>>
> >>>> +1
> >>>>
> >>>> On Fri, 21 Apr 2017 at 09:06 Eno Thereska <eno.there...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> +1 (non-binding)
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>> On 21 Apr 2017, at 05:58, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>> +1. Thanks a lot for the KIP!
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Wed, Apr 5, 2017 at 1:57 PM, Florian Hussonnois <
> >>>>> fhussonn...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi All,
> >>>>>>>
> >>>>>>> I would like to start the vote for the KIP-130 :
> >>>>>>>
> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> --
> >>>>>>> Florian HUSSONNOIS
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >>
> >> --
> >> Florian HUSSONNOIS
> >>
> >
>
>


Re: [DISCUSS] KIP-171: Extend Consumer Group Reset Offset for Stream Application

2017-06-29 Thread Damian Guy
Hi,

Thanks for the KIP. What is not clear is how is this going to handle state
stores? Right now the streams reset tool, resets everything and clears up
the state stores. What are we going to do if we reset to a particular
offset? If we clear the state then we've lost any previously aggregated
values (which may or may not be what is expected). If we don't clear them,
then we will end up with incorrect aggregates.

@matthias, could we remove the ZK dependency from the streams reset tool
now?

Thanks,
Damian

On Thu, 29 Jun 2017 at 15:22 Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> You're right, I was not considering Zookeeper dependency.
>
> I'm starting to like the idea to wrap `reset-offset` from
> `streams-application-reset`.
>
> I will wait a bit for more feedback and work on a draft with this changes.
>
>
> El mié., 28 jun. 2017 a las 0:20, Matthias J. Sax ( >)
> escribió:
>
> > I agree, that we should not duplicate functionality.
> >
> > However, I am worried, that a non-streams users using the offset reset
> > tool might delete topics unintentionally (even if the changes are pretty
> > small). Also, currently the stream reset tool required Zookeeper while
> > the offset reset tool does not -- I don't think we should add this
> > dependency to the offset reset tool.
> >
> > Thus, it think it might be better to keep both tools, but internally
> > rewrite the streams reset entry class, to reuse as much as possible from
> > the offset reset tool. Ie. the streams tool would be a wrapper around
> > the offset tool and add some functionality it needs that is Streams
> > specific.
> >
> > I also think, that keeping separate tools for consumers and streams is a
> > good thing. We might want to add new features that don't apply to plain
> > consumers -- note, a Streams applications is more than just a client.
> >
> > WDYT?
> >
> > Would be good to get some feedback from others, too.
> >
> >
> > -Matthias
> >
> >
> > On 6/27/17 9:05 AM, Jorge Esteban Quilcate Otoya wrote:
> > > Thanks for the feedback Matthias!
> > >
> > > The main idea is to use only 1 tool to reset offsets and don't
> replicate
> > > functionality between tools.
> > > Reset Offset (KIP-122) tool not only reset but support execute the
> reset
> > > but also export, import from files, etc.
> > > If we extend the current tool (kafka-streams-application-reset.sh) we
> > will
> > > have to duplicate all this functionality also.
> > > Maybe another option is to move the current implementation into
> > > `kafka-consumer-group` and add a new command `--reset-offset-streams`
> > with
> > > the current implementation functionality and add `--reset-offset`
> options
> > > for input topics. Does this make sense?
> > >
> > >
> > > El lun., 26 jun. 2017 a las 23:32, Matthias J. Sax (<
> > matth...@confluent.io>)
> > > escribió:
> > >
> > >> Jorge,
> > >>
> > >> thanks a lot for this KIP. Allowing the reset streams applications
> with
> > >> arbitrary start offset is something we got multiple requests already.
> > >>
> > >> Couple of clarification question:
> > >>
> > >>  - why do you want to deprecate the current tool instead of extending
> > >> the current tool with the stuff the offset reset tool can do (ie, use
> > >> the offset reset tool internally)
> > >>
> > >>  - you suggest to extend the offset reset tool to replace the stream
> > >> reset tool: how would the reset tool know if it is resetting a streams
> > >> applications or a regular consumer group?
> > >>
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 6/26/17 1:28 PM, Jorge Esteban Quilcate Otoya wrote:
> > >>> Hi all,
> > >>>
> > >>> I'd like to start the discussion to add reset offset tooling for
> Stream
> > >>> applications.
> > >>> The KIP can be found here:
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application
> > >>>
> > >>> Thanks,
> > >>> Jorge.
> > >>>
> > >>
> > >>
> > >
> >
> >
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-29 Thread Damian Guy
Thanks Kyle.

On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <winkelman.k...@gmail.com>
wrote:

> Hi Damian,
>
> >>>> When trying to program in the fluent API that has been discussed most
> it
> >>>> feels difficult to know when you will actually get an object you can
> reuse.
> >>>> What if I make one KGroupedStream that I want to reuse, is it legal to
> >>>> reuse it or does this approach expect you to call grouped each time?
>
> >> I'd anticipate that once you have a KGroupedStream you can re-use it as
> you
> >> can today.
>
> You said it yourself in another post that the grouped stream is
> effectively a no-op until a count, reduce, or aggregate. The way I see it
> you wouldn’t be able to reuse anything except KStreams and KTables, because
> most of this fluent api would continue returning this (this being the
> builder object currently being manipulated).

So, if you ever store a reference to anything but KStreams and KTables and
> you use it in two different ways then its possible you make conflicting
> withXXX() calls on the same builder.
>
>
No necessarily true. It could return a new instance of the builder, i.e.,
the builders being immutable. So if you held a reference to the builder it
would always be the same as it was when it was created.


> GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped();
> GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
>
> I’ll admit that this shouldn’t happen but some user is going to do it
> eventually…
> Depending on implementation uses of groupedStreamWithDefaultSerdes would
> most likely be equivalent to the version withDeclaredSerdes. One work
> around would be to always make copies of the config objects you are
> building, but this approach has its own problem because now we have to
> identify which configs are equivalent so we don’t create repeated
> processors.
>
> The point of this long winded example is that we always have to be
> thinking about all of the possible ways it could be misused by a user
> (causing them to see hard to diagnose problems).
>

Exactly! That is the point of the discussion really.


>
> In my attempt at a couple methods with builders I feel that I could
> confidently say the user couldn’t really mess it up.
> > // Count
> > KTable<String, Long> count =
> > kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
> The kGroupedStream is reusable and if they attempted to reuse the Count
> for some reason it would throw an error message saying that a store named
> “my-store” already exists.
>
>
Yes i agree and i think using builders is my preferred pattern.

Cheers,
Damian


> Thanks,
> Kyle
>
> From: Damian Guy
> Sent: Thursday, June 29, 2017 3:59 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>
> Hi Kyle,
>
> Thanks for your input. Really appreciated.
>
> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.k...@gmail.com>
> wrote:
>
> > I like more of a builder pattern even though others have voiced against
> > it. The reason I like it is because it makes it clear to the user that a
> > call to KGroupedStream#count will return a KTable not some intermediate
> > class that I need to undetstand.
> >
>
> Yes, that makes sense.
>
>
> > When trying to program in the fluent API that has been discussed most it
> > feels difficult to know when you will actually get an object you can
> reuse.
> > What if I make one KGroupedStream that I want to reuse, is it legal to
> > reuse it or does this approach expect you to call grouped each time?
>
>
> I'd anticipate that once you have a KGroupedStream you can re-use it as you
> can today.
>
>
> > This question doesn’t pop into my head at all in the builder pattern I
> > assume I can reuse everything.
> > Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big fan of
> > the grouped.
> >
> > Yes, grouped() was more for demonstration and because groupBy() and
> groupByKey() were taken! So i'd imagine the api would actually want to be
> groupByKey(/** no required args***/).withOptionalArg() and
> groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this all depends
> on maintaining backward compatibility.
>
>
> > Unfortunately, the below approach would require atleast 2 (probably 3)
> > overloads (one for returning a KTable and one for returning a KTable with
> > Windowed Key, probably would want to split windowed and sessionwindowed
> for
> > ease of implementation) of eac

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-29 Thread Damian Guy
I've updated the experimental code with a couple of ways of doing joins.
One following the fluent approach and one following the builder approach.
The 2 examples can be found here:
https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java#L714

The code looks like:

@Test
public void shouldBeFluentIsh() throws Exception {
final KStream<String, String> one = null;
final KStream<String, String> two = null;
final Serde serde = null;
final ValueJoiner<String, String, String> vj = null;

// inner join
one.join2(two, vj, JoinWindows.of(10))
.withKeySerde(serde)
.withThisValueSerde(serde)
.withOtherValueSerde(serde)
.stream();

// left join
one.join2(two, vj, JoinWindows.of(10))
.withJoinType(JoinType.LEFT)
.stream();
}

@Test
public void shouldUseBuilder() throws Exception {
final KStream<String, String> one = null;
final KStream<String, String> two = null;
final Serde serde = null;
final ValueJoiner<String, String, String> vj = null;

//inner
one.join(Joins.streamStreamJoin(two, vj, JoinWindows.of(10)).build());

//left
one.join(Joins.streamStreamJoin(two, vj,
JoinWindows.of(10)).withJoinType(JoinType.LEFT).build());
}


I'm not going to say which way i'm leaning, yet!

Thanks,
Damian

On Thu, 29 Jun 2017 at 11:47 Damian Guy <damian@gmail.com> wrote:

>
>> However, I don't understand your argument about putting aggregate()
>> after the withXX() -- all the calls to withXX() set optional parameters
>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
>> indicates that the withXX() belongs to the groupBy(). IMHO, this might
>> be quite confusion for developers.
>>
>>
> I see what you are saying, but the grouped stream is effectively a no-op
> until you call one of the aggregate/count/reduce etc functions. So the
> optional params are ones that are applicable to any of the operations you
> can perform on this grouped stream. Then the final
> count()/reduce()/aggregate() call has any of the params that are
> required/specific to that function.
>
>
>>
>> -Matthias
>>
>> On 6/28/17 2:55 AM, Damian Guy wrote:
>> >> I also think that mixing optional parameters with configs is a bad
>> idea.
>> >> Have not proposal for this atm but just wanted to mention it. Hope to
>> >> find some time to come up with something.
>> >>
>> >>
>> > Yes, i don't like the mix of config either. But the only real config
>> here
>> > is the logging config - which we don't really need as it can already be
>> > done via a custom StateStoreSupplier.
>> >
>> >
>> >> What I don't like in the current proposal is the
>> >> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
>> >> and .groupByKey() seems better. For clarity, we could rename to
>> >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
>> >> some better names).
>> >>
>> >>
>> > it could be groupByKey(), groupBy() or something different bt
>> >
>> >
>> >
>> >> The proposed pattern "chains" grouping and aggregation too close
>> >> together. I would rather separate both more than less, ie, do into the
>> >> opposite direction.
>> >>
>> >> I am also wondering, if we could so something more "fluent". The
>> initial
>> >> proposal was like:
>> >>
>> >>>> groupedStream.count()
>> >>>>.withStoreName("name")
>> >>>>.withCachingEnabled(false)
>> >>>>.withLoggingEnabled(config)
>> >>>>.table()
>> >>
>> >> The .table() statement in the end was kinda alien.
>> >>
>> >
>> > I agree, but then all of the withXXX methods need to be on KTable which
>> is
>> > worse in my opinion. You also need something that is going to "build"
>> the
>> > internal processors and add them to the topology.
>> >
>> >
>> >> The current proposal put the count() into the end -- ie, the optional
>> >> parameter for count() have to specified on the .grouped() call -- this
>> >> does not seems to be the best way either.
>> >>
>> >>
>> > I actually prefer this method as you are building a grouped stream that
>> you
>> > will aggregate. So table.grouped(...).withOptionalStuff(

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-29 Thread Damian Guy
> However, I don't understand your argument about putting aggregate()
> after the withXX() -- all the calls to withXX() set optional parameters
> for aggregate() and not for groupBy() -- but a groupBy().withXX()
> indicates that the withXX() belongs to the groupBy(). IMHO, this might
> be quite confusion for developers.
>
>
I see what you are saying, but the grouped stream is effectively a no-op
until you call one of the aggregate/count/reduce etc functions. So the
optional params are ones that are applicable to any of the operations you
can perform on this grouped stream. Then the final
count()/reduce()/aggregate() call has any of the params that are
required/specific to that function.


>
> -Matthias
>
> On 6/28/17 2:55 AM, Damian Guy wrote:
> >> I also think that mixing optional parameters with configs is a bad idea.
> >> Have not proposal for this atm but just wanted to mention it. Hope to
> >> find some time to come up with something.
> >>
> >>
> > Yes, i don't like the mix of config either. But the only real config here
> > is the logging config - which we don't really need as it can already be
> > done via a custom StateStoreSupplier.
> >
> >
> >> What I don't like in the current proposal is the
> >> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
> >> and .groupByKey() seems better. For clarity, we could rename to
> >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
> >> some better names).
> >>
> >>
> > it could be groupByKey(), groupBy() or something different bt
> >
> >
> >
> >> The proposed pattern "chains" grouping and aggregation too close
> >> together. I would rather separate both more than less, ie, do into the
> >> opposite direction.
> >>
> >> I am also wondering, if we could so something more "fluent". The initial
> >> proposal was like:
> >>
> >>>> groupedStream.count()
> >>>>.withStoreName("name")
> >>>>.withCachingEnabled(false)
> >>>>.withLoggingEnabled(config)
> >>>>.table()
> >>
> >> The .table() statement in the end was kinda alien.
> >>
> >
> > I agree, but then all of the withXXX methods need to be on KTable which
> is
> > worse in my opinion. You also need something that is going to "build" the
> > internal processors and add them to the topology.
> >
> >
> >> The current proposal put the count() into the end -- ie, the optional
> >> parameter for count() have to specified on the .grouped() call -- this
> >> does not seems to be the best way either.
> >>
> >>
> > I actually prefer this method as you are building a grouped stream that
> you
> > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..)
> etc
> > seems natural to me.
> >
> >
> >> I did not think this through in detail, but can't we just do the initial
> >> proposal with the .table() ?
> >>
> >> groupedStream.count().withStoreName("name").mapValues(...)
> >>
> >> Each .withXXX(...) return the current KTable and all the .withXXX() are
> >> just added to the KTable interface. Or do I miss anything why this wont'
> >> work or any obvious disadvantage?
> >>
> >>
> >>
> > See above.
> >
> >
> >>
> >> -Matthias
> >>
> >> On 6/22/17 4:06 AM, Damian Guy wrote:
> >>> Thanks everyone. My latest attempt is below. It builds on the fluent
> >>> approach, but i think it is slightly nicer.
> >>> I agree with some of what Eno said about mixing configy stuff in the
> DSL,
> >>> but i think that enabling caching and enabling logging are things that
> >>> aren't actually config. I'd probably not add withLogConfig(...) (even
> >>> though it is below) as this is actually config and we already have a
> way
> >> of
> >>> doing that, via the StateStoreSupplier. Arguably we could use the
> >>> StateStoreSupplier for disabling caching etc, but as it stands that is
> a
> >>> bit of a tedious process for someone that just wants to use the default
> >>> storage engine, but not have caching enabled.
> >>>
> >>> There is also an orthogonal concern that Guozhang alluded to If you
> >>> want to plug in a custom storage engine and you want it to be logged
> etc,
> >>> you would currently need to implement that you

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-29 Thread Damian Guy
s, 2) syntax-wise it reads more natural.
>
> I think it is okay to add the APIs in (
>
> https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
> )
> in KGroupedStream.
>
>
> 2. For the "withStateStoreSupplier" API, are the user supposed to pass in
> the most-inner state store supplier (e.g. then one whose get() return
> RocksDBStore), or it is supposed to return the most-outer supplier with
> logging / metrics / etc? I think it would be more useful to only require
> users pass in the inner state store supplier while specifying caching /
> logging through other APIs.
>
> In addition, the "GroupedWithCustomStore" is a bit suspicious to me: we are
> allowing users to call other APIs like "withQueryableName" multiple time,
> but only call "withStateStoreSupplier" only once in the end. Why is that?
>
>
> 3. The current DSL seems to be only for aggregations, what about joins?
>
>
> 4. I think it is okay to keep the "withLogConfig": for the
> StateStoreSupplier it will still be user code specifying the topology so I
> do not see there is a big difference.
>
>
> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the
> windowed state store supplier to enforce typing?
>
>
> Below are minor ones:
>
> 6. "withQueryableName": maybe better "withQueryableStateName"?
>
> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
>
>
>
> Guozhang
>
>
>
> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > I see your point about "when to add the processor to the topology". That
> > is indeed an issue. Not sure it we could allow "updates" to the
> topology...
> >
> > I don't see any problem with having all the withXX() in KTable interface
> > -- but this might be subjective.
> >
> >
> > However, I don't understand your argument about putting aggregate()
> > after the withXX() -- all the calls to withXX() set optional parameters
> > for aggregate() and not for groupBy() -- but a groupBy().withXX()
> > indicates that the withXX() belongs to the groupBy(). IMHO, this might
> > be quite confusion for developers.
> >
> >
> > -Matthias
> >
> > On 6/28/17 2:55 AM, Damian Guy wrote:
> > >> I also think that mixing optional parameters with configs is a bad
> idea.
> > >> Have not proposal for this atm but just wanted to mention it. Hope to
> > >> find some time to come up with something.
> > >>
> > >>
> > > Yes, i don't like the mix of config either. But the only real config
> here
> > > is the logging config - which we don't really need as it can already be
> > > done via a custom StateStoreSupplier.
> > >
> > >
> > >> What I don't like in the current proposal is the
> > >> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
> > >> and .groupByKey() seems better. For clarity, we could rename to
> > >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
> > >> some better names).
> > >>
> > >>
> > > it could be groupByKey(), groupBy() or something different bt
> > >
> > >
> > >
> > >> The proposed pattern "chains" grouping and aggregation too close
> > >> together. I would rather separate both more than less, ie, do into the
> > >> opposite direction.
> > >>
> > >> I am also wondering, if we could so something more "fluent". The
> initial
> > >> proposal was like:
> > >>
> > >>>> groupedStream.count()
> > >>>>.withStoreName("name")
> > >>>>.withCachingEnabled(false)
> > >>>>.withLoggingEnabled(config)
> > >>>>    .table()
> > >>
> > >> The .table() statement in the end was kinda alien.
> > >>
> > >
> > > I agree, but then all of the withXXX methods need to be on KTable which
> > is
> > > worse in my opinion. You also need something that is going to "build"
> the
> > > internal processors and add them to the topology.
> > >
> > >
> > >> The current proposal put the count() into the end -- ie, the optional
> > >> parameter for count() have to specified on the .grouped() call -- this
> > >> does not seems to be the best way either.
> > >

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-29 Thread Damian Guy
Hi Guozhang,

Thanks for the feedback

On Thu, 29 Jun 2017 at 01:45 Guozhang Wang <wangg...@gmail.com> wrote:

> I played the current proposal a bit with https://github.com/dguy/kafka/
> tree/dsl-experiment <https://github.com/dguy/kafka/tree/dsl-experiment>,
> and here are my observations:
>
> 1. Personally I prefer
>
> "stream.group(mapper) / stream.groupByKey()"
>
> than
>
> "stream.group().withKeyMapper(mapper) / stream.group()"
>
> Since 1) withKeyMapper is not enforced programmatically though it is not
> "really" optional like others, 2) syntax-wise it reads more natural.
>
>
Yes, that is true. The keyMapper is not optional to `group` so should be an
argument to the method


> I think it is okay to add the APIs in (
>
> https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
> )
> in KGroupedStream.
>
>
Are you saying add these methods to KGroupedStream? I'm not so sure as
they'd all need to return a KGroupedStream and then we would need to be
able to remove/replace/update processors in the topology (which would be
achievable, but i think not before KIP-120 is done so that we don't expose
any further public APIs that are only to support internal usages - also
updating processors in the topology seems a bit odd IMO). Perhaps you mean
something different.


>
> 2. For the "withStateStoreSupplier" API, are the user supposed to pass in
> the most-inner state store supplier (e.g. then one whose get() return
> RocksDBStore), or it is supposed to return the most-outer supplier with
> logging / metrics / etc? I think it would be more useful to only require
> users pass in the inner state store supplier while specifying caching /
> logging through other APIs.
>

Yes that is possible, though in that case we might want to use a narrower
interface than StateStoreSupplier, i.e., one that doesn't have `logConfig`
and `loggingEnabled`? Those two methods are exposed for the
`TopologyBuilder`


>
> In addition, the "GroupedWithCustomStore" is a bit suspicious to me: we are
> allowing users to call other APIs like "withQueryableName" multiple time,
> but only call "withStateStoreSupplier" only once in the end. Why is that?
>
>
The API is not meant to be complete! It is just an experiment to see what
is possible. However, at some point we need to decide which methods are
valid at any point. So if you have called `withStateStoreSupplier` you no
longer need to call `withQueryableName`, `withKeySerde`, `withValueSerde`
etc as this is already done.  The point being to narrow the interface to
what makes sense. There is work to be done here!


>
> 3. The current DSL seems to be only for aggregations, what about joins?
>
>
You have to start somewhere! I'd like to think we can come to an agreement
on a style and then apply that across the board. So the API should look the
same everywhere. But yes i can add some join examples.



>
> 4. I think it is okay to keep the "withLogConfig": for the
> StateStoreSupplier it will still be user code specifying the topology so I
> do not see there is a big difference.
>
>
I guess this one of the most contentious points, i.e., mixing config with
topology building. I'm on the fence on this as I get both sides of the
argument. I also don't really like having everything in config.


>
> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the
> windowed state store supplier to enforce typing?
>
>
Yes it would. I was just doing a bare bones API so at least you could try
it out.


>
> Below are minor ones:
>
> 6. "withQueryableName": maybe better "withQueryableStateName"?
>

I think we currently call it `queryableStoreName`. So maybe we stick with
that?


>
> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
>
>
Yep



>
> Guozhang
>
>
>
> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > I see your point about "when to add the processor to the topology". That
> > is indeed an issue. Not sure it we could allow "updates" to the
> topology...
> >
> > I don't see any problem with having all the withXX() in KTable interface
> > -- but this might be subjective.
> >
> >
> > However, I don't understand your argument about putting aggregate()
> > after the withXX() -- all the calls to withXX() set optional parameters
> > for aggregate() and not for groupBy() -- but a groupBy().withXX()
> > indicates that the withXX() belongs to the groupBy(). IMHO, this might
> > be quite confusion for developers.
> >
> >
>

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-28 Thread Damian Guy
> I also think that mixing optional parameters with configs is a bad idea.
> Have not proposal for this atm but just wanted to mention it. Hope to
> find some time to come up with something.
>
>
Yes, i don't like the mix of config either. But the only real config here
is the logging config - which we don't really need as it can already be
done via a custom StateStoreSupplier.


> What I don't like in the current proposal is the
> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
> and .groupByKey() seems better. For clarity, we could rename to
> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
> some better names).
>
>
it could be groupByKey(), groupBy() or something different bt



> The proposed pattern "chains" grouping and aggregation too close
> together. I would rather separate both more than less, ie, do into the
> opposite direction.
>
> I am also wondering, if we could so something more "fluent". The initial
> proposal was like:
>
> >> groupedStream.count()
> >>.withStoreName("name")
> >>.withCachingEnabled(false)
> >>.withLoggingEnabled(config)
> >>.table()
>
> The .table() statement in the end was kinda alien.
>

I agree, but then all of the withXXX methods need to be on KTable which is
worse in my opinion. You also need something that is going to "build" the
internal processors and add them to the topology.


> The current proposal put the count() into the end -- ie, the optional
> parameter for count() have to specified on the .grouped() call -- this
> does not seems to be the best way either.
>
>
I actually prefer this method as you are building a grouped stream that you
will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..) etc
seems natural to me.


> I did not think this through in detail, but can't we just do the initial
> proposal with the .table() ?
>
> groupedStream.count().withStoreName("name").mapValues(...)
>
> Each .withXXX(...) return the current KTable and all the .withXXX() are
> just added to the KTable interface. Or do I miss anything why this wont'
> work or any obvious disadvantage?
>
>
>
See above.


>
> -Matthias
>
> On 6/22/17 4:06 AM, Damian Guy wrote:
> > Thanks everyone. My latest attempt is below. It builds on the fluent
> > approach, but i think it is slightly nicer.
> > I agree with some of what Eno said about mixing configy stuff in the DSL,
> > but i think that enabling caching and enabling logging are things that
> > aren't actually config. I'd probably not add withLogConfig(...) (even
> > though it is below) as this is actually config and we already have a way
> of
> > doing that, via the StateStoreSupplier. Arguably we could use the
> > StateStoreSupplier for disabling caching etc, but as it stands that is a
> > bit of a tedious process for someone that just wants to use the default
> > storage engine, but not have caching enabled.
> >
> > There is also an orthogonal concern that Guozhang alluded to If you
> > want to plug in a custom storage engine and you want it to be logged etc,
> > you would currently need to implement that yourself. Ideally we can
> provide
> > a way where we will wrap the custom store with logging, metrics, etc. I
> > need to think about where this fits, it is probably more appropriate on
> the
> > Stores API.
> >
> > final KeyValueMapper<String, String, Long> keyMapper = null;
> > // count with mapped key
> > final KTable<Long, Long> count = stream.grouped()
> > .withKeyMapper(keyMapper)
> > .withKeySerde(Serdes.Long())
> > .withValueSerde(Serdes.String())
> > .withQueryableName("my-store")
> > .count();
> >
> > // windowed count
> > final KTable<Windowed, Long> windowedCount = stream.grouped()
> > .withQueryableName("my-window-store")
> > .windowed(TimeWindows.of(10L).until(10))
> > .count();
> >
> > // windowed reduce
> > final Reducer windowedReducer = null;
> > final KTable<Windowed, String> windowedReduce = stream.grouped()
> > .withQueryableName("my-window-store")
> > .windowed(TimeWindows.of(10L).until(10))
> > .reduce(windowedReducer);
> >
> > final Aggregator<String, String, Long> aggregator = null;
> > final Initializer init = null;
> >
> > // aggregate
> > final KTable<String, Long> aggregate = stream.grouped()
> > .withQueryableName("my-aggregate-store")
&

[jira] [Resolved] (KAFKA-5487) Rolling upgrade test for streams

2017-06-24 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5487.
---
   Resolution: Fixed
Fix Version/s: (was: 0.11.0.1)
   0.11.1.0

Issue resolved by pull request 3411
[https://github.com/apache/kafka/pull/3411]

> Rolling upgrade test for streams
> 
>
> Key: KAFKA-5487
> URL: https://issues.apache.org/jira/browse/KAFKA-5487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> We need to do a basic rolling upgrade test for streams, similar to the 
> tests/kafkatest/tests/core/upgrade_test.py test for Kafka core. Basically we 
> need to test the ability of a streams app to use a different JAR from a 
> different version.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Mirroring documentation improvement

2017-06-23 Thread Damian Guy
Done! Thanks

On Fri, 23 Jun 2017 at 15:14 Paolo Patierno <ppatie...@live.com> wrote:

> Hi Damina,
>
> my username is ppatierno ... thank you very much !
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>
>
> 
> From: Tom Bentley <t.j.bent...@gmail.com>
> Sent: Friday, June 23, 2017 2:10 PM
> To: dev@kafka.apache.org
> Subject: Re: Mirroring documentation improvement
>
> Hi Damian, my username is tombentley
>
> Thanks
>
> Tom
>
> On 23 June 2017 at 14:57, Damian Guy <damian@gmail.com> wrote:
>
> > Evgeniy, you should now have access.
> >
> > Tom & Paolo what are your wiki usernames?
> >
> > Thanks,
> > Damian
> >
> > On Fri, 23 Jun 2017 at 14:52 Tom Bentley <t.j.bent...@gmail.com> wrote:
> >
> > > If any of the committers see this: Could I also have edit access
> please?
> > >
> > > On 23 June 2017 at 14:47, Evgeniy Veretennikov <
> > evg.veretenni...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Tom,
> > > >
> > > > Thanks for correction, I meant, that I want to edit this page in
> fact.
> > > > Initially I copy-pasted wrong link.
> > > >
> > > > My confluence username is evis.
> > > >
> > > > Best regards,
> > > > Evgeny
> > > >
> > > > С уважением,
> > > > Евгений Веретенников
> > > >
> > > > 2017-06-23 16:43 GMT+03:00 Tom Bentley <t.j.bent...@gmail.com>:
> > > >
> > > > > Hi Evgeniy,
> > > > >
> > > > > The wiki page you link to there is about contributing improvements
> > for
> > > > > kafka.apache.org. But since you say "mirroring doc in Confluence"
> I
> > > > assume
> > > > > you want to edit
> > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > action?pageId=27846330,
> > > > > if so you will need one of the committers to give you edit access
> to
> > > the
> > > > > wiki. Hopefully one of them will see this and sort it out, but if
> you
> > > > could
> > > > > tell them your confluence username that might help.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Tom
> > > > >
> > > > > On 23 June 2017 at 14:36, Evgeniy Veretennikov <
> > > > evg.veretenni...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I want to improve a bit mirroring doc in Confluence:
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > > Contributing+Website+
> > > > > > Documentation+Changes
> > > > > >
> > > > > > How can I change this page? Do I need some specific access
> rights?
> > > > > >
> > > > > > Best regards,
> > > > > > Evgeny Veretennikov
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Mirroring documentation improvement

2017-06-23 Thread Damian Guy
Evgeniy, you should now have access.

Tom & Paolo what are your wiki usernames?

Thanks,
Damian

On Fri, 23 Jun 2017 at 14:52 Tom Bentley  wrote:

> If any of the committers see this: Could I also have edit access please?
>
> On 23 June 2017 at 14:47, Evgeniy Veretennikov  >
> wrote:
>
> > Hi Tom,
> >
> > Thanks for correction, I meant, that I want to edit this page in fact.
> > Initially I copy-pasted wrong link.
> >
> > My confluence username is evis.
> >
> > Best regards,
> > Evgeny
> >
> > С уважением,
> > Евгений Веретенников
> >
> > 2017-06-23 16:43 GMT+03:00 Tom Bentley :
> >
> > > Hi Evgeniy,
> > >
> > > The wiki page you link to there is about contributing improvements for
> > > kafka.apache.org. But since you say "mirroring doc in Confluence" I
> > assume
> > > you want to edit
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=27846330,
> > > if so you will need one of the committers to give you edit access to
> the
> > > wiki. Hopefully one of them will see this and sort it out, but if you
> > could
> > > tell them your confluence username that might help.
> > >
> > > Cheers,
> > >
> > > Tom
> > >
> > > On 23 June 2017 at 14:36, Evgeniy Veretennikov <
> > evg.veretenni...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I want to improve a bit mirroring doc in Confluence:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/
> > Contributing+Website+
> > > > Documentation+Changes
> > > >
> > > > How can I change this page? Do I need some specific access rights?
> > > >
> > > > Best regards,
> > > > Evgeny Veretennikov
> > > >
> > >
> >
>


Re: [VOTE] KIP-161: streams deserialization exception handlers

2017-06-23 Thread Damian Guy
Thanks for the KIP Eno.
+1 (binding)

On Fri, 23 Jun 2017 at 11:00 Eno Thereska  wrote:

> Starting voting thread for:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+deserialization+exception+handlers
> >
>
> Thanks
> Eno


[jira] [Resolved] (KAFKA-4913) creating a window store with one segment throws division by zero error

2017-06-23 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-4913.
---
   Resolution: Fixed
Fix Version/s: (was: 0.11.0.1)
   0.11.1.0

Issue resolved by pull request 3410
[https://github.com/apache/kafka/pull/3410]

> creating a window store with one segment throws division by zero error
> --
>
> Key: KAFKA-4913
> URL: https://issues.apache.org/jira/browse/KAFKA-4913
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Xavier Léauté
>Assignee: Damian Guy
> Fix For: 0.11.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Kafka streams KStream and ktable join issue

2017-06-23 Thread Damian Guy
My guess is it is because the record doesn't have a key, i.e., the key is
null. We have a fix for this in 0.11, in that we will skip records with a
null key during restore.

On Fri, 23 Jun 2017 at 03:57 Matthias J. Sax  wrote:

> Hi,
>
> can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1?
>
> It's unclear to me, how an NPE can occur. It seems to happen within
> Streams library. Might be a bug. Not sure atm.
>
>
> -Matthias
>
> On 6/22/17 9:43 AM, Shekar Tippur wrote:
> > Hello,
> >
> > I am trying to perform a simple join operation. I am using Kafka 0.10.2
> >
> > I have a "raw" table and a "cache" topics and just 1 partition in my
> local
> > environment.
> >
> > ktable has these entries
> >
> > {"Joe": {"location": "US", "gender": "male"}}
> > {"Julie": {"location": "US", "gender": "female"}}
> > {"Kawasaki": {"location": "Japan", "gender": "male"}}
> >
> > The kstream gets a event
> >
> > {"user": "Joe", "custom": {"choice":"vegan"}}
> >
> > I want a output as a join
> >
> > {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location":
> "US",
> > "gender": "male"}*} }
> >
> > I want to take whats in ktable and add to enriched section of the output
> > stream.
> >
> > I have defined serde
> >
> > //This is the same serde code from the example.
> >
> > final TestStreamsSerializer jsonSerializer = new
> > TestStreamsSerializer();
> > final TestStreamsDeserialzer jsonDeserializer = new
> > TestStreamsDeserialzer();
> > final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer,
> > jsonDeserializer);
> >
> > //
> >
> > KStream raw = builder.stream(Serdes.String(),
> > jsonSerde, "raw");
> > KTable  cache = builder.table("cache", "local-cache");
> >
> > raw.leftJoin(cache,
> > (record1, record2) -> record1.get("user") + "-" +
> record2).to("output");
> >
> > I am having trouble understanding how to call the join api.
> >
> > With the above code, I seem to get a error:
> >
> > [2017-06-22 09:23:31,836] ERROR User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > streams-pipe failed on partition assignment
> > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> >
> > java.lang.NullPointerException
> >
> > at org.rocksdb.RocksDB.put(RocksDB.java:488)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> >
> > at
> >
> 

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-22 Thread Damian Guy
Thanks everyone. My latest attempt is below. It builds on the fluent
approach, but i think it is slightly nicer.
I agree with some of what Eno said about mixing configy stuff in the DSL,
but i think that enabling caching and enabling logging are things that
aren't actually config. I'd probably not add withLogConfig(...) (even
though it is below) as this is actually config and we already have a way of
doing that, via the StateStoreSupplier. Arguably we could use the
StateStoreSupplier for disabling caching etc, but as it stands that is a
bit of a tedious process for someone that just wants to use the default
storage engine, but not have caching enabled.

There is also an orthogonal concern that Guozhang alluded to If you
want to plug in a custom storage engine and you want it to be logged etc,
you would currently need to implement that yourself. Ideally we can provide
a way where we will wrap the custom store with logging, metrics, etc. I
need to think about where this fits, it is probably more appropriate on the
Stores API.

final KeyValueMapper<String, String, Long> keyMapper = null;
// count with mapped key
final KTable<Long, Long> count = stream.grouped()
.withKeyMapper(keyMapper)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
.withQueryableName("my-store")
.count();

// windowed count
final KTable<Windowed, Long> windowedCount = stream.grouped()
.withQueryableName("my-window-store")
.windowed(TimeWindows.of(10L).until(10))
.count();

// windowed reduce
final Reducer windowedReducer = null;
final KTable<Windowed, String> windowedReduce = stream.grouped()
.withQueryableName("my-window-store")
.windowed(TimeWindows.of(10L).until(10))
.reduce(windowedReducer);

final Aggregator<String, String, Long> aggregator = null;
final Initializer init = null;

// aggregate
final KTable<String, Long> aggregate = stream.grouped()
.withQueryableName("my-aggregate-store")
.aggregate(aggregator, init, Serdes.Long());

final StateStoreSupplier<KeyValueStore<String, Long>> stateStoreSupplier = null;

// aggregate with custom store
final KTable<String, Long> aggWithCustomStore = stream.grouped()
.withStateStoreSupplier(stateStoreSupplier)
.aggregate(aggregator, init);

// disable caching
stream.grouped()
.withQueryableName("name")
.withCachingEnabled(false)
.count();

// disable logging
stream.grouped()
.withQueryableName("q")
.withLoggingEnabled(false)
.count();

// override log config
final Reducer reducer = null;
stream.grouped()
.withLogConfig(Collections.singletonMap("segment.size", "10"))
.reduce(reducer);


If anyone wants to play around with this you can find the code here:
https://github.com/dguy/kafka/tree/dsl-experiment

Note: It won't actually work as most of the methods just return null.

Thanks,
Damian


On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote:

> Thanks Damian. I think both options have pros and cons. And both are better
> than overload abuse.
>
> The fluent API approach reads better, no mention of builder or build
> anywhere. The main downside is that the method signatures are a little less
> clear. By reading the method signature, one doesn't necessarily knows what
> it returns. Also, one needs to figure out the special method (`table()` in
> this case) that gives you what you actually care about (`KTable` in this
> case). Not major issues, but worth mentioning while doing the comparison.
>
> The builder approach avoids the issues mentioned above, but it doesn't read
> as well.
>
> Ismael
>
> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian@gmail.com> wrote:
>
> > Hi,
> >
> > I'd like to get a discussion going around some of the API choices we've
> > made in the DLS. In particular those that relate to stateful operations
> > (though this could expand).
> > As it stands we lean heavily on overloaded methods in the API, i.e, there
> > are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
> > feel it is only going to get worse as we add more optional params. In
> > particular we've had some requests to be able to turn caching off, or
> > change log configs,  on a per operator basis (note this can be done now
> if
> > you pass in a StateStoreSupplier, but this can be a bit cumbersome).
> >
> > So this is a bit of an open question. How can we change the DSL overloads
> > so that it flows, is simple to use and understand, and is easily extended
> > in the future?
> >
> > One option would be to use a fluent API approach for providing the
> op

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-21 Thread Damian Guy
Thanks Eno.

Yes i agree. We could apply this same approach to most of the operations
where we have multiple overloads, i.e., we have a single method for each
operation that takes the required parameters and everything else is
specified as you have done above.

On Wed, 21 Jun 2017 at 16:24 Eno Thereska <eno.there...@gmail.com> wrote:

> (cc’ing user-list too)
>
> Given that we already have StateStoreSuppliers that are configurable using
> the fluent-like API, probably it’s worth discussing the other examples with
> joins and serdes first since those have many overloads and are in need of
> some TLC.
>
> So following your example, I guess you’d have something like:
> .join()
>.withKeySerdes(…)
>.withValueSerdes(…)
>.withJoinType(“outer”)
>
> etc?
>
> I like the approach since it still remains declarative and it’d reduce the
> number of overloads by quite a bit.
>
> Eno
>
> > On Jun 21, 2017, at 3:37 PM, Damian Guy <damian@gmail.com> wrote:
> >
> > Hi,
> >
> > I'd like to get a discussion going around some of the API choices we've
> > made in the DLS. In particular those that relate to stateful operations
> > (though this could expand).
> > As it stands we lean heavily on overloaded methods in the API, i.e, there
> > are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
> > feel it is only going to get worse as we add more optional params. In
> > particular we've had some requests to be able to turn caching off, or
> > change log configs,  on a per operator basis (note this can be done now
> if
> > you pass in a StateStoreSupplier, but this can be a bit cumbersome).
> >
> > So this is a bit of an open question. How can we change the DSL overloads
> > so that it flows, is simple to use and understand, and is easily extended
> > in the future?
> >
> > One option would be to use a fluent API approach for providing the
> optional
> > params, so something like this:
> >
> > groupedStream.count()
> >   .withStoreName("name")
> >   .withCachingEnabled(false)
> >   .withLoggingEnabled(config)
> >   .table()
> >
> >
> >
> > Another option would be to provide a Builder to the count method, so it
> > would look something like this:
> > groupedStream.count(new
> > CountBuilder("storeName").withCachingEnabled(false).build())
> >
> > Another option is to say: Hey we don't need this, what are you on about!
> >
> > The above has focussed on state store related overloads, but the same
> ideas
> > could  be applied to joins etc, where we presently have many join methods
> > and many overloads.
> >
> > Anyway, i look forward to hearing your opinions.
> >
> > Thanks,
> > Damian
>
>


Re: confluence permission request

2017-06-21 Thread Damian Guy
Hi,

That should be done.

Thanks,
Damian

On Wed, 21 Jun 2017 at 05:42 Kenji Hayashida  wrote:

> To Kafka Dev Team,
>
> Sorry, forgot sending my ID.
> My ID is kenjih.
>
> Thanks.
>
> - Kenji Hayashida
>
> 2017-06-21 13:29 GMT+09:00 Kenji Hayashida :
>
> > To Kafka Dev Team,
> >
> > Hi, could you please give me a write permission to the confluence page?
> > https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Improvement+Proposals
> >
> > I'm going to write a KIP.
> > Thanks.
> >
> > - Kenji Hayashida
> >
> >
>
>
> --
>
> ☆---★
> 林田賢二
> MAIL: kenji12...@gmail.com
>
> ☆---★
>


Re: Contributor

2017-06-21 Thread Damian Guy
Done - thanks

On Wed, 21 Jun 2017 at 12:19 Tom Bentley <t.j.bent...@gmail.com> wrote:

> Please can I also be added? My username is tombentley.
>
> Thanks
>
> Tom
>
> On 21 June 2017 at 12:03, Damian Guy <damian@gmail.com> wrote:
>
> > Hi Andras,
> >
> > You should have access now.
> >
> > Thanks,
> > Damian
> >
> > On Wed, 21 Jun 2017 at 10:45 Andras Beni <andrasb...@cloudera.com>
> wrote:
> >
> > > Hi All,
> > >
> > > I'd like to contribute to Apache Kafka.
> > > Can you please add me (username: andrasbeni) to the contributors list
> for
> > > this project at issues.apache.org?
> > >
> > > Thank you,
> > > Andras
> > >
> >
>


[DISCUSS] Streams DSL/StateStore Refactoring

2017-06-21 Thread Damian Guy
Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian


Re: Contributor

2017-06-21 Thread Damian Guy
Hi Andras,

You should have access now.

Thanks,
Damian

On Wed, 21 Jun 2017 at 10:45 Andras Beni  wrote:

> Hi All,
>
> I'd like to contribute to Apache Kafka.
> Can you please add me (username: andrasbeni) to the contributors list for
> this project at issues.apache.org?
>
> Thank you,
> Andras
>


Re: [ANNOUNCE] New committer: Damian Guy

2017-06-10 Thread Damian Guy
Thanks everyone. Looking forward to making many more contributions
On Sat, 10 Jun 2017 at 02:46, Joe Stein <crypt...@gmail.com> wrote:

> Congrats!
>
>
> ~ Joe Stein
>
> On Fri, Jun 9, 2017 at 6:49 PM, Neha Narkhede <n...@confluent.io> wrote:
>
> > Well deserved. Congratulations Damian!
> >
> > On Fri, Jun 9, 2017 at 1:34 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Hello all,
> > >
> > >
> > > The PMC of Apache Kafka is pleased to announce that we have invited
> > Damian
> > > Guy as a committer to the project.
> > >
> > > Damian has made tremendous contributions to Kafka. He has not only
> > > contributed a lot into the Streams api, but have also been involved in
> > many
> > > other areas like the producer and consumer clients, broker-side
> > > coordinators (group coordinator and the ongoing transaction
> coordinator).
> > > He has contributed more than 100 patches so far, and have been driving
> > on 6
> > > KIP contributions.
> > >
> > > More importantly, Damian has been a very prolific reviewer on open PRs
> > and
> > > has been actively participating on community activities such as email
> > lists
> > > and slack overflow questions. Through his code contributions and
> reviews,
> > > Damian has demonstrated good judgement on system design and code
> > qualities,
> > > especially on thorough unit test coverages. We believe he will make a
> > great
> > > addition to the committers of the community.
> > >
> > >
> > > Thank you for your contributions, Damian!
> > >
> > >
> > > -- Guozhang, on behalf of the Apache Kafka PMC
> > >
> > --
> > Thanks,
> > Neha
> >
>


[jira] [Updated] (KAFKA-3741) KStream config for changelog min.in.sync.replicas

2017-06-07 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3741:
--
Status: Patch Available  (was: Open)

> KStream config for changelog min.in.sync.replicas
> -
>
> Key: KAFKA-3741
> URL: https://issues.apache.org/jira/browse/KAFKA-3741
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>Assignee: Damian Guy
>  Labels: api
>
> Kafka Streams currently allows you to specify a replication factor for 
> changelog and repartition topics that it creates.  It should also allow you 
> to specify min.in.sync.replicas.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-3741) KStream config for changelog min.in.sync.replicas

2017-06-07 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy reassigned KAFKA-3741:
-

Assignee: Damian Guy

> KStream config for changelog min.in.sync.replicas
> -
>
> Key: KAFKA-3741
> URL: https://issues.apache.org/jira/browse/KAFKA-3741
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>Assignee: Damian Guy
>  Labels: api
>
> Kafka Streams currently allows you to specify a replication factor for 
> changelog and repartition topics that it creates.  It should also allow you 
> to specify min.in.sync.replicas.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-134: Delay initial consumer group rebalance

2017-06-07 Thread Damian Guy
Hi Jun/Ismael,

Sounds good to me.

Thanks,
Damian

On Tue, 6 Jun 2017 at 23:08 Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Jun,
>
> The console consumer issue also came up in a conversation I was having
> recently. Seems like the config/server.properties change is a reasonable
> compromise given that we have other defaults that are for development.
>
> Ismael
>
> On Tue, Jun 6, 2017 at 10:59 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Everyone,
> >
> > Sorry for being late on this thread. I just came across this thread. I
> have
> > a couple of concerns on this. (1) It seems the amount of delay will be
> > application specific. So, it seems that it's better for the delay to be a
> > client side config instead of a server side one? (2) When running console
> > consumer in quickstart, a minimum of 3 sec delay seems to be a bad
> > experience for our users.
> >
> > Since we are getting late into the release cycle, it may be a bit too
> late
> > to make big changes in the 0.11 release. Perhaps we should at least
> > consider overriding the delay in config/server.properties to 0 to improve
> > the quickstart experience?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Apr 11, 2017 at 12:19 AM, Damian Guy <damian@gmail.com>
> wrote:
> >
> > > Hi Onur,
> > >
> > > It was in my previous email. But here it is again.
> > >
> > > 
> > >
> > > 1. Better rebalance timing. We will try to rebalance only when all the
> > > consumers in a group have joined. The challenge would be someone has to
> > > define what does ALL consumers mean, it could either be a time or
> number
> > of
> > > consumers, etc.
> > >
> > > 2. Avoid frequent rebalance. For example, if there are 100 consumers
> in a
> > > group, today, in the worst case, we may end up with 100 rebalances even
> > if
> > > all the consumers joined the group in a reasonably small amount of
> time.
> > > Frequent rebalance is also a bad thing for brokers.
> > >
> > > Having a client side configuration may solve problem 1 better because
> > each
> > > consumer group can potentially configure their own timing. However, it
> > does
> > > not really prevent frequent rebalance in general because some of the
> > > consumers can be misconfigured. (This may have something to do with
> > KIP-124
> > > as well. But if quota is applied on the JoinGroup/SyncGroup request it
> > may
> > > cause some unwanted cascading effects.)
> > >
> > > Having a broker side configuration may result in less flexibility for
> > each
> > > consumer group, but it can prevent frequent rebalance better. I think
> > with
> > > some reasonable design, the rebalance timing issue can be resolved on
> the
> > > broker side as well. Matthias had a good point on extending the delay
> > when
> > > a new consumer joins a group (we actually did something similar to
> batch
> > > ISR change propagation). For example, let's say on the broker side, we
> > will
> > > always delay 2 seconds each time we see a new consumer joining a
> consumer
> > > group. This would probably work for most of the consumer groups and
> will
> > > also limit the rebalance frequency to protect the brokers.
> > >
> > > I am not sure about the streams use case here, but if something like 2
> > > seconds of delay is acceptable for streams, I would prefer adding the
> > > configuration to the broker so that we can address both problems.
> > >
> > > On Thu, 6 Apr 2017 at 17:11 Onur Karaman <onurkaraman.apa...@gmail.com
> >
> > > wrote:
> > >
> > > > Hi Damian.
> > > >
> > > > Can you copy the point Becket made earlier that you say isn't
> > addressed?
> > > >
> > > > On Thu, Apr 6, 2017 at 2:51 AM, Damian Guy <damian@gmail.com>
> > wrote:
> > > >
> > > > > Thanks all, the Vote is now closed and the KIP has been accepted
> > with 9
> > > > +1s
> > > > >
> > > > > 3 binding::
> > > > > Guozhang,
> > > > > Jason,
> > > > > Ismael
> > > > >
> > > > > 6 non-binding:
> > > > > Bill,
> > > > > Eno,
> > > > > Mathieu,
> > > > > Matthias,
> > > > > Dong,
&g

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

2017-06-07 Thread Damian Guy
I'm largely in agreement with what Guozhang has suggested, i.e.,
StateRestoreContext shouldn't have any setters on it and also need to have
the end offset available such that people can use it derive progress.
Slightly different, maybe the StateRestoreContext interface could be:

long beginOffset()
long endOffset()
long currentOffset()

One further thing, this currently doesn't provide developers the ability to
hook into this information if they are using the built-in StateStores. Is
this something we should be considering?


On Tue, 6 Jun 2017 at 23:32 Guozhang Wang  wrote:

> Thanks for the updated KIP Bill, I have a couple of comments:
>
> 1) I'm assuming beginRestore / endRestore is called only once per store
> throughout the whole restoration process, and restoreAll is called per
> batch. In that case I feel we can set the StateRestoreContext as a second
> parameter in restoreAll and in endRestore as well, and let the library to
> set the corresponding values instead and only let users to read (since the
> collection of key-value pairs do not contain offset information anyways
> users cannot really set the offset). The "lastOffsetRestored" would be the
> starting offset when called on `beginRestore`.
>
> 2) Users who wants to implement their own batch restoration callbacks would
> now need to implement both `restore` and `restoreAll` while they either let
> `restoreAll` to call `restore` or implement the logic in `restoreAll` only
> and never call `restore`. Maybe we can provide two abstract impl of
> BatchingStateRestoreCallbacks which does beginRestore / endRestore as
> no-ops, with one callback implementing `restoreAll` to call abstract
> `restore` while the other implement `restore` to throw "not supported
> exception" and keep `restoreAll` abstract.
>
> 3) I think we can also return the "offset limit" in StateRestoreContext,
> which is important for users to track the restoration progress since
> otherwise they could not tell how many percent of restoration has
> completed.  I.e.:
>
> public interface BatchingStateRestoreCallback extends StateRestoreCallback
> {
>
>void restoreAll(Collection> records,
> StateRestoreContext
> restoreContext);
>
>void beginRestore(StateRestoreContext restoreContext);
>
>void endRestore(StateRestoreContext restoreContext);
> }
>
> public interface StateRestoreContext {
>
>   long lastOffsetRestored();
>
>   long endOffsetToRestore();
>
>   int numberRestored();
> }
>
>
> Guozhang
>
>
>
> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck  wrote:
>
> > Guozhang, Matthias,
> >
> > Thanks for the comments.  I have updated the KIP, (JIRA title and
> > description as well).
> >
> > I had thought about introducing a separate interface altogether, but
> > extending the current one makes more sense.
> >
> > As for intermediate callbacks based on time or number of records, I think
> > the latest update to the KIP addresses this point of querying for
> > intermediate results, but it would be per batch restored.
> >
> > Thanks,
> > Bill
> >
> >
> >
> >
> >
> > On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski  wrote:
> >
> > >
> > > > On Jun 2, 2017, at 12:54 AM, Matthias J. Sax 
> > > wrote:
> > > >
> > > > With regard to backward compatibility, we should not change the
> current
> > > > interface, but add a new interface that extends the current one.
> > > >
> > >
> > > ++1
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Updated] (KAFKA-4913) creating a window store with one segment throws division by zero error

2017-06-06 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4913:
--
Status: In Progress  (was: Patch Available)

> creating a window store with one segment throws division by zero error
> --
>
> Key: KAFKA-4913
> URL: https://issues.apache.org/jira/browse/KAFKA-4913
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Xavier Léauté
>Assignee: Damian Guy
> Fix For: 0.11.0.1
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work stopped] (KAFKA-4913) creating a window store with one segment throws division by zero error

2017-06-06 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4913 stopped by Damian Guy.
-
> creating a window store with one segment throws division by zero error
> --
>
> Key: KAFKA-4913
> URL: https://issues.apache.org/jira/browse/KAFKA-4913
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Xavier Léauté
>Assignee: Damian Guy
> Fix For: 0.11.0.1
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Sink Processor definition

2017-06-03 Thread Damian Guy
Hi Michal,

In this case Sink Processor is really referring to a SinkNode that can only
produce to a kafka topic. Maybe the terminology is incorrect as strictly
speaking a processor that writes data to anything could be considered a
Sink Processor.

On Sat, 3 Jun 2017 at 09:23 Michal Borowiecki 
wrote:

> Hi all,
>
> Streams docs say:
>
>
>- *Sink Processor*: A sink processor is a special type of stream
>processor that does not have down-stream processors. It sends any received
>records from its up-stream processors to a specified Kafka topic.
>
> Would a processor that doesn't produce to a kafka topic (directly) but
> only updates a state store also be considered a sink processor? I think yes.
>
> I'll submit a PR to that effect unless I hear otherwise.
>
> Cheers,
>
> Michał
> --
>  Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowie...@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> 
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>


Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-06-03 Thread Damian Guy
Hmm, i guess this won't work due to adding the additional <K,V> to the
StateStoreSupplier params on reduce, count, aggregate etc.

On Sat, 3 Jun 2017 at 09:06 Damian Guy <damian@gmail.com> wrote:

> Hi Michal,
>
> Thanks for the KIP - is there a way we can do this without having to
> introduce the new Typed.. Interfaces, overloaded methods etc? Is it
> possible that we just need to provide a couple of new methods on
> PersistentKeyValueFactory for windowed and sessionWindowed to return
> interfaces like you've introduced in TypedStores?
> I admit i haven't looked in much detail if that would work.
>
> My concern is that this is duplicating a bunch of code and increasing the
> surface area for what is minimal benefit. It is one of those cases where
> i'd love to not have to maintain backward compatibility.
>
> Thanks,
> Damian
>
> On Fri, 2 Jun 2017 at 08:20 Michal Borowiecki <
> michal.borowie...@openbet.com> wrote:
>
>> Thanks Matthias,
>>
>> I appreciate people are busy now preparing the 0.11 release.
>>
>> One thing I would also appreciate input on is perhaps a better name for
>> the new TypedStores class, I just picked it quickly but don't really like
>> it.
>>
>> Perhaps StateStores would make for a better name?
>> Cheers,
>> Michal
>>
>>
>> On 02/06/17 07:18, Matthias J. Sax wrote:
>>
>> Thanks for the update Michal.
>>
>> I did skip over the PR. Looks good to me, as far as I can tell. Maybe
>> Damian, Xavier, or Ismael can comment on this. Would be good to get
>> confirmation that the change is backward compatible.
>>
>>
>> -Matthias
>>
>>
>> On 5/27/17 11:11 AM, Michal Borowiecki wrote:
>>
>> Hi all,
>>
>> I've updated the KIP to reflect the proposed backwards-compatible approach:
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481
>>
>>
>> Given the vast area of APIs affected, I think the PR is easier to read
>> than the code excerpts in the KIP 
>> itself:https://github.com/apache/kafka/pull/2992/files
>>
>> Thanks,
>> Michał
>>
>> On 07/05/17 10:16, Eno Thereska wrote:
>>
>> I like this KIP in general and I agree it’s needed. Perhaps Damian can 
>> comment on the session store issue?
>>
>> Thanks
>> Eno
>>
>> On May 6, 2017, at 10:32 PM, Michal Borowiecki 
>> <michal.borowie...@openbet.com> <michal.borowie...@openbet.com> wrote:
>>
>> Hi Matthias,
>>
>> Agreed. I tried your proposal and indeed it would work.
>>
>> However, I think to maintain full backward compatibility we would also need 
>> to deprecate Stores.create() and leave it unchanged, while providing a new 
>> method that returns the more strongly typed Factories.
>>
>> ( This is because PersistentWindowFactory and PersistentSessionFactory 
>> cannot extend the existing PersistentKeyValueFactory interface, since their 
>> build() methods will be returning TypedStateStoreSupplier<WindowStore<K, V>> 
>> and TypedStateStoreSupplier<SessionStore<K, V>> respectively, which are NOT 
>> subclasses of TypedStateStoreSupplier<KeyValueStore<K, V>>. I do not see 
>> another way around it. Admittedly, my type covariance skills are 
>> rudimentary. Does anyone see a better way around this? )
>>
>> Since create() takes only the store name as argument, and I don't see what 
>> we could overload it with, the new method would need to have a different 
>> name.
>>
>> Alternatively, since create(String) is the only method in Stores, we could 
>> deprecate the entire class and provide a new one. That would be my 
>> preference. Any ideas what to call it?
>>
>>
>>
>> All comments and suggestions appreciated.
>>
>>
>>
>> Cheers,
>>
>> Michał
>>
>>
>> On 04/05/17 21:48, Matthias J. Sax wrote:
>>
>> I had a quick look into this.
>>
>> With regard to backward compatibility, I think it would be required do
>> introduce a new type `TypesStateStoreSupplier` (that extends
>> `StateStoreSupplier`) and to overload all methods that take a
>> `StateStoreSupplier` that accept the new type instead of the current one.
>>
>> This would allow `.build` to return a `TypedStateStoreSupplier` and
>> thus, would not break any code. As least if I did not miss anything with
>> regard to some magic of type inference using generics (I am not an
>> expert in this field).
>>
>>
>> -Matthias
>>
>> On 5/4/17 11:32 AM, M

Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-06-03 Thread Damian Guy
Hi Michal,

Thanks for the KIP - is there a way we can do this without having to
introduce the new Typed.. Interfaces, overloaded methods etc? Is it
possible that we just need to provide a couple of new methods on
PersistentKeyValueFactory for windowed and sessionWindowed to return
interfaces like you've introduced in TypedStores?
I admit i haven't looked in much detail if that would work.

My concern is that this is duplicating a bunch of code and increasing the
surface area for what is minimal benefit. It is one of those cases where
i'd love to not have to maintain backward compatibility.

Thanks,
Damian

On Fri, 2 Jun 2017 at 08:20 Michal Borowiecki 
wrote:

> Thanks Matthias,
>
> I appreciate people are busy now preparing the 0.11 release.
>
> One thing I would also appreciate input on is perhaps a better name for
> the new TypedStores class, I just picked it quickly but don't really like
> it.
>
> Perhaps StateStores would make for a better name?
> Cheers,
> Michal
>
>
> On 02/06/17 07:18, Matthias J. Sax wrote:
>
> Thanks for the update Michal.
>
> I did skip over the PR. Looks good to me, as far as I can tell. Maybe
> Damian, Xavier, or Ismael can comment on this. Would be good to get
> confirmation that the change is backward compatible.
>
>
> -Matthias
>
>
> On 5/27/17 11:11 AM, Michal Borowiecki wrote:
>
> Hi all,
>
> I've updated the KIP to reflect the proposed backwards-compatible approach:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481
>
>
> Given the vast area of APIs affected, I think the PR is easier to read
> than the code excerpts in the KIP 
> itself:https://github.com/apache/kafka/pull/2992/files
>
> Thanks,
> Michał
>
> On 07/05/17 10:16, Eno Thereska wrote:
>
> I like this KIP in general and I agree it’s needed. Perhaps Damian can 
> comment on the session store issue?
>
> Thanks
> Eno
>
> On May 6, 2017, at 10:32 PM, Michal Borowiecki 
>   wrote:
>
> Hi Matthias,
>
> Agreed. I tried your proposal and indeed it would work.
>
> However, I think to maintain full backward compatibility we would also need 
> to deprecate Stores.create() and leave it unchanged, while providing a new 
> method that returns the more strongly typed Factories.
>
> ( This is because PersistentWindowFactory and PersistentSessionFactory cannot 
> extend the existing PersistentKeyValueFactory interface, since their build() 
> methods will be returning TypedStateStoreSupplier> and 
> TypedStateStoreSupplier> respectively, which are NOT 
> subclasses of TypedStateStoreSupplier>. I do not see 
> another way around it. Admittedly, my type covariance skills are rudimentary. 
> Does anyone see a better way around this? )
>
> Since create() takes only the store name as argument, and I don't see what we 
> could overload it with, the new method would need to have a different name.
>
> Alternatively, since create(String) is the only method in Stores, we could 
> deprecate the entire class and provide a new one. That would be my 
> preference. Any ideas what to call it?
>
>
>
> All comments and suggestions appreciated.
>
>
>
> Cheers,
>
> Michał
>
>
> On 04/05/17 21:48, Matthias J. Sax wrote:
>
> I had a quick look into this.
>
> With regard to backward compatibility, I think it would be required do
> introduce a new type `TypesStateStoreSupplier` (that extends
> `StateStoreSupplier`) and to overload all methods that take a
> `StateStoreSupplier` that accept the new type instead of the current one.
>
> This would allow `.build` to return a `TypedStateStoreSupplier` and
> thus, would not break any code. As least if I did not miss anything with
> regard to some magic of type inference using generics (I am not an
> expert in this field).
>
>
> -Matthias
>
> On 5/4/17 11:32 AM, Matthias J. Sax wrote:
>
> Did not have time to have a look. But backward compatibility is a must
> from my point of view.
>
> -Matthias
>
>
> On 5/4/17 12:56 AM, Michal Borowiecki wrote:
>
> Hello,
>
> I've updated the KIP with missing information.
>
> I would especially appreciate some comments on the compatibility aspects
> of this as the proposed change is not fully backwards-compatible.
>
> In the absence of comments I shall call for a vote in the next few days.
>
> Thanks,
>
> Michal
>
>
> On 30/04/17 23:11, Michal Borowiecki wrote:
>
> Hi community!
>
> I have just drafted KIP-147: Add missing type parameters to
> StateStoreSupplier factories and KGroupedStream/Table 
> methods
>   
>  
> 
>
> Please let me know if this a step in the right direction.
>
> All comments welcome.

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Damian Guy
Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com> wrote:

> Hi
>
> 1.
> That greatly complicates monitoring.  Fail Fast gives you that when you
> monitor only the lag of all your apps
> you are completely covered. With that sort of new application Monitoring
> is very much more complicated as
> you know need to monitor fail % of some special apps aswell. In my
> opinion that is a huge downside already.
>
> 2.
> using a schema regerstry like Avrostuff it might not even be the record
> that is broken, it might be just your app
> unable to fetch a schema it needs now know. Maybe you got partitioned
> away from that registry.
>
> 3. When you get alerted because of to high fail percentage. what are the
> steps you gonna do?
> shut it down to buy time. fix the problem. spend way to much time to
> find a good reprocess offset.
> Your timewindows are in bad shape anyways, and you pretty much lost.
> This routine is nonsense.
>
> Dead letter queues would be the worst possible addition to the kafka
> toolkit that I can think of. It just doesn't fit the architecture
> of having clients falling behind is a valid option.
>
> Further. I mentioned already the only bad pill ive seen so far is crc
> errors. any plans for those?
>
> Best Jan
>
>
>
>
>
>
> On 02.06.2017 11:34, Damian Guy wrote:
> > I agree with what Matthias has said w.r.t failing fast. There are plenty
> of
> > times when you don't want to fail-fast and must attempt to  make
> progress.
> > The dead-letter queue is exactly for these circumstances. Of course if
> > every record is failing, then you probably do want to give up.
> >
> > On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> First a meta comment. KIP discussion should take place on the dev list
> >> -- if user list is cc'ed please make sure to reply to both lists.
> Thanks.
> >>
> >> Thanks for making the scope of the KIP clear. Makes a lot of sense to
> >> focus on deserialization exceptions for now.
> >>
> >> With regard to corrupted state stores, would it make sense to fail a
> >> task and wipe out the store to repair it via recreation from the
> >> changelog? That's of course a quite advance pattern, but I want to bring
> >> it up to design the first step in a way such that we can get there (if
> >> we think it's a reasonable idea).
> >>
> >> I also want to comment about fail fast vs making progress. I think that
> >> fail-fast must not always be the best option. The scenario I have in
> >> mind is like this: you got a bunch of producers that feed the Streams
> >> input topic. Most producers work find, but maybe one producer miss
> >> behaves and the data it writes is corrupted. You might not even be able
> >> to recover this lost data at any point -- thus, there is no reason to
> >> stop processing but you just skip over those records. Of course, you
> >> need to fix the root cause, and thus you need to alert (either via logs
> >> of the exception handler directly) and you need to start to investigate
> >> to find the bad producer, shut it down and fix it.
> >>
> >> Here the dead letter queue comes into place. From my understanding, the
> >> purpose of this feature is solely enable post debugging. I don't think
> >> those record would be fed back at any point in time (so I don't see any
> >> ordering issue -- a skipped record, with this regard, is just "fully
> >> processed"). Thus, the dead letter queue should actually encode the
> >> original records metadata (topic, partition offset etc) to enable such
> >> debugging. I guess, this might also be possible if you just log the bad
> >> records, but it would be harder to access (you first must find the
> >> Streams instance that did write the log and extract the information from
> >> there). Reading it from topic is much simpler.
> >>
> >> I also want to mention the following. Assume you have such a topic with
> >> some bad records and some good records. If we always fail-fast, it's
> >> going to be super hard to process the good data. You would need to write
> >> an extra app that copied the data into a new topic filtering out the bad
> >> records (or apply the map() workaround withing stream). So I don't think
> >> that failing fast is most likely the best option in production is
> >> necessar

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Damian Guy
I agree with what Matthias has said w.r.t failing fast. There are plenty of
times when you don't want to fail-fast and must attempt to  make progress.
The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax  wrote:

> First a meta comment. KIP discussion should take place on the dev list
> -- if user list is cc'ed please make sure to reply to both lists. Thanks.
>
> Thanks for making the scope of the KIP clear. Makes a lot of sense to
> focus on deserialization exceptions for now.
>
> With regard to corrupted state stores, would it make sense to fail a
> task and wipe out the store to repair it via recreation from the
> changelog? That's of course a quite advance pattern, but I want to bring
> it up to design the first step in a way such that we can get there (if
> we think it's a reasonable idea).
>
> I also want to comment about fail fast vs making progress. I think that
> fail-fast must not always be the best option. The scenario I have in
> mind is like this: you got a bunch of producers that feed the Streams
> input topic. Most producers work find, but maybe one producer miss
> behaves and the data it writes is corrupted. You might not even be able
> to recover this lost data at any point -- thus, there is no reason to
> stop processing but you just skip over those records. Of course, you
> need to fix the root cause, and thus you need to alert (either via logs
> of the exception handler directly) and you need to start to investigate
> to find the bad producer, shut it down and fix it.
>
> Here the dead letter queue comes into place. From my understanding, the
> purpose of this feature is solely enable post debugging. I don't think
> those record would be fed back at any point in time (so I don't see any
> ordering issue -- a skipped record, with this regard, is just "fully
> processed"). Thus, the dead letter queue should actually encode the
> original records metadata (topic, partition offset etc) to enable such
> debugging. I guess, this might also be possible if you just log the bad
> records, but it would be harder to access (you first must find the
> Streams instance that did write the log and extract the information from
> there). Reading it from topic is much simpler.
>
> I also want to mention the following. Assume you have such a topic with
> some bad records and some good records. If we always fail-fast, it's
> going to be super hard to process the good data. You would need to write
> an extra app that copied the data into a new topic filtering out the bad
> records (or apply the map() workaround withing stream). So I don't think
> that failing fast is most likely the best option in production is
> necessarily, true.
>
> Or do you think there are scenarios, for which you can recover the
> corrupted records successfully? And even if this is possible, it might
> be a case for reprocessing instead of failing the whole application?
> Also, if you think you can "repair" a corrupted record, should the
> handler allow to return a "fixed" record? This would solve the ordering
> problem.
>
>
>
> -Matthias
>
>
>
>
> On 5/30/17 1:47 AM, Michael Noll wrote:
> > Thanks for your work on this KIP, Eno -- much appreciated!
> >
> > - I think it would help to improve the KIP by adding an end-to-end code
> > example that demonstrates, with the DSL and with the Processor API, how
> the
> > user would write a simple application that would then be augmented with
> the
> > proposed KIP changes to handle exceptions.  It should also become much
> > clearer then that e.g. the KIP would lead to different code paths for the
> > happy case and any failure scenarios.
> >
> > - Do we have sufficient information available to make informed decisions
> on
> > what to do next?  For example, do we know in which part of the topology
> the
> > record failed? `ConsumerRecord` gives us access to topic, partition,
> > offset, timestamp, etc., but what about topology-related information
> (e.g.
> > what is the associated state store, if any)?
> >
> > - Only partly on-topic for the scope of this KIP, but this is about the
> > bigger picture: This KIP would give users the option to send corrupted
> > records to dead letter queue (quarantine topic).  But, what pattern would
> > we advocate to process such a dead letter queue then, e.g. how to allow
> for
> > retries with backoff ("If the first record in the dead letter queue fails
> > again, then try the second record for the time being and go back to the
> > first record at a later time").  Jay and Jan already alluded to ordering
> > problems that will be caused by dead letter queues. As I said, retries
> > might be out of scope but perhaps the implications should be considered
> if
> > possible?
> >
> > Also, I wrote the text below before reaching the point in the
> conversation
> > that this KIP's scope will be limited to exceptions in 

[jira] [Work started] (KAFKA-5357) StackOverFlow error in transaction coordinator

2017-06-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5357 started by Damian Guy.
-
> StackOverFlow error in transaction coordinator
> --
>
> Key: KAFKA-5357
> URL: https://issues.apache.org/jira/browse/KAFKA-5357
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>    Assignee: Damian Guy
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
> Attachments: KAFKA-5357.tar.gz
>
>
> I observed the following in the broker logs: 
> {noformat}
> [2017-06-01 04:10:36,664] ERROR [Replica Manager on Broker 1]: Error 
> processing append operation on partition __transaction_state-37 
> (kafka.server.ReplicaManager)
> [2017-06-01 04:10:36,667] ERROR [TxnMarkerSenderThread-1]: Error due to 
> (kafka.common.InterBrokerSendThread)
> java.lang.StackOverflowError
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.PrintWriter.(PrintWriter.java:116)
> at java.io.PrintWriter.(PrintWriter.java:100)
> at 
> org.apache.log4j.DefaultThrowableRenderer.render(DefaultThrowableRenderer.java:58)
> at 
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> at 
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> at 
> org.apache.log4j.DailyRollingFileAppender.subAppend(DailyRollingFileAppender.java:369)
> at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.error(Category.java:322)
> at kafka.utils.Logging$class.error(Logging.scala:105)
> at kafka.server.ReplicaManager.error(ReplicaManager.scala:122)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:557)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:505)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:505)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:346)
> at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply$mcV$sp(TransactionStateManager.scala:589)
> at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:570)
> at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:570)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219)
> at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:564)
> at 
> kafka.coordinator.transaction.TransactionMarkerChannelManager.kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1(TransactionMarkerChannelManager.scala:225)
> at 
> kafka.coordinator.transaction.TransactionMarkerChannelManager$$anonfun$kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1$4.apply(TransactionMarkerChannelManager.scala:225)
> at 
> kafka.coordinator.transaction.TransactionMarkerChannelManager$$anonfun$kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1$4.apply(TransactionMarkerChannelManager.scala:225)
> at 
> kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:561)
> at 
> kafka.co

[jira] [Assigned] (KAFKA-5357) StackOverFlow error in transaction coordinator

2017-06-01 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy reassigned KAFKA-5357:
-

Assignee: Damian Guy

> StackOverFlow error in transaction coordinator
> --
>
> Key: KAFKA-5357
> URL: https://issues.apache.org/jira/browse/KAFKA-5357
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>    Assignee: Damian Guy
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
> Attachments: KAFKA-5357.tar.gz
>
>
> I observed the following in the broker logs: 
> {noformat}
> [2017-06-01 04:10:36,664] ERROR [Replica Manager on Broker 1]: Error 
> processing append operation on partition __transaction_state-37 
> (kafka.server.ReplicaManager)
> [2017-06-01 04:10:36,667] ERROR [TxnMarkerSenderThread-1]: Error due to 
> (kafka.common.InterBrokerSendThread)
> java.lang.StackOverflowError
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.PrintWriter.(PrintWriter.java:116)
> at java.io.PrintWriter.(PrintWriter.java:100)
> at 
> org.apache.log4j.DefaultThrowableRenderer.render(DefaultThrowableRenderer.java:58)
> at 
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> at 
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> at 
> org.apache.log4j.DailyRollingFileAppender.subAppend(DailyRollingFileAppender.java:369)
> at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.error(Category.java:322)
> at kafka.utils.Logging$class.error(Logging.scala:105)
> at kafka.server.ReplicaManager.error(ReplicaManager.scala:122)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:557)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:505)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:505)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:346)
> at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply$mcV$sp(TransactionStateManager.scala:589)
> at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:570)
> at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:570)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219)
> at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:564)
> at 
> kafka.coordinator.transaction.TransactionMarkerChannelManager.kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1(TransactionMarkerChannelManager.scala:225)
> at 
> kafka.coordinator.transaction.TransactionMarkerChannelManager$$anonfun$kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1$4.apply(TransactionMarkerChannelManager.scala:225)
> at 
> kafka.coordinator.transaction.TransactionMarkerChannelManager$$anonfun$kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1$4.apply(TransactionMarkerChannelManager.scala:225)
> at 
> kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:561)
>   

<    1   2   3   4   5   6   7   8   >