Build failed in Jenkins: kafka-trunk-jdk10 #314

2018-07-24 Thread Apache Jenkins Server
See 

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H23 (ubuntu xenial) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from 
https://github.com/apache/kafka.git
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:888)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1155)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1186)
at hudson.scm.SCM.checkout(SCM.java:504)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1208)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1794)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
Caused by: hudson.plugins.git.GitException: Command "git fetch --tags 
--progress https://github.com/apache/kafka.git 
+refs/heads/*:refs/remotes/origin/*" returned status code 128:
stdout: 
stderr: error: Could not read 0f3affc0f40751dc8fd064b36b6e859728f63e37
error: Could not read 95fbb2e03f4fe79737c71632e0ef2dfdcfb85a69
error: Could not read 08c465028d057ac23cdfe6d57641fe40240359dd
remote: Counting objects: 5896, done.
remote: Compressing objects:   1% (1/56)   remote: Compressing objects: 
  3% (2/56)   remote: Compressing objects:   5% (3/56)   
remote: Compressing objects:   7% (4/56)   remote: Compressing objects: 
  8% (5/56)   remote: Compressing objects:  10% (6/56)   
remote: Compressing objects:  12% (7/56)   remote: Compressing objects: 
 14% (8/56)   remote: Compressing objects:  16% (9/56)   
remote: Compressing objects:  17% (10/56)   remote: Compressing 
objects:  19% (11/56)   remote: Compressing objects:  21% (12/56)   
remote: Compressing objects:  23% (13/56)   remote: Compressing 
objects:  25% (14/56)   remote: Compressing objects:  26% (15/56)   
remote: Compressing objects:  28% (16/56)   remote: Compressing 
objects:  30% (17/56)   remote: Compressing objects:  32% (18/56)   
remote: Compressing objects:  33% (19/56)   remote: Compressing 
objects:  35% (20/56)   remote: Compressing objects:  37% (21/56)   
remote: Compressing objects:  39% (22/56)   remote: Compressing 
objects:  41% (23/56)   remote: Compressing objects:  42% (24/56)   
remote: Compressing objects:  44% (25/56)   remote: Compressing 
objects:  46% (26/56)   remote: Compressing objects:  48% (27/56)   
remote: Compressing objects:  50% (28/56)   remote: Compressing 
objects:  51% (29/56)   remote: Compressing objects:  53% (30/56)   
remote: Compressing objects:  55% (31/56)   remote: Compressing 
objects:  57% (32/56)   remote: Compressing objects:  58% (33/56)   
remote: Compressing objects:  60% (34/56)   remote: Compressing 
objects:  62% (35/56)   remote: Compressing objects:  64% (36/56)   
remote: Compressing objects:  66% (37/56)   remote: Compressing 
objects:  67% (38/56)   remote: Compressing objects:  69% (39/56)   
remote: Compressing objects:  71% (40/56)   remote: Compressing 
objects:  73% (41/56)   remote: Compressing objects:  75% (42/56)   
remote: Compressing objects:  76% (43/56)   remote: Compressing 
objects:  78% (44/56)   remote: Compressing objects:  80% (45/56)   
remote: Compressing objects:  82% (46/56)   remote: Compressing 
objects:  83% (47/56)   remote: Compressing objects:  85% (48/56)   
remote: Compressing objects:  87% (49/56)   remote: Compressing 
objects:  89% (50/56)   remote: Compressing objects:  91% (51/56)   
remote: Compressing objects:  92% (52/56)   remote: Compressing 
objects:  94% (53/56)   remote: Compressing objects:  96% (54/56)   
remote: Compressing objects:  98% (55/56)   

[jira] [Created] (KAFKA-7203) Improve Streams StickyTaskAssingor

2018-07-24 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7203:


 Summary: Improve Streams StickyTaskAssingor
 Key: KAFKA-7203
 URL: https://issues.apache.org/jira/browse/KAFKA-7203
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


This is a inspired discussion while trying to fix KAFKA-7144.

Currently we are not striking a very good trade-off sweet point between 
stickiness and workload balance: we are honoring the former more than the 
latter. One idea to improve on this is the following:

{code}
I'd like to propose a slightly different approach to fix 7114 while making 
no-worse tradeoffs between stickiness and sub-topology balance. The key idea is 
to try to adjust the assignment to gets the distribution as closer as to the 
sub-topologies' num.tasks distribution.

Here is a detailed workflow:

1. at the beginning, we first calculate for each client C, how many tasks 
should it be assigned ideally, as num.total_tasks / num.total_capacity * 
C_capacity rounded down, call it C_a. Note that since we round down this 
number, the summing C_a across all C would be <= num.total_tasks, but this does 
not matter.

2. and then for each client C, based on its num. previous assigned tasks C_p, 
we calculate how many tasks it should take over, or give up as C_a - C_p (if it 
is positive, it should take over some, otherwise it should give up some).

Note that because of the round down, when we calculate the C_a - C_p for each 
client, we need to make sure that the total number of give ups and total number 
of take overs should be equal, some ad-hoc heuristics can be used.

3. then we calculate the tasks distribution across the sub-topologies as a 
whole. For example, if we have three sub-topologies, st0 and st1, and st0 has 4 
total tasks, st1 has 4 total tasks, and st2 has 8 total tasks, then the 
distribution between st0, st1 and st2 should be 1:1:2. Let's call it the global 
distribution, and note that currently since num.tasks per sub-topology never 
change, this distribution should NEVER change.

4. then for each client that should give up some, we decides which tasks it 
should give up so that the remaining tasks distribution is proportional to the 
above global distribution.

For example, if a client previously own 4 tasks of st0, no tasks of st1, and 2 
tasks of st2, and now it needs to give up 3 tasks, I should then give up 2 of 
st0 and 1 of st1, so that the remaining distribution is closer to 1:1:2.

5. now we've collected a list of given-up tasks plus the ones that does not 
have any prev active assignment (normally operations it should not happen since 
all tasks should have been created since day one), we now migrate them to those 
who needs to take over some, similarly proportional to the global distribution.

For example if a client previously own 1 task of st0, and nothing of st1 and 
st2, and now it needs to take over 3 tasks, we would try to give it 1 task of 
st1 and 2 tasks of st2, so that the resulted distribution becomes 1:1:2. And we 
ONLY consider prev-standby tasks when we decide which one of st1 / st2 should 
we get for that client.

Now, consider the following scenarios:

a) this is a clean start and there is no prev-assignment at all, step 4 would 
be a no-op; the result should still be fine.

b) a client leaves the group, no client needs to give up and all clients may 
need to take over some, so step 4 is no-op, and the cumulated step 5 only 
contains the tasks of the left client.

c) a new client joins the group, all clients need to give up some, and only the 
new client need to take over all the given-up ones. Hence step 5 is 
straight-forward.
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7144) Kafka Streams doesn't properly balance partition assignment

2018-07-24 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7144.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> Kafka Streams doesn't properly balance partition assignment
> ---
>
> Key: KAFKA-7144
> URL: https://issues.apache.org/jira/browse/KAFKA-7144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: James Cheng
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: OneThenTwelve.java
>
>
> Kafka Streams doesn't always spread the tasks across all available 
> instances/threads
> I have a topology which consumes a single partition topic and goes .through() 
> a 12 partition topic. The makes 13 partitions.
>  
> I then started 2 instances of the application. I would have expected the 13 
> partitions to be split across the 2 instances roughly evenly (7 partitions on 
> one, 6 partitions on the other).
> Instead, one instance gets 12 partitions, and the other instance gets 1 
> partition.
>  
> Repro case attached. I ran it a couple times, and it was fairly repeatable.
> Setup for the repro:
> {code:java}
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one 
> --partitions 1 --replication-factor 1 
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve 
> --partitions 12 --replication-factor 1
> $ echo foo | kafkacat -P -b 127.0.0.1 -t one
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-338 Support to exclude the internal topics in kafka-topics.sh command

2018-07-24 Thread Chia-Ping Tsai
Current voting result is shown below.

+2 (binding)
1) Jason Gustafson
2) Gwen Shapira

+4 (non-binding) 
1) Colin McCabe
2) Manikumar
3) Dong Lin
4) Vahid S Hashemian

Need one more binding vote please

Cheers,
Chia-Ping

On 2018/07/16 08:10:51, Chia-Ping Tsai  wrote: 
> hi folks,
> 
> The discussion[1] of KIP-338[2] did not get any objection for last 6 days so 
> it is time to start the voting thread.
> 
> Thanks for your time!
> 
> [1] 
> https://lists.apache.org/thread.html/9bd4e61b73c901b51132ada49743b9b703d40b85fc4eeaa5c9099900@%3Cdev.kafka.apache.org%3E
> 
> [2] 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-338+Support+to+exclude+the+internal+topics+in+kafka-topics.sh+command
> 
> Cheers,
> chia-ping
> 


Re: [Discuss] KIP-321: Add method to get TopicNameExtractor in TopologyDescription

2018-07-24 Thread Nishanth Pradeep
I have updated the KIP

.

Changes to the KIP:

   - Removed topics() from the Public Interface and Proposed Changes
   sections.
   - Added topics() to the Deprecation plan.

Thanks again for the feedback.

Best,
Nishanth Pradeep

On Tue, Jul 24, 2018 at 11:21 AM Guozhang Wang  wrote:

> We should not remove it immediately in the up coming 2.1 release. Usually
> we first mark an API as deprecated, and consider removing it only after it
> has been deprecated for at least one major release period.
>
>
> Guozhang
>
> On Mon, Jul 23, 2018 at 7:40 PM, Nishanth Pradeep 
> wrote:
>
> > Sounds good to me too.
> >
> > As far as deprecating goes -- should the topics() method removed
> completely
> > or should it have a @deprecated annotation for removal in some future
> > version?
> >
> > Best,
> > Nishanth Pradeep
> >
> > On Sun, Jul 22, 2018 at 1:32 PM Matthias J. Sax 
> > wrote:
> >
> > > Works for me.
> > >
> > > On 7/22/18 9:48 AM, Guozhang Wang wrote:
> > > > I think I can be convinced with deprecating topics() to keep API
> > minimal.
> > > >
> > > > About renaming the others with `XXNames()`: well, to me it feels
> still
> > > not
> > > > very worthy since although it is not a big burden, it seems also not
> a
> > > big
> > > > "return" if we name the newly added function `topicSet()`.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Jul 20, 2018 at 7:38 PM, Nishanth Pradeep <
> > nishanth...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > >> I definitely agree with you on deprecating topics().
> > > >>
> > > >> I also think changing the method names for consistency is
> reasonable,
> > > since
> > > >> there is no functionality change. Although, I can be convinced
> either
> > > way
> > > >> on this one.
> > > >>
> > > >> Best,
> > > >> Nishanth Pradeep
> > > >> On Fri, Jul 20, 2018 at 12:15 PM Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > >> wrote:
> > > >>
> > > >>> I would still deprecate existing `topics()` method. If users need a
> > > >>> String, they can call `topicSet().toString()`.
> > > >>>
> > > >>> It's just a personal preference, because I believe it's good to
> keep
> > > the
> > > >>> API "minimal".
> > > >>>
> > > >>> About renaming the other methods: I thinks it's a very small burden
> > to
> > > >>> deprecate the existing methods and add them with new names. Also
> just
> > > my
> > > >>> 2 cents.
> > > >>>
> > > >>> Would be good to see what others think.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 7/19/18 6:20 PM, Nishanth Pradeep wrote:
> > >  Understood, Guozhang.
> > > 
> > >  Thanks for the help, everyone! I have updated the KIP. Let me know
> > if
> > > >> you
> > >  any other thoughts or suggestions.
> > > 
> > >  Best,
> > >  Nishanth Pradeep
> > > 
> > >  On Thu, Jul 19, 2018 at 7:33 PM Guozhang Wang  >
> > > >>> wrote:
> > > 
> > > > I see.
> > > >
> > > > Well, I think if we add a new function like topicSet() it is less
> > > >>> needed to
> > > > deprecate topics() as it returns "{topic1, topic2, ..}" which is
> > sort
> > > >> of
> > > > non-overlapping in usage with the new API.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Jul 19, 2018 at 5:31 PM, Nishanth Pradeep <
> > > >>> nishanth...@gmail.com>
> > > > wrote:
> > > >
> > > >> That is what I meant. I will add topicSet() instead of changing
> > the
> > > >> signature of topics() for compatibility reasons. But should we
> not
> > > >> add
> > > >>> a
> > > >> @deprecated flag for topics() or do you want to keep it around
> for
> > > >> the
> > > > long
> > > >> run?
> > > >>
> > > >> On Thu, Jul 19, 2018 at 7:27 PM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > wrote:
> > > >>
> > > >>> We cannot change the signature of the function named "topics"
> > from
> > > >> "String"
> > > >>> to "Set", as Matthias mentioned it is a compatibility
> > > >> breaking
> > > >>> change.
> > > >>>
> > > >>> That's why I was proposing add a new function like "Set
> > > >>> topicSet()", while keeping "String topics()" as is.
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>> On Thu, Jul 19, 2018 at 5:22 PM, Nishanth Pradeep <
> > > > nishanth...@gmail.com
> > > >>>
> > > >>> wrote:
> > > >>>
> > >  Right, adding topicNames() instead of changing the return type
> > of
> > > >>> topics()
> > >  in order preserve backwards compatibility is a good idea. But
> is
> > > it
> > > > not
> > >  better to depreciate topics() because it would be redundant?
> In
> > > our
> > > >> case,
> > >  it would only be calling topicNames/topicSet#toString().
> > > 
> > >  I still agree that perhaps changing the other API's might be
> 

Re: [DISCUSS] KIP-346 - Limit blast radius of log compaction failure

2018-07-24 Thread Stanislav Kozlovski
Hey James, Ted,

@James - Thanks for showing me some of the changes, that was informative.

* *Log Cleaner Thread Revival* - I also acknowledge that could be useful.
My concern is that if the thread has died, there is most likely something
wrong with either the disk or the software and since both are deterministic
(correct me if I'm wrong), we will most likely hit it very soon again. I am
not sure that scenario would be any good, but I am also not sure if it
would hurt. Could it waste a significant amount of CPU from dying and
running again?

* *Partition Re-clean* - Hmm, maybe some sort of retry mechanism could be
worth exploring. I'd like to hear other people's opinion on this and
whether or not they've seen such scenarios before diving into possible
implementation.

* *Metric* - Could you point me to the some resources showing how the JMX
metrics should be structured? I could not found any and am sadly not too
knowledgeable on the topic

* *uncleanable-partitions* *metric* - Yes, that might be problematic. Maybe
the format Ted suggested would be best - "topic1-0,1,2". Then again, I fear
we might still run out of characters. I am not sure how to best approach
this yet.

* *Disk Problems* - I am aware that the 4 JIRAs are not related to disk
problems. I think this KIP brings the most value to exactly such scenarios
- ones where the disk is OK. But then again, I thought I'd suggest failing
the disk after a certain number of errors on it since it makes sense to me.
I do not have a strong opinion about this, though. Now that you mentioned
that this actually increases the blast radius - I tend to agree. Maybe we
should scrap this behavior.

Best,
Stanislav

On Tue, Jul 24, 2018 at 6:13 AM Ted Yu  wrote:

> As James pointed out in his reply, topic-partition name can be long.
> It is not necessary to repeat the topic name for each of its partitions.
> How about the following format:
>
> topic-name1-{partition1, partition2, etc}
>
> That is, topic name only appears once.
>
> Cheers
>
> On Mon, Jul 23, 2018 at 9:08 PM Stanislav Kozlovski <
> stanis...@confluent.io>
> wrote:
>
> > Hi Ted,
> >
> > Yes, absolutely. Thanks for pointing that out!
> >
> > On Mon, Jul 23, 2018 at 6:12 PM Ted Yu  wrote:
> >
> > > For `uncleanable-partitions`, should the example include topic name(s)
> ?
> > >
> > > Cheers
> > >
> > > On Mon, Jul 23, 2018 at 5:46 PM Stanislav Kozlovski <
> > > stanis...@confluent.io>
> > > wrote:
> > >
> > > > I renamed the KIP and that changed the link. Sorry about that. Here
> is
> > > the
> > > > new link:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error
> > > >
> > > > On Mon, Jul 23, 2018 at 5:11 PM Stanislav Kozlovski <
> > > > stanis...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hey group,
> > > > >
> > > > > I created a new KIP about making log compaction more
> fault-tolerant.
> > > > > Please give it a look here and please share what you think,
> > especially
> > > in
> > > > > regards to the points in the "Needs Discussion" paragraph.
> > > > >
> > > > > KIP: KIP-346
> > > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Limit+blast+radius+of+log+compaction+failure
> > > > >
> > > > > --
> > > > > Best,
> > > > > Stanislav
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> >
> >
> > --
> > Best,
> > Stanislav
> >
>


-- 
Best,
Stanislav


Re: Processor API StateStore and Recovery with State Machines question.

2018-07-24 Thread Guozhang Wang
Hello Adam,

I figured that rather than answering your questions one-by-one, I'd give
you a more general explanation between consumer offset commits, changelog
and state store.

If you have a state store update processor, the state store maintenance
workflow is this:


1) updating the state store:

1.a write to state store.
1.b write to changelog topic


Note that 1.a) could be async: the state store may be caching enabled, and
also even the state store itself may have some write buffer (e.g. rocksDB);
also 1.b) is async and batching enabled as well, and the actual sending
request is done via another thread.

So at the end of 1.b) either is possible: data is written persistently to
the local files of the state store, but have not been sent to changelog, or
data not written persistently to local files yet, but have been sent to
changelog, or both have happened, or neither has happened.


2) committing the state store:

2.a) flush state store (make sure all previous writes have been persisted)
2.b) flush on producer (make sure all previous writes to changelog topics
have been acknowledged).
2.c) commit offset.

That is, if committing succeeded, by the end of 2.c) all should be done,
and everything is consistent.

Now if there is a crash after 1.b) and before 2), then like above said, any
scenarios may happen, but note that consumer's offset will definitely NOT
committed yet (it should only be done in 2.c) ), so upon restarting the
data will be re-processed, and hence either state store's image or
changelog may contained duplicated results, aka "at-least-once".

3) Finally, when exactly-once is enabled, if there is any crashes, the
changelog topic / state store will be "rewinded" (I omit the implementation
details here, but just assume that logically, we can rewind them) to the
previously successful commit, so `exactly-once` is guaranteed.


Guozhang

On Sun, Jul 22, 2018 at 5:29 PM, Adam Bellemare 
wrote:

> Hi Folks
>
> I have a quick question about a scenario that I would appreciate some
> insight on. This is related to a KIP I am working on, but I wanted to break
> this out into its own scenario to reach a wider audience. In this scenario,
> I am using builder.internalTopologyBuilder to create the following within
> the internals of Kafka Streaming:
>
> 1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...) )
>
> 2) ProcessorSupplier with StateStore, Changelogging enabled. For the
> purpose of this question, this processor is a very simple state machine.
> All it does is alternately block each other event, of a given key, from
> processing. For instance:
> (A,1)
> (A,2)
> (A,3)
> It would block the propagation of (A,2). The state of the system after
> processing each event is:
> blockNext = true
> blockNext = false
> blockNext = true
>
> The expecation is that this component would always block the same event, in
> any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but not
> (A,1) or (A,3) ). In other words, it would maintain perfect state in
> accordance with the offsets of the upstream and downstream elements.
>
> 3) The third component is a KTable with a Materialized StateStore where I
> want to sink the remaining events. It is also backed by a change log. The
> events arriving would be:
> (A,1)
> (A,3)
>
> The components are ordered as:
> 1 -> 2 -> 3
>
>
> Note that I am keeping the state machine in a separate state store. My main
> questions are:
>
> 1) Will this workflow be consistent in all manners of failure? For example,
> are the state stores change logs fully written to internal topics before
> the offset is updated for the consumer in #1?
>
> 2) Is it possible that one State Store with changelogging will be logged to
> Kafka safely (say component #3) but the other (#2) will not be, prior to a
> sudden, hard termination of the node?
>
> 3) Is the alternate possible, where #2 is backed up to its Kafka Topic but
> #3 is not? Does the ordering of the topology matter in this case?
>
> 4) Is it possible that the state store #2 is updated and logged, but the
> source topic (#1) offset is not updated?
>
> In all of these cases, my main concern is keeping the state and the
> expected output consistent. For any failure mode, will I be able to recover
> to a fully consistent state given the requirements of the state machine in
> #2?
>
> Though this is a trivial example, I am not certain about the dynamics
> between maintaining state, recovering from internal changelog topics, and
> the order in which all of these things apply. Any words of wisdom or
> explanations would be helpful here. I have been looking through the code
> but I wanted to get second opinions on this.
>
>
>
> Thanks,
>
> Adam
>



-- 
-- Guozhang


Re: [VOTE] 2.0.0 RC3

2018-07-24 Thread Vahid S Hashemian
+1 (non-binding)

Built from source and ran quickstart successfully with both Java 8 and 
Java 9 on Ubuntu.
Thanks Rajini!

--Vahid




From:   Rajini Sivaram 
To: dev , Users , 
kafka-clients 
Date:   07/24/2018 08:33 AM
Subject:[VOTE] 2.0.0 RC3



Hello Kafka users, developers and client-developers,


This is the fourth candidate for release of Apache Kafka 2.0.0.


This is a major version release of Apache Kafka. It includes 40 new  KIPs
and

several critical bug fixes. Please see the 2.0.0 release plan for more
details:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80448820



A few notable highlights:

   - Prefixed wildcard ACLs (KIP-290), Fine grained ACLs for CreateTopics
   (KIP-277)
   - SASL/OAUTHBEARER implementation (KIP-255)
   - Improved quota communication and customization of quotas (KIP-219,
   KIP-257)
   - Efficient memory usage for down conversion (KIP-283)
   - Fix log divergence between leader and follower during fast leader
   failover (KIP-279)
   - Drop support for Java 7 and remove deprecated code including old 
scala
   clients
   - Connect REST extension plugin, support for externalizing secrets and
   improved error handling (KIP-285, KIP-297, KIP-298 etc.)
   - Scala API for Kafka Streams and other Streams API improvements
   (KIP-270, KIP-150, KIP-245, KIP-251 etc.)


Release notes for the 2.0.0 release:

http://home.apache.org/~rsivaram/kafka-2.0.0-rc3/RELEASE_NOTES.html



*** Please download, test and vote by Friday July 27, 4pm PT.


Kafka's KEYS file containing PGP keys we use to sign the release:

http://kafka.apache.org/KEYS



* Release artifacts to be voted upon (source and binary):

http://home.apache.org/~rsivaram/kafka-2.0.0-rc3/



* Maven artifacts to be voted upon:

https://repository.apache.org/content/groups/staging/



* Javadoc:

http://home.apache.org/~rsivaram/kafka-2.0.0-rc3/javadoc/



* Tag to be voted upon (off 2.0 branch) is the 2.0.0 tag:

https://github.com/apache/kafka/releases/tag/2.0.0-rc3


* Documentation:

http://kafka.apache.org/20/documentation.html



* Protocol:

http://kafka.apache.org/20/protocol.html



* Successful Jenkins builds for the 2.0 branch:

Unit/integration tests: 
https://builds.apache.org/job/kafka-2.0-jdk8/90/


System tests: 
https://jenkins.confluent.io/job/system-test-kafka/job/2.0/41/



/**


Thanks,



Rajini






Re: [DISCUSS] KIP-342 Add Customizable SASL extensions to OAuthBearer authentication

2018-07-24 Thread Stanislav Kozlovski
Hi Ron,

Thanks for the suggestions. I have applied them to the KIP.

On Tue, Jul 24, 2018 at 1:39 PM Ron Dagostino  wrote:

> Hi Stanislav.  The statement "New config option for OAuthBearerLoginModule"
> is technically incorrect; it should be "New config option for default,
> unsecured bearer tokens" since that is what provides the functionality (as
> opposed to the login module, which does not).  Also, please state that
> "auth" is not supported as a custom extension name with any
> SASL/OAUTHBEARER mechanism, including the unsecured one, since it is
> reserved by the spec for what is normally sent in the HTTP Authorization
> header an attempt to use it will result in a configuration exception.
>
> Finally, please also state that while the OAuthBearerLoginModule and the
> OAuthBearerSaslClient will be changed to request the extensions from its
> callback handler, for backwards compatibility it is not necessary for the
> callback handler to support SaslExtensionsCallback -- any
> UnsupportedCallbackException that is thrown will be ignored and no
> extensions will be added.
>
> Ron
>
> On Tue, Jul 24, 2018 at 11:20 AM Stanislav Kozlovski <
> stanis...@confluent.io>
> wrote:
>
> > Hey everybody,
> >
> > I have updated the KIP to reflect the latest changes as best as I could.
> If
> > there aren't more suggestions, I intent to start the [VOTE] thread
> > tomorrow.
> >
> > Best,
> > Stanislav
> >
> > On Tue, Jul 24, 2018 at 6:34 AM Ron Dagostino  wrote:
> >
> > > Hi Stanislav.  Could you update the KIP to reflect the latest
> definition
> > of
> > > SaslExtensions and confirm or correct the impact it has to the
> > > SCRAM-related classes?  I'm not sure if the currently-described impact
> is
> > > still accurate.  Also, could you mention the changes to
> > > OAuthBearerUnsecuredLoginCallbackHandler in the text in addition to
> > giving
> > > the examples?  The examples show the new
> > > unsecuredLoginExtension_ feature, but that feature is
> not
> > > described anywhere prior to it appearing there.
> > >
> > > Ron
> > >
> > > On Mon, Jul 23, 2018 at 1:42 PM Ron Dagostino 
> wrote:
> > >
> > > > Hi Rajini.  I think a class is fine as long as we make sure the
> > semantics
> > > > of immutability are clear -- it would have to be a value class, and
> any
> > > > constructor that accepts a Map as input would have to copy that Map
> > > rather
> > > > than store it in a member variable.  Similarly, any Map that it might
> > > > return would have to be unmodifiable.
> > > >
> > > > Ron
> > > >
> > > > On Mon, Jul 23, 2018 at 12:24 PM Rajini Sivaram <
> > rajinisiva...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > >> Hi Ron, Stanislav,
> > > >>
> > > >> I agree with Stanislav that it would be better to leave
> > `SaslExtensions`
> > > >> as
> > > >> a class rather than make it an interface. We don''t really expect
> > users
> > > to
> > > >> extends this class, so it is convenient to have an implementation
> > since
> > > >> users need to create an instance. The class provided by the public
> API
> > > >> should be sufficient in the vast majority of the cases. Ron, do you
> > > agree?
> > > >>
> > > >> On Mon, Jul 23, 2018 at 11:35 AM, Ron Dagostino 
> > > >> wrote:
> > > >>
> > > >> > Hi Stanislav.  See
> https://tools.ietf.org/html/rfc7628#section-3.1,
> > > and
> > > >> > that section refers to the core ABNF productions defined in
> > > >> > https://tools.ietf.org/html/rfc5234#appendix-B.
> > > >> >
> > > >> > Ron
> > > >> >
> > > >> > > On Jul 23, 2018, at 1:30 AM, Stanislav Kozlovski <
> > > >> stanis...@confluent.io>
> > > >> > wrote:
> > > >> > >
> > > >> > > Hey Ron and Rajini,
> > > >> > >
> > > >> > > Here are my thoughts:
> > > >> > > Regarding separators in SaslExtensions - Agreed, that was a bad
> > > move.
> > > >> > > Should definitely not be a concern of CallbackHandler and
> > > LoginModule
> > > >> > > implementors.
> > > >> > > SaslExtensions interface - Wouldn't implementing it as an
> > interface
> > > >> mean
> > > >> > > that users will have to make sure they're passing in an
> > unmodifiable
> > > >> map
> > > >> > > themselves. I believe it would be better if we enforced that
> > through
> > > >> > class
> > > >> > > constructors instead.
> > > >> > > SaslExtensions#map() - I'd also prefer this. The reason I went
> > with
> > > >> > > `extensionValue` and `extensionNames` was because I figured it
> > made
> > > >> sense
> > > >> > > to have `ScramExtensions` extend `SaslExtensions` and therefore
> > have
> > > >> > their
> > > >> > > API be similar. In the end, do you think that it is worth it to
> > have
> > > >> > > `ScramExtensions` extend `SaslExtensions`?
> > > >> > > @Ron, could you point me to the SASL OAuth mechanism specific
> > > regular
> > > >> > > expressions for keys/values you mentioned are in RFC 7628 (
> > > >> > > https://tools.ietf.org/html/rfc7628) ? I could not find any
> while
> > > >> > > originally implementing this.
> > > >> > >
> > > >> > > Best,
> > > >> > > 

[jira] [Created] (KAFKA-7202) Support multiple auto-generated docs formats

2018-07-24 Thread Joel Hamill (JIRA)
Joel Hamill created KAFKA-7202:
--

 Summary: Support multiple auto-generated docs formats
 Key: KAFKA-7202
 URL: https://issues.apache.org/jira/browse/KAFKA-7202
 Project: Kafka
  Issue Type: New Feature
Reporter: Joel Hamill


Currently the configuration parameters for Confluent/Kafka are autogenerated as 
HTML (and hosted at [https://kafka.apache.org/documentation/#configuration]). 
This request is to expand this to support other formats (e.g. RST) so that they 
can be easily leveraged by other authorign language formats.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-342 Add Customizable SASL extensions to OAuthBearer authentication

2018-07-24 Thread Ron Dagostino
Hi Stanislav.  The statement "New config option for OAuthBearerLoginModule"
is technically incorrect; it should be "New config option for default,
unsecured bearer tokens" since that is what provides the functionality (as
opposed to the login module, which does not).  Also, please state that
"auth" is not supported as a custom extension name with any
SASL/OAUTHBEARER mechanism, including the unsecured one, since it is
reserved by the spec for what is normally sent in the HTTP Authorization
header an attempt to use it will result in a configuration exception.

Finally, please also state that while the OAuthBearerLoginModule and the
OAuthBearerSaslClient will be changed to request the extensions from its
callback handler, for backwards compatibility it is not necessary for the
callback handler to support SaslExtensionsCallback -- any
UnsupportedCallbackException that is thrown will be ignored and no
extensions will be added.

Ron

On Tue, Jul 24, 2018 at 11:20 AM Stanislav Kozlovski 
wrote:

> Hey everybody,
>
> I have updated the KIP to reflect the latest changes as best as I could. If
> there aren't more suggestions, I intent to start the [VOTE] thread
> tomorrow.
>
> Best,
> Stanislav
>
> On Tue, Jul 24, 2018 at 6:34 AM Ron Dagostino  wrote:
>
> > Hi Stanislav.  Could you update the KIP to reflect the latest definition
> of
> > SaslExtensions and confirm or correct the impact it has to the
> > SCRAM-related classes?  I'm not sure if the currently-described impact is
> > still accurate.  Also, could you mention the changes to
> > OAuthBearerUnsecuredLoginCallbackHandler in the text in addition to
> giving
> > the examples?  The examples show the new
> > unsecuredLoginExtension_ feature, but that feature is not
> > described anywhere prior to it appearing there.
> >
> > Ron
> >
> > On Mon, Jul 23, 2018 at 1:42 PM Ron Dagostino  wrote:
> >
> > > Hi Rajini.  I think a class is fine as long as we make sure the
> semantics
> > > of immutability are clear -- it would have to be a value class, and any
> > > constructor that accepts a Map as input would have to copy that Map
> > rather
> > > than store it in a member variable.  Similarly, any Map that it might
> > > return would have to be unmodifiable.
> > >
> > > Ron
> > >
> > > On Mon, Jul 23, 2018 at 12:24 PM Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > >> Hi Ron, Stanislav,
> > >>
> > >> I agree with Stanislav that it would be better to leave
> `SaslExtensions`
> > >> as
> > >> a class rather than make it an interface. We don''t really expect
> users
> > to
> > >> extends this class, so it is convenient to have an implementation
> since
> > >> users need to create an instance. The class provided by the public API
> > >> should be sufficient in the vast majority of the cases. Ron, do you
> > agree?
> > >>
> > >> On Mon, Jul 23, 2018 at 11:35 AM, Ron Dagostino 
> > >> wrote:
> > >>
> > >> > Hi Stanislav.  See https://tools.ietf.org/html/rfc7628#section-3.1,
> > and
> > >> > that section refers to the core ABNF productions defined in
> > >> > https://tools.ietf.org/html/rfc5234#appendix-B.
> > >> >
> > >> > Ron
> > >> >
> > >> > > On Jul 23, 2018, at 1:30 AM, Stanislav Kozlovski <
> > >> stanis...@confluent.io>
> > >> > wrote:
> > >> > >
> > >> > > Hey Ron and Rajini,
> > >> > >
> > >> > > Here are my thoughts:
> > >> > > Regarding separators in SaslExtensions - Agreed, that was a bad
> > move.
> > >> > > Should definitely not be a concern of CallbackHandler and
> > LoginModule
> > >> > > implementors.
> > >> > > SaslExtensions interface - Wouldn't implementing it as an
> interface
> > >> mean
> > >> > > that users will have to make sure they're passing in an
> unmodifiable
> > >> map
> > >> > > themselves. I believe it would be better if we enforced that
> through
> > >> > class
> > >> > > constructors instead.
> > >> > > SaslExtensions#map() - I'd also prefer this. The reason I went
> with
> > >> > > `extensionValue` and `extensionNames` was because I figured it
> made
> > >> sense
> > >> > > to have `ScramExtensions` extend `SaslExtensions` and therefore
> have
> > >> > their
> > >> > > API be similar. In the end, do you think that it is worth it to
> have
> > >> > > `ScramExtensions` extend `SaslExtensions`?
> > >> > > @Ron, could you point me to the SASL OAuth mechanism specific
> > regular
> > >> > > expressions for keys/values you mentioned are in RFC 7628 (
> > >> > > https://tools.ietf.org/html/rfc7628) ? I could not find any while
> > >> > > originally implementing this.
> > >> > >
> > >> > > Best,
> > >> > > Stanislav
> > >> > >
> > >> > >> On Sun, Jul 22, 2018 at 6:46 PM Ron Dagostino  >
> > >> > wrote:
> > >> > >>
> > >> > >> Hi again, Rajini and Stanislav.  I wonder if making
> SaslExtensions
> > an
> > >> > >> interface rather than a class might be a good solution.  For
> > example:
> > >> > >>
> > >> > >> public interface SaslExtensions {
> > >> > >>   /**
> > >> > >>* @return an immutable map view of the SASL 

NotCoordinatorForGroupException in a long loop

2018-07-24 Thread Jamie Park
Hi,

org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not
the correct coordinator for this group.


When there is an under-replication partition, it causes our
stream-application to stall with NotCoorindatorForGroupException.

This retriableException goes on in a loop for a long time.  Eventually the
right coordinator is found but I am not sure what the coordinator change
does NOT get detected during heartbeats.I also noticed the Coordinator
object becomes null in AbstractCoordinator class.


Any help will be appreciated.


Thanks.


dev and prod cluster consumption rate are different

2018-07-24 Thread Jamie Park
Hi,

I have a kafka stream application that processes 250K events per minute in
our dev kafka cluster but in production using the same binary, the
performance degrades dramatically to a few thousands per minute.  What
could possibly be wrong?

1. server.properties in dev and prod, kafka brokers are exactly the same
2.  production cluster has much greater data volume, topic numbers, number
of brokers.

Please help.


Re: [DISCUSS] KIP-289: Improve the default group id behavior in KafkaConsumer

2018-07-24 Thread Vahid S Hashemian
Hi Jason,

Thanks for clarifying.

So if we are going to continue supporting the empty group id as before 
(with only an addition of a deprecation warning), and disable 
enable.auto.commit for the new default (null) group id on the client side, 
do we really need to bump up the OffsetCommit version?

You mentioned "If an explicit empty string is configured for the group id, 
then maybe we keep the current behavior for compatibility" which makes 
sense to me, but I find it in conflict with your earlier suggestion "we 
just need to bump the OffsetCommit request API and only accept the offset 
commit for older versions.". Maybe I'm missing something?

Thanks!
--Vahid




From:   Jason Gustafson 
To: dev 
Date:   07/23/2018 10:52 PM
Subject:Re: [DISCUSS] KIP-289: Improve the default group id 
behavior in KafkaConsumer



Hey Vahid,

Thanks for the updates. Just to clarify, I was suggesting that we disable
enable.auto.commit only if no explicit group.id is configured. If an
explicit empty string is configured for the group id, then maybe we keep
the current behavior for compatibility. We can log a warning mentioning 
the
deprecation and we can use the old version of the OffsetCommit API that
allows the empty group id. In a later release, we can drop this support in
the client. Does that seem reasonable?

By the way, instead of using the new ILLEGAL_OFFSET_COMMIT error code,
couldn't we use INVALID_GROUP_ID?

Thanks,
Jason



On Mon, Jul 23, 2018 at 5:14 PM, Stanislav Kozlovski 
 wrote:

> Hey Vahid,
>
> No I don't see an issue with it. I believe it to be the best approach.
>
> Best,
> Stanisav
>
> On Mon, Jul 23, 2018 at 12:41 PM Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Hi Stanislav,
> >
> > Thanks for the feedback.
> > Do you see an issue with using `null` as the default group id (as
> > addressed by Jason in his response)?
> > This default group id would not support offset commits and consumers
> would
> > use `auto.offset.reset` config when there is no current offset.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> >
> > From:   Stanislav Kozlovski 
> > To: dev@kafka.apache.org
> > Date:   07/20/2018 11:09 AM
> > Subject:Re: [DISCUSS] KIP-289: Improve the default group id
> > behavior in KafkaConsumer
> >
> >
> >
> > I agree with Jason's notion that
> > >  implicit use of the empty group.id to commit offsets is more likely
> to
> > be causing users unexpected problems than actually providing a useful
> > capability.
> > I was initially confused that this is the behavior when investigating 
a
> > new-ish JIRA issue <
> > 
https://issues.apache.org/jira/browse/KAFKA-6758

> > > about
> > the same topic.
> > So, +1 to deprecating "" as a group.id
> >
> > The question after that becomes what the *default* value should be -
> > should
> > we:
> > a) treat an unconfigured group.id consumer as a sort of intermittent
> > consumer where you don't store offsets at all (thereby making the user
> > explicitly sign up for them)
> > b) have a default value which makes use of them? I sort of like the
> > former.
> >
> > @Dhruvil, thinking about it at a high-level - yes. I can't think of a
> > situation where it makes sense to name something an empty string as 
far
> as
> > I'm aware - to me it seems like potential for confusion
> >
> >
> > On Fri, Jul 20, 2018 at 10:22 AM Rajini Sivaram 
 >
> > wrote:
> >
> > > +1 to deprecate use of "" as group.id since it is odd to have a
> resource
> > > name that you cannot set ACLs for. Agree, we have to support older
> > clients
> > > though.
> > >
> > > Thanks,
> > >
> > > Rajini
> > >
> > > On Fri, Jul 20, 2018 at 5:25 PM, Jason Gustafson 

> > > wrote:
> > >
> > > > Hi Vahid,
> > > >
> > > > Sorry for getting to this so late. I think there are two things 
here:
> > > >
> > > > 1. The use of "" as a groupId has always been a dubious practice 
at
> > best.
> > > > We definitely ought to deprecate its use in the client. Perhaps in
> the
> > > next
> > > > major release, we can remove support completely. However, since 
older
> > > > clients depend on it, we may have to continue letting the broker
> > support
> > > it
> > > > to some extent. Perhaps we just need to bump the OffsetCommit 
request
> > API
> > > > and only accept the offset commit for older versions. You probably
> > have
> > > to
> > > > do this anyway if you want to introduce the new error code since 
old
> > > > clients will not expect it.
> > > >
> > > > 2. There should be a way for the consumer to indicate that it has 
no
> > > group
> > > > id and will not commit offsets. This is an explicit instruction 
that
> > the
> > > > consumer should not bother with coordinator lookup and such. We
> > currently
> > > > have some brittle logic in place to let users avoid the 
coordinator
> > > lookup,
> > > > but it is a bit error-prone. I was hoping that we could change the
> > > default
> > > > value of group.id to be null so that the user had to take an
> explicit
> > > > action to 

[jira] [Resolved] (KAFKA-7195) StreamStreamJoinIntegrationTest fails in 2.0 Jenkins

2018-07-24 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7195.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

> StreamStreamJoinIntegrationTest fails in 2.0 Jenkins
> 
>
> Key: KAFKA-7195
> URL: https://issues.apache.org/jira/browse/KAFKA-7195
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0
>
>
> From 
> https://builds.apache.org/job/kafka-2.0-jdk8/87/testReport/junit/org.apache.kafka.streams.integration/StreamStreamJoinIntegrationTest/testOuter_caching_enabled___false_/
>  :
> {code}
> java.lang.AssertionError: 
> Expected: is <[A-null]>
>  but: was <[A-a, A-b, A-c, A-d]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.checkResult(AbstractJoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:212)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:184)
>   at 
> org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest.testOuter(StreamStreamJoinIntegrationTest.java:198)
> {code}
> However, some test output was missing:
> {code}
> [2018-07-23 20:51:36,363] INFO Socket c
> ...[truncated 1627692 chars]...
> 671)
> {code}
> I ran the test locally which passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2318) replica manager repeatedly tries to fetch from partitions already moved during controlled shutdown

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2318.
--
Resolution: Auto Closed

Closing inactive issue. Please reopen if the issue still exists in newer 
versions.

> replica manager repeatedly tries to fetch from partitions already moved 
> during controlled shutdown
> --
>
> Key: KAFKA-2318
> URL: https://issues.apache.org/jira/browse/KAFKA-2318
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Jason Rosenberg
>Priority: Major
>
> Using version 0.8.2.1.
> During a controlled shutdown, it seems like the left-hand is often not 
> talking to the right :)
> In this case, we see the ReplicaManager remove a fetcher for a partition, 
> truncate it's log, and then apparently try to fetch data from that partition 
> repeatedly, spamming the log with "failed due to Leader not local for 
> partition" warnings.
> Below is a snippet (in this case it happened for partition 
> '__consumer_offsets,7' and '__consumer_offsets,47').  It went on for quite a 
> bit longer than included here.  The current broker is '99' here.
> {code}
> 2015-07-07 18:54:26,415  INFO [kafka-request-handler-0] 
> server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 99] Removed 
> fetcher for partitions [__consumer_offsets,7]
> 2015-07-07 18:54:26,415  INFO [kafka-request-handler-0] log.Log - Truncating 
> log __consumer_offsets-7 to offset 0.
> 2015-07-07 18:54:26,421  WARN [kafka-request-handler-3] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 6832556 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,429  WARN [kafka-request-handler-4] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345717 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,430  WARN [kafka-request-handler-2] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345718 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,431  WARN [kafka-request-handler-4] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345719 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,432  WARN [kafka-request-handler-5] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345720 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,433  WARN [kafka-request-handler-2] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345721 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,434  WARN [kafka-request-handler-3] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345722 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,436  WARN [kafka-request-handler-1] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345723 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,437  WARN [kafka-request-handler-2] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345724 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,438  WARN [kafka-request-handler-7] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345725 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,438  INFO [kafka-request-handler-6] 
> 

[jira] [Resolved] (KAFKA-3040) Broker didn't report new data after change in leader

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3040.
--
Resolution: Auto Closed

Closing inactive issue. Please reopen if the issue still exists in newer 
versions.

> Broker didn't report new data after change in leader
> 
>
> Key: KAFKA-3040
> URL: https://issues.apache.org/jira/browse/KAFKA-3040
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>  Labels: reliability
>
> Recently we had an event that causes large Kafka backlogs to develop 
> suddenty. This happened across multiple partitions. We noticed that after a 
> brief connection loss to Zookeeper, Kafka brokers were not reporting no new 
> data to our (SimpleConsumer) consumer although the producers were enqueueing 
> fine. This went on until another zk blip led to a reconfiguration which 
> suddenly caused the consumers to "see" the data. Our consumers and our 
> monitoring tools did not see the offsets move during the outage window. Here 
> is the sequence of events for a single partition (with logs attached below). 
> The brokers are running 0.9, the producer is using library version 
> kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java 
> programs). Our monitoring tool uses kafka-python-9.0
> Can you tell us if this could be due to a consumer bug (the libraries being 
> too "old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core 
> issue? Please note that we recently upgraded the brokers to 0.9 and hadn't 
> seen a similar issue prior to that.
> - after a brief connection loss to zookeeper, the partition leader (broker 9 
> for partition 29 in logs below) came back and shrunk the ISR to itself. 
> - producers kept on successfully sending data to Kafka and the remaining 
> replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. 
> Broker 9 did NOT replicate this data. It did repeatedly print the ISR 
> shrinking message over and over again.
> - consumer on the other hand reported no new data presumably because it was 
> talking to 9 and that broker was doing nothing.
> - 6 hours later, another zookeeper blip causes the brokers to reconfigure and 
> now consumers started seeing new data. 
> Broker 9:
> [2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding 
> ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition
> [2015-12-18 00:59:25,511] INFO New leader is 9 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking 
> ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition)
> [2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached 
> zkVersion [472] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset 
> 14169556269. (kafka.log.Log)
> [2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added 
> fetcher for partitions List([[messages,61], initOffset 14178575900 to broker 
> BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], 
> initOffset 14156091271 to broker 
> BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], 
> initOffset 14135826155 to broker 
> BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], 
> initOffset 14157926400 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], 
> initOffset 14169556269 to broker 
> BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], 
> initOffset 14175218230 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
> Broker 3:
> [2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding 
> ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition)
> [2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> Broker 4:
> [2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added 
> fetcher for partitions List([[messages,29], initOffset 14169556262 to broker 
> BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
> 

[jira] [Resolved] (KAFKA-2890) Strange behaviour during partitions reassignment.

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2890.
--
Resolution: Auto Closed

Closing inactive issue. Please reopen if the issue still exists in newer 
versions.

> Strange behaviour during partitions reassignment.
> -
>
> Key: KAFKA-2890
> URL: https://issues.apache.org/jira/browse/KAFKA-2890
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Alexander Kukushkin
>Assignee: Ismael Juma
>Priority: Major
> Attachments: reassign.json, reassign.out
>
>
> Hi.
> I am playing with the new version of kafka (0.9.0.0).
> Initially I've created cluster of 3 nodes, and created some topics there. 
> Later I've added one more node and triggered partitions reassignment. It's 
> kind of working, but on the new node in the log file there are strange 
> warnings:
> [2015-11-25 14:06:52,998] WARN [ReplicaFetcherThread-1-152], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@3f442c7b. Possible cause: 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'responses': Error reading field 'topic': java.nio.BufferUnderflowException 
> (kafka.server.ReplicaFetcherThread)
> I've found similar log messages in the following ticket: 
> https://issues.apache.org/jira/browse/KAFKA-2756
> But there such messages were related to the replication between different 
> versions (0.8 and 0.9).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7201) Optimize repartition operations

2018-07-24 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-7201:
--

 Summary: Optimize repartition operations
 Key: KAFKA-7201
 URL: https://issues.apache.org/jira/browse/KAFKA-7201
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 2.1.0


When the topology has a key changing operation, any downstream processors using 
the key will automatically create a repartition topic.  In most cases these 
multiple repartition topics can be collapsed into one repartition operation 
occurring immediately after the key changing operation, thus reducing streams 
overall footprint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2968) Refactor ReplicaManager to be optimal for o.a.k.c requests

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2968.
--
Resolution: Fixed

Closing this as scala request/responses classes are removed and ReplicaManager 
is updated with new o.a.k.c requests

> Refactor ReplicaManager to be optimal for o.a.k.c requests
> --
>
> Key: KAFKA-2968
> URL: https://issues.apache.org/jira/browse/KAFKA-2968
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Major
>
> Once LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest are 
> minimally migrated to the new o.a.k.c requests. Refactor ReplicaManager to 
> use the new classes more cleanly and use o.a.k.c classes internally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7200) Preserve serdes used

2018-07-24 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-7200:
--

 Summary: Preserve serdes used 
 Key: KAFKA-7200
 URL: https://issues.apache.org/jira/browse/KAFKA-7200
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 2.1.0


We need to preserve the serdes used and pass them either to parent nodes or 
child nodes in the topology graph required for optimization when removing 
repartition topics and adding a new one upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2969) Refactor MetadataCache to be optimal for o.a.k.c requests

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2969.
--
Resolution: Fixed

Closing this as scala request/responses classes are removed and MetadataCache 
updated with new o.a.k.c requests

> Refactor MetadataCache to be optimal for o.a.k.c requests
> -
>
> Key: KAFKA-2969
> URL: https://issues.apache.org/jira/browse/KAFKA-2969
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Major
>
> Once UpdateMetadataRequest and TopicMetadata are minimally migrated to the 
> new o.a.k.c requests. Refactor MetadataCache to use the new classes more 
> cleanly and use o.a.k.c classes internally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-07-24 Thread Patrick Huang
Thanks, Dong!


In terms of the background retention thread reading the last entry of timeindex 
files, there are two scenarios we need to consider:

  1.  Most segments in each log have passed their TTL. In this case, the 
background thread has to read the last entry of the timeindex files for all 
these segments. If we skip sanity checks and skip pre-loading the indexes of 
the segments below the recovery point, this will cause disk spike which then 
increases the processing time of client requests.
  2.  Only a few segments in each log have passed their TTL. In this case, the 
background thread only needs read the last entry of the timeindex files for 
very few segments because it will stop iterating through the segments once it 
sees a segment with "now - segment.largestTimestamp <= config.retentionMs". 
This has little impact on the disk and will not affect client requests even if 
we skip sanity checks because the cost of reading the last entry of timeindex 
files is amortized across time.


Test results showed that:

  *   Rolling bounce time is reduced by 50% in both scenarios if we skip sanity 
checks and skip pre-loading the indexes of the segments below the recovery 
point.
  *   In scenario 1, we saw disk await time increased and didn't go down until 
the retention thread finished iterating through the segments reading the last 
entry of timeindex files. During this time period, there was a 5x increase in 
999th processing time of produce request
  *   In scenario 2, we saw a little increase in disk await time only for a 
very short period of time and client requests were not affected.


Scenairo 2 is the common case because most of the time, the partition bytes in 
rate is steady and after broker starts up, the retention thread only needs to 
do the time-based retention check on the first few segments of each partition. 
Scenario 1 happens only when we have huge bytes in traffic and small segment 
size for a partition, or the broker has stopped for a long time, which are 
rare. Given that there will be little impact for the common scenario, I think 
it is safe to skip the sanity checks for segments below the recovery point to 
speed up broker restart.


Best,
Zhanxiang (Patrick) Huang



From: Dong Lin 
Sent: Saturday, July 21, 2018 3:45
To: dev
Subject: Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive 
segments on broker startup

Here is some information related to the KIP.

Previously we thought we can ignore the sanity check by default to speedup
broker startup and there is no need for extra configuration. However based
on the code and some previous experiment result it is not clear whether it
will work. When LogManager.cleanupLogs() is called by the background
thread, the thread will go over all segments and read the last entry of
timeindex file of each segment to see whether the segment should be deleted
based on time. This may cause spike in the disk usage and reduce broker
startup time.

More test is needed to validate whether this is an issue. If this is an
issue, we have validated that the issue can be solved by using a few extra
configs. More information will be posted later when we get the result.



On Wed, Jun 27, 2018 at 10:59 AM, Dong Lin  wrote:

> Thanks for the reply Jason and Dhruvil.
>
> Yeah we don't need config for the sanity check and thus we don't need a
> KIP. I think we are on the same page of just skipping the sanity check of
> segments before the recovery offset. I will close the KIP and submit a
> patch for this.
>
> On Wed, Jun 27, 2018 at 10:09 AM, Dhruvil Shah 
> wrote:
>
>> +1 to what Jason said. We need a better long-term strategy for dealing
>> with
>> corrupted log and index data, but the sanity checks we have do not
>> guarantee much in this regard.
>>
>> For now, we could do away with these index sanity checks in my opinion. We
>> could handle the missing index case at startup. I think we could have
>> missing index files only when users are upgrading from a version that did
>> not have a particular type of index to a version that does, or if the
>> operator physically deleted these files. Because these are rare scenarios,
>> having to recreate a missing index should typically not affect normal
>> startup time.
>>
>> - Dhruvil
>>
>> On Wed, Jun 27, 2018 at 8:47 AM Jason Gustafson 
>> wrote:
>>
>> > Hey Dong,
>> >
>> >
>> > So the main concern with the above approach is that, if for any reason
>> the
>> > > index files of inactive segment is deleted or corrupted, the broker
>> will
>> > > halt if there is only one log directory. This is different from the
>> > > existing behavior where the broker will rebuild the index for this
>> > inactive
>> > > segment before it can accept any request from consumer. Though we
>> don't
>> > > have provide guarantee for segments already flushed to disk, this
>> still
>> > > seems like a change in behavior for user. Maybe we don't have to worry
>> > > about this if we 

[jira] [Resolved] (KAFKA-2075) Validate that all kafka.api requests has been removed and clean up compatibility code

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2075.
--
Resolution: Fixed

Old scala request/reponse classes are removed as part of KAFKA-2983:

> Validate that all kafka.api requests has been removed and clean up 
> compatibility code
> -
>
> Key: KAFKA-2075
> URL: https://issues.apache.org/jira/browse/KAFKA-2075
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Priority: Major
>
> Once we finished all other subtasks - the old kafka.api requests/responses 
> shouldn't be used anywhere.
> We need to validate that the classes are indeed gone, remove the unittests 
> for serializing/deserializing them and clean up the compatibility code added 
> in KAFKA-2044.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-24 Thread Mayuresh Gharat
Hi Lucas,
I agree, if we want to go forward with a separate controller plane and data
plane and completely isolate them, having a separate port for controller
with a separate Acceptor and a Processor sounds ideal to me.

Thanks,

Mayuresh


On Mon, Jul 23, 2018 at 11:04 PM Becket Qin  wrote:

> Hi Lucas,
>
> Yes, I agree that a dedicated end to end control flow would be ideal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jul 24, 2018 at 1:05 PM, Lucas Wang  wrote:
>
> > Thanks for the comment, Becket.
> > So far, we've been trying to avoid making any request handler thread
> > special.
> > But if we were to follow that path in order to make the two planes more
> > isolated,
> > what do you think about also having a dedicated processor thread,
> > and dedicated port for the controller?
> >
> > Today one processor thread can handle multiple connections, let's say 100
> > connections
> >
> > represented by connection0, ... connection99, among which connection0-98
> > are from clients, while connection99 is from
> >
> > the controller. Further let's say after one selector polling, there are
> > incoming requests on all connections.
> >
> > When the request queue is full, (either the data request being full in
> the
> > two queue design, or
> >
> > the one single queue being full in the deque design), the processor
> thread
> > will be blocked first
> >
> > when trying to enqueue the data request from connection0, then possibly
> > blocked for the data request
> >
> > from connection1, ... etc even though the controller request is ready to
> be
> > enqueued.
> >
> > To solve this problem, it seems we would need to have a separate port
> > dedicated to
> >
> > the controller, a dedicated processor thread, a dedicated controller
> > request queue,
> >
> > and pinning of one request handler thread for controller requests.
> >
> > Thanks,
> > Lucas
> >
> >
> > On Mon, Jul 23, 2018 at 6:00 PM, Becket Qin 
> wrote:
> >
> > > Personally I am not fond of the dequeue approach simply because it is
> > > against the basic idea of isolating the controller plane and data
> plane.
> > > With a single dequeue, theoretically speaking the controller requests
> can
> > > starve the clients requests. I would prefer the approach with a
> separate
> > > controller request queue and a dedicated controller request handler
> > thread.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Jul 24, 2018 at 8:16 AM, Lucas Wang 
> > wrote:
> > >
> > > > Sure, I can summarize the usage of correlation id. But before I do
> > that,
> > > it
> > > > seems
> > > > the same out-of-order processing can also happen to Produce requests
> > sent
> > > > by producers,
> > > > following the same example you described earlier.
> > > > If that's the case, I think this probably deserves a separate doc and
> > > > design independent of this KIP.
> > > >
> > > > Lucas
> > > >
> > > >
> > > >
> > > > On Mon, Jul 23, 2018 at 12:39 PM, Dong Lin 
> > wrote:
> > > >
> > > > > Hey Lucas,
> > > > >
> > > > > Could you update the KIP if you are confident with the approach
> which
> > > > uses
> > > > > correlation id? The idea around correlation id is kind of scattered
> > > > across
> > > > > multiple emails. It will be useful if other reviews can read the
> KIP
> > to
> > > > > understand the latest proposal.
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Mon, Jul 23, 2018 at 12:32 PM, Mayuresh Gharat <
> > > > > gharatmayures...@gmail.com> wrote:
> > > > >
> > > > > > I like the idea of the dequeue implementation by Lucas. This will
> > > help
> > > > us
> > > > > > avoid additional queue for controller and additional configs in
> > > Kafka.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Mayuresh
> > > > > >
> > > > > > On Sun, Jul 22, 2018 at 2:58 AM Becket Qin  >
> > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > The usage of correlation ID might still be useful to address
> the
> > > > cases
> > > > > > > that the controller epoch and leader epoch check are not
> > sufficient
> > > > to
> > > > > > > guarantee correct behavior. For example, if the controller
> sends
> > a
> > > > > > > LeaderAndIsrRequest followed by a StopReplicaRequest, and the
> > > broker
> > > > > > > processes it in the reverse order, the replica may still be
> > wrongly
> > > > > > > recreated, right?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > > > On Jul 22, 2018, at 11:47 AM, Jun Rao 
> > wrote:
> > > > > > > >
> > > > > > > > Hmm, since we already use controller epoch and leader epoch
> for
> > > > > > properly
> > > > > > > > caching the latest partition state, do we really need
> > correlation
> > > > id
> > > > > > for
> > > > > > > > ordering the controller requests?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Fri, Jul 20, 2018 at 2:18 PM, Becket Qin <
> > > 

Re: [Discuss] KIP-321: Add method to get TopicNameExtractor in TopologyDescription

2018-07-24 Thread Guozhang Wang
We should not remove it immediately in the up coming 2.1 release. Usually
we first mark an API as deprecated, and consider removing it only after it
has been deprecated for at least one major release period.


Guozhang

On Mon, Jul 23, 2018 at 7:40 PM, Nishanth Pradeep 
wrote:

> Sounds good to me too.
>
> As far as deprecating goes -- should the topics() method removed completely
> or should it have a @deprecated annotation for removal in some future
> version?
>
> Best,
> Nishanth Pradeep
>
> On Sun, Jul 22, 2018 at 1:32 PM Matthias J. Sax 
> wrote:
>
> > Works for me.
> >
> > On 7/22/18 9:48 AM, Guozhang Wang wrote:
> > > I think I can be convinced with deprecating topics() to keep API
> minimal.
> > >
> > > About renaming the others with `XXNames()`: well, to me it feels still
> > not
> > > very worthy since although it is not a big burden, it seems also not a
> > big
> > > "return" if we name the newly added function `topicSet()`.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Jul 20, 2018 at 7:38 PM, Nishanth Pradeep <
> nishanth...@gmail.com
> > >
> > > wrote:
> > >
> > >> I definitely agree with you on deprecating topics().
> > >>
> > >> I also think changing the method names for consistency is reasonable,
> > since
> > >> there is no functionality change. Although, I can be convinced either
> > way
> > >> on this one.
> > >>
> > >> Best,
> > >> Nishanth Pradeep
> > >> On Fri, Jul 20, 2018 at 12:15 PM Matthias J. Sax <
> matth...@confluent.io
> > >
> > >> wrote:
> > >>
> > >>> I would still deprecate existing `topics()` method. If users need a
> > >>> String, they can call `topicSet().toString()`.
> > >>>
> > >>> It's just a personal preference, because I believe it's good to keep
> > the
> > >>> API "minimal".
> > >>>
> > >>> About renaming the other methods: I thinks it's a very small burden
> to
> > >>> deprecate the existing methods and add them with new names. Also just
> > my
> > >>> 2 cents.
> > >>>
> > >>> Would be good to see what others think.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>> On 7/19/18 6:20 PM, Nishanth Pradeep wrote:
> >  Understood, Guozhang.
> > 
> >  Thanks for the help, everyone! I have updated the KIP. Let me know
> if
> > >> you
> >  any other thoughts or suggestions.
> > 
> >  Best,
> >  Nishanth Pradeep
> > 
> >  On Thu, Jul 19, 2018 at 7:33 PM Guozhang Wang 
> > >>> wrote:
> > 
> > > I see.
> > >
> > > Well, I think if we add a new function like topicSet() it is less
> > >>> needed to
> > > deprecate topics() as it returns "{topic1, topic2, ..}" which is
> sort
> > >> of
> > > non-overlapping in usage with the new API.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Jul 19, 2018 at 5:31 PM, Nishanth Pradeep <
> > >>> nishanth...@gmail.com>
> > > wrote:
> > >
> > >> That is what I meant. I will add topicSet() instead of changing
> the
> > >> signature of topics() for compatibility reasons. But should we not
> > >> add
> > >>> a
> > >> @deprecated flag for topics() or do you want to keep it around for
> > >> the
> > > long
> > >> run?
> > >>
> > >> On Thu, Jul 19, 2018 at 7:27 PM Guozhang Wang  >
> > > wrote:
> > >>
> > >>> We cannot change the signature of the function named "topics"
> from
> > >> "String"
> > >>> to "Set", as Matthias mentioned it is a compatibility
> > >> breaking
> > >>> change.
> > >>>
> > >>> That's why I was proposing add a new function like "Set
> > >>> topicSet()", while keeping "String topics()" as is.
> > >>>
> > >>> Guozhang
> > >>>
> > >>> On Thu, Jul 19, 2018 at 5:22 PM, Nishanth Pradeep <
> > > nishanth...@gmail.com
> > >>>
> > >>> wrote:
> > >>>
> >  Right, adding topicNames() instead of changing the return type
> of
> > >>> topics()
> >  in order preserve backwards compatibility is a good idea. But is
> > it
> > > not
> >  better to depreciate topics() because it would be redundant? In
> > our
> > >> case,
> >  it would only be calling topicNames/topicSet#toString().
> > 
> >  I still agree that perhaps changing the other API's might be
> > >> unnecessary
> >  since it's only a name change.
> > 
> >  I have made the change to the KIP to only add, not change,
> > > preexisting
> >  APIs. But where do we stand on deprecating topics()?
> > 
> >  Best,
> >  Nishanth Pradeep
> > 
> >  On Thu, Jul 19, 2018 at 1:44 PM Guozhang Wang <
> wangg...@gmail.com
> > >
> > >>> wrote:
> > 
> > > Personally I'd prefer to keep the deprecation-related changes
> as
> > >> small
> > >>> as
> > > possible unless they are really necessary, and hence I'd prefer
> > to
> > >> just
> >  add
> > >
> > > List topicList()  /* or Set topicSet() */
> > >
> > 

