[jira] [Created] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE

2018-09-24 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-7434:


 Summary: DeadLetterQueueReporter throws NPE if transform throws NPE
 Key: KAFKA-7434
 URL: https://issues.apache.org/jira/browse/KAFKA-7434
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
 Environment: jdk 8
Reporter: Michal Borowiecki


A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{quote}{{Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
\{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. 
(org.apache.kafka.connect.runtime.errors.LogReporter)}}
{{java.lang.NullPointerException}}
{{Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)}}
{{java.lang.NullPointerException}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)}}
{{ at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)}}
{{ at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)}}
{{ at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)}}
{{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
{{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}
{{ at java.lang.Thread.run(Thread.java:748)}}
{quote}
 

This is caused by populateContextHeaders only checking if the Throwable is not 
null, but not checking that the message in the Throwable is not null before 
trying to serialize the message:

[https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]

if (context.error() != null) {
    headers.add(ERROR_HEADER_EXCEPTION, 
toBytes(context.error().getClass().getName()));
    headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
toBytes(context.error().getMessage()));



toBytes throws an NPE if passed null as the parameter.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-138: Change punctuate semantics

2017-08-31 Thread Michal Borowiecki

+0 ambivalent about the naming but do agree that should be kept consistent


On 31/08/17 00:43, Matthias J. Sax wrote:

+1

On 8/30/17 12:00 PM, Bill Bejeck wrote:

+1

On Wed, Aug 30, 2017 at 1:06 PM, Damian Guy <damian@gmail.com> wrote:


+1

On Wed, 30 Aug 2017 at 17:49 Guozhang Wang <wangg...@gmail.com> wrote:


Hello Michal and community:

While working on updating the web docs and java docs for this KIP, I felt
that the term SYSTEM_TIME a bit confusing sometimes from a reader's
perspective as we are actually talking about wall-clock time. I'd hence
like to propose an minor addendum to this adopted KIP before the release

to

rename this enum:

SYSTEM_TIME

to

WALL_CLOCK_TIME


For people who have voted on this KIP, could you vote again for this
addendum (detailed discussions can be found in this PR:
https://github.com/apache/kafka/pull/3732#issuecomment-326043657)?


Guozhang




On Sat, May 13, 2017 at 8:13 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:


Thank you all!

This KIP passed the vote with 3 binding and 5 non-binding +1s:

+1 (binding) from Guozhang Wang, Ismael Juma and Ewen Cheslack-Postava

+1 (non-binding) from Matthias J. Sax, Bill Bejeck, Eno Thereska, Arun
Mathew and Thomas Becker


Created KAFKA-5233 <https://issues.apache.org/jira/browse/KAFKA-5233>
created to track implementation.
It's been a fantastic experience for me working with this great

community

to produce a KIP for the first time.
Big thank you to everyone who contributed!

Cheers,
Michał


On 12/05/17 02:01, Ismael Juma wrote:

Michal, you have enough votes, would you like to close the vote?

Ismael

On Thu, May 11, 2017 at 4:49 PM, Ewen Cheslack-Postava <

e...@confluent.io> <e...@confluent.io>

wrote:


+1 (binding)

-Ewen

On Thu, May 11, 2017 at 7:12 AM, Ismael Juma <ism...@juma.me.uk> <

ism...@juma.me.uk> wrote:


Thanks for the KIP, Michal. +1(binding) from me.

Ismael

On Sat, May 6, 2017 at 6:18 PM, Michal Borowiecki <

michal.borowie...@openbet.com> wrote:


Hi all,

Given I'm not seeing any contentious issues remaining on the discussion
thread, I'd like to initiate the vote for:

KIP-138: Change punctuate semantics
https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%
3A+Change+punctuate+semantics


Thanks,
Michał
--<http://www.openbet.com/> <http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600 <+44%2020%208742%201600> <020%208742%201600>

+44 203 249 8448 <+44%2020%203249%208448> <020%203249%208448>



E: michal.borowie...@openbet.com
W: www.openbet.com
OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK<https://www.openbet.com/email_promo> <https://www.openbet.com/

email_promo>

This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify

thepostmas...@openbet.com and delete it from your system as well as any

copies. The content of e-mails as well as traffic data may be monitored

by

OpenBet for employment and security purposes. To protect the environment
please do not print this e-mail unless necessary. OpenBet Ltd.

Registered

Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4

5XT,

United Kingdom. A company registered in England and Wales. Registered

no.

3134634. VAT no. GB927523612



--
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600 <+44%2020%208742%201600>


+44 203 249 8448 <+44%2020%203249%208448>



E: michal.borowie...@openbet.com
W: www.openbet.com
OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com and delete it from your system as well as any
copies. The content of e-mails as well as traffic data may be monitored

by

OpenBet for employment and security purposes. To protect the environment
please do not print this e-mail unless necessary. OpenBet Ltd.

Registered

Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4

5XT,

United Kingdom. A company registered in England and Wales. Registered

no.

3134634. VAT no. GB927523612




--
-- Guozhang



--
Signature
Michal Borowiecki   <http://www.openbet.com/>
*Senior Software Engineer L4*
*T:*+44 208 742 1600 <https://signature.openbet/cgi-bin/signature.php#> 
  

	*E:* 	michal.borowie...@openbet.com 
<https://signature.openbet/cgi-bin/signature.php#>
*DL: * 	+44 203 249 8448 
<https://signature.openbet/cgi-bin/signature.php#> 	


*W:*www.openbet.com 
<https://signature.openbet/cgi-bin/signature.php#>
**  


**  

<https://www.openbe

[jira] [Created] (KAFKA-5677) Remove deprecated punctuate method

2017-07-29 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-5677:


 Summary: Remove deprecated punctuate method
 Key: KAFKA-5677
 URL: https://issues.apache.org/jira/browse/KAFKA-5677
 Project: Kafka
  Issue Type: Task
Reporter: Michal Borowiecki


Task to track the removal of the punctuate method that got deprecated in 
KAFKA-5233 and associated unit tests.
(not sure the fix version number at this point)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] 2017 October release planning and release version

2017-07-21 Thread Michal Borowiecki
 APIs labeled as unstable can be broken in backward 
incompatible
 > > way
 > > > > in
 > > > > > > any release, major, minor or patch
 > > > > > >
 > > > > >
 > > > > > The relevant annotations do explain this:
 > > > > >
 > > > > >
 > > > > https://kafka.apache.org/0110/javadoc/org/apache/kafka/
 > > > common/annotation/
 > > > > > InterfaceStability.html
 > > > > >
 > > > > https://kafka.apache.org/0110/javadoc/org/apache/kafka/
 > > > common/annotation/
 > > > > > InterfaceStability.Stable.html
 > > > > >
 > > > > https://kafka.apache.org/0110/javadoc/org/apache/kafka/
 > > > common/annotation/
 > > > > > InterfaceStability.Evolving.html
 > > > > >
 > > > > https://kafka.apache.org/0110/javadoc/org/apache/kafka/
 > > > common/annotation/
 > > > > > InterfaceStability.Unstable.html
 > > > > >
 > > > > > But we should have a section in our documentation as well.
 > > > > >
 > > > > >
 > > > > > > - deprecated stable APIs are treated as any stable APIs, they 
can
 > > be
 > > > > > > removed only in major release, are not allowed to be changed in
 > > > > backward
 > > > > > > incompatible way in either patch or minor version release
 > > > > > >
 > > > > >
 > > > > > Right, but note that stable non-deprecated APIs provide stronger
 > > > > guarantees
 > > > > > in major releases (they can't be changed in an incompatible way).
 > > > > >
 > > > > > >
 > > > > > > This means one should be able to upgrade server and
 > > recompile/deploy
 > > > > apps
 > > > > > > with clients to new minor.patch release with dependency version
 > > > change
 > > > > > > being only change needed and there would be no drama.
 > > > > > >
 > > > > >
 > > > > > That should have been the case for a while as long as you are 
using
 > > > > stable
 > > > > > public APIs.
 > > > > >
 > > > > > >
 > > > > > > Practice/"features" like protocol version being a parameter, 
and
 > > > > > defaulting
 > > > > > > to latest so auto updated with dependency update which 
introduces
 > > new
 > > > > > > protocol/behavior should not be used in public client APIs. To
 > > switch
 > > > > > > between backward incompatible APIs (contract and behaviors),
 > > ideally
 > > > > user
 > > > > > > should explicitly have to change code and not dependency only,
 > but
 > > at
 > > > > > least
 > > > > > > it should be clearly communicated that there are breaking 
changes
 > > to
 > > > > > expect
 > > > > > > even with just dependency update by e.g. giving major version
 > > release
 > > > > > clear
 > > > > > > meaning. If app dependency on Kafka client library minor.patch 
on
 > > > same
 > > > > > > major is updated, and if there's a change in behavior or API
 > > > requiring
 > > > > > app
 > > > > > > code change - it's a bug.
 > > > > > >
 > > > > >
 > > > > > Hmm, if the protocol bump provides improved behaviour, that is 
not
 > a
 > > > > > backwards incompatible change though. So, I don't think I agree
 > with
 > > > > this.
 > > > > > Of course,
 > > > > > it does mean that _downgrading_ may cause loss of functionality.
 > > That's
 > > > > OK,
 > > > > > in my opinion.
 > > > > >
 > > > > > Change introduced contrary to the SLO, is OK to be reported as 
bug.
 > > > > > > Everything else is improvement or feature request.
 > > > > > >
 > > > > > > If this was the case, and 1.0.0 was released toda

Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-06-24 Thread Michal Borowiecki
I think the discussion on Streams DSL refactoring will render this KIP 
obsolete.


I'll leave is as under discussion until something is agreed and then 
move it to discarded.


Cheers,

Michał


On 03/06/17 10:02, Michal Borowiecki wrote:


I agree maintaining backwards-compatibility here adds a lot of overhead.

I haven't so far found a way to reconcile these elegantly.

Whichever way we go it's better to take the pain sooner rather than 
later. Kafka 0.11.0.0 (through KAFKA-5045 
<https://issues.apache.org/jira/browse/KAFKA-5045>/KIP-114) increased 
the surface affected by the lack of fully type-parametrised suppliers 
noticeably.


Cheers,

Michał


On 03/06/17 09:43, Damian Guy wrote:
Hmm, i guess this won't work due to adding the additional <K,V> to 
the StateStoreSupplier params on reduce, count, aggregate etc.


On Sat, 3 Jun 2017 at 09:06 Damian Guy <damian@gmail.com 
<mailto:damian@gmail.com>> wrote:


Hi Michal,

Thanks for the KIP - is there a way we can do this without having
to introduce the new Typed.. Interfaces, overloaded methods etc?
Is it possible that we just need to provide a couple of new
methods on PersistentKeyValueFactory for windowed and
sessionWindowed to return interfaces like you've introduced in
TypedStores?
I admit i haven't looked in much detail if that would work.

My concern is that this is duplicating a bunch of code and
increasing the surface area for what is minimal benefit. It is
one of those cases where i'd love to not have to maintain
backward compatibility.

Thanks,
Damian

On Fri, 2 Jun 2017 at 08:20 Michal Borowiecki
<michal.borowie...@openbet.com
<mailto:michal.borowie...@openbet.com>> wrote:

Thanks Matthias,

I appreciate people are busy now preparing the 0.11 release.

One thing I would also appreciate input on is perhaps a
better name for the new TypedStores class, I just picked it
quickly but don't really like it.

Perhaps StateStores would make for a better name?

Cheers,
Michal


On 02/06/17 07:18, Matthias J. Sax wrote:

Thanks for the update Michal.

I did skip over the PR. Looks good to me, as far as I can tell. Maybe
Damian, Xavier, or Ismael can comment on this. Would be good to get
confirmation that the change is backward compatible.


-Matthias


On 5/27/17 11:11 AM, Michal Borowiecki wrote:

Hi all,

I've updated the KIP to reflect the proposed backwards-compatible 
approach:


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481


Given the vast area of APIs affected, I think the PR is easier to read
than the code excerpts in the KIP itself:
https://github.com/apache/kafka/pull/2992/files

Thanks,
Michał

On 07/05/17 10:16, Eno Thereska wrote:

I like this KIP in general and I agree it’s needed. Perhaps Damian can 
comment on the session store issue?

Thanks
Eno

    On May 6, 2017, at 10:32 PM, Michal 
Borowiecki<michal.borowie...@openbet.com>
<mailto:michal.borowie...@openbet.com>  wrote:

Hi Matthias,

Agreed. I tried your proposal and indeed it would work.

However, I think to maintain full backward compatibility we would also 
need to deprecate Stores.create() and leave it unchanged, while providing a new 
method that returns the more strongly typed Factories.

( This is because PersistentWindowFactory and PersistentSessionFactory cannot extend the existing 
PersistentKeyValueFactory interface, since their build() methods will be returning 
TypedStateStoreSupplier<WindowStore<K, V>> and TypedStateStoreSupplier<SessionStore<K, V>> 
respectively, which are NOT subclasses of TypedStateStoreSupplier<KeyValueStore<K, V>>. I do not see 
another way around it. Admittedly, my type covariance skills are rudimentary. Does anyone see a better way around 
this? )

Since create() takes only the store name as argument, and I don't see 
what we could overload it with, the new method would need to have a different 
name.

Alternatively, since create(String) is the only method in Stores, we 
could deprecate the entire class and provide a new one. That would be my 
preference. Any ideas what to call it?



All comments and suggestions appreciated.



Cheers,

Michał


On 04/05/17 21:48, Matthias J. Sax wrote:

I had a quick look into this.

With regard to backward compatibility, I think it would be required do
introduce a new type `TypesStateStoreSupplier` (that extends
`StateStoreSupplier`) and to overload all methods that take a
`StateStoreSupplier` that accept the new type instead of the current 
one.

This would allow `.bui

Re: Re: [DISCUSS] KIP-165: Extend Interactive Queries for return latest update timestamp per key

2017-06-24 Thread Michal Borowiecki

Hi Jeyhun,

Could the proposed KeyContext.keyTs() be made more descriptive?

e.g. lastUpdated() or similar? So that users don't have to read the docs 
to know it isn't the creation timestamp for instance.


Cheers,
Michał

On 04/06/17 01:24, Jeyhun Karimov wrote:

Hi Matthias,

Thanks for comments.

  - why do you only consider get() and not range() and all() ?


The corresponding jira concentrates on single key lookups. Moreover, I
could not find a use-case to include range queries to return records with
timestamp. However, theoritically we can include range() and all() as well.

  - we cannot have a second get() (this would be ambiguous) but need

another name like getWithTs() (or something better)

  - what use case do you have in mind for getKeyTs() ? Would a single new

method returning KeyContext not be sufficient?


Thanks for correction, this is my bad.

  - for backward compatibility, we will also need a new interface and

cannot just extend the existing one


  I will correct the KIP accordingly.

Thanks,
Jeyhun

On Fri, Jun 2, 2017 at 7:36 AM, Matthias J. Sax <matth...@confluent.io>
wrote:


Thanks for the KIP Jeyhun.

Some comments:
  - why do you only consider get() and not range() and all() ?
  - we cannot have a second get() (this would be ambiguous) but need
another name like getWithTs() (or something better)
  - what use case do you have in mind for getKeyTs() ? Would a single new
method returning KeyContext not be sufficient?
  - for backward compatibility, we will also need a new interface and
cannot just extend the existing one



-Matthias

On 5/29/17 4:55 PM, Jeyhun Karimov wrote:

Dear community,

I want to share KIP-165 [1] based on issue KAFKA-4304 [2].
I would like to get your comments.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-

165%3A+Extend+Interactive+Queries+for+return+latest+
update+timestamp+per+key

[2] https://issues.apache.org/jira/browse/KAFKA-4304

Cheers,
Jeyhun





--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-06-20 Thread Michal Borowiecki

+1


On 19/06/17 21:31, Vahid S Hashemian wrote:

Thanks everyone. Great discussion.

Because these Read or Write actions are interpreted in conjunction with
particular resources (Topic, Group, ...) it would also make more sense to
me that for committing offsets the ACL should be (Group, Write).
So, a consumer would be required to have (Topic, Read), (Group, Write)
ACLs in order to function.

--Vahid




From:   Colin McCabe <cmcc...@apache.org>
To: us...@kafka.apache.org
Date:   06/19/2017 11:01 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
Permission of OffsetFetch



Thanks for the explanation.  I still think it would be better to have
the mutation operations require write ACLs, though.  It might not be
100% intuitive for novice users, but the current split between Describe
and Read is not intuitive for either novice or experienced users.

In any case, I am +1 on the incremental improvement discussed in
KIP-163.

cheers,
Colin


On Sat, Jun 17, 2017, at 11:11, Hans Jespersen wrote:

Offset commit is something that is done in the act of consuming (or
reading) Kafka messages.
Yes technically it is a write to the Kafka consumer offset topic but

it's

much easier for
administers to think of ACLs in terms of whether the user is allowed to
write (Produce) or
read (Consume) messages and not the lower level semantics that are that
consuming is actually
reading AND writing (albeit only to the offset topic).

-hans





On Jun 17, 2017, at 10:59 AM, Viktor Somogyi

<viktor.somo...@cloudera.com> wrote:

Hi Vahid,

+1 for OffsetFetch from me too.

I also wanted to ask the strangeness of the permissions, like why is
OffsetCommit a Read operation instead of Write which would intuitively

make

more sense to me. Perhaps any expert could shed some light on this? :)

Viktor

On Tue, Jun 13, 2017 at 2:38 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com <mailto:vahidhashem...@us.ibm.com>> wrote:


Hi Michal,

Thanks a lot for your feedback.

Your statement about Heartbeat is fair and makes sense. I'll update

the

KIP accordingly.

--Vahid




From:    Michal Borowiecki <michal.borowie...@openbet.com>
To:us...@kafka.apache.org, Vahid S Hashemian <
vahidhashem...@us.ibm.com>, dev@kafka.apache.org
Date:06/13/2017 01:35 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
Permission of OffsetFetch
--



Hi Vahid,

+1 wrt OffsetFetch.

The "Additional Food for Thought" mentions Heartbeat as a

non-mutating

action. I don't think that's true as the GroupCoordinator updates the
latestHeartbeat field for the member and adds a new object to the
heartbeatPurgatory, see completeAndScheduleNextHeartbeatExpiration()
called from handleHeartbeat()

NB added dev mailing list back into CC as it seems to have been lost

along

the way.

Cheers,

Michał


On 12/06/17 18:47, Vahid S Hashemian wrote:
Hi Colin,

Thanks for the feedback.

To be honest, I'm not sure either why Read was selected instead of

Write

for mutating APIs in the initial design (I asked Ewen on the

corresponding

JIRA and he seemed unsure too).
Perhaps someone who was involved in the design can clarify.

Thanks.
--Vahid




From:   Colin McCabe *<cmcc...@apache.org <mailto:cmcc...@apache.org

* <cmcc...@apache.org <mailto:cmcc...@apache.org>>

To: *us...@kafka.apache.org <mailto:us...@kafka.apache.org>*

<us...@kafka.apache.org <mailto:us...@kafka.apache.org>>

Date:   06/12/2017 10:11 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
Permission of OffsetFetch



Hi Vahid,

I think you make a valid point that the ACLs controlling group
operations are not very intuitive.

This is probably a dumb question, but why are we using Read for

mutating

APIs?  Shouldn't that be Write?

The distinction between Describe and Read makes a lot of sense for
Topics.  A group isn't really something that you "read" from in the

same

way as a topic, so it always felt kind of weird there.

best,
Colin


On Thu, Jun 8, 2017, at 11:29, Vahid S Hashemian wrote:

Hi all,

I'm resending my earlier note hoping it would spark some conversation
this
time around :)

Thanks.
--Vahid




From:   "Vahid S Hashemian" *<vahidhashem...@us.ibm.com <

mailto:vahidhashem...@us.ibm.com>>*

<vahidhashem...@us.ibm.com <mailto:vahidhashem...@us.ibm.com>>
To: dev *<dev@kafka.apache.org <mailto:dev@kafka.apache.org>>*

<dev@kafka.apache.org <mailto:dev@kafka.apache.org>>, "Kafka User"

*<us...@kafka.apache.org <mailto:us...@kafka.apache.org>>*

<us...@kafka.apache.org <mailto:us...@kafka.apache.org>>

Date:   05/30/2017 08:33 AM
Subject:KIP-163: Lower the Minimum Required ACL Permission of
OffsetFetch



Hi,

I started a new KIP to improve the minimum required ACL permissions

of

some of th

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-18 Thread Michal Borowiecki
If confusion is the problem, then totally agree no point adding more 
knobs. Perhaps you're right that users don't /really/ want 
processing-time semantics. Just /think/ they want them until they start 
considering replay/catch-up scenarios. I guess people rarely think about 
those from the start (I sure didn't).


Cheers,

Michał


On 16/06/17 17:54, Jay Kreps wrote:
I think the question is when do you actually /want/ processing time 
semantics? There are definitely times when its safe to assume the two 
are close enough that a little lossiness doesn't matter much but it is 
pretty hard to make assumptions about when the processing time is and 
has been hard for us to think of a use case where its actually desirable.


I think mostly what we've seen is confusion about the core concepts:

  * stream -- immutable events that occur
  * tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. 
If the root problem is we're missing important use cases that justify 
the additional knobs then i think it's good to try to really 
understand them. I think there could be use cases around systems that 
don't take updates, example would be email, twitter, and some metrics 
stores.


One solution that would be less complexity inducing than allowing new 
semantics, but might help with the use cases we need to collect, would 
be to add a new operator in the DSL. Something like .freezeAfter(30, 
TimeUnit.SECONDS) that collects all updates for a given window and 
both emits and enforces a single output after 30 seconds after the 
advancement of stream time and remembers that it is omitted, 
suppressing all further output (so the output is actually a KStream). 
This might or might not depend on wall clock time. Perhaps this is in 
fact what you are proposing?


-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki 
<michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> 
wrote:


I wonder if it's a frequent enough use case that Kafka Streams
should consider providing this out of the box - this was asked for
multiple times, right?

Personally, I agree totally with the philosophy of "no final
aggregation", as expressed by Eno's post, but IMO that is
predicated totally on event-time semantics.

If users want processing-time semantics then, as the docs already
point out, there is no such thing as a late-arriving record -
every record just falls in the currently open window(s), hence the
notion of final aggregation makes perfect sense, from the
usability point of view.

The single abstraction of "stream time" proves leaky in some cases
(e.g. for punctuate method - being addressed in KIP-138). Perhaps
this is another case where processing-time semantics warrant
explicit handling in the api - but of course, only if there's
sufficient user demand for this.

What I could imagine is a new type of time window
(ProcessingTimeWindow?), that if used in an aggregation, the
underlying processor would force the WallclockTimestampExtractor
(KAFKA-4144 enables that) and would use the system-time
punctuation (KIP-138) to send the final aggregation value once the
window has expired and could be configured to not send
intermediate updates while the window was open.

Of course this is just a helper for the users, since they can
implement it all themselves using the low-level API, as Matthias
pointed out already. Just seems there's recurring interest in this.

Again, this only makes sense for processing time semantics. For
event-time semantics I find the arguments for "no final
aggregation" totally convincing.


Cheers,

Michał


On 16/06/17 00:08, Matthias J. Sax wrote:

Hi Paolo,

This SO question might help, too:

https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable

<https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable>

For Streams, the basic model is based on "change" and we report updates
to the "current" result immediately reducing latency to a minimum.

Last, if you say it's going to fall into the next window, you won't get
event time semantics but you fall back processing time semantics, that
cannot provide exact results

If you really want to trade-off correctness version getting (late)
updates and want to use processing time semantics, you should configure
WallclockTimestampExtractor and implement a "update deduplication"
operator using table.toStream().transform(). You can attached a state to
your transformer and store all update there (ie, newer update overwrite
older updates). Punctuations allow you to emit "final" results for
wi

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Michal Borowiecki
ploring Kafka Streams and it's very powerful imho even because the usage 
is pretty simple but this scenario could have a lack against Spark.


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Eno Thereska <eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 1:45 PM
To: us...@kafka.apache.org
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

That is indeed correct. We don’t believe in closing windows in Kafka Streams.
You could reduce the number of downstream records by using record caches: 
http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
 
<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.

Alternatively you can just query the KTable whenever you want using the Interactive 
Query APIs (so when you query dictates what  data you receive), see this 
https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
 
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

Thanks
Eno

On Jun 15, 2017, at 2:38 PM, Paolo Patierno <ppatie...@live.com> wrote:

Hi,


using the streams library I noticed a difference (or there is a lack of 
knowledge on my side)with Apache Spark.

Imagine following scenario ...


I have a source topic where numeric values come in and I want to check the 
maximum value in the latest 5 seconds but ... putting the max value into a 
destination topic every 5 seconds.

This is what happens with reduceByWindow method in Spark.

I'm using reduce on a KStream here that process the max value taking into 
account previous values in the latest 5 seconds but the final value is put into 
the destination topic for each incoming value.


For example ...


An application sends numeric values every 1 second.

With Spark ... the source gets values every 1 second, process max in a window 
of 5 seconds, puts the max into the destination every 5 seconds (so when the 
window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.

With Kafka Streams ... the source gets values every 1 second, process max in a 
window of 5 seconds, puts the max into the destination every 1 seconds (so 
every time an incoming value arrives). Of course, if for example the sequence 
is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.


Is it possible with Kafka Streams ? Or it's something to do at application 
level ?


Thanks,

Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>




--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2017-06-13 Thread Michal Borowiecki

You're right, I haven't thought of that.

Cheers,

Michał


On 13/06/17 13:00, Kyle Winkelman wrote:

First, I would prefer not calling it aggregate because there are already
plenty of aggregate methods.

Second, I dont think this would really work because after each aggregate
you now have a unique KTable (someone may want a table with 4 streams and
reuse those 4 in another table but with one more stream added) and unless
we completely duplicate everything every time this isnt really possible.
Additionally, the cogroup way just requires 1 more call to create two
different tables (normal, windowed, and session windowed) this new way
would require copying the aggregate chain.

Another way to think about it is with cogroup we know that when they call
aggregate they arent going to be adding any more aggregators to that table
but your way requires us to assume they are done adding aggregators after
each call so we must return a ktable just to possibly not need to have
created it.

On Jun 13, 2017 5:20 AM, "Michal Borowiecki" <michal.borowie...@openbet.com>
wrote:


Actually, just had a thought. It started with naming.

Are we actually co-grouping these streams or are we co-aggregating them?

After all, in each of the cogroup calls we are providing an Aggregator
implementation.


If they are really co-aggregated, why don't we turn this around:
KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();

KTable<K, CG> coagg = grouped1.aggregate(initializer1, aggregator1,
aggValueSerde1) // this is the unchanged aggregate method
 .aggregate(grouped2, aggregator2)  // this is a new method
 .aggregate(grouped3, aggregator3); // ditto

This means instead of adding cogroup methods on KGroupStream interface,
adding aggregate method on KTable interface.

Is that feasible?

Cheers,
Michał

On 13/06/17 10:56, Michal Borowiecki wrote:

Also, I still feel that putting initializer on the first cogroup can
mislead users into thinking the first stream is in some way special.
Just my 5c.
Michał

On 13/06/17 09:54, Michal Borowiecki wrote:

Agree completely with the argument for serdes belonging in the same place
as the state store name, which is in the aggregate method.

Cheers,

Michał

On 12/06/17 18:20, Xavier Léauté wrote:

I think we are discussing two separate things here, so it might be worth
clarifying:

1) the position of the initializer with respect to the aggregators. If I
understand correctly, Guozhang seems to think it is more natural to specify
the initializer first, despite it not bearing any relation to the first
aggregator. I can see the argument for specifying the initializer first,
but I think it is debatable whether mixing it into the first cogroup call
leads to a cleaner API or not.

2) where the serde should be defined (if necessary). Looking at our
existing APIs in KGroupedStreams, we always offer two aggregate()
methods. The first one takes the name of the store and associated aggregate
value serde e.g. KGroupedStream.aggregate(Initializer initializer,
Aggregator aggregator, Serde aggValueSerde,
String queryableStoreName)
The second one only takes a state store supplier, and does not specify any
serde, e.g. KGroupedStream.aggregate(Initializer
initializer, Aggregator aggregator, final
StateStoreSupplier storeSupplier)
Presumably, when specifying a state store supplier it shouldn't be
necessary to specify an aggregate value serde, since the provided
statestore might not need to serialize the values (e.g. it may just keep
them as regular objects in heap) or it may have its own
internal serialization format.

For consistency I think it would be valuable to preserve the same two
aggregate methods for cogroup as well. Since the serde is only required in
one of the two cases, I believe the serde has no place in the first
cogroup() call and should only have to be specified as part of the
aggregate() method that takes a state store name. In the case of a state
store supplier, no serde would be necessary.


On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangg...@gmail.com> wrote:


I'd agree that the aggregate value serde and the initializer does not
bear direct relationship with the first `cogroup` calls, but after I tried
to write some example code with these two different set of APIs I felt the
current APIs just program more naturally.

I know it is kinda subjective, but I do think that user experience may be
more important as a deciding factor than the logical argument for public
interfaces. So I'd recommend people to also try out writing some example
lines also and we can circle back and discuss which one feels more natural
to write code.


Guozhang

On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:


I feel it would 

[jira] [Commented] (KAFKA-5245) KStream builder should capture serdes

2017-06-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047608#comment-16047608
 ] 

Michal Borowiecki commented on KAFKA-5245:
--

Just wanted to say it's great to see there's a ticket for this :-) Always found 
it counter-intuitive that the default serdes are taken from config instead of 
upstream in these cases.

> KStream builder should capture serdes 
> --
>
> Key: KAFKA-5245
> URL: https://issues.apache.org/jira/browse/KAFKA-5245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1
>Reporter: Yeva Byzek
>Assignee: anugrah
>Priority: Minor
>  Labels: beginner, newbie
>
> Even if one specifies a serdes in `builder.stream`, later a call to 
> `groupByKey` may require the serdes again if it differs from the configured 
> streams app serdes. The preferred behavior is that if no serdes is provided 
> to `groupByKey`, it should use whatever was provided in `builder.stream` and 
> not what was in the app.
> From the current docs:
> “When to set explicit serdes: Variants of groupByKey exist to override the 
> configured default serdes of your application, which you must do if the key 
> and/or value types of the resulting KGroupedStream do not match the 
> configured default serdes.”



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-06-13 Thread Michal Borowiecki

Hi Vahid,

+1 wrt OffsetFetch.

The "Additional Food for Thought" mentions Heartbeat as a non-mutating 
action. I don't think that's true as the GroupCoordinator updates the 
latestHeartbeat field for the member and adds a new object to the 
heartbeatPurgatory, see completeAndScheduleNextHeartbeatExpiration() 
called from handleHeartbeat()



NB added dev mailing list back into CC as it seems to have been lost 
along the way.


Cheers,

Michał


On 12/06/17 18:47, Vahid S Hashemian wrote:

Hi Colin,

Thanks for the feedback.

To be honest, I'm not sure either why Read was selected instead of Write
for mutating APIs in the initial design (I asked Ewen on the corresponding
JIRA and he seemed unsure too).
Perhaps someone who was involved in the design can clarify.

Thanks.
--Vahid




From:   Colin McCabe <cmcc...@apache.org>
To: us...@kafka.apache.org
Date:   06/12/2017 10:11 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
Permission of OffsetFetch



Hi Vahid,

I think you make a valid point that the ACLs controlling group
operations are not very intuitive.

This is probably a dumb question, but why are we using Read for mutating
APIs?  Shouldn't that be Write?

The distinction between Describe and Read makes a lot of sense for
Topics.  A group isn't really something that you "read" from in the same
way as a topic, so it always felt kind of weird there.

best,
Colin


On Thu, Jun 8, 2017, at 11:29, Vahid S Hashemian wrote:

Hi all,

I'm resending my earlier note hoping it would spark some conversation
this
time around :)

Thanks.
--Vahid




From:   "Vahid S Hashemian" <vahidhashem...@us.ibm.com>
To: dev <dev@kafka.apache.org>, "Kafka User"

<us...@kafka.apache.org>

Date:   05/30/2017 08:33 AM
Subject:KIP-163: Lower the Minimum Required ACL Permission of
OffsetFetch



Hi,

I started a new KIP to improve the minimum required ACL permissions of
some of the APIs:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch


The KIP is to address KAFKA-4585.

Feedback and suggestions are welcome!

Thanks.
--Vahid













--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Test fail reason on doc fix

2017-06-12 Thread Michal Borowiecki
Some tests are intermittently failing, it may go away when you retest. 
You can trigger a retest by putting a comment "Retest this please" on 
your PR.


Hope that helps,

Michał


On 12/06/17 10:19, Paolo Patierno wrote:

Hi Tom,


this is the problem I noticed for this reason asked here. Btw I don't 
understand the reason for a failure on a simple doc fix (inside the HTML file) 
:-)


Thanks,

Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Tom Bentley <t.j.bent...@gmail.com>
Sent: Monday, June 12, 2017 9:17 AM
To: dev@kafka.apache.org
Subject: Re: Test fail reason on doc fix

Hi Paolo,

Usually you can just follow the links added by asfbot on the PR, but these
are currently giving 404 (and not just for your failures, so maybe an ASF
infrastructure problem?)

Cheers,

Tom

On 12 June 2017 at 09:20, Paolo Patierno <ppatie...@live.com> wrote:


Hi all,


I opened this JIRA with a related PR few days ago :


https://github.com/apache/kafka/pull/3269
https://issues.apache.org/jira/browse/KAFKA-5410


It's just a fix on a documentation file but in the PR I see a Test FAILed
(JDK 7 and Scala 2.11) with no more information about that.

Because it sounds strange to me, because I didn't change any Java/Scala
code, what is the way to go to check the failure reason ?


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Comment Edited] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer

2017-06-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557
 ] 

Michal Borowiecki edited comment on KAFKA-5419 at 6/9/17 3:42 PM:
--

I think it's a duplicate of KAFKA-2526 and KAFKA-5149


was (Author: mihbor):
I think it's a duplicate of KAFKA-5149

> Console consumer --key-deserializer and --value-deserializer are always 
> overwritten by ByteArrayDeserializer
> 
>
> Key: KAFKA-5419
> URL: https://issues.apache.org/jira/browse/KAFKA-5419
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Priority: Minor
>
> Hi,
> the --key-deserializer and  --value-deserializer options passed to the 
> command line are always overwritten here :
> {code}
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> {code}
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer

2017-06-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557
 ] 

Michal Borowiecki edited comment on KAFKA-5419 at 6/9/17 3:41 PM:
--

I think it's a duplicate of KAFKA-5149


was (Author: mihbor):
I think it's a duplicate of KAFKA-2526

> Console consumer --key-deserializer and --value-deserializer are always 
> overwritten by ByteArrayDeserializer
> 
>
> Key: KAFKA-5419
> URL: https://issues.apache.org/jira/browse/KAFKA-5419
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Priority: Minor
>
> Hi,
> the --key-deserializer and  --value-deserializer options passed to the 
> command line are always overwritten here :
> {code}
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> {code}
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5419) Console consumer --key-deserializer and --value-deserializer are always overwritten by ByteArrayDeserializer

2017-06-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044557#comment-16044557
 ] 

Michal Borowiecki commented on KAFKA-5419:
--

I think it's a duplicate of KAFKA-2526

> Console consumer --key-deserializer and --value-deserializer are always 
> overwritten by ByteArrayDeserializer
> 
>
> Key: KAFKA-5419
> URL: https://issues.apache.org/jira/browse/KAFKA-5419
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Priority: Minor
>
> Hi,
> the --key-deserializer and  --value-deserializer options passed to the 
> command line are always overwritten here :
> {code}
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> {code}
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Re: synchronous request response using kafka

2017-06-09 Thread Michal Borowiecki

cc-ed users mailing list, as I think it's more appropriate for this thread.

Sanjay, if you what you're after is the following pattern:

http://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReplyJmsExample.html

then yes, you can do this in kafka. The outline would be similar to the 
JMS example above, but specifics of course different.


Also, currently you would have to put the reply topic name and 
correlation id into the msg value itself but from v0.11 you can use 
custom headers for that.


Hope that helps,
Michał


On 12/05/17 22:02, Colin McCabe wrote:

Hi Sanjay,

Can you be a little clearer what you are trying to achieve?  If you want
to build an RPC system where one entity makes a remote procedure call to
another, you might consider using something like CORBA, Apache Thrift,
gRPC, etc.

best,
Colin


On Fri, May 12, 2017, at 07:55, Banerjee, Sanjay wrote:

Can someone please share some thoughts whether we can do synchronous call
  (request response) using kafka similar to JMS

Thanks
Sanjay
913-221-9164



--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Commented] (KAFKA-5246) Remove backdoor that allows any client to produce to internal topics

2017-06-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044179#comment-16044179
 ] 

Michal Borowiecki commented on KAFKA-5246:
--

Would it instead perhaps make sense to document this special client id, 
together with its use-cases listed in the PR comments?

>  Remove backdoor that allows any client to produce to internal topics
> -
>
> Key: KAFKA-5246
> URL: https://issues.apache.org/jira/browse/KAFKA-5246
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Minor
>
> kafka.admim.AdminUtils defines an ‘AdminClientId' val, which looks to be 
> unused in the code, with the exception of a single use in KafkaAPis.scala in 
> handleProducerRequest, where is looks to allow any client, using the special 
> ‘__admin_client' client id, to append to internal topics.
> This looks like a security risk to me, as it would allow any client to 
> produce either rouge offsets or even a record containing something other than 
> group/offset info.
> Can we remove this please?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-06-03 Thread Michal Borowiecki

I agree maintaining backwards-compatibility here adds a lot of overhead.

I haven't so far found a way to reconcile these elegantly.

Whichever way we go it's better to take the pain sooner rather than 
later. Kafka 0.11.0.0 (through KAFKA-5045 
<https://issues.apache.org/jira/browse/KAFKA-5045>/KIP-114) increased 
the surface affected by the lack of fully type-parametrised suppliers 
noticeably.


Cheers,

Michał


On 03/06/17 09:43, Damian Guy wrote:
Hmm, i guess this won't work due to adding the additional <K,V> to the 
StateStoreSupplier params on reduce, count, aggregate etc.


On Sat, 3 Jun 2017 at 09:06 Damian Guy <damian@gmail.com 
<mailto:damian@gmail.com>> wrote:


Hi Michal,

Thanks for the KIP - is there a way we can do this without having
to introduce the new Typed.. Interfaces, overloaded methods etc?
Is it possible that we just need to provide a couple of new
methods on PersistentKeyValueFactory for windowed and
sessionWindowed to return interfaces like you've introduced in
TypedStores?
I admit i haven't looked in much detail if that would work.

My concern is that this is duplicating a bunch of code and
increasing the surface area for what is minimal benefit. It is one
of those cases where i'd love to not have to maintain backward
compatibility.

Thanks,
Damian

On Fri, 2 Jun 2017 at 08:20 Michal Borowiecki
<michal.borowie...@openbet.com
<mailto:michal.borowie...@openbet.com>> wrote:

Thanks Matthias,

I appreciate people are busy now preparing the 0.11 release.

One thing I would also appreciate input on is perhaps a better
name for the new TypedStores class, I just picked it quickly
but don't really like it.

Perhaps StateStores would make for a better name?

Cheers,
Michal


On 02/06/17 07:18, Matthias J. Sax wrote:

Thanks for the update Michal.

I did skip over the PR. Looks good to me, as far as I can tell. Maybe
Damian, Xavier, or Ismael can comment on this. Would be good to get
confirmation that the change is backward compatible.


-Matthias


On 5/27/17 11:11 AM, Michal Borowiecki wrote:

Hi all,

I've updated the KIP to reflect the proposed backwards-compatible 
approach:


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481


Given the vast area of APIs affected, I think the PR is easier to read
than the code excerpts in the KIP itself:
https://github.com/apache/kafka/pull/2992/files

Thanks,
Michał

On 07/05/17 10:16, Eno Thereska wrote:

I like this KIP in general and I agree it’s needed. Perhaps Damian can 
comment on the session store issue?

Thanks
Eno

    On May 6, 2017, at 10:32 PM, Michal 
Borowiecki<michal.borowie...@openbet.com>
<mailto:michal.borowie...@openbet.com>  wrote:

Hi Matthias,

Agreed. I tried your proposal and indeed it would work.

However, I think to maintain full backward compatibility we would also 
need to deprecate Stores.create() and leave it unchanged, while providing a new 
method that returns the more strongly typed Factories.

( This is because PersistentWindowFactory and PersistentSessionFactory cannot extend the existing 
PersistentKeyValueFactory interface, since their build() methods will be returning 
TypedStateStoreSupplier<WindowStore<K, V>> and TypedStateStoreSupplier<SessionStore<K, V>> 
respectively, which are NOT subclasses of TypedStateStoreSupplier<KeyValueStore<K, V>>. I do not see 
another way around it. Admittedly, my type covariance skills are rudimentary. Does anyone see a better way around 
this? )

Since create() takes only the store name as argument, and I don't see 
what we could overload it with, the new method would need to have a different 
name.

Alternatively, since create(String) is the only method in Stores, we 
could deprecate the entire class and provide a new one. That would be my 
preference. Any ideas what to call it?



All comments and suggestions appreciated.



Cheers,

Michał


On 04/05/17 21:48, Matthias J. Sax wrote:

I had a quick look into this.

With regard to backward compatibility, I think it would be required do
introduce a new type `TypesStateStoreSupplier` (that extends
`StateStoreSupplier`) and to overload all methods that take a
`StateStoreSupplier` that accept the new type instead of the current 
one.

This would allow `.build` to return a `TypedStateStoreSupplier` and
thus, would not break any code. As least if I did not miss anything with
regard to some magic of type inference using generics (I am not an
expert in this field).


   

Re: Sink Processor definition

2017-06-03 Thread Michal Borowiecki
Yes, I think the key distinction, from the point of view of that 
documentation section, is that it doesn't have downstream processors.



On 03/06/17 09:48, Damian Guy wrote:

Hi Michal,

In this case Sink Processor is really referring to a SinkNode that can 
only produce to a kafka topic. Maybe the terminology is incorrect as 
strictly speaking a processor that writes data to anything could be 
considered a Sink Processor.


On Sat, 3 Jun 2017 at 09:23 Michal Borowiecki 
<michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> 
wrote:


Hi all,

Streams docs say:


  * *Sink Processor*: A sink processor is a special type of
stream processor that does not have down-stream processors.
It sends any received records from its up-stream processors
to a specified Kafka topic.


Would a processor that doesn't produce to a kafka topic (directly)
but only updates a state store also be considered a sink
processor? I think yes.

I'll submit a PR to that effect unless I hear otherwise.

Cheers,

Michał

-- 
<http://www.openbet.com/> 	Michal Borowiecki

Senior Software Engineer L4
T:  +44 208 742 1600 <tel:+44%2020%208742%201600>


+44 203 249 8448 <tel:+44%2020%203249%208448>



E:  michal.borowie...@openbet.com
<mailto:michal.borowie...@openbet.com>
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee.
If you have received this message in error, please immediately
notify the postmas...@openbet.com <mailto:postmas...@openbet.com>
and delete it from your system as well as any copies. The content
of e-mails as well as traffic data may be monitored by OpenBet for
employment and security purposes. To protect the environment
please do not print this e-mail unless necessary. OpenBet Ltd.
Registered Office: Chiswick Park Building 9, 566 Chiswick High
Road, London, W4 5XT, United Kingdom. A company registered in
England and Wales. Registered no. 3134634. VAT no. GB927523612



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Sink Processor definition

2017-06-03 Thread Michal Borowiecki

Hi all,

Streams docs say:


  * *Sink Processor*: A sink processor is a special type of stream
processor that does not have down-stream processors. It sends any
received records from its up-stream processors to a specified
Kafka topic.

Would a processor that doesn't produce to a kafka topic (directly) but 
only updates a state store also be considered a sink processor? I think yes.


I'll submit a PR to that effect unless I hear otherwise.

Cheers,

Michał

--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-06-02 Thread Michal Borowiecki

Hi Matthias,

Apologies, somehow I totally missed this email earlier.

Wrt ValueTransformer, I added it to the the list of deprecated methods 
(PR is up to date).


Wrt Cancellable vs Cancelable:

I'm not fluent enough to have spotted this nuance, but having googled 
for it, you are right.


On the other hand however, the precedent seems to have been set by 
java.util.concurrent.Cancellable and akka for instance followed that 
with akka.actor.Cancellable.


Given established heritage in computing context, I'd err on the side of 
consistency with prior practice.


Unless anyone has strong opinions on this matter?


Thanks,

Michal


On 04/05/17 20:43, Matthias J. Sax wrote:

Hi,

thanks for updating the KIP. Looks good to me overall.

I think adding `Cancellable` (or should it be `Cancelable` to follow
American English?) is a clean solution, in contrast to the proposed
alternative.

One minor comment: can you add `ValueTransformer#punctuate()` to the
list of deprecated methods?


-Matthias



On 5/4/17 1:41 AM, Michal Borowiecki wrote:

Further in this direction I've updated the main proposal to incorporate
the Cancellable return type for ProcessorContext.schedule and the
guidance on how to implement "hybrid" punctuation with the proposed 2
PunctuationTypes.

I look forward to more comments whether the Cancallable return type is
an agreeable solution and it's precise definition.

I shall move all alternatives other than the main proposal into the
Rejected Alternatives section and if I hear any objections, I'll move
those back up and we'll discuss further.


Looking forward to all comments and suggestions.


Thanks,

Michal


On 01/05/17 18:23, Michal Borowiecki wrote:

Hi all,

As promised, here is my take at how one could implement the previously
discussed hybrid semantics using the 2 PunctuationType callbacks (one
for STREAM_TIME and one for SYSTEM_TIME).

However, there's a twist.

Since currently calling context.schedule() adds a new
PunctuationSchedule and does not overwrite the previous one, a slight
change would be required:

a) either that PuncuationSchedules are cancellable

b) or that calling schedule() ||overwrites(cancels) the previous one
with the given |PunctuationType |(but that's not how it works currently)


Below is an example assuming approach a) is implemented by having
schedule return Cancellable instead of void.

|ProcessorContext context;|
|long| |streamTimeInterval = ...;|
|long| |systemTimeUpperBound = ...; ||//e.g. systemTimeUpperBound =
streamTimeInterval + some tolerance|
|Cancellable streamTimeSchedule;|
|Cancellable systemTimeSchedule;|
|long| |lastStreamTimePunctation = -||1||;|
| |
|public| |void| |init(ProcessorContext context){|
|||this||.context = context;|
|||streamTimeSchedule =
context.schedule(PunctuationType.STREAM_TIME,
streamTimeInterval,   ||this||::streamTimePunctuate);|
|||systemTimeSchedule =
context.schedule(PunctuationType.SYSTEM_TIME,
systemTimeUpperBound, ||this||::systemTimePunctuate);   |
|}|
| |
|public| |void| |streamTimePunctuate(||long| |streamTime){|
|||periodicBusiness(streamTime);|
  
|||systemTimeSchedule.cancel();|

|||systemTimeSchedule =
context.schedule(PunctuationType.SYSTEM_TIME,
systemTimeUpperBound, ||this||::systemTimePunctuate);|
|}|
| |
|public| |void| |systemTimePunctuate(||long| |systemTime){|
|||periodicBusiness(context.timestamp());|
  
|||streamTimeSchedule.cancel();|

|||streamTimeSchedule =
context.schedule(PunctuationType.STREAM_TIME,
streamTimeInterval, ||this||::streamTimePunctuate);|
|}|
| |
|public| |void| |periodicBusiness(||long| |streamTime){|
|||// guard against streamTime == -1, easy enough.|
|||// if you need system time instead, just use
System.currentTimeMillis()|
| |
|||// do something businessy here|
|}|

Where Cancellable is either an interface containing just a single void
cancel() method or also boolean isCancelled() like here
<http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.


Please let your opinions known whether we should proceed in this
direction or leave "hybrid" considerations out of scope.

Looking forward to hearing your thoughts.

Thanks,
Michal

On 30/04/17 20:07, Michal Borowiecki wrote:

Hi Matthias,

I'd like to start moving the discarded ideas into Rejected
Alternatives section. Before I do, I want to tidy them up, ensure
they've each been given proper treatment.

To that end let me go back to one of your earlier comments about the
original suggestion (A) to put that to bed.


On 04/04/17 06:44, Matthias J. Sax wrote:

(A) You argue, that users can still "punctuate" on event-time via
process(), but I am not sure if this is possible. Note, that users only
get record timestamps via context.timestamp(). Thus, users would need to
track the time progress per partition (based on the partitions they
obverse via context.partition(). (This alone puts a huge burden on the
user by itself.) However, us

Re: 0.11.0.0 Release Update

2017-06-02 Thread Michal Borowiecki

Hi all,

So will Exactly Once Semantics be reason enough to bump version to 1.0? 
Or is the leading zero here to stay indefinitely? :-)


Cheers,

Michal


On 05/05/17 04:28, Ismael Juma wrote:

Hi all,

We're quickly approaching our next time-based release. If you missed any of
the updates on the new time-based releases we'll be following, see
https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan
for an explanation.

The release plan can be found in the usual location:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0

Here are the important dates (also documented in the wiki):

- KIP Freeze: May 10, 2017 (a KIP must be accepted by this date in order
to be considered for this release)
- Feature Freeze: May 17, 2017 (major features merged & working on
stabilization, minor features have PR, release branch cut; anything not in
this state will be automatically moved to the next release in JIRA)
- Code Freeze: May 31, 2017 (first RC created now)
- Release: June 14, 2017

There are a couple of changes based on Ewen's feedback as release manager
for 0.10.2.0:

1. We now have a KIP freeze one week before the feature freeze to avoid
the risky and confusing situation where some KIPs are being discussed,
voted on and merged all in the same week.
2. All the dates were moved from Friday to Wednesday so that release
management doesn't spill over to the weekend.

KIPs: we have 24 adopted with 10 already committed and 10 with patches in
flight. The feature freeze is 12 days away so we have a lot of reviewing to
do, but significant changes have been merged already.

Open JIRAs: As usual, we have a lot!

*https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.11.0.0%20ORDER%20BY%20priority%20DESC
<https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.11.0.0%20ORDER%20BY%20priority%20DESC>*

146 at the moment. As we get nearer to the feature freeze, I will start
moving JIRAs out of this release.

* Closed JIRAs: So far ~191 closed tickets for 0.11.0.0:
https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.11.0.0%20ORDER%20BY%20priority%20DESC

* Release features:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 has
a "Release Features" section that will be included with the release
notes/email for the release. I added some items to get it going. Please add
to
this list anything you think is worth noting.

I'll plan to give another update next week just before the KIP freeze.

Ismael



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [VOTE] KIP-164 Add unavailablePartitionCount and per-partition Unavailable metrics

2017-06-02 Thread Michal Borowiecki

+1 (non binding)

Thanks,
Michał

On 02/06/17 10:18, Mickael Maison wrote:

+1 (non binding)
Thanks for the KIP

On Thu, Jun 1, 2017 at 5:44 PM, Dong Lin <lindon...@gmail.com> wrote:

Hi all,

Can you please vote for KIP-164? The KIP can be found at
https://cwiki.apache.org/confluence/display/KAFKA/KIP-164-+Add+UnderMinIsrPartitionCount+and+per-partition+UnderMinIsr+metrics
.

Thanks,
Dong


--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-06-02 Thread Michal Borowiecki

Thanks Matthias,

I appreciate people are busy now preparing the 0.11 release.

One thing I would also appreciate input on is perhaps a better name for 
the new TypedStores class, I just picked it quickly but don't really 
like it.


Perhaps StateStores would make for a better name?

Cheers,
Michal

On 02/06/17 07:18, Matthias J. Sax wrote:

Thanks for the update Michal.

I did skip over the PR. Looks good to me, as far as I can tell. Maybe
Damian, Xavier, or Ismael can comment on this. Would be good to get
confirmation that the change is backward compatible.


-Matthias


On 5/27/17 11:11 AM, Michal Borowiecki wrote:

Hi all,

I've updated the KIP to reflect the proposed backwards-compatible approach:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481


Given the vast area of APIs affected, I think the PR is easier to read
than the code excerpts in the KIP itself:
https://github.com/apache/kafka/pull/2992/files

Thanks,
Michał

On 07/05/17 10:16, Eno Thereska wrote:

I like this KIP in general and I agree it’s needed. Perhaps Damian can comment 
on the session store issue?

Thanks
Eno

On May 6, 2017, at 10:32 PM, Michal Borowiecki <michal.borowie...@openbet.com> 
wrote:

Hi Matthias,

Agreed. I tried your proposal and indeed it would work.

However, I think to maintain full backward compatibility we would also need to 
deprecate Stores.create() and leave it unchanged, while providing a new method 
that returns the more strongly typed Factories.

( This is because PersistentWindowFactory and PersistentSessionFactory cannot extend the existing 
PersistentKeyValueFactory interface, since their build() methods will be returning 
TypedStateStoreSupplier<WindowStore<K, V>> and TypedStateStoreSupplier<SessionStore<K, V>> 
respectively, which are NOT subclasses of TypedStateStoreSupplier<KeyValueStore<K, V>>. I do not see 
another way around it. Admittedly, my type covariance skills are rudimentary. Does anyone see a better way around 
this? )

Since create() takes only the store name as argument, and I don't see what we 
could overload it with, the new method would need to have a different name.

Alternatively, since create(String) is the only method in Stores, we could 
deprecate the entire class and provide a new one. That would be my preference. 
Any ideas what to call it?



All comments and suggestions appreciated.



Cheers,

Michał


On 04/05/17 21:48, Matthias J. Sax wrote:

I had a quick look into this.

With regard to backward compatibility, I think it would be required do
introduce a new type `TypesStateStoreSupplier` (that extends
`StateStoreSupplier`) and to overload all methods that take a
`StateStoreSupplier` that accept the new type instead of the current one.

This would allow `.build` to return a `TypedStateStoreSupplier` and
thus, would not break any code. As least if I did not miss anything with
regard to some magic of type inference using generics (I am not an
expert in this field).


-Matthias

On 5/4/17 11:32 AM, Matthias J. Sax wrote:

Did not have time to have a look. But backward compatibility is a must
from my point of view.

-Matthias


On 5/4/17 12:56 AM, Michal Borowiecki wrote:

Hello,

I've updated the KIP with missing information.

I would especially appreciate some comments on the compatibility aspects
of this as the proposed change is not fully backwards-compatible.

In the absence of comments I shall call for a vote in the next few days.

Thanks,

Michal


On 30/04/17 23:11, Michal Borowiecki wrote:

Hi community!

I have just drafted KIP-147: Add missing type parameters to
StateStoreSupplier factories and KGroupedStream/Table methods
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481> 
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481>

Please let me know if this a step in the right direction.

All comments welcome.

Thanks,
Michal
--
Signature
<http://www.openbet.com/> <http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com 
<mailto:michal.borowie...@openbet.com>
W:  www.openbet.com <http://www.openbet.com/> <http://www.openbet.com/> 
<http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo> <https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If
you have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> 
<mailto:postmas...@openbet.com> <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as 

Re: Jira-Spam on Dev-Mailinglist

2017-05-31 Thread Michal Borowiecki

+1 agree with Jeff,

Michał


On 31/05/17 06:25, Jeff Widman wrote:

I'm hugely in favor of this change as well...

Although I actually find the Github pull request emails less useful than
the jirabot ones since Jira typically has more info when I'm trying to
figure out if the issue is relevant to me or not...

On Tue, May 30, 2017 at 2:28 PM, Guozhang Wang <wangg...@gmail.com> wrote:


I actually do not know.. Maybe Jun knows better than me?


Guozhang

On Mon, May 29, 2017 at 12:58 PM, Gwen Shapira <g...@confluent.io> wrote:


I agree.

Guozhang, do you know how to implement the suggestion? JIRA to Apache
Infra? Or is this something we can do ourselves somehow?

On Mon, May 29, 2017 at 9:33 PM Guozhang Wang <wangg...@gmail.com>

wrote:

I share your pains. Right now I use filters on my email accounts and it

has

been down to about 25 per day.

I think setup a separate mailing list for jirabot and jenkins auto
generated emails is a good idea.


Guozhang


On Mon, May 29, 2017 at 12:58 AM, <marc.schle...@sdv-it.de> wrote:


Hello everyone

I find it hard to follow this mailinglist due to all the mails

generated

by Jira. Just over this weekend there are 240 new mails.
Would it be possible to setup something like j...@kafka.apache.org

where

everyone can subscribe interested in those Jira mails?

Right now I am going to setup a filter which just deletes the

jira-tagged

mails, but I think the current setup also makes it hard to read

through

the archives.

regards
Marc




--
-- Guozhang




--
-- Guozhang



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Commented] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not

2017-05-27 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027521#comment-16027521
 ] 

Michal Borowiecki commented on KAFKA-5155:
--

Hi [~plavjanik], do you care to submit a pull request with the test and the fix?

> Messages can be deleted prematurely when some producers use timestamps and 
> some not
> ---
>
> Key: KAFKA-5155
> URL: https://issues.apache.org/jira/browse/KAFKA-5155
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Petr Plavjaník
>
> Some messages can be deleted prematurely and never read in following 
> scenario. A producer uses timestamps and produces messages that are appended 
> to the beginning of a log segment. Other producer produces messages without a 
> timestamp. In that case the largest timestamp is made by the old messages 
> with a timestamp and new messages with the timestamp does not influence and 
> the log segment with old and new messages can be delete immediately after the 
> last new message with no timestamp is appended. When all appended messages 
> have no timestamp, then they are not deleted because {{lastModified}} 
> attribute of a {{LogSegment}} is used.
> New test case to {{kafka.log.LogTest}} that fails:
> {code}
>   @Test
>   def 
> shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
> val retentionMs = 1000
> val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
> val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, 
> magicValue = 0)
> val log = createLog(set.sizeInBytes, retentionMs = retentionMs)
> // append some messages to create some segments
> log.append(old)
> for (_ <- 0 until 12)
>   log.append(set)
> assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
>   }
> {code}
> It can be prevented by using {{def largestTimestamp = 
> Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using 
> current timestamp when messages with timestamp {{-1}} are appended.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-05-27 Thread Michal Borowiecki

Hi all,

I've updated the KIP to reflect the proposed backwards-compatible approach:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481


Given the vast area of APIs affected, I think the PR is easier to read 
than the code excerpts in the KIP itself:

https://github.com/apache/kafka/pull/2992/files

Thanks,
Michał

On 07/05/17 10:16, Eno Thereska wrote:

I like this KIP in general and I agree it’s needed. Perhaps Damian can comment 
on the session store issue?

Thanks
Eno

On May 6, 2017, at 10:32 PM, Michal Borowiecki <michal.borowie...@openbet.com> 
wrote:

Hi Matthias,

Agreed. I tried your proposal and indeed it would work.

However, I think to maintain full backward compatibility we would also need to 
deprecate Stores.create() and leave it unchanged, while providing a new method 
that returns the more strongly typed Factories.

( This is because PersistentWindowFactory and PersistentSessionFactory cannot extend the existing 
PersistentKeyValueFactory interface, since their build() methods will be returning 
TypedStateStoreSupplier<WindowStore<K, V>> and TypedStateStoreSupplier<SessionStore<K, V>> 
respectively, which are NOT subclasses of TypedStateStoreSupplier<KeyValueStore<K, V>>. I do not see 
another way around it. Admittedly, my type covariance skills are rudimentary. Does anyone see a better way around 
this? )

Since create() takes only the store name as argument, and I don't see what we 
could overload it with, the new method would need to have a different name.

Alternatively, since create(String) is the only method in Stores, we could 
deprecate the entire class and provide a new one. That would be my preference. 
Any ideas what to call it?



All comments and suggestions appreciated.



Cheers,

Michał


On 04/05/17 21:48, Matthias J. Sax wrote:

I had a quick look into this.

With regard to backward compatibility, I think it would be required do
introduce a new type `TypesStateStoreSupplier` (that extends
`StateStoreSupplier`) and to overload all methods that take a
`StateStoreSupplier` that accept the new type instead of the current one.

This would allow `.build` to return a `TypedStateStoreSupplier` and
thus, would not break any code. As least if I did not miss anything with
regard to some magic of type inference using generics (I am not an
expert in this field).


-Matthias

On 5/4/17 11:32 AM, Matthias J. Sax wrote:

Did not have time to have a look. But backward compatibility is a must
from my point of view.

-Matthias


On 5/4/17 12:56 AM, Michal Borowiecki wrote:

Hello,

I've updated the KIP with missing information.

I would especially appreciate some comments on the compatibility aspects
of this as the proposed change is not fully backwards-compatible.

In the absence of comments I shall call for a vote in the next few days.

Thanks,

Michal


On 30/04/17 23:11, Michal Borowiecki wrote:

Hi community!

I have just drafted KIP-147: Add missing type parameters to
StateStoreSupplier factories and KGroupedStream/Table methods
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481> 
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481>

Please let me know if this a step in the right direction.

All comments welcome.

Thanks,
Michal
--
Signature
<http://www.openbet.com/> <http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com 
<mailto:michal.borowie...@openbet.com>
W:  www.openbet.com <http://www.openbet.com/> <http://www.openbet.com/> 
<http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo> <https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If
you have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> 
<mailto:postmas...@openbet.com> <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and
security purposes. To protect the environment please do not print this
e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
company registered in England and Wales. Registered no. 3134634. VAT
no. GB927523612


--
Signature
<http://www.openbet.com/> <http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@o

[jira] [Commented] (KAFKA-5319) Add a tool to make cluster replica and leader balance

2017-05-24 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022695#comment-16022695
 ] 

Michal Borowiecki commented on KAFKA-5319:
--

[~markTC], shouldn't this be in "Patch Available" instead of "Resolved" status 
until the PR is merged?

> Add a tool to make cluster replica and leader balance
> -
>
> Key: KAFKA-5319
> URL: https://issues.apache.org/jira/browse/KAFKA-5319
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Ma Tianchi
> Attachments: ClusterBalanceCommand.scala
>
>
> When a new broker is added to cluster,there is not any topics in the new 
> broker.When we use console command to create a topic without 
> 'replicaAssignmentOpt',Kafka use AdminUtils.assignReplicasToBrokers to get 
> replicaAssignment.Even though it is balance at the creating time if the 
> cluster never change,with more and more brokers added to cluster the replica 
> balanced will become not well. We also can use 'kafka-reassign-partitions.sh' 
> to balance ,but the big amount of topics make it not easy.And at the topic 
> created time , Kafka choose a  PreferredReplicaLeader which be put at the 
> first position of  the AR to make leader balance.But the balance will be 
> destroyed when cluster changed.Using  'kafka-reassign-partitions.sh' to make 
> partition reassignment may be also destroy the leader balance ,because user 
> can change the AR of the partition . It may be not balanced , but Kafka 
> believe cluster leader balance is well with every leaders is the first on at 
> AR.
> So we create a tool to make the number of replicas and number of leaders on 
> every brokers is balanced.It uses a algorithm to get a balanced replica and 
> leader  reassignment,then uses ReassignPartitionsCommand to real balance the 
> cluster.
> It can be used to make balance when cluster added brokers or cluster is not 
> balanced .It does not deal with moving replicas of a dead broker to an alive 
> broker,it only makes replicas and leaders on alive brokers is balanced.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-23 Thread Michal Borowiecki

Hi Jeyhun,

I understand your argument about "Rich" in RichFunctions. Perhaps I'm 
just being too puritan here, but let me ask this anyway:


What is it that makes something a function? To me a function is 
something that takes zero or more arguments and possibly returns a value 
and while it may have side-effects (as opposed to "pure functions" which 
can't), it doesn't have any life-cycle of its own. This is what, in my 
mind, distinguishes the concept of a "function" from that of more 
vaguely defined concepts.


So if we add a life-cycle to a function, in that understanding, it 
doesn't become a rich function but instead stops being a function 
altogether.


You could say it's "just semantics" but to me precise use of language in 
the given context is an important foundation for good engineering. And 
in the context of programming "function" has a precise meaning. Of 
course we can say that in the context of Kafka Streams "function" has a 
different, looser meaning but I'd argue that won't do anyone any good.


On the other hand other frameworks such as Flink use this terminology, 
so it could be that consistency is the reason. I'm guessing that's why 
the name was proposed in the first place. My point is simply that it's a 
poor choice of wording and Kafka Streams don't have to follow that to 
the letter.


Cheers,

Michal


On 23/05/17 13:26, Jeyhun Karimov wrote:

Hi Michal,

Thanks for your comments.


To me at least it feels strange that something is called a
function yet doesn't follow the functional interface definition of
having just one abstract method. I suppose init and close could be
made default methods with empty bodies once Java 7 support is
dropped to mitigate that concern. Still, I feel some resistance to
consider something that requires initialisation and closing (which
implies holding state) as being a function. Sounds more like the
Processor/Transformer kind of thing semantically, rather than a
function. 



 -  If we called the interface name only Function your assumptions 
will hold. However, the keyword Rich by definition implies that we 
have a function (as you described, with one abstract method and etc) 
but it is rich. So, there are multiple methods in it.

Ideally it should be:

public interface RichFunction extends Function {  // this is the 
Function that you described

  void close();
  void init(Some params);
   ...
}


The KIP says there are multiple use-cases for this but doesn't
enumerate any - I think some examples would be useful, otherwise
that section sounds a little bit vague. 



I thought it is obvious by definition but I will update it. Thanks.


IMHO, it's the access to the RecordContext is where the added
value lies but maybe I'm just lacking in imagination, so I'm
asking all this to better understand the rationale for init() and
close().


Maybe I should add some examples. Thanks.


Cheers,
Jeyhun

On Mon, May 22, 2017 at 11:02 AM, Michal Borowiecki 
<michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> 
wrote:


Hi Jeyhun,

I'd like to understand better the premise of RichFunctions and why
|init(Some params)|,|close() |are said to be needed.

To me at least it feels strange that something is called a
function yet doesn't follow the functional interface definition of
having just one abstract method. I suppose init and close could be
made default methods with empty bodies once Java 7 support is
dropped to mitigate that concern. Still, I feel some resistance to
consider something that requires initialisation and closing (which
implies holding state) as being a function. Sounds more like the
Processor/Transformer kind of thing semantically, rather than a
function.

The KIP says there are multiple use-cases for this but doesn't
enumerate any - I think some examples would be useful, otherwise
that section sounds a little bit vague.

IMHO, it's the access to the RecordContext is where the added
value lies but maybe I'm just lacking in imagination, so I'm
asking all this to better understand the rationale for init() and
close().

Thanks,
Michał

On 20/05/17 17:05, Jeyhun Karimov wrote:

Dear community,

As we discussed in KIP-149 [DISCUSS] thread [1], I would like to initiate
KIP for rich functions (interfaces) [2].
I would like to get your comments.


[1]

http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+and+ValueJoiner

<http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+and+ValueJoiner>
[2]

https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

<https://cwiki.apache.org/conflu

[jira] [Commented] (KAFKA-5243) Request to add row limit in ReadOnlyKeyValueStore range function

2017-05-23 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020911#comment-16020911
 ] 

Michal Borowiecki commented on KAFKA-5243:
--

Just a note, replacing the second argument is not an option IMO, as it would 
clash with the current range(K from, K to) method. K is a type parameter that 
itself could be an int, making the two indistinguishable, I think.

Secondly, the existing range() and all() methods expressly do not guarantee 
ordering of the returned iterator. I think the new range(from, to, limit) 
method would only make sense if order in the returned iterator is consistent 
across invocations. This is probably not a problem for the built-in stores, but 
given these stores are meant to be pluggable, perhaps it would be better to not 
force other stores implementations to take on those guarantees? Instead a new 
interface with stronger guarantees could be added e.g. 
ReadOnlyOrderedKeyValueStore extending ReadOnlyKeyValueStore and adding this 
extra method. It could also add the consistent ordering promise on the 
inherited range(from, to) and all() methods. Just a thought.

Probably best to raise a KIP and discuss on the mailing list. Since this is a 
public API change a KIP is required anyway:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 

> Request to add row limit in ReadOnlyKeyValueStore range function
> 
>
> Key: KAFKA-5243
> URL: https://issues.apache.org/jira/browse/KAFKA-5243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Joe Wood
>
> When using distributed queries across a cluster of stream stores it's quite 
> common to use query pagination to limit the number of rows returned. The 
> {{range}} function on {{ReadOnlyKeyValueStore}} only accepts the {{to}} and 
> {{from}} keys. This means that the query created either unncessarily 
> retrieves the entire range and manually limits the rows, or estimates the 
> range based on the key values. Neither options are ideal for processing 
> distributed queries.
> This suggestion is to add an overload to the {{range}} function by adding a 
> third (or replacement second) argument as a suggested row limit count. This 
> means that the range of keys returned will not exceed the supplied count.
> {code:java}
> // Get an iterator over a given range of keys, limiting to limit elements.
> KeyValueIterator<K,V> range(K from, K to, int limit)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-23 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5233:
-
Fix Version/s: (was: 0.11.0.0)
   0.11.1.0

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Michal Borowiecki
>    Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 0.11.1.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-22 Thread Michal Borowiecki

Hi Jeyhun,

I'd like to understand better the premise of RichFunctions and why 
|init(Some params)|,|close() |are said to be needed.


To me at least it feels strange that something is called a function yet 
doesn't follow the functional interface definition of having just one 
abstract method. I suppose init and close could be made default methods 
with empty bodies once Java 7 support is dropped to mitigate that 
concern. Still, I feel some resistance to consider something that 
requires initialisation and closing (which implies holding state) as 
being a function. Sounds more like the Processor/Transformer kind of 
thing semantically, rather than a function.


The KIP says there are multiple use-cases for this but doesn't enumerate 
any - I think some examples would be useful, otherwise that section 
sounds a little bit vague.


IMHO, it's the access to the RecordContext is where the added value lies 
but maybe I'm just lacking in imagination, so I'm asking all this to 
better understand the rationale for init() and close().


Thanks,
Michał

On 20/05/17 17:05, Jeyhun Karimov wrote:

Dear community,

As we discussed in KIP-149 [DISCUSS] thread [1], I would like to initiate
KIP for rich functions (interfaces) [2].
I would like to get your comments.


[1]
http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+and+ValueJoiner
[2]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams


Cheers,
Jeyhun


--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-15 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5233:
-
Status: Patch Available  (was: In Progress)

Pull request here:
https://github.com/apache/kafka/pull/3055
Looking forward to feedback.

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Michal Borowiecki
>    Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-15 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5233 started by Michal Borowiecki.

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Michal Borowiecki
>    Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-05-15 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010530#comment-16010530
 ] 