[jira] [Created] (KAFKA-7199) Support BigInteger data type

2018-07-24 Thread Jiri Pechanec (JIRA)
Jiri Pechanec created KAFKA-7199:


 Summary: Support BigInteger data type
 Key: KAFKA-7199
 URL: https://issues.apache.org/jira/browse/KAFKA-7199
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Jiri Pechanec


When a data are sourced from database to the Connect it is possible that 
`unsigned long` or larger integer values need to be represented. Currently only 
{{BigDecimal}} or {{String}} are types that could be used. Unfortunately in 
this case the semantic information about the value being the integer one is 
lost. We thus propose to introduce {{BigInteger}} logical datattype as a 
conterpart to the {{BigDecimal}}.

Would there be an interest in contribution from our side?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[VOTE] 2.0.0 RC3

2018-07-24 Thread Rajini Sivaram
Hello Kafka users, developers and client-developers,


This is the fourth candidate for release of Apache Kafka 2.0.0.


This is a major version release of Apache Kafka. It includes 40 new  KIPs
and

several critical bug fixes. Please see the 2.0.0 release plan for more
details:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80448820


A few notable highlights:

   - Prefixed wildcard ACLs (KIP-290), Fine grained ACLs for CreateTopics
   (KIP-277)
   - SASL/OAUTHBEARER implementation (KIP-255)
   - Improved quota communication and customization of quotas (KIP-219,
   KIP-257)
   - Efficient memory usage for down conversion (KIP-283)
   - Fix log divergence between leader and follower during fast leader
   failover (KIP-279)
   - Drop support for Java 7 and remove deprecated code including old scala
   clients
   - Connect REST extension plugin, support for externalizing secrets and
   improved error handling (KIP-285, KIP-297, KIP-298 etc.)
   - Scala API for Kafka Streams and other Streams API improvements
   (KIP-270, KIP-150, KIP-245, KIP-251 etc.)


Release notes for the 2.0.0 release:

http://home.apache.org/~rsivaram/kafka-2.0.0-rc3/RELEASE_NOTES.html


*** Please download, test and vote by Friday July 27, 4pm PT.


Kafka's KEYS file containing PGP keys we use to sign the release:

http://kafka.apache.org/KEYS


* Release artifacts to be voted upon (source and binary):

http://home.apache.org/~rsivaram/kafka-2.0.0-rc3/


* Maven artifacts to be voted upon:

https://repository.apache.org/content/groups/staging/


* Javadoc:

http://home.apache.org/~rsivaram/kafka-2.0.0-rc3/javadoc/


* Tag to be voted upon (off 2.0 branch) is the 2.0.0 tag:

https://github.com/apache/kafka/releases/tag/2.0.0-rc3

* Documentation:

http://kafka.apache.org/20/documentation.html


* Protocol:

http://kafka.apache.org/20/protocol.html


* Successful Jenkins builds for the 2.0 branch:

Unit/integration tests: https://builds.apache.org/job/kafka-2.0-jdk8/90/

System tests: https://jenkins.confluent.io/job/system-test-kafka/job/2.0/41/


/**


Thanks,



Rajini


Re: [DISCUSS] KIP-342 Add Customizable SASL extensions to OAuthBearer authentication

2018-07-24 Thread Stanislav Kozlovski
Hey everybody,

I have updated the KIP to reflect the latest changes as best as I could. If
there aren't more suggestions, I intent to start the [VOTE] thread tomorrow.

Best,
Stanislav

On Tue, Jul 24, 2018 at 6:34 AM Ron Dagostino  wrote:

> Hi Stanislav.  Could you update the KIP to reflect the latest definition of
> SaslExtensions and confirm or correct the impact it has to the
> SCRAM-related classes?  I'm not sure if the currently-described impact is
> still accurate.  Also, could you mention the changes to
> OAuthBearerUnsecuredLoginCallbackHandler in the text in addition to giving
> the examples?  The examples show the new
> unsecuredLoginExtension_ feature, but that feature is not
> described anywhere prior to it appearing there.
>
> Ron
>
> On Mon, Jul 23, 2018 at 1:42 PM Ron Dagostino  wrote:
>
> > Hi Rajini.  I think a class is fine as long as we make sure the semantics
> > of immutability are clear -- it would have to be a value class, and any
> > constructor that accepts a Map as input would have to copy that Map
> rather
> > than store it in a member variable.  Similarly, any Map that it might
> > return would have to be unmodifiable.
> >
> > Ron
> >
> > On Mon, Jul 23, 2018 at 12:24 PM Rajini Sivaram  >
> > wrote:
> >
> >> Hi Ron, Stanislav,
> >>
> >> I agree with Stanislav that it would be better to leave `SaslExtensions`
> >> as
> >> a class rather than make it an interface. We don''t really expect users
> to
> >> extends this class, so it is convenient to have an implementation since
> >> users need to create an instance. The class provided by the public API
> >> should be sufficient in the vast majority of the cases. Ron, do you
> agree?
> >>
> >> On Mon, Jul 23, 2018 at 11:35 AM, Ron Dagostino 
> >> wrote:
> >>
> >> > Hi Stanislav.  See https://tools.ietf.org/html/rfc7628#section-3.1,
> and
> >> > that section refers to the core ABNF productions defined in
> >> > https://tools.ietf.org/html/rfc5234#appendix-B.
> >> >
> >> > Ron
> >> >
> >> > > On Jul 23, 2018, at 1:30 AM, Stanislav Kozlovski <
> >> stanis...@confluent.io>
> >> > wrote:
> >> > >
> >> > > Hey Ron and Rajini,
> >> > >
> >> > > Here are my thoughts:
> >> > > Regarding separators in SaslExtensions - Agreed, that was a bad
> move.
> >> > > Should definitely not be a concern of CallbackHandler and
> LoginModule
> >> > > implementors.
> >> > > SaslExtensions interface - Wouldn't implementing it as an interface
> >> mean
> >> > > that users will have to make sure they're passing in an unmodifiable
> >> map
> >> > > themselves. I believe it would be better if we enforced that through
> >> > class
> >> > > constructors instead.
> >> > > SaslExtensions#map() - I'd also prefer this. The reason I went with
> >> > > `extensionValue` and `extensionNames` was because I figured it made
> >> sense
> >> > > to have `ScramExtensions` extend `SaslExtensions` and therefore have
> >> > their
> >> > > API be similar. In the end, do you think that it is worth it to have
> >> > > `ScramExtensions` extend `SaslExtensions`?
> >> > > @Ron, could you point me to the SASL OAuth mechanism specific
> regular
> >> > > expressions for keys/values you mentioned are in RFC 7628 (
> >> > > https://tools.ietf.org/html/rfc7628) ? I could not find any while
> >> > > originally implementing this.
> >> > >
> >> > > Best,
> >> > > Stanislav
> >> > >
> >> > >> On Sun, Jul 22, 2018 at 6:46 PM Ron Dagostino 
> >> > wrote:
> >> > >>
> >> > >> Hi again, Rajini and Stanislav.  I wonder if making SaslExtensions
> an
> >> > >> interface rather than a class might be a good solution.  For
> example:
> >> > >>
> >> > >> public interface SaslExtensions {
> >> > >>   /**
> >> > >>* @return an immutable map view of the SASL extensions
> >> > >>*/
> >> > >>Map map();
> >> > >> }
> >> > >>
> >> > >> This solves the issue of lack of clarity on immutability, and it
> also
> >> > >> eliminates copying, like this:
> >> > >>
> >> > >> SaslExtensions myMethod() {
> >> > >>Map myRetval =
> getUnmodifiableSaslExtensionsMap();
> >> > >>return new SaslExtensions() {
> >> > >>public Map map() {
> >> > >>return myRetval;
> >> > >>}
> >> > >>}
> >> > >> }
> >> > >>
> >> > >> Alternatively, we could do it like this:
> >> > >>
> >> > >> /**
> >> > >> * Supplier that returns immutable map view of SASL Extensions
> >> > >> */
> >> > >> public interface SaslExtensions extends Supplier >> String>> {
> >> > >>// empty
> >> > >> }
> >> > >>
> >> > >> The we could simply return the instance like this, again without
> >> > copying:
> >> > >>
> >> > >> SaslExtensions myMethod() {
> >> > >>Map myRetval =
> getUnmodifiableSaslExtensionsMap();
> >> > >>return () -> myRetval;
> >> > >> }
> >> > >>
> >> > >> I think the main reason for making SaslExtensions part of the
> public
> >> > >> interface is to avoid adding a Map to the Subject's public
> >> credentials.
> >> > >> Making SaslExtensions an interface meets that requirement and then
> >> 