Michal Borowiecki commented on KAFKA-3455:
--

Hi [~bobbycalderwood],

Can you please describe your use-case, where it would be useful to re-use 
Processor/Transformer implementations?

As to Transformer.punctuate return value having to be null, the javadoc was in 
error but has been fixed on trunk (to be released).

Changing the method signature of Transformer.punctuate would be a 
backward-incompatible change, however, 
[KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]
 will deprecated both methods in favour of a functional interface passed to 
ProcessorContext.schedule(), so it's a small step in the direction you're 
suggesting.

I think AutoCloseable is a false friend in this case. The intention behind 
AutoCloseable is for objects created in a try-with-resources statement to be 
closed when execution exists that statement. However, the Processor is being 
created when you are *defining* the topology and must not be closed from that 
same block of code, since it's used as long as the topology is actually 
*running*, which is happening in different threads.

As to init() and close() I think it would make sense to have them pulled out, 
however, again due to backwards-compatibility it's not as simple as it sounds.
Fortunately, once Java 7 compatibility is dropped, it will be possible to 
change their definition to a default method with an empty body. I think that 
would be backwards-compatible. That would leave only one abstract method for 
Processor and Transformer, process() and transform(), respectively. Since these 
are actually *different* from each other, I'd say that then there'd be no 
repetition.