[jira] [Created] (KAFKA-7198) Enhance KafkaStreams start method javadoc

2018-07-24 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-7198:
--

 Summary: Enhance KafkaStreams start method javadoc
 Key: KAFKA-7198
 URL: https://issues.apache.org/jira/browse/KAFKA-7198
 Project: Kafka
  Issue Type: Improvement
Reporter: Bill Bejeck
 Fix For: 2.1.0


The {{KafkaStreams.start}} method javadoc states that once called the streams 
threads are started in the background hence the method does not block.  However 
you have GlobalKTables in your topology, the threads aren't started until the 
GlobalKTables bootstrap fully so the javadoc for the {{start}} method should be 
updated to reflect this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Didn't receive video stream data into spark through kafka

2018-07-24 Thread Biswajit Ghosh
Yes, I have double check that.

On 24 July 2018 at 19:20, Aman Rastogi  wrote:

> Is your topic same in both the case?
>
> On Tue, 24 Jul 2018, 19:15 Biswajit Ghosh, 
> wrote:
>
> > Hi team,
> >
> > I got an issue while integrating with the spark streaming using pyspark,
> I
> > did receive the video stream data in a different consumer subscribe to
> the
> > same topic.
> >
> > Works fine with this command : *./kafka-console-consumer.sh
> > --bootstrap-server :9092 --topic spark-streaming-consumer
> > --from-beginning*
> >
> > But not with this :
> >
> > ​
> > >
> > def processRecord(record):
> >
> > print(record)
> >
> >
> > > ​sc = SparkContext(master="local[2]",appName="HNStreaming")
> >
> > ​sc.setLogLevel('DEBUG')
> >
> > ssc = StreamingContext(sc, 2)
> >
> > topic = "spark-stream-message"
> >
> > kvs = KafkaUtils.createDirectStream(ssc, [topic],
> {'metadata.broker.list':
> > > brokers})
> >
> > kvs.foreachRDD(self.processRecord)
> >
> > ssc.start()
> >
> > ssc.awaitTermination()
> >
> > ​
> > >
> > >
> >
> > Expecting help from your side asap.
> >
> > Thank you!
> >
> >
> > --
> >
> > Regards,
> > biswajitGhosh
> >
>



-- 

Regards,
biswajitGhosh


Re: Didn't receive video stream data into spark through kafka

2018-07-24 Thread Aman Rastogi
Is your topic same in both the case?

On Tue, 24 Jul 2018, 19:15 Biswajit Ghosh, 
wrote:

> Hi team,
>
> I got an issue while integrating with the spark streaming using pyspark, I
> did receive the video stream data in a different consumer subscribe to the
> same topic.
>
> Works fine with this command : *./kafka-console-consumer.sh
> --bootstrap-server :9092 --topic spark-streaming-consumer
> --from-beginning*
>
> But not with this :
>
> ​
> >
> def processRecord(record):
>
> print(record)
>
>
> > ​sc = SparkContext(master="local[2]",appName="HNStreaming")
>
> ​sc.setLogLevel('DEBUG')
>
> ssc = StreamingContext(sc, 2)
>
> topic = "spark-stream-message"
>
> kvs = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list':
> > brokers})
>
> kvs.foreachRDD(self.processRecord)
>
> ssc.start()
>
> ssc.awaitTermination()
>
> ​
> >
> >
>
> Expecting help from your side asap.
>
> Thank you!
>
>
> --
>
> Regards,
> biswajitGhosh
>


Didn't receive video stream data into spark through kafka

2018-07-24 Thread Biswajit Ghosh
Hi team,