Would that help your use-cases?

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-13 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5233:
-
Labels: kip  (was: )

> Changes to punctuate semantics (KIP-138)
> 
>
> Key: KAFKA-5233
> URL: https://issues.apache.org/jira/browse/KAFKA-5233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Michal Borowiecki
>    Assignee: Michal Borowiecki
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-138: Change punctuate semantics

2017-05-13 Thread Michal Borowiecki

Thank you all!

This KIP passed the vote with 3 binding and 5 non-binding +1s:

+1 (binding) from Guozhang Wang, Ismael Juma and Ewen Cheslack-Postava

+1 (non-binding) from Matthias J. Sax, Bill Bejeck, Eno Thereska, Arun 
Mathew and Thomas Becker



Created KAFKA-5233 <https://issues.apache.org/jira/browse/KAFKA-5233> 
created to track implementation.


It's been a fantastic experience for me working with this great 
community to produce a KIP for the first time.

Big thank you to everyone who contributed!

Cheers,
Michał

On 12/05/17 02:01, Ismael Juma wrote:

Michal, you have enough votes, would you like to close the vote?

Ismael

On Thu, May 11, 2017 at 4:49 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:


+1 (binding)

-Ewen

On Thu, May 11, 2017 at 7:12 AM, Ismael Juma <ism...@juma.me.uk> wrote:


Thanks for the KIP, Michal. +1(binding) from me.

Ismael

On Sat, May 6, 2017 at 6:18 PM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:


Hi all,

Given I'm not seeing any contentious issues remaining on the discussion
thread, I'd like to initiate the vote for:

KIP-138: Change punctuate semantics

https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%
3A+Change+punctuate+semantics


Thanks,
Michał
--
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600 <020%208742%201600>


+44 203 249 8448 <020%203249%208448>



E: michal.borowie...@openbet.com
W: www.openbet.com
OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com and delete it from your system as well as any
copies. The content of e-mails as well as traffic data may be monitored

by

OpenBet for employment and security purposes. To protect the environment
please do not print this e-mail unless necessary. OpenBet Ltd.

Registered

Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4

5XT,

United Kingdom. A company registered in England and Wales. Registered

no.

3134634. VAT no. GB927523612





--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-05-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009365#comment-16009365
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

I've created KAFKA-5233 to track work related to KIP-138. As noted above, the 
considerations on this ticket span beyond the scope of KIP-138, which is 
agnostic to how the stream time gets advanced.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5233) Changes to punctuate semantics (KIP-138)

2017-05-13 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-5233:


 Summary: Changes to punctuate semantics (KIP-138)
 Key: KAFKA-5233
 URL: https://issues.apache.org/jira/browse/KAFKA-5233
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Michal Borowiecki
Assignee: Michal Borowiecki
 Fix For: 0.11.0.0


This ticket is to track implementation of 
[KIP-138: Change punctuate 
semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-155 Add range scan for windowed state stores

2017-05-11 Thread Michal Borowiecki

Also, wrt


In the case of the window store, the "key" of the single-key iterator is
the actual timestamp of the underlying entry, not just range of the 
window,

so if we were to wrap the result key a window we wouldn't be getting back
the equivalent of the single key iterator. 
I believe the timestamp in the entry *is* the window start time (the end 
time is implicitly known by adding the window size to the window start time)


https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java#L109 



https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L111

Both use window.start() as the timestamp when storing into the WindowStore.

Or am I confusing something horribly here? Hope not ;-)


If the above is correct, then using KeyValueIterator<Windowed, V> as 
the return type of the new fetch method would indeed not lose anything 
the single key iterator is offering.


The window end time could simply be calculated as window start time + 
window size (window size would have to be passed from the store supplier 
to the store implementation, which I think it isn't now but that's an 
implementation detail).


If you take objection to exposing the window end time because the single 
key iterator doesn't do that, then an alternative could also be to have 
the return type of the new fetch be something like 
KeyValueItarator<Tuple2<K, Long>, V>, since the key is composed of the 
actual key and the timestamp together. peakNextKey() would then allow 
you to glimpse both the actual key and the associated window start time. 
This feels like a better workaround then putting the KeyValue pair in 
the V of the WindowStoreIterator.


All-in-all, I'd still prefer KeyValueIterator<Windowed, V> as it more 
clearly names what's what.


What do you think?

Thanks,

Michal

On 11/05/17 07:51, Michal Borowiecki wrote:
Well, another concern, apart from potential confusion, is that you 
won't be able to peek the actual next key, just the timestamp. So the 
tradeoff is between having consistency in return types versus 
consistency in having the ability to know the next key without moving 
the iterator. To me the latter just feels more important.


Thanks,
Michal
On 11 May 2017 12:46 a.m., Xavier Léauté <xav...@confluent.io> wrote:

Thank you for the feedback Michal.

While I agree the return may be a little bit more confusing to reason
about, the reason for doing so was to keep the range query interfaces
consistent with their single-key counterparts.

In the case of the window store, the "key" of the single-key
iterator is
the actual timestamp of the underlying entry, not just range of
the window,
so if we were to wrap the result key a window we wouldn't be
getting back
the equivalent of the single key iterator.

In both cases peekNextKey is just returning the timestamp of the
next entry
in the window store that matches the query.

In the case of the session store, we already return Windowed
for the
single-key method, so it made sense there to also return
Windowed for
the range method.

Hope this make sense? Let me know if you still have concerns about
this.

Thank you,
Xavier

On Wed, May 10, 2017 at 12:25 PM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Apologies, I missed the discussion (or lack thereof) about the
return
> type of:
>
> WindowStoreIterator<KeyValue<K, V>> fetch(K from, K to, long
timeFrom,
> long timeTo)
>
>
> WindowStoreIterator (as the KIP mentions) is a subclass of
> KeyValueIterator<Long, V>
>
> KeyValueIterator<K,V> has the following method:
>
> /** * Peek at the next key without advancing the iterator *
@return the
> key of the next value that would be returned from the next call
to next
> */ K peekNextKey();
>
> Given the type in this case will be Long, I assume what it would
return
> is the window timestamp of the next found record?
>
>
> In the case of WindowStoreIterator fetch(K key, long
timeFrom, long
> timeTo);
> all records found by fetch have the same key, so it's harmless
to return
> the timestamp of the next found window but here we have varying
keys and
> varying windows, so won't it be too confusing?
>
> KeyValueIterator<Windowed, V> (as in the proposed
> ReadOnlySessionStore.fetch) just feels much more intuitive.
>
> Apologies again for jumping onto this only once the voting has
already
> begun.
> Thanks,
> Michał
>
> On 10/05/17 20:08, Sriram Subramanian wrote:
&

Re: [VOTE] KIP-155 Add range scan for windowed state stores

2017-05-11 Thread Michal Borowiecki
Well, another concern, apart from potential confusion, is that you won't be 
able to peek the actual next key, just the timestamp. So the tradeoff is 
between having consistency in return types versus consistency in having the 
ability to know the next key without moving the iterator. To me the latter just 
feels more important.


Thanks, 

Michal 

On 11 May 2017 12:46 a.m., Xavier Léauté <xav...@confluent.io> wrote:

Thank you for the feedback Michal.

While I agree the return may be a little bit more confusing to reason
about, the reason for doing so was to keep the range query interfaces
consistent with their single-key counterparts.

In the case of the window store, the "key" of the single-key iterator is
the actual timestamp of the underlying entry, not just range of the window,
so if we were to wrap the result key a window we wouldn't be getting back
the equivalent of the single key iterator.

In both cases peekNextKey is just returning the timestamp of the next entry
in the window store that matches the query.

In the case of the session store, we already return Windowed for the
single-key method, so it made sense there to also return Windowed for
the range method.

Hope this make sense? Let me know if you still have concerns about this.

Thank you,
Xavier

On Wed, May 10, 2017 at 12:25 PM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Apologies, I missed the discussion (or lack thereof) about the return
> type of:
>
> WindowStoreIterator<KeyValue<K, V>> fetch(K from, K to, long timeFrom,
> long timeTo)
>
>
> WindowStoreIterator (as the KIP mentions) is a subclass of
> KeyValueIterator<Long, V>
>
> KeyValueIterator<K,V> has the following method:
>
> /** * Peek at the next key without advancing the iterator * @return the
> key of the next value that would be returned from the next call to next
> */ K peekNextKey();
>
> Given the type in this case will be Long, I assume what it would return
> is the window timestamp of the next found record?
>
>
> In the case of WindowStoreIterator fetch(K key, long timeFrom, long
> timeTo);
> all records found by fetch have the same key, so it's harmless to return
> the timestamp of the next found window but here we have varying keys and
> varying windows, so won't it be too confusing?
>
> KeyValueIterator<Windowed, V> (as in the proposed
> ReadOnlySessionStore.fetch) just feels much more intuitive.
>
> Apologies again for jumping onto this only once the voting has already
> begun.
> Thanks,
> Michał
>
> On 10/05/17 20:08, Sriram Subramanian wrote:
> > +1
> >
> > On Wed, May 10, 2017 at 11:42 AM, Bill Bejeck <bbej...@gmail.com> wrote:
> >
> >> +1
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Wed, May 10, 2017 at 2:38 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> +1. Thank you!
> >>>
> >>> On Wed, May 10, 2017 at 11:30 AM, Xavier Léauté <xav...@confluent.io>
> >>> wrote:
> >>>
> >>>> Hi everyone,
> >>>>
> >>>> Since there aren't any objections to this addition, I would like to
> >> start
> >>>> the voting on KIP-155 so we can hopefully get this into 0.11.
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>> 155+-+Add+range+scan+for+windowed+state+stores
> >>>>
> >>>> Voting will stay active for at least 72 hours.
> >>>>
> >>>> Thank you,
> >>>> Xavier
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
>
>



Re: [VOTE] KIP-155 Add range scan for windowed state stores