I got an issue while integrating with the spark streaming using pyspark, I
did receive the video stream data in a different consumer subscribe to the
same topic.

Works fine with this command : *./kafka-console-consumer.sh
--bootstrap-server :9092 --topic spark-streaming-consumer
--from-beginning*

But not with this :

​
>
def processRecord(record):

print(record)


> ​sc = SparkContext(master="local[2]",appName="HNStreaming")

​sc.setLogLevel('DEBUG')

ssc = StreamingContext(sc, 2)

topic = "spark-stream-message"

kvs = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list':
> brokers})

kvs.foreachRDD(self.processRecord)

ssc.start()

ssc.awaitTermination()

​
>
>

Expecting help from your side asap.

Thank you!


-- 

Regards,
biswajitGhosh


Re: [DISCUSS] KIP-342 Add Customizable SASL extensions to OAuthBearer authentication

2018-07-24 Thread Ron Dagostino
Hi Stanislav.  Could you update the KIP to reflect the latest definition of
SaslExtensions and confirm or correct the impact it has to the
SCRAM-related classes?  I'm not sure if the currently-described impact is
still accurate.  Also, could you mention the changes to
OAuthBearerUnsecuredLoginCallbackHandler in the text in addition to giving
the examples?  The examples show the new
unsecuredLoginExtension_ feature, but that feature is not
described anywhere prior to it appearing there.