2017-05-10 Thread Michal Borowiecki
Apologies, I missed the discussion (or lack thereof) about the return 
type of:


WindowStoreIterator> fetch(K from, K to, long timeFrom, 
long timeTo)



WindowStoreIterator (as the KIP mentions) is a subclass of 
KeyValueIterator


KeyValueIterator has the following method:

/** * Peek at the next key without advancing the iterator * @return the 
key of the next value that would be returned from the next call to next 
*/ K peekNextKey();


Given the type in this case will be Long, I assume what it would return 
is the window timestamp of the next found record?



In the case of WindowStoreIterator fetch(K key, long timeFrom, long 
timeTo);
all records found by fetch have the same key, so it's harmless to return 
the timestamp of the next found window but here we have varying keys and 
varying windows, so won't it be too confusing?


KeyValueIterator (as in the proposed 
ReadOnlySessionStore.fetch) just feels much more intuitive.


Apologies again for jumping onto this only once the voting has already 
begun.

Thanks,
Michał

On 10/05/17 20:08, Sriram Subramanian wrote:

+1

On Wed, May 10, 2017 at 11:42 AM, Bill Bejeck  wrote:


+1

Thanks,
Bill

On Wed, May 10, 2017 at 2:38 PM, Guozhang Wang  wrote:


+1. Thank you!

On Wed, May 10, 2017 at 11:30 AM, Xavier Léauté 
wrote:


Hi everyone,

Since there aren't any objections to this addition, I would like to

start

the voting on KIP-155 so we can hopefully get this into 0.11.

https://cwiki.apache.org/confluence/display/KAFKA/KIP+
155+-+Add+range+scan+for+windowed+state+stores

Voting will stay active for at least 72 hours.

Thank you,
Xavier




--
-- Guozhang





Re: [VOTE] KIP-138: Change punctuate semantics

2017-05-10 Thread Michal Borowiecki

Hi all,

This vote thread has gone quiet.

In view of the looming cut-off for 0.11.0.0 I'd like to encourage anyone 
who cares about this to have a look and vote and/or comment on this 
proposal.


Thanks,

Michał


On 07/05/17 10:16, Eno Thereska wrote:

+1 (non binding)

Thanks
Eno

On May 6, 2017, at 11:01 PM, Bill Bejeck <bbej...@gmail.com> wrote:

+1

Thanks,
Bill

On Sat, May 6, 2017 at 5:58 PM, Matthias J. Sax <matth...@confluent.io>
wrote:


+1

Thanks a lot for this KIP!

-Matthias

On 5/6/17 10:18 AM, Michal Borowiecki wrote:

Hi all,

Given I'm not seeing any contentious issues remaining on the discussion
thread, I'd like to initiate the vote for:

KIP-138: Change punctuate semantics

https://cwiki.apache.org/confluence/display/KAFKA/KIP-

138%3A+Change+punctuate+semantics


Thanks,
Michał
--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
  T:  +44 208 742 1600


  +44 203 249 8448



  E:  michal.borowie...@openbet.com
  W:  www.openbet.com <http://www.openbet.com/>


  OpenBet Ltd

  Chiswick Park Building 9

  566 Chiswick High Rd

  London

  W4 5XT

  UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and security
purposes. To protect the environment please do not print this e-mail
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
registered in England and Wales. Registered no. 3134634. VAT no.
GB927523612









Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-09 Thread Michal Borowiecki

+1 :)


On 08/05/17 23:52, Matthias J. Sax wrote:

Hi,

I was reading the updated KIP and I am wondering, if we should do the
design a little different.

Instead of distinguishing between a RichFunction and non-RichFunction at
runtime level, we would use RichFunctions all the time. Thus, on the DSL
entry level, if a user provides a non-RichFunction, we wrap it by a
RichFunction that is fully implemented by Streams. This RichFunction
would just forward the call omitting the key:

Just to sketch the idea (incomplete code snippet):


public StreamsRichValueMapper implements RichValueMapper() {
   private ValueMapper userProvidedMapper; // set by constructor

   public VR apply(final K key, final V1 value1, final V2 value2) {
   return userProvidedMapper(value1, value2);
   }
}


 From a performance point of view, I am not sure if the
"if(isRichFunction)" including casts etc would have more overhead than
this approach (we would do more nested method call for non-RichFunction
which should be more common than RichFunctions).

This approach should not effect lambdas (or do I miss something?) and
might be cleaner, as we could have one more top level interface
`RichFunction` with methods `init()` and `close()` and also interfaces
for `RichValueMapper` etc. (thus, no abstract classes required).


Any thoughts?


-Matthias


On 5/6/17 5:29 PM, Jeyhun Karimov wrote:

Hi,

Thanks for comments. I extended PR and KIP to include rich functions. I
will still have to evaluate the cost of deep copying of keys.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak 
wrote:


Hey Matthias,

My opinion would be that documenting the immutability of the key is the
best approach available.  Better than requiring the key to be serializable
(as with Jeyhun's last pass at the PR), no performance risk.

It'd be different if Java had immutable type constraints of some kind. :-)

Mathieu


On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax 
wrote:


Agreed about RichFunction. If we follow this path, it should cover
all(?) DSL interfaces.

About guarding the key -- I am still not sure what to do about it...
Maybe it might be enough to document it (and name the key parameter like
`readOnlyKey` to make is very clear). Ultimately, I would prefer to
guard against any modification, but I have no good idea how to do this.
Not sure what others think about the risk of corrupted partitioning
(what would be a user error and we could say, well, bad luck, you got a
bug in your code, that's not our fault), vs deep copy with a potential
performance hit (that we can't quantity atm without any performance

test).

We do have a performance system test. Maybe it's worth for you to apply
the deep copy strategy and run the test. It's very basic performance
test only, but might give some insight. If you want to do this, look
into folder "tests" for general test setup, and into
"tests/kafaktests/benchmarks/streams" to find find the perf test.


-Matthias

On 5/5/17 8:55 AM, Jeyhun Karimov wrote:

Hi Matthias,

I think extending KIP to include RichFunctions totally  makes sense.

So,

  we don't want to guard the keys because it is costly.
If we introduce RichFunctions I think it should not be limited only

the 3

interfaces proposed in KIP but to wide range of interfaces.
Please correct me if I am wrong.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax 

wrote:


Hi Jeyhun,

This approach would change ValueMapper (...etc) to be classes,

rather

than

interfaces, which is also a backwards incompatible change.  An

alternative

approach that would be backwards 

[jira] [Commented] (KAFKA-5201) Compacted topic could be misused to fill up a disk but deletion policy can't retain legitimate keys

2017-05-09 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002311#comment-16002311
 ] 

Michal Borowiecki commented on KAFKA-5201:
--

Hi Edoardo,
>From the broker's perspective there is no concept of legitimate vs 
>illegitimate keys AFAIK. What keys are legitimate depends on use-case, so 
>should the broker really care about that?
If this is to prevent disk being filled up by producers you don't trust to 
produce legitimate keys, perhaps quota + compact,delete cleanup can guard 
against that? What do you think about that?

> Compacted topic could be misused to fill up a disk but deletion policy can't 
> retain legitimate keys 
> 
>
> Key: KAFKA-5201
> URL: https://issues.apache.org/jira/browse/KAFKA-5201
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Edoardo Comar
>
> Misuse of a topic with cleanup policy = compact
> could lead to a disk being filled if a misbehaving producer keeps producing 
> messages with unique keys.
> The mixed cleanup policy compact,delete could be adopted, but would not 
> guarantee that the latest "legitimate" keys will be kept.
> It would be desirable to have a cleanup policy that attempts to preserve 
> messages with 'legitimate' keys
> This issue needs a KIP but I have no proposed solution yet at the time of 
> writing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-155 Add range scan for windowed state stores

2017-05-08 Thread Michal Borowiecki

Hi Xavier,

I like your KIP. Do you think the same should apply to session stores?

IMHO, all three should be on par wrt the ability to query key ranges 
(although I understand the implementation concerns for windowed stores 
are more involved than for normal key value stores).


Thanks,

Michal


On 08/05/17 08:18, Xavier Léauté wrote:

Hi everyone,

I am proposing to add the ability to range scan windowed state stores in
Kafka Streams. The required interface changes are relatively minimal and
follow our existing conventions for state stores. Let me know if that
sounds reasonable.

https://cwiki.apache.org/confluence/display/KAFKA/KIP+155+-+Add+range+scan+for+windowed+state+stores

https://issues.apache.org/jira/browse/KAFKA-5192

Thank you,
Xavier



--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-05-07 Thread Michal Borowiecki

To visualise this better, I've created a WIP PR:

https://github.com/apache/kafka/pull/2992

tentatively having named the new Stores class TypedStores.

Thanks,
Michał

On 07/05/17 10:16, Eno Thereska wrote:

I like this KIP in general and I agree it’s needed. Perhaps Damian can comment 
on the session store issue?

Thanks
Eno

On May 6, 2017, at 10:32 PM, Michal Borowiecki <michal.borowie...@openbet.com> 
wrote:

Hi Matthias,

Agreed. I tried your proposal and indeed it would work.

However, I think to maintain full backward compatibility we would also need to 
deprecate Stores.create() and leave it unchanged, while providing a new method 
that returns the more strongly typed Factories.

( This is because PersistentWindowFactory and PersistentSessionFactory cannot extend the existing 
PersistentKeyValueFactory interface, since their build() methods will be returning 
TypedStateStoreSupplier<WindowStore<K, V>> and TypedStateStoreSupplier<SessionStore<K, V>> 
respectively, which are NOT subclasses of TypedStateStoreSupplier<KeyValueStore<K, V>>. I do not see 
another way around it. Admittedly, my type covariance skills are rudimentary. Does anyone see a better way around 
this? )

Since create() takes only the store name as argument, and I don't see what we 
could overload it with, the new method would need to have a different name.

Alternatively, since create(String) is the only method in Stores, we could 
deprecate the entire class and provide a new one. That would be my preference. 
Any ideas what to call it?



All comments and suggestions appreciated.



Cheers,

Michał


On 04/05/17 21:48, Matthias J. Sax wrote:

I had a quick look into this.

With regard to backward compatibility, I think it would be required do
introduce a new type `TypesStateStoreSupplier` (that extends
`StateStoreSupplier`) and to overload all methods that take a
`StateStoreSupplier` that accept the new type instead of the current one.

This would allow `.build` to return a `TypedStateStoreSupplier` and
thus, would not break any code. As least if I did not miss anything with
regard to some magic of type inference using generics (I am not an
expert in this field).


-Matthias

On 5/4/17 11:32 AM, Matthias J. Sax wrote:

Did not have time to have a look. But backward compatibility is a must
from my point of view.

-Matthias


On 5/4/17 12:56 AM, Michal Borowiecki wrote:

Hello,

I've updated the KIP with missing information.

I would especially appreciate some comments on the compatibility aspects
of this as the proposed change is not fully backwards-compatible.

In the absence of comments I shall call for a vote in the next few days.

Thanks,

Michal


On 30/04/17 23:11, Michal Borowiecki wrote:

Hi community!

I have just drafted KIP-147: Add missing type parameters to
StateStoreSupplier factories and KGroupedStream/Table methods
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481> 
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481>

Please let me know if this a step in the right direction.

All comments welcome.

Thanks,
Michal
--
Signature
<http://www.openbet.com/> <http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com 
<mailto:michal.borowie...@openbet.com>
W:  www.openbet.com <http://www.openbet.com/> <http://www.openbet.com/> 
<http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo> <https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If
you have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> 
<mailto:postmas...@openbet.com> <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and
security purposes. To protect the environment please do not print this
e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
company registered in England and Wales. Registered no. 3134634. VAT
no. GB927523612


--
Signature
<http://www.openbet.com/> <http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com 
<mailto:michal.borowie...@openbet.com>
W:  www.openbet.com <http://www.openbet.com/> <http://www.openbet.com/> 
<http://ww

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-07 Thread Michal Borowiecki

Hi Jeyhun,

Thanks for your quick reply.

Indeed, I understand the existing ValueMapper/Joiner etc. have to remain 
as-is for backwards compatibility.


I was just expressing my surprise that their proposed richer equivalents 
weren't functional interfaces too.


Thanks,

Michał


On 07/05/17 12:32, Jeyhun Karimov wrote:

Hi Michal,

Thanks for your comments. We proposed the similar solution to yours in 
KIP (please look at rejected alternatives). However, after the 
discussion in mailing list I extended it to rich functions. Maybe we 
should keep them both: simple and rich versions.


Cheers,
Jeyhun

On Sun, May 7, 2017 at 11:46 AM Michal Borowiecki 
<michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> 
wrote:


Do I understanding correctly, that with the proposed pattern one
could not pass a lambda expression and access the context from
within it?

TBH, I was hoping that for simple things this would be possible:

myStream.map( (key, value, ctx) -> new KeyValue<>(ctx.partition(),
value) )

or (more to the original point of this KIP):

myStream.mapValues( (key, value, ctx) -> new MyValueWrapper(value,
ctx.partition(), ctx.offset()) )

but it looks like that would require subclassing RichFunction?
That's a bit of an inconvenience IMO.

Cheers,

Michal


On 07/05/17 01:29, Jeyhun Karimov wrote:

Hi,

Thanks for comments. I extended PR and KIP to include rich functions. I
will still have to evaluate the cost of deep copying of keys.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak<mathieu.fenn...@replicon.com> 
<mailto:mathieu.fenn...@replicon.com>
wrote:


Hey Matthias,

My opinion would be that documenting the immutability of the key is the
best approach available.  Better than requiring the key to be serializable
(as with Jeyhun's last pass at the PR), no performance risk.

It'd be different if Java had immutable type constraints of some kind. :-)

Mathieu


On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax<matth...@confluent.io> 
<mailto:matth...@confluent.io>
wrote:


Agreed about RichFunction. If we follow this path, it should cover
all(?) DSL interfaces.

About guarding the key -- I am still not sure what to do about it...
Maybe it might be enough to document it (and name the key parameter like
`readOnlyKey` to make is very clear). Ultimately, I would prefer to
guard against any modification, but I have no good idea how to do this.
Not sure what others think about the risk of corrupted partitioning
(what would be a user error and we could say, well, bad luck, you got a
bug in your code, that's not our fault), vs deep copy with a potential
performance hit (that we can't quantity atm without any performance

test).

We do have a performance system test. Maybe it's worth for you to apply
the deep copy strategy and run the test. It's very basic performance
test only, but might give some insight. If you want to do this, look
into folder "tests" for general test setup, and into
"tests/kafaktests/benchmarks/streams" to find find the perf test.


-Matthias

On 5/5/17 8:55 AM, Jeyhun Karimov wrote:

Hi Matthias,

I think extending KIP to include RichFunctions totally  makes sense.

So,

  we don't want to guard the keys because it is costly.
If we introduce RichFunctions I think it should not be limited only

the 3

interfaces proposed in KIP but to wide range of interfaces.
Please correct me if I am wrong.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <matth...@confluent.io 
<mailto:matth...@confluent.io>
wrote:


One follow up. There was this email on the user list:


http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=

Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+

It might make sense so include Initializer, Adder, and Substractor
inferface, too.

And we should double check if there are other interface we might miss

atm.

-Matthias


On 5/4/17 1:31 PM, Matthias J. Sax wrote:

Thanks for updating the KIP.

Deep copying the key will work for sure, but I am actually a little

bit

worried about performance impact... We might want to do some test to
quantify this impact.


Btw: this remind me about the idea of `RichFunction` interface that
would allow users to access record metadata (like timestamp, offset,
partition etc) within DSL. This would be a similar concept. Thus, I

am

wondering, if it would make sense to enlarge the scope of this KIP by
that? WDYT?



-Matthias


On 5/3/17 2:08 AM, Jeyhun Karimov wrote:

Hi Mathieu,

Thanks for feedback. I followed similar approach and updated PR and

KIP

accordingly. I tried 

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-07 Thread Michal Borowiecki
is case 2 solutions come to my mind. In both cases, user

accesses

the

key in Object type, as passing extra type parameter will break
backwards-compatibility.  So user has to cast to actual key type.

1. Firstly, We can overload apply method with 2 argument (key and

value)

and force key to be *final*. By doing this,  I think we can

address

both

backward-compatibility and guarding against key change.

2. Secondly, we can create class KeyAccess like:

public class KeyAccess {
 Object key;
 public void beforeApply(final Object key) {
 this.key = key;
 }
 public Object getKey() {
 return key;
 }
}

We can extend *ValueMapper, ValueJoiner* and *ValueTransformer*

from

*KeyAccess*. Inside processor (for example

*KTableMapValuesProcessor*)

before calling *mapper.apply(value)* we can set the *key* by
*mapper.beforeApply(key)*. As a result, user can use *getKey()* to

access

the key inside *apply(value)* method.


Cheers,
Jeyhun




On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <

matth...@confluent.io

wrote:


Jeyhun,

thanks a lot for the KIP!

I think there are two issues we need to address:

(1) The KIP does not consider backward compatibility. Users did

complain

about this in past releases already, and as the user base grows,

we

should not break backward compatibility in future releases

anymore.

Thus, we should think of a better way to allow key access.

Mathieu's comment goes into the same direction


On the other hand, the number of compile failures that would

need

to

be

fixed from this change is unfortunate. :-)

(2) Another concern is, that there is no guard to prevent user

code

to

modify the key. This might corrupt partitioning if users do alter

the

key (accidentally -- or users are just not aware that they are

not

allowed to modify the provided key object) and thus break the
application. (This was the original motivation to not provide the

key

in

the first place -- it's guards against modification.)


-Matthias



On 5/1/17 6:31 AM, Mathieu Fenniak wrote:

Hi Jeyhun,

I just want to add my voice that, I too, have wished for access

to

the

record key during a mapValues or similar operation.

On the other hand, the number of compile failures that would

need

to

be

fixed from this change is unfortunate. :-)  But at least it

would

all

be

a

pretty clear and easy change.

Mathieu


On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <

je.kari...@gmail.com

wrote:

Dear community,

I want to share KIP-149 [1] based on issues KAFKA-4218 [2],

KAFKA-4726

[3],

KAFKA-3745 [4]. The related PR can be found at [5].
I would like to get your comments.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
149%3A+Enabling+key+access+in+ValueTransformer%2C+
ValueMapper%2C+and+ValueJoiner
[2] https://issues.apache.org/jira/browse/KAFKA-4218
[3] https://issues.apache.org/jira/browse/KAFKA-4726
[4] https://issues.apache.org/jira/browse/KAFKA-3745
[5] https://github.com/apache/kafka/pull/2946


Cheers,
Jeyhun



--
-Cheers

Jeyhun


--

-Cheers

Jeyhun


--

-Cheers

Jeyhun





--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-05-06 Thread Michal Borowiecki

Hi Matthias,

Agreed. I tried your proposal and indeed it would work.

However, I think to maintain full backward compatibility we would also 
need to deprecate Stores.create() and leave it unchanged, while 
providing a new method that returns the more strongly typed Factories.


( This is because PersistentWindowFactory and PersistentSessionFactory 
cannot extend the existing PersistentKeyValueFactory interface, since 
their build() methods will be returning 
TypedStateStoreSupplier<WindowStore<K, V>> and 
TypedStateStoreSupplier<SessionStore<K, V>> respectively, which are NOT 
subclasses of TypedStateStoreSupplier<KeyValueStore<K, V>>. I do not see 
another way around it. Admittedly, my type covariance skills are 
rudimentary. Does anyone see a better way around this? )


Since create() takes only the store name as argument, and I don't see 
what we could overload it with, the new method would need to have a 
different name.


Alternatively, since create(String) is the only method in Stores, we 
could deprecate the entire class and provide a new one. That would be my 
preference. Any ideas what to call it?



All comments and suggestions appreciated.


Cheers,

Michał


On 04/05/17 21:48, Matthias J. Sax wrote:

I had a quick look into this.

With regard to backward compatibility, I think it would be required do
introduce a new type `TypesStateStoreSupplier` (that extends
`StateStoreSupplier`) and to overload all methods that take a
`StateStoreSupplier` that accept the new type instead of the current one.

This would allow `.build` to return a `TypedStateStoreSupplier` and
thus, would not break any code. As least if I did not miss anything with
regard to some magic of type inference using generics (I am not an
expert in this field).


-Matthias

On 5/4/17 11:32 AM, Matthias J. Sax wrote:

Did not have time to have a look. But backward compatibility is a must
from my point of view.

-Matthias


On 5/4/17 12:56 AM, Michal Borowiecki wrote:

Hello,

I've updated the KIP with missing information.

I would especially appreciate some comments on the compatibility aspects
of this as the proposed change is not fully backwards-compatible.

In the absence of comments I shall call for a vote in the next few days.

Thanks,

Michal


On 30/04/17 23:11, Michal Borowiecki wrote:

Hi community!

I have just drafted KIP-147: Add missing type parameters to
StateStoreSupplier factories and KGroupedStream/Table methods
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481>

Please let me know if this a step in the right direction.

All comments welcome.

Thanks,
Michal
--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If
you have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and
security purposes. To protect the environment please do not print this
e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
company registered in England and Wales. Registered no. 3134634. VAT
no. GB927523612


--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and security
purposes. To protect the environment please do not print this e-mail
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
registered in England and Wales. Registered no. 3134634. VAT no.
GB927523612



--
Signature
<http://www.openbet.com/> Michal B

[VOTE] KIP-138: Change punctuate semantics

2017-05-06 Thread Michal Borowiecki

Hi all,

Given I'm not seeing any contentious issues remaining on the discussion 
thread, I'd like to initiate the vote for:


KIP-138: Change punctuate semantics

https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics


Thanks,
Michał
--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-05-04 Thread Michal Borowiecki

Hello,

I've updated the KIP with missing information.

I would especially appreciate some comments on the compatibility aspects 
of this as the proposed change is not fully backwards-compatible.


In the absence of comments I shall call for a vote in the next few days.

Thanks,

Michal


On 30/04/17 23:11, Michal Borowiecki wrote:


Hi community!

I have just drafted KIP-147: Add missing type parameters to 
StateStoreSupplier factories and KGroupedStream/Table methods 
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481>


Please let me know if this a step in the right direction.

All comments welcome.

Thanks,
Michal
--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If 
you have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and 
security purposes. To protect the environment please do not print this 
e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
company registered in England and Wales. Registered no. 3134634. VAT 
no. GB927523612




--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Comment Edited] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not

2017-05-03 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15994575#comment-15994575
 ] 

Michal Borowiecki edited comment on KAFKA-5155 at 5/3/17 9:44 AM:
--

Hi [~huxi_2b],
Personally, I feel the similarity is superficial. KAFKA-4398 is about consuming 
messages in timestamp order, which challenges the current design and basically 
calls out for a new feature.

This ticket on the other hand is reporting a defect, with potential data loss, 
which violates the at-least-once semantics.
However, it does not challenge the design, simply points out that one line of 
code needs changing to cater for the case when msgs with and without timestamps 
are appended to the same segment, which IMHO is a non-contentious bugfix.



was (Author: mihbor):
Hi @huxi,
Personally, I feel the similarity is superficial. KAFKA-4398 is about consuming 
messages in timestamp order, which challenges the current design and basically 
calls out for a new feature.

This ticket on the other hand is reporting a defect, with potential data loss, 
which violates the at-least-once semantics.
However, it does not challenge the design, simply points out that one line of 
code needs changing to cater for the case when msgs with and without timestamps 
are appended to the same segment, which IMHO is a non-contentious bugfix.


> Messages can be deleted prematurely when some producers use timestamps and 
> some not
> ---
>
> Key: KAFKA-5155
> URL: https://issues.apache.org/jira/browse/KAFKA-5155
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Petr Plavjaník
>
> Some messages can be deleted prematurely and never read in following 
> scenario. A producer uses timestamps and produces messages that are appended 
> to the beginning of a log segment. Other producer produces messages without a 
> timestamp. In that case the largest timestamp is made by the old messages 
> with a timestamp and new messages with the timestamp does not influence and 
> the log segment with old and new messages can be delete immediately after the 
> last new message with no timestamp is appended. When all appended messages 
> have no timestamp, then they are not deleted because {{lastModified}} 
> attribute of a {{LogSegment}} is used.
> New test case to {{kafka.log.LogTest}} that fails:
> {code}
>   @Test
>   def 
> shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
> val retentionMs = 1000
> val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
> val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, 
> magicValue = 0)
> val log = createLog(set.sizeInBytes, retentionMs = retentionMs)
> // append some messages to create some segments
> log.append(old)
> for (_ <- 0 until 12)
>   log.append(set)
> assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
>   }
> {code}
> It can be prevented by using {{def largestTimestamp = 
> Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using 
> current timestamp when messages with timestamp {{-1}} are appended.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Support for Kafka Consumer over SASL_SSL

2017-05-01 Thread Michal Borowiecki

Hi Nixon,

kafka.consumer.Consumer is the old consumer.
When the documentation is referring to the new producer and consumer, it 
means those in the org.apache.kafka.clients.producer and 
org.apache.kafka.clients.consumer packages respectively.

They are packaged in the kafka-clients-0.10.x.y.jar

Hope that helps,
Michał

On 01/05/17 14:04, Nixon Rodrigues wrote:

   Hi Kafka dev team,


I am using  kafka_2.11-0.10.0.0.jar api for kafka consumer,

i am facing issue while consuming from topics over SASL_SSL,
getting below exception

Consumer.createJavaConsumerConnector(new
kafka.consumer.ConsumerConfig(consumerProperties))


As per documentation found on
http://docs.confluent.io/2.0.0/kafka/sasl.html  , it is said that

"SASL authentication is only supported for the new Kafka producer
and consumer, the older API is not supported."

Can anybody confirm me which kafka version onward SASl_SSL is
supported, any input on this is appreciated.


2017-04-26 16:06:36,655 WARN  -
[node11.openstacklocal-1493222790718-b862352a-leader-finder-thread:] ~
Fetching topic metadata with correlation id 18 for topics [Set(TEST)]
from broker [BrokerEndPoint(1001,node11.openstacklocal,6667)] failed
(Logging$class:89)
java.io.EOFException
at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:84)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:81)
at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
2017-04-26 16:06:36,655 WARN  -
[node11.openstacklocal-1493222790718-b862352a-leader-finder-thread:] ~
[node11.openstacklocal-1493222790718-b862352a-leader-finder-thread],
Failed to find leader for Set([TEST,0]) (Logging$class:89)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(TEST)] from broker
[ArrayBuffer(BrokerEndPoint(1001,node11.openstacklocal,6667))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.io.EOFException
at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:84)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:81)
at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)


Nixon



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

2017-05-01 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990683#comment-15990683
 ] 

Michal Borowiecki commented on KAFKA-5144:
--

I understand now, thanks a lot for the thorough explanation!
Closed #2947 with the invalid tests.
Leaving #2948 open as I think it is still of value.

> MinTimestampTracker does not correctly add timestamps lower than the current 
> max
> 
>
> Key: KAFKA-5144
> URL: https://issues.apache.org/jira/browse/KAFKA-5144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>    Reporter: Michal Borowiecki
>    Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements 
> greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find 
> any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-04-30 Thread Michal Borowiecki

Hi community!

I have just drafted KIP-147: Add missing type parameters to 
StateStoreSupplier factories and KGroupedStream/Table methods 
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69408481>


Please let me know if this a step in the right direction.

All comments welcome.

Thanks,
Michal
--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-30 Thread Michal Borowiecki

Hi Matthias,

I'd like to start moving the discarded ideas into Rejected Alternatives 
section. Before I do, I want to tidy them up, ensure they've each been 
given proper treatment.


To that end let me go back to one of your earlier comments about the 
original suggestion (A) to put that to bed.



On 04/04/17 06:44, Matthias J. Sax wrote:

(A) You argue, that users can still "punctuate" on event-time via
process(), but I am not sure if this is possible. Note, that users only
get record timestamps via context.timestamp(). Thus, users would need to
track the time progress per partition (based on the partitions they
obverse via context.partition(). (This alone puts a huge burden on the
user by itself.) However, users are not notified at startup what
partitions are assigned, and user are not notified when partitions get
revoked. Because this information is not available, it's not possible to
"manually advance" stream-time, and thus event-time punctuation within
process() seems not to be possible -- or do you see a way to get it
done? And even if, it might still be too clumsy to use.
I might have missed something but I'm guessing your worry about users 
having to track time progress /per partition/ comes from the what the 
stream-time does currently.
But I'm not sure that those semantics of stream-time are ideal for users 
of punctuate.
That is, if stream-time punctuate didn't exist and users had to use 
process(), would they actually want to use the current semantics of 
stream time?


As a reminder stream time, in all its glory, is (not exactly actually, 
but when trying to be absolutely precise here I spotted KAFKA-5144 
<https://issues.apache.org/jira/browse/KAFKA-5144> so I think this 
approximation suffices to illustrate the point):


a minimum across all input partitions of (
   if(msgs never received by partition) -1;
   else {
  a non-descending-minimum of ( the per-batch minimum msg timestamp)
   }
)

Would that really be clear enough to the users of punctuate? Do they 
care for such a convoluted notion of time? I see how this can be useful 
for StreamTask to pick the next partition to take a record from but for 
punctuate?
If users had to implement punctuation with process(), is that what they 
would have chosen as their notion of time?

I'd argue not.

None of the processors implementing the rich windowing/join operations 
in the DSL use punctuate.
Let's take the KStreamKStreamJoinProcessor as an example, in it's 
process() method it simply uses context().timestamp(), which, since it's 
called from process, returns simply, per javadoc:


If it is triggered while processing a record streamed from the source 
processor, timestamp is defined as the timestamp of the current input 
record;


So they don't use that convoluted formula for stream-time. Instead, they 
only care about the timestamp of the current record. I think that having 
users track just that wouldn't be that much of a burden. I don't think 
they need to care about which partitions got assigned or not. And 
StreamTask would still be picking records first from the partition 
having the lowest timestamp to try to "synchronize" the streams as it 
does now.


What users would have to do in their Processor implementations is 
somewhere along the lines of:


long lastPunctuationTime = 0;
long interval = ; //millis

@Override
public void process(K key, V value){
while (ctx.timestamp() >= lastPunctuationTime + interval){
punctuate(ctx.timestamp());
lastPunctuationTime += interval;// I'm not sure of the merit of 
this vs lastPunctuationTime = ctx.timestamp(); but that's what 
PunctuationQueue does currently

}
// do some other business logic here
}

Looking forward to your thoughts.

Cheers,
Michal

--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990335#comment-15990335
 ] 

Michal Borowiecki commented on KAFKA-5144:
--

Added a second [PR|https://github.com/apache/kafka/pull/2948] which is just a 
refactoring (does not fix the issue!) to make reasoning about the code easier.
It renames variables to express their true meaning and adds comments where it 
matters.
This is a separate PR since it's independent of the tests. If I got it wrong 
somehow and the test cases are indeed invalid, this refactoring is still useful 
as it better documents what the code actually does.

> MinTimestampTracker does not correctly add timestamps lower than the current 
> max
> 
>
> Key: KAFKA-5144
> URL: https://issues.apache.org/jira/browse/KAFKA-5144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>    Reporter: Michal Borowiecki
>    Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements 
> greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find 
> any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990329#comment-15990329
 ] 

Michal Borowiecki commented on KAFKA-5144:
--