Ron

On Mon, Jul 23, 2018 at 1:42 PM Ron Dagostino  wrote:

> Hi Rajini.  I think a class is fine as long as we make sure the semantics
> of immutability are clear -- it would have to be a value class, and any
> constructor that accepts a Map as input would have to copy that Map rather
> than store it in a member variable.  Similarly, any Map that it might
> return would have to be unmodifiable.
>
> Ron
>
> On Mon, Jul 23, 2018 at 12:24 PM Rajini Sivaram 
> wrote:
>
>> Hi Ron, Stanislav,
>>
>> I agree with Stanislav that it would be better to leave `SaslExtensions`
>> as
>> a class rather than make it an interface. We don''t really expect users to
>> extends this class, so it is convenient to have an implementation since
>> users need to create an instance. The class provided by the public API
>> should be sufficient in the vast majority of the cases. Ron, do you agree?
>>
>> On Mon, Jul 23, 2018 at 11:35 AM, Ron Dagostino 
>> wrote:
>>
>> > Hi Stanislav.  See https://tools.ietf.org/html/rfc7628#section-3.1, and
>> > that section refers to the core ABNF productions defined in
>> > https://tools.ietf.org/html/rfc5234#appendix-B.
>> >
>> > Ron
>> >
>> > > On Jul 23, 2018, at 1:30 AM, Stanislav Kozlovski <
>> stanis...@confluent.io>
>> > wrote:
>> > >
>> > > Hey Ron and Rajini,
>> > >
>> > > Here are my thoughts:
>> > > Regarding separators in SaslExtensions - Agreed, that was a bad move.
>> > > Should definitely not be a concern of CallbackHandler and LoginModule
>> > > implementors.
>> > > SaslExtensions interface - Wouldn't implementing it as an interface
>> mean
>> > > that users will have to make sure they're passing in an unmodifiable
>> map
>> > > themselves. I believe it would be better if we enforced that through
>> > class
>> > > constructors instead.
>> > > SaslExtensions#map() - I'd also prefer this. The reason I went with
>> > > `extensionValue` and `extensionNames` was because I figured it made
>> sense
>> > > to have `ScramExtensions` extend `SaslExtensions` and therefore have
>> > their
>> > > API be similar. In the end, do you think that it is worth it to have
>> > > `ScramExtensions` extend `SaslExtensions`?
>> > > @Ron, could you point me to the SASL OAuth mechanism specific regular
>> > > expressions for keys/values you mentioned are in RFC 7628 (
>> > > https://tools.ietf.org/html/rfc7628) ? I could not find any while
>> > > originally implementing this.
>> > >
>> > > Best,
>> > > Stanislav
>> > >
>> > >> On Sun, Jul 22, 2018 at 6:46 PM Ron Dagostino 
>> > wrote:
>> > >>
>> > >> Hi again, Rajini and Stanislav.  I wonder if making SaslExtensions an
>> > >> interface rather than a class might be a good solution.  For example:
>> > >>
>> > >> public interface SaslExtensions {
>> > >>   /**
>> > >>* @return an immutable map view of the SASL extensions
>> > >>*/
>> > >>Map map();
>> > >> }
>> > >>
>> > >> This solves the issue of lack of clarity on immutability, and it also
>> > >> eliminates copying, like this:
>> > >>
>> > >> SaslExtensions myMethod() {
>> > >>Map myRetval = getUnmodifiableSaslExtensionsMap();
>> > >>return new SaslExtensions() {
>> > >>public Map map() {
>> > >>return myRetval;
>> > >>}
>> > >>}
>> > >> }
>> > >>
>> > >> Alternatively, we could do it like this:
>> > >>
>> > >> /**
>> > >> * Supplier that returns immutable map view of SASL Extensions
>> > >> */
>> > >> public interface SaslExtensions extends Supplier> String>> {
>> > >>// empty
>> > >> }
>> > >>
>> > >> The we could simply return the instance like this, again without
>> > copying:
>> > >>
>> > >> SaslExtensions myMethod() {
>> > >>Map myRetval = getUnmodifiableSaslExtensionsMap();
>> > >>return () -> myRetval;
>> > >> }
>> > >>
>> > >> I think the main reason for making SaslExtensions part of the public
>> > >> interface is to avoid adding a Map to the Subject's public
>> credentials.
>> > >> Making SaslExtensions an interface meets that requirement and then
>> > allows
>> > >> us to be free to implement whatever we want internally.
>> > >>
>> > >> Thoughts?
>> > >>
>> > >> Ron
>> > >>
>> > >>> On Sun, Jul 22, 2018 at 12:45 PM Ron Dagostino 
>> > wrote:
>> > >>>
>> > >>> Hi Rajini.  The SaslServer is going to have to validate the
>> extensions,
>> > >>> too, but I’m okay with keeping the validation logic elsewhere as
>> long
>> > as
>> > >> it
>> > >>> can be reused in both the client and the secret.
>> > >>>
>> > >>> I strongly prefer exposing a map() method as 

[jira] [Created] (KAFKA-7197) Release a milestone build for Scala 2.13.0 M3

2018-07-24 Thread JIRA
Martynas Mickevičius created KAFKA-7197:
---

 Summary: Release a milestone build for Scala 2.13.0 M3
 Key: KAFKA-7197
 URL: https://issues.apache.org/jira/browse/KAFKA-7197
 Project: Kafka
  Issue Type: Improvement
Reporter: Martynas Mickevičius


Releasing a milestone version for Scala 2.13.0-M3 (and maybe even for 
2.13.0-M4, which has new collections) would be helpful to kickstart Kafka 
ecosystem adoption for 2.13.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-346 - Limit blast radius of log compaction failure

2018-07-24 Thread Ted Yu
As James pointed out in his reply, topic-partition name can be long.
It is not necessary to repeat the topic name for each of its partitions.
How about the following format:

topic-name1-{partition1, partition2, etc}

That is, topic name only appears once.

Cheers

On Mon, Jul 23, 2018 at 9:08 PM Stanislav Kozlovski 
wrote:

> Hi Ted,
>
> Yes, absolutely. Thanks for pointing that out!
>
> On Mon, Jul 23, 2018 at 6:12 PM Ted Yu  wrote:
>
> > For `uncleanable-partitions`, should the example include topic name(s) ?
> >
> > Cheers
> >
> > On Mon, Jul 23, 2018 at 5:46 PM Stanislav Kozlovski <
> > stanis...@confluent.io>
> > wrote:
> >
> > > I renamed the KIP and that changed the link. Sorry about that. Here is
> > the
> > > new link:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error
> > >
> > > On Mon, Jul 23, 2018 at 5:11 PM Stanislav Kozlovski <
> > > stanis...@confluent.io>
> > > wrote:
> > >
> > > > Hey group,
> > > >
> > > > I created a new KIP about making log compaction more fault-tolerant.
> > > > Please give it a look here and please share what you think,
> especially
> > in
> > > > regards to the points in the "Needs Discussion" paragraph.
> > > >
> > > > KIP: KIP-346
> > > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Limit+blast+radius+of+log+compaction+failure
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Stanislav
> > >
> >
>
>
> --
> Best,
> Stanislav
>


Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-07-24 Thread Adam Bellemare
Let me kick this off with a few starting points that I would like to
generate some discussion on.

1) It seems to me that I will need to repartition the data twice - once on
the foreign key, and once back to the primary key. Is there anything I am
missing here?

2) I believe I will also need to materialize 3 state stores: the prefixScan
SS, the highwater mark SS (for out-of-order resolution) and the final state
store, due to the workflow I have laid out. I have not thought of a better
way yet, but would appreciate any input on this matter. I have gone back
through the mailing list for the previous discussions on this KIP, and I
did not see anything relating to resolving out-of-order compute. I cannot
see a way around the current three-SS structure that I have.

3) Caching is disabled on the prefixScan SS, as I do not know how to
resolve the iterator obtained from rocksDB with that of the cache. In
addition, I must ensure everything is flushed before scanning. Since the
materialized prefixScan SS is under "control" of the function, I do not
anticipate this to be a problem. Performance throughput will need to be
tested, but as Jan observed in his initial overview of this issue, it is
generally a surge of output events which affect performance moreso than the
flush or prefixScan itself.

Thoughts on any of these are greatly appreciated, since these elements are
really the cornerstone of the whole design. I can put up the code I have
written against 1.0.2 if we so desire, but first I was hoping to just
tackle some of the fundamental design proposals.

Thanks,
Adam



On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare 
wrote:

> Here is the new discussion thread for KIP-213. I picked back up on the KIP
> as this is something that we too at Flipp are now running in production.
> Jan started this last year, and I know that Trivago is also using something
> similar in production, at least in terms of APIs and functionality.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 213+Support+non-key+joining+in+KTable
>
> I do have an implementation of the code for Kafka 1.0.2 (our local
> production version) but I won't post it yet as I would like to focus on the
> workflow and design first. That being said, I also need to add some clearer
> integration tests (I did a lot of testing using a non-Kafka Streams
> framework) and clean up the code a bit more before putting it in a PR
> against trunk (I can do so later this week likely).
>
> Please take a look,
>
> Thanks
>
> Adam Bellemare
>
>


Build failed in Jenkins: kafka-trunk-jdk8 #2837

2018-07-24 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: Remove unused TopicAndPartition usage in tests (#5419)

[rajinisivaram] KAFKA-7194; Fix buffer underflow if onJoinComplete is retried 
after

--
[...truncated 903.19 KB...]
kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldAddEpochAndMessageOffsetToCache STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldAddEpochAndMessageOffsetToCache PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryTailIfUndefinedPassed STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryTailIfUndefinedPassed PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnUnsupportedIfNoEpochRecorded STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnUnsupportedIfNoEpochRecorded PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliest STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliest PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPersistEpochsBetweenInstances STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPersistEpochsBetweenInstances PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToFirstOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToFirstOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 

Jenkins build is back to normal : kafka-2.0-jdk8 #90

2018-07-24 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk10 #311

2018-07-24 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-7194) Error deserializing assignment after rebalance

2018-07-24 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7194.
---
   Resolution: Fixed
 Reviewer: Konstantine Karantasis
Fix Version/s: 2.0.0