[PR|https://github.com/apache/kafka/pull/2947/commits/a8b223b92c0aec31498e0d6043c8deffb1ea21ef]
 contains the cases that are failing, which I think are valid.
If someone can please confirm I'm not talking nonsense here, I'll proceed with 
a fix.

> MinTimestampTracker does not correctly add timestamps lower than the current 
> max
> 
>
> Key: KAFKA-5144
> URL: https://issues.apache.org/jira/browse/KAFKA-5144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>    Reporter: Michal Borowiecki
>    Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements 
> greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find 
> any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

2017-04-30 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-5144:


 Summary: MinTimestampTracker does not correctly add timestamps 
lower than the current max
 Key: KAFKA-5144
 URL: https://issues.apache.org/jira/browse/KAFKA-5144
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.1
Reporter: Michal Borowiecki
Assignee: Michal Borowiecki


When adding elements MinTimestampTracker removes all existing elements greater 
than the added element.
Perhaps I've missed something and this is intended behaviour but I can't find 
any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990288#comment-15990288
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Agreed. That's what I meant. Beyond the scope of KIP-138 and let's keep it that 
way.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990176#comment-15990176
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

I think the description of this ticket is missing an important detail.
If my understanding is correct, it will behave as described if all the records 
arrive in a single batch.
However, if the records preceding the record with timestamp "1" come in a 
separate batch (I'll use brackets to depict batch boundaries):
{code}
Stream A: [5, 6, 7, 8, 9], [1, 10]

Stream B: [2, 3, 4, 5]
{code}
then initially the timestamp for stream A is going to be set to "5" (minimum of 
the first batch) and since it's not allowed to move back, the second batch 
containing the late arriving record "1" is not going to change that. Stream B 
is going to be drained first until "5".
However, if the batch boundaries are different by just one record and the late 
arriving "1" is in the first batch:
{code}
Stream A: [5, 6, 7, 8, 9, 1], [10]

Stream B: [2, 3, 4, 5]
{code}
 then it's going to behave as currently described.

Please correct me if I got this wrong.
But if that is the case, it feels all too non-deterministic and I think the 
timestamp computation deserves further thought beyond the scope of 
[KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics],
 which is limited to punctuate semantics, but not stream time semantics in 
general.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4593) Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException

2017-04-29 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4593:
-
Description: 
1. Assume 2 running threads A and B, and one task t1 just for simplicity. 
Thread A and B are on different machines so their local state dir are not 
shared.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following exception on thread A:

{code}
java.lang.IllegalStateException: task XXX Log end offset of
YYY-table_stream-changelog-ZZ should not change while
restoring: old end offset .., current offset ..

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
{code}

  was:
1. Assume 2 running threads A and B, and one task t1 jut for simplicity. Thread 
A and B are not different machines so their local state dir are not shared.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When

Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface

2017-04-23 Thread Michal Borowiecki
However, if you're used to working with immutables, it can feel natural 
that withPartition would return a new object, so it could be more prone 
to mistakes.


Cheers,
Michal

On 23/04/17 10:41, Michal Borowiecki wrote:


IMHO, the ProducerRecord is anyway not immutable until send, since key 
and value don't have to immutable, until serialized on send.


So if immutability is a concern, I think it would have to be enforced 
in send as Mike suggested, don't see much point in enforcing 
immutability prior to send.


Therefore the with pattern should not be discarded just for that reason.

Thanks,

Michal


On 23/04/17 06:34, Stephane Maarek wrote:

Good call.
That’s when I heavily miss Scala Case classes and options. You get clarity on 
optional vs mandatory fields, one constructor, and immutability. If losing 
Immutability is an issue, then the with pattern is a no-go and then I’ll just 
add a missing constructor the way Ismael described it. That’ll make the PR way 
simpler, with limited refactoring.

Regarding the ConsumerRecord, I’m happy to have a look, but it’s the first time 
I see it concretely. When would you manually construct such a record? Isn’t the 
client handling all that for you behind the scene?
  


On 23/4/17, 3:21 pm, "Michael Pearce"<michael.pea...@ig.com>  wrote:

 If moving to a wither pattern instead of a builder. How will this enforce 
immutability? Eg current PR it is now changing to allow possible change values 
once set.
 
 Or are you proposing to change it to a mutable record? And move to a closable record similar to the closing of the headers on send.
 
 How about also the consumer record, is this also being looked at so we don't have two very different styles.
 
 Cheers

 Mike
 
 
 
 Sent using OWA for iPhone

 
 From:isma...@gmail.com  <isma...@gmail.com>  on behalf of Ismael 
Juma<ism...@juma.me.uk>
 Sent: Saturday, April 22, 2017 11:53:45 PM
 To:dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface
 
 On Sat, Apr 22, 2017 at 6:16 PM, Matthias J. Sax<matth...@confluent.io>

 wrote:
 
 > I think Ismael's suggestion is a valid alternative.

 >
 > However, `timestamp` is an optional field and thus we should have at
 > least two constructors for this:
 >
 >  - ProducerRecord(String topic, K key, V value)
 
  - ProducerRecord(String topic, K key, V value, Long timestamp)

 >
 
 Yes, the other one already exists.
 
 Ismael

 The information contained in this email is strictly confidential and for 
the use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.
     





--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If 
you have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and 
security purposes. To protect the environment please do not print this 
e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
company registered in England and Wales. Registered no. 3134634. VAT 
no. GB927523612




--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448


   

Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface

2017-04-23 Thread Michal Borowiecki
IMHO, the ProducerRecord is anyway not immutable until send, since key 
and value don't have to immutable, until serialized on send.


So if immutability is a concern, I think it would have to be enforced in 
send as Mike suggested, don't see much point in enforcing immutability 
prior to send.


Therefore the with pattern should not be discarded just for that reason.

Thanks,

Michal


On 23/04/17 06:34, Stephane Maarek wrote:

Good call.
That’s when I heavily miss Scala Case classes and options. You get clarity on 
optional vs mandatory fields, one constructor, and immutability. If losing 
Immutability is an issue, then the with pattern is a no-go and then I’ll just 
add a missing constructor the way Ismael described it. That’ll make the PR way 
simpler, with limited refactoring.

Regarding the ConsumerRecord, I’m happy to have a look, but it’s the first time 
I see it concretely. When would you manually construct such a record? Isn’t the 
client handling all that for you behind the scene?
  


On 23/4/17, 3:21 pm, "Michael Pearce" <michael.pea...@ig.com> wrote:

 If moving to a wither pattern instead of a builder. How will this enforce 
immutability? Eg current PR it is now changing to allow possible change values 
once set.
 
 Or are you proposing to change it to a mutable record? And move to a closable record similar to the closing of the headers on send.
 
 How about also the consumer record, is this also being looked at so we don't have two very different styles.
 
 Cheers

 Mike
 
 
 
 Sent using OWA for iPhone

 
 From: isma...@gmail.com <isma...@gmail.com> on behalf of Ismael Juma 
<ism...@juma.me.uk>
 Sent: Saturday, April 22, 2017 11:53:45 PM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface
 
 On Sat, Apr 22, 2017 at 6:16 PM, Matthias J. Sax <matth...@confluent.io>

 wrote:
 
 > I think Ismael's suggestion is a valid alternative.

 >
 > However, `timestamp` is an optional field and thus we should have at
 > least two constructors for this:
 >
 >  - ProducerRecord(String topic, K key, V value)
 
  - ProducerRecord(String topic, K key, V value, Long timestamp)

 >
 
 Yes, the other one already exists.
 
 Ismael

 The information contained in this email is strictly confidential and for 
the use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.
     





--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [VOTE] 0.10.2.1 RC3

2017-04-22 Thread Michal Borowiecki

It's listed below:


* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/



On 22/04/17 19:23, Shimi Kiviti wrote:

Is there a maven repo with these jars so I can test it against our kafka
streams services?

On Sat, Apr 22, 2017 at 9:05 PM, Eno Thereska <eno.there...@gmail.com>
wrote:


+1 tested the usual streams tests as before.

Thanks
Eno

On 21 Apr 2017, at 17:56, Gwen Shapira <g...@confluent.io> wrote:

Hello Kafka users, developers, friends, romans, countrypersons,

This is the fourth (!) candidate for release of Apache Kafka 0.10.2.1.

It is a bug fix release, so we have lots of bug fixes, some super
important.

Release notes for the 0.10.2.1 release:
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/RELEASE_NOTES.html

*** Please download, test and vote by Wednesday, April 26, 2017 ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/javadoc/

* Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.1 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=

8e4f09caeaa877f06dc75c7da1af7a727e5e599f


* Documentation:
http://kafka.apache.org/0102/documentation.html

* Protocol:
http://kafka.apache.org/0102/protocol.html

/**

Your help in validating this bugfix release is super valuable, so
please take the time to test and vote!

Suggested tests:
* Grab the source archive and make sure it compiles
* Grab one of the binary distros and run the quickstarts against them
* Extract and verify one of the site docs jars
* Build a sample against jars in the staging repo
* Validate GPG signatures on at least one file
* Validate the javadocs look ok
* The 0.10.2 documentation was updated for this bugfix release
(especially upgrade, streams and connect portions) - please make sure
it looks ok: http://kafka.apache.org/documentation.html

But above all, try to avoid finding new bugs - we want to get this

release

out the door already :P


Thanks,
Gwen



--
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter <https://twitter.com/ConfluentInc> | blog
<http://www.confluent.io/blog>




--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




[jira] [Updated] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken

2017-04-19 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5090:
-
Affects Version/s: 0.10.2.1

> Kafka Streams SessionStore.findSessions javadoc broken
> --
>
> Key: KAFKA-5090
> URL: https://issues.apache.org/jira/browse/KAFKA-5090
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1
>    Reporter: Michal Borowiecki
>Priority: Trivial
>
> {code}
> /**
>  * Fetch any sessions with the matching key and the sessions end is  
> earliestEndTime and the sessions
>  * start is  latestStartTime
>  */
> KeyValueIterator<Windowed, AGG> findSessions(final K key, long 
> earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> The conditions in the javadoc comment are inverted (le should be ge and ge 
> shoudl be le), since this is what the code does. They were correct in the 
> original KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows
> {code}
> /**
>  * Find any aggregated session values with the matching key and where the
>  * session’s end time is >= earliestSessionEndTime, i.e, the oldest 
> session to
>  * merge with, and the session’s start time is <= latestSessionStartTime, 
> i.e,
>  * the newest session to merge with.
>  */
>KeyValueIterator<Windowed, AGG> findSessionsToMerge(final K key, final 
> long earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> Also, the escaped html character references are missing the trailing 
> semicolon making them render as-is.
> Happy to have this assigned to me to fix as it seems trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken

2017-04-19 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5090:
-
Comment: was deleted

(was: https://github.com/apache/kafka/pull/2874)

> Kafka Streams SessionStore.findSessions javadoc broken
> --
>
> Key: KAFKA-5090
> URL: https://issues.apache.org/jira/browse/KAFKA-5090
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>    Reporter: Michal Borowiecki
>Priority: Trivial
>
> {code}
> /**
>  * Fetch any sessions with the matching key and the sessions end is  
> earliestEndTime and the sessions
>  * start is  latestStartTime
>  */
> KeyValueIterator<Windowed, AGG> findSessions(final K key, long 
> earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> The conditions in the javadoc comment are inverted (le should be ge and ge 
> shoudl be le), since this is what the code does. They were correct in the 
> original KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows
> {code}
> /**
>  * Find any aggregated session values with the matching key and where the
>  * session’s end time is >= earliestSessionEndTime, i.e, the oldest 
> session to
>  * merge with, and the session’s start time is <= latestSessionStartTime, 
> i.e,
>  * the newest session to merge with.
>  */
>KeyValueIterator<Windowed, AGG> findSessionsToMerge(final K key, final 
> long earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> Also, the escaped html character references are missing the trailing 
> semicolon making them render as-is.
> Happy to have this assigned to me to fix as it seems trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken

2017-04-19 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-5090:
-
Status: Patch Available  (was: Open)

https://github.com/apache/kafka/pull/2874

> Kafka Streams SessionStore.findSessions javadoc broken
> --
>
> Key: KAFKA-5090
> URL: https://issues.apache.org/jira/browse/KAFKA-5090
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>    Reporter: Michal Borowiecki
>Priority: Trivial
>
> {code}
> /**
>  * Fetch any sessions with the matching key and the sessions end is  
> earliestEndTime and the sessions
>  * start is  latestStartTime
>  */
> KeyValueIterator<Windowed, AGG> findSessions(final K key, long 
> earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> The conditions in the javadoc comment are inverted (le should be ge and ge 
> shoudl be le), since this is what the code does. They were correct in the 
> original KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows
> {code}
> /**
>  * Find any aggregated session values with the matching key and where the
>  * session’s end time is >= earliestSessionEndTime, i.e, the oldest 
> session to
>  * merge with, and the session’s start time is <= latestSessionStartTime, 
> i.e,
>  * the newest session to merge with.
>  */
>KeyValueIterator<Windowed, AGG> findSessionsToMerge(final K key, final 
> long earliestSessionEndTime, final long latestSessionStartTime);
> {code}
> Also, the escaped html character references are missing the trailing 
> semicolon making them render as-is.
> Happy to have this assigned to me to fix as it seems trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5090) Kafka Streams SessionStore.findSessions javadoc broken

2017-04-19 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-5090:


 Summary: Kafka Streams SessionStore.findSessions javadoc broken
 Key: KAFKA-5090
 URL: https://issues.apache.org/jira/browse/KAFKA-5090
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Michal Borowiecki
Priority: Trivial


{code}
/**
 * Fetch any sessions with the matching key and the sessions end is  
earliestEndTime and the sessions
 * start is  latestStartTime
 */
KeyValueIterator<Windowed, AGG> findSessions(final K key, long 
earliestSessionEndTime, final long latestSessionStartTime);
{code}

The conditions in the javadoc comment are inverted (le should be ge and ge 
shoudl be le), since this is what the code does. They were correct in the 
original KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows
{code}

/**
 * Find any aggregated session values with the matching key and where the
 * session’s end time is >= earliestSessionEndTime, i.e, the oldest session 
to
 * merge with, and the session’s start time is <= latestSessionStartTime, 
i.e,
 * the newest session to merge with.
 */
   KeyValueIterator<Windowed, AGG> findSessionsToMerge(final K key, final 
long earliestSessionEndTime, final long latestSessionStartTime);
{code}

Also, the escaped html character references are missing the trailing semicolon 
making them render as-is.

Happy to have this assigned to me to fix as it seems trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-16 Thread Michal Borowiecki

Hi Arun,

Thanks for putting the use cases on the wiki. I copied over your 
Terminology section to the main KIP page as I think it's super important 
to be clear on the terms.


I've made some changes while doing that which I highlight below, as I'd 
like to encourage comments on these.


1) I removed the mention of logical time, since the API is strictly 
mandates " milliseconds since midnight, January 1, 1970 UTC"  as opposed 
to any arbitrary logical time (even if it's not enforceable).


2) I broke up the definition of Stream Time into 2 separate terms: 
Stream Partition Time and Stream Time proper. This is for 2 reasons:


a) Follows the definition of Stream Time as it is stated on the 
ProcessorContext: 
https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java#L159


b) The timestamp extractors are stealing all the thunder ;-)
There's been a lot of discussion about timestamp extractors and merits 
of event/processing time, however I haven't encountered much in terms of 
justification why the stream time is fixed to be the /_smallest_/ among 
all its input stream partition timestamps. I found a comment in the 
PartitionGroup: 
https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L138

public long timestamp() {
// we should always return the smallest timestamp of all partitions
// to avoid group partition time goes backward

but I can't believe this to be the only reason behind this choice as 
minimum is not the only function to guarantee the group partition time 
never going back. Using the largest or the average among partitions' 
timestamp would also guaranteed the group timestamp not going back as 
timestamp never goes back for any individual partition.
So why was minimum chosen? Is it depended on by window semantics 
somewhere or anything else?


3) I used the term Punctuate'stimestampargument instead of Punctuation 
Timestamp since I found the latter sound too similar to Punctuate Time


4) Rephrased Output Record Time. This is something I haven't given any 
thought before whatsoever. Is it still true to what you meant?



Comments appreciated, especially need input on 2b above.

Cheers,
Michal


On 10/04/17 12:58, Arun Mathew wrote:

Thanks Ewen.

@Michal, @all, I have created a child page to start the Use Cases discussion 
[https://cwiki.apache.org/confluence/display/KAFKA/Punctuate+Use+Cases]. Please 
go through it and give your comments.

@Tianji, Sorry for the delay. I am trying to make the patch public.

--
Arun Mathew

On 4/8/17, 02:00, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:

 Arun,
 
 I've given you permission to edit the wiki. Let me know if you run into any

 issues.
 
 -Ewen
 
 On Fri, Apr 7, 2017 at 1:21 AM, Arun Mathew <amat...@yahoo-corp.jp> wrote:
 
 > Thanks Michal. I don’t have the access yet [arunmathew88]. Should I be

 > sending a separate mail for this?
 >
 > I thought one of the person following this thread would be able to give 
me
 > access.
 >
 >
 >
 > *From: *Michal Borowiecki <michal.borowie...@openbet.com>
 > *Reply-To: *"dev@kafka.apache.org" <dev@kafka.apache.org>
 > *Date: *Friday, April 7, 2017 at 17:16
 > *To: *"dev@kafka.apache.org" <dev@kafka.apache.org>
 > *Subject: *Re: [DISCUSS] KIP-138: Change punctuate semantics
 >
 >
 >
 > Hi Arun,
 >
 > I was thinking along the same lines as you, listing the use cases on the
 > wiki, but didn't find time to get around doing that yet.
 > Don't mind if you do it if you have access now.
 > I was thinking it would be nice if, once we have the use cases listed,
 > people could use likes to up-vote the use cases similar to what they're
 > working on.
 >
 > I should have a bit more time to action this in the next few days, but
 > happy for you to do it if you can beat me to it ;-)
 >
 > Cheers,
 > Michal
 >
 > On 07/04/17 04:39, Arun Mathew wrote:
 >
 > Sure, Thanks Matthias. My id is [arunmathew88].
 >
 >
 >
 > Of course. I was thinking of a subpage where people can collaborate.
 >
 >
 >
 > Will do as per Michael’s suggestion.
 >
 >
 >
 > Regards,
 >
 > Arun Mathew
 >
 >
 >
 > On 4/7/17, 12:30, "Matthias J. Sax" <matth...@confluent.io> 
<matth...@confluent.io> wrote:
 >
 >
 >
 > Please share your Wiki-ID and a committer can give you write access.
 >
 >
 >
 > Btw: as you did not initiate the KIP, you should not change the 

Re: 答复: Invoking KafkaConsumer#seek for the same partition

2017-04-13 Thread Michal Borowiecki

But I totally agree, the comment is ambiguous.

The way it's phrased now "latest offset" can easily be taken for "the 
highest(=latest) of the offsets" rather than "the offset last-used".


Cheers,

Michal


On 13/04/17 10:07, Hu Xi wrote:

Oh My! yes, you are right. I would have been thinking it that way  Thank 
you.

________
发件人: Michal Borowiecki <michal.borowie...@openbet.com>
发送时间: 2017年4月13日 17:02
收件人: dev@kafka.apache.org
主题: Re: Invoking KafkaConsumer#seek for the same partition

Sounds to me the comment is imprecisely phrased but was meant to
indicate the behaviour you are describing.

Perhaps instead of "the latest offset", it should say, "the offset used
in the latest seek" to make it super-clear.

Cheers,

Michal


On 13/04/17 08:28, Hu Xi wrote:

Hi guys,


The comments for KafkaConsumer#seek says “If this API is invoked for the same 
partition more than once, the latest offset will be used on the next poll()”. 
However, I tried a couple of times, and it turned out that the next poll could 
always read records from the offset which was specified in the last call of 
KafkaConsumer#seek instead of the latest offset. Seems the comment is not 
correct.  What do you say? Any comments are welcomed.







Re: Invoking KafkaConsumer#seek for the same partition

2017-04-13 Thread Michal Borowiecki
Sounds to me the comment is imprecisely phrased but was meant to 
indicate the behaviour you are describing.


Perhaps instead of "the latest offset", it should say, "the offset used 
in the latest seek" to make it super-clear.


Cheers,

Michal


On 13/04/17 08:28, Hu Xi wrote:

Hi guys,


The comments for KafkaConsumer#seek says “If this API is invoked for the same 
partition more than once, the latest offset will be used on the next poll()”. 
However, I tried a couple of times, and it turned out that the next poll could 
always read records from the offset which was specified in the last call of 
KafkaConsumer#seek instead of the latest offset. Seems the comment is not 
correct.  What do you say? Any comments are welcomed.





Re: [VOTE] 0.10.2.1 RC0

2017-04-12 Thread Michal Borowiecki
FWIW, I upgraded without issue and noticed the speedup from 
KAFKA-4851/KAFKA-4876.


+1 from me (non-binding)


On 12/04/17 02:06, Gwen Shapira wrote:

Wrong link :)
http://kafka.apache.org/documentation/#upgrade
and
http://kafka.apache.org/documentation/streams#streams_api_changes_0102

On Tue, Apr 11, 2017 at 5:57 PM, Gwen Shapira <g...@confluent.io> wrote:

FYI: I just updated the upgrade notes with Streams changes:
http://kafka.apache.org/documentation/#gettingStarted

On Fri, Apr 7, 2017 at 5:12 PM, Gwen Shapira <g...@confluent.io> wrote:

Hello Kafka users, developers and client-developers,

This is the first candidate for the release of Apache Kafka 0.10.2.1. This
is a bug fix release and it includes fixes and improvements from 24 JIRAs
(including a few critical bugs). See the release notes for more details:

http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Thursday, 13 April, 8am PT ***

Your help in validating this bugfix release is super valuable, so
please take the time to test and vote!

Few notes:
1. There are missing "Notable Changes" in the docs:
https://github.com/apache/kafka/pull/2824
I will review, merge and update the docs by Monday.
2. The last commit (KAFKA-4943 chery-pick) did not pass system tests
yet. We may need another RC if system tests fail tonight.

Suggested tests:
  * Grab the source archive and make sure it compiles
  * Grab one of the binary distros and run the quickstarts against them
  * Extract and verify one of the site docs jars
  * Build a sample against jars in the staging repo
  * Validate GPG signatures on at least one file
  * Validate the javadocs look ok

*

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging

* Javadoc:
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/javadoc/

* Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=d08115f05da0e39c7f75b45e05d6d14ad5baf71d

* Documentation:
http://kafka.apache.org/0102/documentation.html

* Protocol:
http://kafka.apache.org/0102/protocol.html

Thanks,
Gwen Shapira



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog





--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-07 Thread Michal Borowiecki
 to mix approaches.
 >>>>>>>
 >>>>>>> Then the Processor API becomes:
 >>>>>>> punctuate(Time time) // time here denotes which schedule resulted
 >>>>>>> in
 >>>>>>> this call.
 >>>>>>>
 >>>>>>> Thoughts?
 >>>>>>>
 >>>>>>>
 >>>>>>> On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax wrote:
 >>>>>>>>
 >>>>>>>> Thanks a lot for the KIP Michal,
 >>>>>>>>
 >>>>>>>> I was thinking about the four options you proposed in more
 >>>>>>>> details
 >>>>>>>> and
 >>>>>>>> this are my thoughts:
 >>>>>>>>
 >>>>>>>> (A) You argue, that users can still "punctuate" on event-time
 >> via
 >>>>>>>> process(), but I am not sure if this is possible. Note, that
 >>>>>>>> users
 >>>>>>>> only
 >>>>>>>> get record timestamps via context.timestamp(). Thus, users
 >> would
 >>>>>>>> need
 >>>>>>>> to
 >>>>>>>> track the time progress per partition (based on the partitions
 >>>>>>>> they
 >>>>>>>> obverse via context.partition(). (This alone puts a huge burden
 >>>>>>>> on
 >>>>>>>> the
 >>>>>>>> user by itself.) However, users are not notified at startup
 >> what
 >>>>>>>> partitions are assigned, and user are not notified when
 >>>>>>>> partitions
 >>>>>>>> get
 >>>>>>>> revoked. Because this information is not available, it's not
 >>>>>>>> possible
 >>>>>>>> to
 >>>>>>>> "manually advance" stream-time, and thus event-time punctuation
 >>>>>>>> within
 >>>>>>>> process() seems not to be possible -- or do you see a way to
 >> get
 >>>>>>>> it
 >>>>>>>> done? And even if, it might still be too clumsy to use.
 >>>>>>>>
 >>>>>>>> (B) This does not allow to mix both approaches, thus limiting
 >>>>>>>> what
 >>>>>>>> users
 >>>>>>>> can do.
 >>>>>>>>
 >>>>>>>> (C) This should give all flexibility we need. However, just
 >>>>>>>> adding
 >>>>>>>> one
 >>>>>>>> more method seems to be a solution that is too simple (cf my
 >>>>>>>> comments
 >>>>>>>> below).
 >>>>>>>>
 >>>>>>>> (D) This might be hard to use. Also, I am not sure how a user
 >>>>>>>> could
 >>>>>>>> enable system-time and event-time punctuation in parallel.
 >>>>>>>>
 >>>>>>>>
 >>>>>>>>
 >>>>>>>> Overall options (C) seems to be the most promising approach to
 >>>>>>>> me.
 >>>>>>>> Because I also favor a clean API, we might keep current
 >>>>>>>> punctuate()
 >>>>>>>> as-is, but deprecate it -- so we can remove it at some later
 >>>>>>>> point
 >>>>>>>> when
 >>>>>>>> people use the "new punctuate API".
 >>>>>>>>
 >>>>>>>>
 >>>>>>>> Couple of follow up questions:
 >>>>>>>>
 >>>>>>>> - I am wondering, if we should have two callback methods or
 >> just
 >>>>>>>> one
 >>>>>>>> (ie, a unified for system and event time punctuation or one for
 >>>>>>>> each?).
 >>>>>>>>
 >>>>>>>> - If we have one, how can the user figure out, which condition
 >>>

[jira] [Commented] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?

2017-04-04 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955260#comment-15955260
 ] 

Michal Borowiecki commented on KAFKA-4971:
--

I'd venture a guess that you are limited by something else than your hdd/ssd 
performance.
Is 1g your total memory in the VM? How much of it is allocated to the kafka jvm 
process?
Some things I can think of:
Is there a lot of activity in the gc.log?
Is the OS not swapping ferociously due to over-allocation of memory by any 
chance?

Hope that helps.

> Why is there no difference between kafka benchmark tests on SSD and HDD? 
> -
>
> Key: KAFKA-4971
> URL: https://issues.apache.org/jira/browse/KAFKA-4971
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.10.0.0
> Environment: Oracle VM VirtualBox
> OS : CentOs 7
> Memory : 1G
> Disk : 8GB
>Reporter: Dasol Kim
>
> I installed OS and kafka in the two SSD and two HDDs  to perform the kafka 
> benchmark test based on the disc difference. As expected, the SSD should show 
> faster results, but according to my experimental results, there is no big 
> difference between SSD and HDD. why? Ohter settings have been set to default.
> *test settings
> zookeeper node  : 1, producer node : 2, broker node : 2(SSD 1, HDD 1)
> test scenario : Two producers send messages to the broker and compare the 
> throughtput per second of kafka installed on SSD and kafka on HDD
> command : ./bin/kafka-producer-perf-test.sh --num-records 100 
> --record-size 2000 --topic test --throughput 10 --producer-props 
> bootstrap.servers=SN02:9092
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-03 Thread Michal Borowiecki

Thanks Thomas,

I'm also wary of changing the existing semantics of punctuate, for 
backward compatibility reasons, although I like the conceptual 
simplicity of that option.


Adding a new method to me feels safer but, in a way, uglier. I added 
this to the KIP now as option (C).


The TimestampExtractor mechanism is actually more flexible, as it allows 
you to return any value, you're not limited to event time or system time 
(although I don't see an actual use case where you might need anything 
else then those two). Hence I also proposed the option to allow users 
to, effectively, decide what "stream time" is for them given the 
presence or absence of messages, much like they can decide what msg time 
means for them using the TimestampExtractor. What do you think about 
that? This is probably most flexible but also most complicated.


All comments appreciated.

Cheers,

Michal


On 03/04/17 19:23, Thomas Becker wrote:

Although I fully agree we need a way to trigger periodic processing
that is independent from whether and when messages arrive, I'm not sure
I like the idea of changing the existing semantics across the board.
What if we added an additional callback to Processor that can be
scheduled similarly to punctuate() but was always called at fixed, wall
clock based intervals? This way you wouldn't have to give up the notion
of stream time to be able to do periodic processing.

On Mon, 2017-04-03 at 10:34 +0100, Michal Borowiecki wrote:

Hi all,

I have created a draft for KIP-138: Change punctuate semantics
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+
punctuate+semantics>
.

Appreciating there can be different views on system-time vs event-
time
semantics for punctuation depending on use-case and the importance of
backwards compatibility of any such change, I've left it quite open
and
hope to fill in more info as the discussion progresses.

Thanks,
Michal

--


 Tommy Becker

 Senior Software Engineer

 O +1 919.460.4747

 tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.





[jira] [Comment Edited] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?

2017-04-03 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953608#comment-15953608
 ] 

Michal Borowiecki edited comment on KAFKA-4971 at 4/3/17 2:54 PM:
--

I think your question would be easier to respond to if you quantified it by 
providing your test results and the drive specs.
Kafka IO access patterns are designed to be sequential for good reason. 
Spinning disks and OS level buffering are optimised for such IO patterns, but I 
don't know if that alone can account for the miss-match between your 
expectations and the results you are getting on your hardware.


was (Author: mihbor):
I think your question would be easier to respond to if you quantified it by 
providing your test results and the drive specs.
Kafka IO access patterns are designed to be sequential for good reason. 
Spinning disks and OS level buffering are optimised for such IO patterns, but I 
don't know if that alone can account for the miss-match between your 
expectations and the results your getting on your hardware.

> Why is there no difference between kafka benchmark tests on SSD and HDD? 
> -
>
> Key: KAFKA-4971
> URL: https://issues.apache.org/jira/browse/KAFKA-4971
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.10.0.0
> Environment: Oracle VM VirtualBox
> OS : CentOs 7
> Memory : 1G
> Disk : 8GB
>Reporter: Dasol Kim
>
> I installed OS and kafka in the two SSD and two HDDs  to perform the kafka 
> benchmark test based on the disc difference. As expected, the SSD should show 
> faster results, but according to my experimental results, there is no big 
> difference between SSD and HDD. why? Ohter settings have been set to default.
> *test settings
> zookeeper node  : 1, producer node : 2, broker node : 2(SSD 1, HDD 1)
> test scenario : Two producers send messages to the broker and compare the 
> throughtput per second of kafka installed on SSD and kafka on HDD
> command : ./bin/kafka-producer-perf-test.sh --num-records 100 
> --record-size 2000 --topic test --throughput 10 --producer-props 
> bootstrap.servers=SN02:9092
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?

2017-04-03 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953608#comment-15953608
 ] 

Michal Borowiecki commented on KAFKA-4971:
--

I think your question would be easier to respond to if you quantified it by 
providing your test results and the drive specs.
Kafka IO access patterns are designed to be sequential for good reason. 
Spinning disks and OS level buffering are optimised for such IO patterns, but I 
don't know if that alone can account for the miss-match between your 
expectations and the results your getting on your hardware.

> Why is there no difference between kafka benchmark tests on SSD and HDD? 
> -
>
> Key: KAFKA-4971
> URL: https://issues.apache.org/jira/browse/KAFKA-4971
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.10.0.0
> Environment: Oracle VM VirtualBox
> OS : CentOs 7
> Memory : 1G
> Disk : 8GB
>Reporter: Dasol Kim
>
> I installed OS and kafka in the two SSD and two HDDs  to perform the kafka 
> benchmark test based on the disc difference. As expected, the SSD should show 
> faster results, but according to my experimental results, there is no big 
> difference between SSD and HDD. why? Ohter settings have been set to default.
> *test settings
> zookeeper node  : 1, producer node : 2, broker node : 2(SSD 1, HDD 1)
> test scenario : Two producers send messages to the broker and compare the 
> throughtput per second of kafka installed on SSD and kafka on HDD
> command : ./bin/kafka-producer-perf-test.sh --num-records 100 
> --record-size 2000 --topic test --throughput 10 --producer-props 
> bootstrap.servers=SN02:9092
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS] KIP-138: Change punctuate semantics

2017-04-03 Thread Michal Borowiecki
Hi all,

I have created a draft for KIP-138: Change punctuate semantics

.

Appreciating there can be different views on system-time vs event-time
semantics for punctuation depending on use-case and the importance of
backwards compatibility of any such change, I've left it quite open and
hope to fill in more info as the discussion progresses.

Thanks,
Michal


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907523#comment-15907523
 ] 

Michal Borowiecki commented on KAFKA-4835:
--

My point above was that if the customerRef (which is what we partition by) was 
part of the key (not the whole key) then we'd still need to modify the key for 
the purpose of the join operation.
We'd need to do that for both streams, even though they would both be 
partitioned by the same part of the key, hence the re-partitioning (forced 
automatically by kafka streams) would be totally unnecessary.

In more generic terms, I think this can be a common use case. Let' consider it 
using DDD concepts. We have an aggregate comprised of multiple entities. We 
send messages for each entity (not the whole aggregate) but to ensure 
sequential processing for entities belonging to the same aggregate, the 
messages are partitioned by the aggregate id. The entity id is still important, 
especially for compacted topics it would be needed for deletion markers, as the 
key is all there is in that case. Hence it comes naturally to compose the 
message key as ::
Then, if you want to join two such streams by aggregate id, you should be able 
to do it without repartitioning (since both partitioned by the aggregate-id 
part of the msg key). However, since joins are only supported on the whole msg 
key, you're forced to re-map the key to just  prior to the join 
which in turn currently forces repartitioning.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>    Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable<String, Activity> loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-10 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904785#comment-15904785
 ] 

Michal Borowiecki commented on KAFKA-4835:
--

Yes, that's the case. Message key is a concatenation of activity type + 
activity id but the partitioning is done on the customer.

NB. I don't think it was wise for us to not put the key partitioned on in the 
msg key, however, that ship has sailed, I'm afraid. 
However, my understanding is that even if the partitioning key was part of the 
msg key (e.g. activity type + customerRef + activity id), we'd still be using a 
custom partitioner and we'd still have this issue.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>    Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable<String, Activity> loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897771#comment-15897771
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Oh, I wouldn't mind that at all. Just thought that you wanted to stick to event 
time semantics for this, but if you're not precious about that then I'm all for 
it :)

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897413#comment-15897413
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Thank you for responding.
Just now I had a thought about the semantics of event time.
It is already possible to provide a TimestampExtractor that determines what the 
event time is, given a message.
It's not far fetched to assume user should also want a way to specify what the 
event time is, given the absence of messages (on one or more input partitions).
Possibly by providing an implementation other than what 
PartitionGroup.timestamp() is doing based on the timestamps of its partitions.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897371#comment-15897371
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Hi [~enothereska],
I have to disagree. It is perfectly clear to me (from the documentation) that 
punctuate is based on event time, not system time. However, the problem is 
event time is not advanced reliably, since a single input stream that doesn't 
receive messages will cause the event time to not be advanced. In an extreme 
case of a poorly partitioned topic, I can imagine some partition may never get 
a message. That would cause a topology that has that partition as input to not 
advance event time ever, hence not fire punctuate ever, regardless of the 
presence of messages on its other input topics. In my opinion, if the purpose 
of punctuate is to perform periodic operations, then this flaw makes it unfit 
for that purpose. 

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2017-03-03 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894498#comment-15894498
 ] 

Michal Borowiecki commented on KAFKA-4601:
--

Created KAFKA-4835.

> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: performance
>
> Consider the following DSL:
> {code}
> Stream<String, String> source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1").map(..);
> KTable<String, Long> counts = source
> .groupByKey()
> .count("Counts");
> KStream<String, String> sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>   topics: [X-Counts-repartition]
>   children:   
> [KSTREAM-AGGREGATE-02]
>   KSTREAM-AGGREGATE-02:
>   states: [Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the 
> join, which not only introduce unnecessary overheads but also mess up the 
> processing ordering (users are expecting each record to go through 
> aggregation first then the join operator). And in order to get the following 
> simpler topology users today need to add a {{through}} operator after {{map}} 
> manually to enforce repartitioning.
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-SINK-02]
>   KSTREAM-SINK-02:
>   topic:  topic 2
> ProcessorTopology:
>   KSTREAM-SOURCE-03:
>   topics: [topic 2]
>   children:   
> [KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05]
>   KSTREAM-AGGREGATE-04:
>   states: [Counts]
>   KSTREAM-LEFTJOIN-05:
>   states: [Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can 
> consider doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4835) Allow users control over repartitioning

2017-03-03 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4835:


 Summary: Allow users control over repartitioning
 Key: KAFKA-4835
 URL: https://issues.apache.org/jira/browse/KAFKA-4835
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Michal Borowiecki


>From 
>https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030

...it would be good to provide users more control over the repartitioning. 
My use case is as follows (unrelated bits omitted for brevity):
{code}
KTable<String, Activity> loggedInCustomers = builder
.stream("customerLogins")
.groupBy((key, activity) -> 
activity.getCustomerRef())
.reduce((first,second) -> second, loginStore());

builder
.stream("balanceUpdates")
.map((key, activity) -> new KeyValue<>(
activity.getCustomerRef(),
activity))
.join(loggedInCustomers, (activity, session) -> ...
.to("sessions");
{code}
Both "groupBy" and "map" in the underlying implementation set the 
repartitionRequired flag (since the key changes), and the aggregation/join that 
follows will create the repartitioned topic.
However, in our case I know that both input streams are already partitioned by 
the customerRef value, which I'm mapping into the key (because it's required by 
the join operation).
So there are 2 unnecessary intermediate topics created with their associated 
overhead, while the ultimate goal is simply to do a join on a value that we 
already use to partition the original streams anyway.
(Note, we don't have the option to re-implement the original input streams to 
make customerRef the message key.)

I think it would be better to allow the user to decide (from their knowledge of 
the incoming streams) whether a repartition is mandatory on aggregation and 
join operations (overloaded version of the methods with the repartitionRequired 
flag exposed maybe?)
An alternative would be to allow users to perform a join on a value other than 
the key (a keyValueMapper parameter to join, like the one used for joins with 
global tables), but I expect that to be more involved and error-prone to use 
for people who don't understand the partitioning requirements well (whereas 
it's safe for global tables).




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2017-02-23 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881030#comment-15881030
 ] 

Michal Borowiecki commented on KAFKA-4601:
--

Don't know if this belongs in this ticket or warrants a separate one, but I'd 
suggest, instead of trying to rely on kstreams doing more automatic 
optimization, it would be good to provide users more control over the 
repartitioning. 
My use case is as follows (unrelated bits omitted for brevity):
{code}
KTable<String, Activity> loggedInCustomers = builder
.stream("customerLogins")
.groupBy((key, activity) -> 
activity.getCustomerRef())
.reduce((first,second) -> second, loginStore());

builder
.stream("balanceUpdates")
.map((key, activity) -> new KeyValue<>(
activity.getCustomerRef(),
activity))
.join(loggedInCustomers, (activity, session) -> ...
.to("sessions");
{code}
Both "groupBy" and "map" in the underlying implementation set the 
repartitionRequired flag (since the key changes), and the aggregation/join that 
follows will create the repartitioned topic.
However, in our case I know that both input streams are already partitioned by 
the customerRef value, which I'm mapping into the key (because it's required by 
the join operation).
So there are 2 unnecessary intermediate topics created with their associated 
overhead, while the ultimate goal is simply to do a join on a value that we 
already use to partition the original streams anyway.
(Note, we don't have the option to re-implement the original input streams to 
make customerRef the message key.)

I think it would be better to allow the user to decide (from their knowledge of 
the incoming streams) whether a repartition is mandatory on aggregation and 
join operations (overloaded version of the methods with the repartitionRequired 
flag exposed maybe?)
An alternative would be to allow users to perform a join on a value other than 
the key (a keyValueMapper parameter to join, like the one used for joins with 
global tables), but I expect that to be more involved and error-prone to use 
for people who don't understand the partitioning requirements well (whereas 
it's safe for global tables).


> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: performance
>
> Consider the following DSL:
> {code}
> Stream<String, String> source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1").map(..);
> KTable<String, Long> counts = source
> .groupByKey()
> .count("Counts");
> KStream<String, String> sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>  

[jira] [Created] (KAFKA-4750) KeyValueIterator returns null values

2017-02-09 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4750:


 Summary: KeyValueIterator returns null values
 Key: KAFKA-4750
 URL: https://issues.apache.org/jira/browse/KAFKA-4750
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Michal Borowiecki


The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
will not return null values. However, after upgrading from 0.10.0.0 to 0.10.1.1 
we found null values are returned causing NPEs on our side.

I found this happens after removing entries from the store and I found 
resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
deleting entries and having a serializer that does not return null when null is 
passed in, the state store doesn't actually delete that key/value pair but the 
iterator will return null value for that key.

When I modified our serilizer to return null when null is passed in, the 
problem went away. However, I believe this should be fixed in kafka streams, 
perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2016-11-21 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683056#comment-15683056
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

IMO, 2) *is* a severe problem. Punctuate methods (as described by their API) 
are meant to perform periodic operations. As it currently stands, if any of the 
input topics doesn't receive messages regularly, the punctuate method won't be 
called regularly either (due to the min offset across all partitions not 
advancing), which violates what the API promises. 
We've worked around it in our app by creating an independent stream and a 
scheduler sending ticks regularly to an input topic to a Transformer, so that 
it's punctuate method is called predictably but this is far from ideal.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-11-02 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628442#comment-15628442
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

KAFKA-4366 created for the KafkaSteams.close() hanging issue.

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream t

[jira] [Created] (KAFKA-4366) KafkaStreams.close() blocks indefinitely

2016-11-02 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4366:


 Summary: KafkaStreams.close() blocks indefinitely
 Key: KAFKA-4366
 URL: https://issues.apache.org/jira/browse/KAFKA-4366
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.1, 0.10.1.0
Reporter: Michal Borowiecki
Assignee: Guozhang Wang


KafkaStreams.close() method calls join on all its threads without a timeout, 
meaning indefinitely, which makes it prone to deadlocks and unfit to be used in 
shutdown hooks.

(KafkaStreams::close is used in numerous examples by confluent: 
https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams
 and 
https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/
 so we assumed it to be recommended practice)

A deadlock happens, for instance, if System.exit() is called from within the 
uncaughtExceptionHandler. (We need to call System.exit() from the 
uncaughtExceptionHandler because KAFKA-4355 issue shuts down the StreamThread 
and to recover we want the process to exit, as our infrastructure will then 
start it up again.)

The System.exit call (from the uncaughtExceptionHandler, which runs in the 
StreamThread) will execute the shutdown hook in a new thread and wait for that 
thread to join. If the shutdown hook calls KafkaStreams.close, it will in turn 
block waiting for the StreamThread to join, hence the deadlock.

Runtime.addShutdownHook javadocs state:
{quote}
Shutdown hooks run at a delicate time in the life cycle of a virtual machine 
and should therefore be coded defensively. They should, in particular, be 
written to be thread-safe and to avoid deadlocks insofar as possible
{quote}
and
{quote}
Shutdown hooks should also finish their work quickly.
{quote}
Therefore the current implementation of KafkaStreams.close() which waits 
forever for threads to join is completely unsuitable for use in a shutdown 
hook. 





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-29 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15618009#comment-15618009
 ] 

Michal Borowiecki edited comment on KAFKA-4355 at 10/29/16 12:18 PM:
-

Trying to work around this issue by calling System.exit from the 
UncaughtExceptionHandler (once the app dies, it will be re-started by our 
infrastructure).
We are adding a shutdown hook as per example here: 
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/

{code:java}
Runtime.getRuntime().addShutdownHook(new 
Thread(schedulerStreams::close));
{code}

However, even though both stream threads report completion of shutdown:
{noformat}
[2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}
and before that report the closing of their producers and consumers, the app is 
not stopped.
At least the following 2 threads remain active and keep logging:
{noformat}
[2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590021 after 0ms 
(org.apache.zookeeper.ClientCnxn)
[2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] 
Sending metadata request {topics=[scheduler]} to node 0 
(org.apache.kafka.clients.NetworkClient)
[2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] 
Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, 
nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = 
[Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = 
[0,])]) (org.apache.kafka.clients.Metadata)
[2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590022 after 0ms 
(org.apache.zookeeper.ClientCnxn)
{noformat}

"Stopped Kafka Stream process" is never logged, so the close method remains 
blocked on the join here, I suspect:
https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227

PS. When we don't add the shutdown hook to call close(), then the app exits 
correctly on System.exit(). I think it is pretty bad behaviour if the close() 
method blocks indefinitely, so I'll raise a separate ticket, unless I find one 
exists for that already. It should be easier to reproduce hopefully.


was (Author: mihbor):
Trying to work around this issue by calling System.exit from the 
UncaughtExceptionHandler (once the app dies, it will be re-started by our 
infrastructure).
We are adding a shutdown hook as per example here: 
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/

{code:java}
Runtime.getRuntime().addShutdownHook(new 
Thread(schedulerStreams::close));
{code}

However, even though both stream threads report completion of shutdown:
{noformat}
[2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}
and before that report the closing of their producers and consumers, the app is 
not stopped.
At least the following 2 threads remain active and keep logging:
{noformat}
[2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590021 after 0ms 
(org.apache.zookeeper.ClientCnxn)
[2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] 
Sending metadata request {topics=[scheduler]} to node 0 
(org.apache.kafka.clients.NetworkClient)
[2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] 
Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, 
nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = 
[Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = 
[0,])]) (org.apache.kafka.clients.Metadata)
[2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590022 after 0ms 
(org.apache.zookeeper.ClientCnxn)
{noformat}

"Stopped Kafka Stream process" is never logged, so the close method remains 
blocked on the join here, I suspect:
https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227

> StreamThread intermittently dies with "Topi

[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-29 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15618009#comment-15618009
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

Trying to work around this issue by calling System.exit from the 
UncaughtExceptionHandler (once the app dies, it will be re-started by our 
infrastructure).
We are adding a shutdown hook as per example here: 
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/

{code:java}
Runtime.getRuntime().addShutdownHook(new 
Thread(schedulerStreams::close));
{code}

However, even though both stream threads report completion of shutdown:
{noformat}
[2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] 
Stream thread shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}
and before that report the closing of their producers and consumers, the app is 
not stopped.
At least the following 2 threads remain active and keep logging:
{noformat}
[2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590021 after 0ms 
(org.apache.zookeeper.ClientCnxn)
[2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] 
Sending metadata request {topics=[scheduler]} to node 0 
(org.apache.kafka.clients.NetworkClient)
[2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] 
Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, 
nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = 
[Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = 
[0,])]) (org.apache.kafka.clients.Metadata)
[2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping 
response for sessionid: 0x158101fc9590022 after 0ms 
(org.apache.zookeeper.ClientCnxn)
{noformat}

"Stopped Kafka Stream process" is never logged, so the close method remains 
blocked on the join here, I suspect:
https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apac

[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15616118#comment-15616118
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

Perhaps the DefultPartitionGrouper here:
https://github.com/apache/kafka/blob/e7663a306f40e9fcbc3096d17fb0f99fa3d11d1d/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java#L81
should instead of StreamsException throw a RetriableException?
AbstractCoordinator would then keep looping instead of re-throwing it:
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L320

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.k

[jira] [Commented] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15615967#comment-15615967
 ] 

Michal Borowiecki commented on KAFKA-4355:
--

My first suspect so far is the ConsumerCoordinator.
In this line: 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L301
it sets topics on the metadata from subscriptions, which the debugger shows to 
contain the correct topic name.
4 lines later: 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L305
it calls client.ensureFreshMetadata(), which can override the topics list.

Debugger shows that in the problematic case in 
https://github.com/apache/kafka/blob/d092673838173d9dedbf5acf3f4e2cd8c736294f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L313
 the passed metadata object already has an empty set of topics, while the 
subscriptions object contains the topic name.

So I think the topic was removed from the metadata in line 305. 

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
> 

[jira] [Created] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-4355:


 Summary: StreamThread intermittently dies with "Topic not found 
during partition assignment" when broker restarted
 Key: KAFKA-4355
 URL: https://issues.apache.org/jira/browse/KAFKA-4355
 Project: Kafka
  Issue Type: Bug
  Components: streams
 Environment: kafka 0.10.0.0
kafka 0.10.1.0
Reporter: Michal Borowiecki
Assignee: Guozhang Wang


When (a) starting kafka streams app before the broker or
(b) restarting the broker while the streams app is running:
the stream thread intermittently dies with "Topic not found during partition 
assignment" StreamsException.
This happens about between one in 5 or one in 10 times.
Stack trace:
{noformat}
Exception in thread "StreamThread-2" 
org.apache.kafka.streams.errors.StreamsException: Topic not found during 
partition assignment: scheduler
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
{noformat}

Our app has 2 streams in it, consuming from 2 different topics.
Sometimes the exception happens on both stream threads. Sometimes only on one 
of the stream threads.

The exception is preceded by:
{noformat}
[2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group pool-scheduler 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Completed validating interna

[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4355:
-
Environment: 
kafka 0.10.0.0
kafka 0.10.1.0

uname -a
Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
2016 x86_64 x86_64 x86_64 GNU/Linux

java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)


  was:
kafka 0.10.0.0
kafka 0.10.1.0


> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runL

[jira] [Updated] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-10-28 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4355:
-
Description: 
When (a) starting kafka streams app before the broker or
(b) restarting the broker while the streams app is running:
the stream thread intermittently dies with "Topic not found during partition 
assignment" StreamsException.
This happens about between one in 5 or one in 10 times.
Stack trace:
{noformat}
Exception in thread "StreamThread-2" 
org.apache.kafka.streams.errors.StreamsException: Topic not found during 
partition assignment: scheduler
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
at 
org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
{noformat}

Our app has 2 streams in it, consuming from 2 different topics.
Sometimes the exception happens on both stream threads. Sometimes only on one 
of the stream threads.

The exception is preceded by:
{noformat}
[2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group pool-scheduler 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group 
pool-scheduler (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Completed validating internal topics in partition assignor 
(org.apache.kafka.streams.processor.internals.StreamPartitionAssignor)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThread-2] 
Shutting down (org.apache.kafka.streams.processor.internals.StreamThread)
[2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread [StreamThre

  1   2   >