> Error deserializing assignment after rebalance
> --
>
> Key: KAFKA-7194
> URL: https://issues.apache.org/jira/browse/KAFKA-7194
> Project: Kafka
>  Issue Type: Bug
>Reporter: Konstantine Karantasis
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.0
>
>
> A simple sink connector task is failing in a test with the following 
> exception: 
> {noformat}
> [2018-07-02 12:31:13,200] ERROR WorkerSinkTask{id=verifiable-sink-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException
>         at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:243)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:353)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:338)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748){noformat}
>  
> After dumping the consumer offsets on the partition that this consumer group 
> is writing with: 
> {noformat}
> bin/kafka-dump-log.sh --offsets-decoder --files ./.log 
> {noformat}
> we get: 
> {noformat}
> Dumping ./.log
> Starting offset: 0
> offset: 0 position: 0 CreateTime: 1530534673177 isvalid: true keysize: 27 
> valuesize: 217 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
> sequence: -1 isTransactional: false headerKeys: [] key: 
> {"metadata":"connect-verifiable-sink"} payload: 
> {"protocolType":"consumer","protocol":"range","generationId":1,"assignment":"{consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4=[test-0]}"}
> offset: 1 position: 314 CreateTime: 1530534673206 isvalid: true keysize: 27 
> valuesize: 32 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
> sequence: -1 isTransactional: false headerKeys: [] key: 
> {"metadata":"connect-verifiable-sink"} payload: 
> {"protocolType":"consumer","protocol":null,"generationId":2,"assignment":"{}"}{noformat}
>  
> Since the broker seems to send a non-empty response to the consumer, there's 
> a chance that the response buffer is consumed more than once at some point 
> when parsing the response in the client. 
> Here's what the kafka-request.log shows it sends to the client with the 
> `SYNC_GROUP` response that throws the error: 
> {noformat}
> [2018-07-02 12:31:13,185] DEBUG Completed 
> request:RequestHeader(apiKey=SYNC_GROUP, apiVersion=2, clientId=consumer-4, 
> correlationId=5) -- 
> {group_id=connect-verifiable-sink,generation_id=1,member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,group_assignment=[{member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,member_assignment=java.nio.HeapByteBuffer[pos=0
> 

[jira] [Resolved] (KAFKA-2963) Replace server internal usage of TopicAndPartition with TopicPartition

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2963.
--
Resolution: Fixed

This has been fixed in various cleanups

> Replace server internal usage of TopicAndPartition with TopicPartition
> --
>
> Key: KAFKA-2963
> URL: https://issues.apache.org/jira/browse/KAFKA-2963
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Jakub Nowak
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #2836

2018-07-24 Thread Apache Jenkins Server
See 

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H32 (ubuntu xenial) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from 
https://github.com/apache/kafka.git
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:888)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1155)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1186)
at hudson.scm.SCM.checkout(SCM.java:504)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1208)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1794)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
Caused by: hudson.plugins.git.GitException: Command "git fetch --tags 
--progress https://github.com/apache/kafka.git 
+refs/heads/*:refs/remotes/origin/*" returned status code 128:
stdout: 
stderr: error: Could not read 0f3affc0f40751dc8fd064b36b6e859728f63e37
error: Could not read 95fbb2e03f4fe79737c71632e0ef2dfdcfb85a69
error: Could not read 08c465028d057ac23cdfe6d57641fe40240359dd
remote: Counting objects: 5785, done.
remote: Compressing objects:   7% (1/13)   remote: Compressing objects: 
 15% (2/13)   remote: Compressing objects:  23% (3/13)   
remote: Compressing objects:  30% (4/13)   remote: Compressing objects: 
 38% (5/13)   remote: Compressing objects:  46% (6/13)   
remote: Compressing objects:  53% (7/13)   remote: Compressing objects: 
 61% (8/13)   remote: Compressing objects:  69% (9/13)   
remote: Compressing objects:  76% (10/13)   remote: Compressing 
objects:  84% (11/13)   remote: Compressing objects:  92% (12/13)   
remote: Compressing objects: 100% (13/13)   remote: Compressing 
objects: 100% (13/13), done.
Receiving objects:   0% (1/5785)   Receiving objects:   1% (58/5785)   
Receiving objects:   2% (116/5785)   Receiving objects:   3% (174/5785)   
Receiving objects:   4% (232/5785)   Receiving objects:   5% (290/5785)   
Receiving objects:   6% (348/5785)   Receiving objects:   7% (405/5785)   
Receiving objects:   8% (463/5785)   Receiving objects:   9% (521/5785)   
Receiving objects:  10% (579/5785)   Receiving objects:  11% (637/5785)   
Receiving objects:  12% (695/5785)   Receiving objects:  13% (753/5785)   
Receiving objects:  14% (810/5785)   Receiving objects:  15% (868/5785)   
Receiving objects:  16% (926/5785)   Receiving objects:  17% (984/5785)   
Receiving objects:  18% (1042/5785)   Receiving objects:  19% (1100/5785)   
Receiving objects:  20% (1157/5785)   Receiving objects:  21% (1215/5785)   
Receiving objects:  22% (1273/5785)   Receiving objects:  23% (1331/5785)   
Receiving objects:  24% (1389/5785)   Receiving objects:  25% (1447/5785)   
Receiving objects:  26% (1505/5785)   Receiving objects:  27% (1562/5785)   
Receiving objects:  28% (1620/5785)   Receiving objects:  29% (1678/5785)   
Receiving objects:  30% (1736/5785)   Receiving objects:  31% (1794/5785)   
Receiving objects:  32% (1852/5785)   Receiving objects:  33% (1910/5785)   
Receiving objects:  34% (1967/5785)   Receiving objects:  35% (2025/5785)   
Receiving objects:  36% (2083/5785)   Receiving objects:  37% (2141/5785)   
Receiving objects:  38% (2199/5785)   Receiving objects:  39% (2257/5785)   
Receiving objects:  40% (2314/5785)   Receiving objects:  41% (2372/5785)   
Receiving objects:  42% (2430/5785)   Receiving objects:  43% (2488/5785)   
Receiving objects:  44% (2546/5785)   Receiving objects:  45% (2604/5785)   
Receiving objects:  46% (2662/5785)   Receiving objects:  47% (2719/5785)   
Receiving objects:  48% (2777/5785)   Receiving objects:  49% (2835/5785)   
Receiving objects:  50% (2893/5785)   Receiving objects:  51% (2951/5785)   
Receiving objects:  52% (3009/5785)   Receiving objects:  53% (3067/5785)   
Receiving objects:  54% (3124/5785)   Receiving objects:  55% (3182/5785)   
Receiving objects:  56% 

Re: [DISCUSS] KIP-346 - Limit blast radius of log compaction failure

2018-07-24 Thread James Cheng
Hi Stanislav! Thanks for this KIP!

I agree that it would be good if the LogCleaner were more tolerant of errors. 
Currently, as you said, once it dies, it stays dead. 

Things are better now than they used to be. We have the metric
kafka.log:type=LogCleanerManager,name=time-since-last-run-ms
which we can use to tell us if the threads are dead. And as of 1.1.0, we have 
KIP-226, which allows you to restart the log cleaner thread, without requiring 
a broker restart. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
 

 I've only read about this, I haven't personally tried it.

Some comments:
* I like the idea of having the log cleaner continue to clean as many 
partitions as it can, skipping over the problematic ones if possible.

* If the log cleaner thread dies, I think it should automatically be revived. 
Your KIP attempts to do that by catching exceptions during execution, but I 
think we should go all the way and make sure that a new one gets created, if 
the thread ever dies.

* It might be worth trying to re-clean the uncleanable partitions. I've seen 
cases where an uncleanable partition later became cleanable. I unfortunately 
don't remember how that happened, but I remember being surprised when I 
discovered it. It might have been something like a follower was uncleanable but 
after a leader election happened, the log truncated and it was then cleanable 
again. I'm not sure.

* For your metrics, can you spell out the full metric in JMX-style format, such 
as:
kafka.log:type=LogCleanerManager,name=uncleanable-partitions-count
value=4

* For "uncleanable-partitions": topic-partition names can be very long. I think 
the current max size is 210 characters (or maybe 240-ish?). Having the 
"uncleanable-partitions" being a list could be very large metric. Also, having 
the metric come out as a csv might be difficult to work with for monitoring 
systems. If we *did* want the topic names to be accessible, what do you think 
of having the 
kafka.log:type=LogCleanerManager,topic=topic1,partition=2
I'm not sure if LogCleanerManager is the right type, but my example was that 
the topic and partition can be tags in the metric. That will allow monitoring 
systems to more easily slice and dice the metric. I'm not sure what the 
attribute for that metric would be. Maybe something like  "uncleaned bytes" for 
that topic-partition? Or time-since-last-clean? Or maybe even just "Value=1".

* About `max.uncleanable.partitions`, you said that this likely indicates that 
the disk is having problems. I'm not sure that is the case. For the 4 JIRAs 
that you mentioned about log cleaner problems, all of them are partition-level 
scenarios that happened during normal operation. None of them were indicative 
of disk problems.

* About marking disks as offline when exceeding a certain threshold, that 
actually increases the blast radius of log compaction failures. Currently, the 
uncleaned partitions are still readable and writable. Taking the disks offline 
would impact availability of the uncleanable partitions, as well as impact all 
other partitions that are on the disk.

-James


> On Jul 23, 2018, at 5:46 PM, Stanislav Kozlovski  
> wrote:
> 
> I renamed the KIP and that changed the link. Sorry about that. Here is the
> new link:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error
> 
> On Mon, Jul 23, 2018 at 5:11 PM Stanislav Kozlovski 
> wrote:
> 
>> Hey group,
>> 
>> I created a new KIP about making log compaction more fault-tolerant.
>> Please give it a look here and please share what you think, especially in
>> regards to the points in the "Needs Discussion" paragraph.
>> 
>> KIP: KIP-346
>> 
>> --
>> Best,
>> Stanislav
>> 
> 
> 
> -- 
> Best,
> Stanislav



Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-24 Thread Becket Qin
Hi Lucas,

Yes, I agree that a dedicated end to end control flow would be ideal.

Thanks,

Jiangjie (Becket) Qin

On Tue, Jul 24, 2018 at 1:05 PM, Lucas Wang  wrote:

> Thanks for the comment, Becket.
> So far, we've been trying to avoid making any request handler thread
> special.
> But if we were to follow that path in order to make the two planes more
> isolated,
> what do you think about also having a dedicated processor thread,
> and dedicated port for the controller?
>
> Today one processor thread can handle multiple connections, let's say 100
> connections
>
> represented by connection0, ... connection99, among which connection0-98
> are from clients, while connection99 is from
>
> the controller. Further let's say after one selector polling, there are
> incoming requests on all connections.
>
> When the request queue is full, (either the data request being full in the
> two queue design, or
>
> the one single queue being full in the deque design), the processor thread
> will be blocked first
>
> when trying to enqueue the data request from connection0, then possibly
> blocked for the data request
>
> from connection1, ... etc even though the controller request is ready to be
> enqueued.
>
> To solve this problem, it seems we would need to have a separate port
> dedicated to
>
> the controller, a dedicated processor thread, a dedicated controller
> request queue,
>
> and pinning of one request handler thread for controller requests.
>
> Thanks,
> Lucas
>
>
> On Mon, Jul 23, 2018 at 6:00 PM, Becket Qin  wrote:
>
> > Personally I am not fond of the dequeue approach simply because it is
> > against the basic idea of isolating the controller plane and data plane.
> > With a single dequeue, theoretically speaking the controller requests can
> > starve the clients requests. I would prefer the approach with a separate
> > controller request queue and a dedicated controller request handler
> thread.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Jul 24, 2018 at 8:16 AM, Lucas Wang 
> wrote:
> >
> > > Sure, I can summarize the usage of correlation id. But before I do
> that,
> > it
> > > seems
> > > the same out-of-order processing can also happen to Produce requests
> sent
> > > by producers,
> > > following the same example you described earlier.
> > > If that's the case, I think this probably deserves a separate doc and
> > > design independent of this KIP.
> > >
> > > Lucas
> > >
> > >
> > >
> > > On Mon, Jul 23, 2018 at 12:39 PM, Dong Lin 
> wrote:
> > >
> > > > Hey Lucas,
> > > >
> > > > Could you update the KIP if you are confident with the approach which
> > > uses
> > > > correlation id? The idea around correlation id is kind of scattered
> > > across
> > > > multiple emails. It will be useful if other reviews can read the KIP
> to
> > > > understand the latest proposal.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Mon, Jul 23, 2018 at 12:32 PM, Mayuresh Gharat <
> > > > gharatmayures...@gmail.com> wrote:
> > > >
> > > > > I like the idea of the dequeue implementation by Lucas. This will
> > help
> > > us
> > > > > avoid additional queue for controller and additional configs in
> > Kafka.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > > On Sun, Jul 22, 2018 at 2:58 AM Becket Qin 
> > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > The usage of correlation ID might still be useful to address the
> > > cases
> > > > > > that the controller epoch and leader epoch check are not
> sufficient
> > > to
> > > > > > guarantee correct behavior. For example, if the controller sends
> a
> > > > > > LeaderAndIsrRequest followed by a StopReplicaRequest, and the
> > broker
> > > > > > processes it in the reverse order, the replica may still be
> wrongly
> > > > > > recreated, right?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > > On Jul 22, 2018, at 11:47 AM, Jun Rao 
> wrote:
> > > > > > >
> > > > > > > Hmm, since we already use controller epoch and leader epoch for
> > > > > properly
> > > > > > > caching the latest partition state, do we really need
> correlation
> > > id
> > > > > for
> > > > > > > ordering the controller requests?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Fri, Jul 20, 2018 at 2:18 PM, Becket Qin <
> > becket@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > >> Lucas and Mayuresh,
> > > > > > >>
> > > > > > >> Good idea. The correlation id should work.
> > > > > > >>
> > > > > > >> In the ControllerChannelManager, a request will be resent
> until
> > a
> > > > > > response
> > > > > > >> is received. So if the controller to broker connection
> > disconnects
> > > > > after
> > > > > > >> controller sends R1_a, but before the response of R1_a is
> > > received,
> > > > a
> > > > > > >> disconnection may cause the controller to resend R1_b. i.e.
> > until
> > > R1
> > > > > is
> > > > > > >> acked, R2