Build failed in Jenkins: kafka-trunk-jdk8 #3491

2019-03-23 Thread Apache Jenkins Server
See 


Changes:

[cshapi] MINOR: add MacOS requirement to Streams docs

--
[...truncated 4.68 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 

Re: [VOTE] KIP-349 Priorities for Source Topics

2019-03-23 Thread nathankski



On 2019/01/28 02:26:31, n...@afshartous.com wrote: 
> Hi Sönke,
> 
> Thanks for taking the time to review.  I’ve put KIP-349 into hibernation.  
> 
> Thanks also to everyone who participated in the discussion.
> 
> Best regards,
> --
>   Nick
> 
> > On Jan 25, 2019, at 5:51 AM, Sönke Liebau 
> >  wrote:
> > 
> > a bit late to the party, sorry. I recently spent some time looking
> > into this / a similar issue [1].
> > After some investigation and playing around with settings I think that
> > the benefit that could be gained from this is somewhat limited and
> > probably outweighed by the implementation effort.
> > 
> > The consumer internal are already geared towards treating partitions
> > fairly so that no partition has to wait an undue amount of time and
> > this can be further tuned for latency over throughput. Additionally,
> > if this is a large issue for someone, there is always the option of
> > having a dedicated consumer reading only from the control topic, which
> > would mean that messages from that topic are received "immediately".
> > For a Kafka Streams job it would probably make sense to create two
> > input streams and then merging those as a first step.
> > 
> > I think with these knobs a fairly large amount of flexibility can be
> > achieved so that there is no urgent need to implement priorities.
> > 
> > So my personal preference would be to set this KIP to dormant for now.
> 
> 
> 
> 
> 
> 
Hello Nick,

I'm extremely new to Kafka, but I was attempting to set up a per-topic priority 
application, and ended up finding this thread. I'm having difficulty seeing how 
one can implement it with pause/resume. Would you elaborate?

Since those operations are per-partition, and when you stop a partition, it 
attempts to re-balance, I would need to stop all partitions. Even then, it 
would try to finish the current transactions instead of immediately putting it 
on hold and processing other topics. 

It also looks like in order to determine if I had received messages from the 
pri-1 topic, I would need to loop through all records, and ignore those that 
weren't pri-1 until a poll failed to retrieve any, which seems like it would 
screw up the other topics.

Thank you,

Nathan


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-23 Thread Paul Whalen
Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the stream
so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right way
to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
.addBranch(predicate1, this::handle1)
.addBranch(predicate2, this::handle2)
.defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or something,
which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so the
new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched() or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev 
wrote:

> Hi Bill,
>
> Thank you for your reply!
>
> This is how I usually do it:
>
> void handleFirstCase(KStream ks){
> ks.filter().mapValues(...)
> }
>
>
> void handleSecondCase(KStream ks){
> ks.selectKey(...).groupByKey()...
> }
>
> ..
> new KafkaStreamsBrancher()
>.addBranch(predicate1, this::handleFirstCase)
>.addBranch(predicate2, this::handleSecondCase)
>.onTopOf()
>
> Regards,
>
> Ivan
>
> 22.03.2019 1:34, Bill Bejeck пишет:
> > Hi Ivan,
> >
> > Thanks for the KIP.
> >
> > I have one question, the KafkaStreamsBrancher takes a Consumer as a
> second
> > argument which returns nothing, and the example in the KIP shows each
> > stream from the branch using a terminal node (KafkaStreams#to() in this
> > case).
> >
> > Maybe I've missed something, but how would we handle the case where the
> > user has created a branch but wants to continue processing and not
> > necessarily use a terminal node on the branched stream immediately?
> >
> > For example, using today's logic as is if we had something like this:
> >
> > KStream[] branches = originalStream.branch(predicate1,
> > predicate2);
> > branches[0].filter().mapValues(...)..
> > branches[1].selectKey(...).groupByKey().
> >
> >
> > Thanks!
> > Bill
> >
> >
> >
> > On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:
> >
> >> All,
> >>
> >> I'd like to jump-start the discussion for KIP- 418.
> >>
> >> Here's the original message:
> >>
> >> Hello,
> >>
> >> I'd like to start a discussion about KIP-418. Please take a look at the
> >> KIP if you can, I would appreciate any feedback :)
> >>
> >> KIP-418:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>
> >> JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488
> >>
> >> PR#6164: https://github.com/apache/kafka/pull/6164
> >>
> >> Regards,
> >>
> >> Ivan Ponomarev
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-03-23 Thread Paul Whalen
I'd like to resurrect this discussion with a cursory, proof-of-concept
implementation of the KIP which combines many of our ideas:
https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
small as possible for now, just using it to convey the main ideas.  But
I'll separately address some of our earlier discussion:

   - Will there be a new, separate interface for users to implement for the
   new functionality? No, to hopefully keep things simple, all of the
   Processor/TransformerSupplier interfaces will just extend
   StateStoresSupplier, allowing users to opt in to this functionality by
   overriding the default implementation that gives an empty list.
   - Will the interface allow users to specify the store name, or the
   entire StoreBuilder? The entire StoreBuilder, so the
   Processor/TransformerSupplier can completely encapsulate name and
   implementation of a state store if desired.
   - Will the old way of specifying store names alongside the supplier when
   calling stream.process/transform() be deprecated? No, this is still a
   legitimate way to wire up Processors/Transformers and their stores. But I
   would recommend not allowing stream.process/transform() calls that use both
   store declaration mechanisms (this restriction is not in the proof of
   concept)
   - How will we handle adding the same state store to the topology
   multiple times because different Processor/TransformerSuppliers declare it?
   topology.addStateStore() will be slightly relaxed for convenience, and will
   allow adding the same StoreBuilder multiple times as long as the exact same
   StoreBuilder instance is being added for the same store name.  This seems
   to prevent in practice the issue of accidentally making two state stores
   one by adding with the same name.  For additional safety, if we wanted to
   (not in the proof of concept), we could allow for this relaxation only for
   internal callers of topology.addStateStore().

So, in summary, the use cases look like:

   - 1 transformer/processor that owns its store: Using the new
   StateStoresSupplier interface method to supply its StoreBuilders that will
   be added to the topology automatically.
   - Multiple transformer/processors that share the same store: Either


   1. The old way: the StoreBuilder is defined "far away" from the
   Transformer/Processor implementations, and is added to the topology
   manually by the user
   2. The new way: the StoreBuilder is defined closer to the
   Transformer/Processor implementations, and the same instance is returned by
   all Transformer/ProcessorSuppliers that need it


This makes the KIP wiki a bit stale; I'll update if we want to bring this
design to a vote.

Thanks!
Paul

On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang  wrote:

> Matthias / Paul,
>
> The concern I had about introducing `StoreBuilderSupplier` is simply
> because it is another XXSupplier to the public API, so I'd like to ask if
> we really have to add it :)
>
> The difference between encapsulating the store name and encapsulating the
> full state store builder is that, in the former:
>
> ---
>
> String storeName = "store1";
> builder.addStore(new MyStoreBuilder(storeName));
> stream1.transform(new MyTransformerSupplier(storeName));   // following my
> proposal, that the store name can be passed in and used for both
> `listStores` and in the `Transformer#init`; so the Transformer function
> does not need to get the constant string name again.
>
>  // one caveat to admit, is that
> MyTransofmerSupplier logic may be just unique to `store1` so it cannot be
> reused with a different store name anyways.
> ---
>
> While in the latter:
>
> ---
>
> stream1.transform(new MyTransformerSupplierForStore1);   // the name is
> just indicating that we may have one such supplier for each store.
>
> ---
>
> I understand the latter introduce more convenience from the API, but the
> cost is that since we still cannot completely `builder.addStore`, but only
> reduce its semantic scope to shared state stores only,; hence users need to
> learn two ways of creating state stores for those two patterns.
>
> My argument is that more public APIs requires longer learning curve for
> users, and introduces more usage patterns that may confuse users (the
> proposal I had tries to replace one with another completely).
>
>
> Guozhang
>
> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen  wrote:
>
> > Thanks for the great thoughts Matthias and Guozhang!
> >
> > If I'm not mistaken, Guozhang's suggestion is what my second alternative
> on
> > the KIP is ("Have the added method on the Supplier interfaces only return
> > store names, not builders").  I do think it would be a worthwhile
> usability
> > improvement on its own, but to Matthias's point, it doesn't achieve the
> > full goal of completing encapsulating a state store and it's processor -
> it
> > encapsulates the name, but not the StateStoreBuilder.
> >
> 

[jira] [Created] (KAFKA-8152) Offline partition state not propagated by controller

2019-03-23 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-8152:
--

 Summary: Offline partition state not propagated by controller
 Key: KAFKA-8152
 URL: https://issues.apache.org/jira/browse/KAFKA-8152
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jose Armando Garcia Sancio


Currently when the controller starts up, only the state of online partitions 
will be sent to other brokers. Any broker which is started or restarted after 
the controller will see only a subset of the partitions of any topic which has 
offline partitions. If all the partitions for a topic are offline, then the 
broker will not know of the topic at all. As far as I can tell, the bug is the 
fact that `ReplicaStateMachine.startup` only does an initial state change for 
replicas which are online.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #3490

2019-03-23 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: fix message protocol help text for ElectPreferredLeadersResult

--
[...truncated 2.34 MB...]
kafka.cluster.BrokerEndPointTest > testHashAndEquals PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonV4WithNoRack STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonV4WithNoRack PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonFutureVersion STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonFutureVersion PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonV4WithNullRack STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonV4WithNullRack PASSED

kafka.cluster.BrokerEndPointTest > testBrokerEndpointFromUri STARTED

kafka.cluster.BrokerEndPointTest > testBrokerEndpointFromUri PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonV1 STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonV1 PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonV2 STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonV2 PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonV3 STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonV3 PASSED

kafka.cluster.PartitionTest > 
testMakeLeaderDoesNotUpdateEpochCacheForOldFormats STARTED

kafka.cluster.PartitionTest > 
testMakeLeaderDoesNotUpdateEpochCacheForOldFormats PASSED

kafka.cluster.PartitionTest > testReadRecordEpochValidationForLeader STARTED

kafka.cluster.PartitionTest > testReadRecordEpochValidationForLeader PASSED

kafka.cluster.PartitionTest > 
testFetchOffsetForTimestampEpochValidationForFollower STARTED

kafka.cluster.PartitionTest > 
testFetchOffsetForTimestampEpochValidationForFollower PASSED

kafka.cluster.PartitionTest > testListOffsetIsolationLevels STARTED

kafka.cluster.PartitionTest > testListOffsetIsolationLevels PASSED

kafka.cluster.PartitionTest > testAppendRecordsAsFollowerBelowLogStartOffset 
STARTED

kafka.cluster.PartitionTest > testAppendRecordsAsFollowerBelowLogStartOffset 
PASSED

kafka.cluster.PartitionTest > testFetchLatestOffsetIncludesLeaderEpoch STARTED

kafka.cluster.PartitionTest > testFetchLatestOffsetIncludesLeaderEpoch PASSED

kafka.cluster.PartitionTest > testFetchOffsetSnapshotEpochValidationForFollower 
STARTED

kafka.cluster.PartitionTest > testFetchOffsetSnapshotEpochValidationForFollower 
PASSED

kafka.cluster.PartitionTest > testMonotonicOffsetsAfterLeaderChange STARTED

kafka.cluster.PartitionTest > testMonotonicOffsetsAfterLeaderChange PASSED

kafka.cluster.PartitionTest > testMakeFollowerWithNoLeaderIdChange STARTED

kafka.cluster.PartitionTest > testMakeFollowerWithNoLeaderIdChange PASSED

kafka.cluster.PartitionTest > 
testAppendRecordsToFollowerWithNoReplicaThrowsException STARTED

kafka.cluster.PartitionTest > 
testAppendRecordsToFollowerWithNoReplicaThrowsException PASSED

kafka.cluster.PartitionTest > 
testFollowerDoesNotJoinISRUntilCaughtUpToOffsetWithinCurrentLeaderEpoch STARTED

kafka.cluster.PartitionTest > 
testFollowerDoesNotJoinISRUntilCaughtUpToOffsetWithinCurrentLeaderEpoch PASSED

kafka.cluster.PartitionTest > testFetchOffsetSnapshotEpochValidationForLeader 
STARTED

kafka.cluster.PartitionTest > testFetchOffsetSnapshotEpochValidationForLeader 
PASSED

kafka.cluster.PartitionTest > testOffsetForLeaderEpochValidationForLeader 
STARTED

kafka.cluster.PartitionTest > testOffsetForLeaderEpochValidationForLeader PASSED

kafka.cluster.PartitionTest > testOffsetForLeaderEpochValidationForFollower 
STARTED

kafka.cluster.PartitionTest > testOffsetForLeaderEpochValidationForFollower 
PASSED

kafka.cluster.PartitionTest > testDelayedFetchAfterAppendRecords STARTED

kafka.cluster.PartitionTest > testDelayedFetchAfterAppendRecords PASSED

kafka.cluster.PartitionTest > testMakeLeaderUpdatesEpochCache STARTED

kafka.cluster.PartitionTest > testMakeLeaderUpdatesEpochCache PASSED

kafka.cluster.PartitionTest > 
testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets STARTED

kafka.cluster.PartitionTest > 
testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets PASSED

kafka.cluster.PartitionTest > 
testFetchOffsetForTimestampEpochValidationForLeader STARTED

kafka.cluster.PartitionTest > 
testFetchOffsetForTimestampEpochValidationForLeader PASSED

kafka.cluster.PartitionTest > testGetReplica STARTED

kafka.cluster.PartitionTest > testGetReplica PASSED

kafka.cluster.PartitionTest > testReadRecordEpochValidationForFollower STARTED

kafka.cluster.PartitionTest > testReadRecordEpochValidationForFollower PASSED

kafka.cluster.PartitionTest > testMaybeReplaceCurrentWithFutureReplica STARTED

kafka.cluster.PartitionTest > testMaybeReplaceCurrentWithFutureReplica PASSED

kafka.cluster.ReplicaTest > testCannotIncrementLogStartOffsetPastHighWatermark 
STARTED

kafka.cluster.ReplicaTest > testCannotIncrementLogStartOffsetPastHighWatermark 
PASSED

kafka.cluster.ReplicaTest > testSegmentDeletionWithHighWatermarkInitialization 
STARTED


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-23 Thread Ivan Ponomarev

Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream ks){
ks.filter().mapValues(...)
}


void handleSecondCase(KStream ks){
ks.selectKey(...).groupByKey()...
}

..
new KafkaStreamsBrancher()
  .addBranch(predicate1, this::handleFirstCase)
  .addBranch(predicate2, this::handleSecondCase)
  .onTopOf()

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:

Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer as a second
argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).

Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?

For example, using today's logic as is if we had something like this:

KStream[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter().mapValues(...)..
branches[1].selectKey(...).groupByKey().


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:


All,

I'd like to jump-start the discussion for KIP- 418.

Here's the original message:

Hello,

I'd like to start a discussion about KIP-418. Please take a look at the
KIP if you can, I would appreciate any feedback :)

KIP-418: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488

PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev








Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2019-03-23 Thread Ismael Juma
Thanks for the KIP, making reassignment more flexible is definitely
welcome. As others have mentioned, I think we need to do it via the Kafka
protocol and not via ZK. The latter introduces an implicit API that other
tools will depend on causing migration challenges. This has already
happened with the existing ZK based interface and we should avoid
introducing more tech debt here.

Ismael

On Sat, Mar 23, 2019, 12:09 PM Colin McCabe  wrote:

> On Thu, Mar 21, 2019, at 20:51, George Li wrote:
> >  Hi Colin,
> >
> > I agree with your proposal of having administrative APIs through RPC
> > instead of ZooKeeper. But seems like it will incur significant changes
> > to both submitting reassignments and this KIP's cancelling pending
> > reassignments.
> >
> > To make this KIP simple and moving along, I will be happy to do another
> > follow-up KIP to change all reassignment related operations via RP
>
> Thanks, George.  I think doing it as a two-step process is fine, but I
> suspect it would be much easier and quicker to do the RPC conversion first,
> and the interruptible part later.  The reason is because a lot of the
> things that people have brought up as concerns with this KIP are really
> issues with the API (how will people interact with ZK, how does access
> control work, what does the format look like in ZK) that will just go away
> once we have an RPC.
>
> > Just curious,  KIP-4 includes Topics/ACL related operations. In
> > addition to Reassignments,  any other operations should be done via
> > RPC?
>
> I think all of the administrative shell scripts have been converted except
> kafka-configs.sh.  I believe there is a KIP for that conversion.
> Reassigning partitions is probably the biggest KIP-4 gap we have right now.
>
> best,
> Colin
>
> >
> > Thanks,
> > George
> >
> >
> > On Wednesday, March 20, 2019, 5:28:59 PM PDT, Colin McCabe
> >  wrote:
> >
> >  Hi George,
> >
> > One big problem here is that administrative APIs should be done through
> > RPCs, not through ZooKeeper.  KIP-4 (Command line and centralized
> > administrative operations) describes the rationale for this.  We want
> > public and stable APIs that don't depend on the internal representation
> > of stuff in ZK, which will change over time.  Tools shouldn't have to
> > integrate with ZK or understand the internal data structures of Kafka
> > to make administrative changes.  ZK doesn't have a good security,
> > access control, or compatibility story.
> >
> > We should create an official reassignment RPC for Kafka.  This will
> > solve many of the problems discussed in this thread, I think.  For
> > example, with an RPC, users won't be able to make changes unless they
> > have ALTER on KafkaCluster.  That avoids the problem of random users
> > making changes without the administrator knowing.  Also, if multiple
> > users are making changes, there is no risk that they will overwrite
> > each other's changes, since they won't be modifying the internal ZK
> > structures directly.
> >
> > I think a good reassignment API would be something like:
> >
> > > ReassignPartitionsResults reassignPartitions(Map PartitionAssignment> reassignments);
> > >
> > > class PartitionAssignment {
> > >  List nodes;
> > > }
> > >
> > > class ReassignPartitionsResults {
> > >  Map> pending;
> > >  Map> completed;
> > >  Map> rejected;
> > > }
> > >
> > > PendingReassignmentResults pendingReassignments();
> > >
> > > class PendingReassignmentResults {
> > >  KafkaFuture> pending;
> > >  KafkaFuture> previous;
> > > }
> >
> > best,
> > Colin
> >
> >
> > On Tue, Mar 19, 2019, at 15:04, George Li wrote:
> > >  Hi Viktor,
> > >
> > > Thanks for the review.
> > >
> > > If there is reassignment in-progress while the cluster is upgraded
> with
> > > this KIP (upgrade the binary and then do a cluster rolling restart of
> > > the brokers), the reassignment JSON in Zookeeper
> > > /admin/reassign_partitions will only have  {topic, partition,
> > > replicas(new)} info when the batch of reassignment was kicked off
> > > before the upgrade,  not with the "original_replicas" info per
> > > topic/partition.  So when the user is trying to cancel/rollback the
> > > reassignments, it's going to fail and the cancellation will be skipped
> > > (The code in this KIP will check the if the "original_replicas" is in
> > > the /admin/reassign_partition).
> > >
> > > The user either has to wait till current reassignments to finish or
> > > does quite some manual work to cancel them (delete ZK node, bounce
> > > controller, re-submit reassignments with original replicas to
> rollback,
> > > if the original replicas are kept before the last batch of
> > > reassignments were submitted).
> > >
> > > I think this scenario of reassignments being kicked off by end-user,
> > > not by the team(s) that managed Kafka infrastructure might be rare
> > > (maybe in some very small companies?),  since only one batch of
> > > reassignments can be running at a given time in
> > > 

Re: [Vote] KIP-421: Support resolving externalized secrets in AbstractConfig

2019-03-23 Thread Colin McCabe
I'm not sure if this is ready for a vote yet.  In particular, I don't 
understand how it will work in the broker.  Having external secrets in the 
broker is something that a lot of people have been asking for -- it seems like 
a big omission to not talk about it at all in this KIP.

I also don't understand why we are not considering the case where configuration 
changes over time.  This was certainly one of the big motivating use-cases of 
KIP-297.  A lot of our software runs for a long period of time (like a streams 
job that might run indefinitely, or the broker, etc. etc.)  We want to be able 
to handle configuration changes transparently.

I'm concerned that we might be adding technical debt by creating an interface 
that doesn't support dynamic configuration changes.  It would be nice to think 
a little bit more about how this should work and support the use-cases people 
want.

best,
Colin


On Thu, Mar 21, 2019, at 13:49, TEJAL ADSUL wrote:
> Hi All,
> 
> I would like to start the vote thread for KIP-421: Support resolving 
> externalized secrets in AbstractConfig.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-421%3A+Support+resolving+externalized+secrets+in+AbstractConfig
> 
> Thanks!
> 
> Regards,
> Tejal
>


Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2019-03-23 Thread Colin McCabe
On Thu, Mar 21, 2019, at 20:51, George Li wrote:
>  Hi Colin,
> 
> I agree with your proposal of having administrative APIs through RPC 
> instead of ZooKeeper. But seems like it will incur significant changes 
> to both submitting reassignments and this KIP's cancelling pending 
> reassignments. 
> 
> To make this KIP simple and moving along, I will be happy to do another 
> follow-up KIP to change all reassignment related operations via RP 

Thanks, George.  I think doing it as a two-step process is fine, but I suspect 
it would be much easier and quicker to do the RPC conversion first, and the 
interruptible part later.  The reason is because a lot of the things that 
people have brought up as concerns with this KIP are really issues with the API 
(how will people interact with ZK, how does access control work, what does the 
format look like in ZK) that will just go away once we have an RPC.

> Just curious,  KIP-4 includes Topics/ACL related operations. In 
> addition to Reassignments,  any other operations should be done via 
> RPC? 

I think all of the administrative shell scripts have been converted except 
kafka-configs.sh.  I believe there is a KIP for that conversion.  Reassigning 
partitions is probably the biggest KIP-4 gap we have right now.

best,
Colin

> 
> Thanks,
> George
> 
> 
> On Wednesday, March 20, 2019, 5:28:59 PM PDT, Colin McCabe 
>  wrote:  
>  
>  Hi George,
> 
> One big problem here is that administrative APIs should be done through 
> RPCs, not through ZooKeeper.  KIP-4 (Command line and centralized 
> administrative operations) describes the rationale for this.  We want 
> public and stable APIs that don't depend on the internal representation 
> of stuff in ZK, which will change over time.  Tools shouldn't have to 
> integrate with ZK or understand the internal data structures of Kafka 
> to make administrative changes.  ZK doesn't have a good security, 
> access control, or compatibility story.
> 
> We should create an official reassignment RPC for Kafka.  This will 
> solve many of the problems discussed in this thread, I think.  For 
> example, with an RPC, users won't be able to make changes unless they 
> have ALTER on KafkaCluster.  That avoids the problem of random users 
> making changes without the administrator knowing.  Also, if multiple 
> users are making changes, there is no risk that they will overwrite 
> each other's changes, since they won't be modifying the internal ZK 
> structures directly.
> 
> I think a good reassignment API would be something like:
> 
> > ReassignPartitionsResults reassignPartitions(Map > PartitionAssignment> reassignments);
> > 
> > class PartitionAssignment {
> >  List nodes;
> > }
> >
> > class ReassignPartitionsResults {
> >  Map> pending;
> >  Map> completed;
> >  Map> rejected;
> > }
> >
> > PendingReassignmentResults pendingReassignments();
> >
> > class PendingReassignmentResults {
> >  KafkaFuture> pending;
> >  KafkaFuture> previous;
> > }
> 
> best,
> Colin
> 
> 
> On Tue, Mar 19, 2019, at 15:04, George Li wrote:
> >  Hi Viktor,
> > 
> > Thanks for the review. 
> > 
> > If there is reassignment in-progress while the cluster is upgraded with 
> > this KIP (upgrade the binary and then do a cluster rolling restart of 
> > the brokers), the reassignment JSON in Zookeeper  
> > /admin/reassign_partitions will only have  {topic, partition, 
> > replicas(new)} info when the batch of reassignment was kicked off 
> > before the upgrade,  not with the "original_replicas" info per 
> > topic/partition.  So when the user is trying to cancel/rollback the 
> > reassignments, it's going to fail and the cancellation will be skipped 
> > (The code in this KIP will check the if the "original_replicas" is in 
> > the /admin/reassign_partition).  
> > 
> > The user either has to wait till current reassignments to finish or 
> > does quite some manual work to cancel them (delete ZK node, bounce 
> > controller, re-submit reassignments with original replicas to rollback, 
> > if the original replicas are kept before the last batch of 
> > reassignments were submitted). 
> > 
> > I think this scenario of reassignments being kicked off by end-user, 
> > not by the team(s) that managed Kafka infrastructure might be rare 
> > (maybe in some very small companies?),  since only one batch of 
> > reassignments can be running at a given time in 
> > /admin/reassign_partitions.  The end-users need some co-ordination for 
> > submitting reassignments. 
> > 
> > Thanks,
> > George
> > 
> > 
> >    On Tuesday, March 19, 2019, 3:34:20 AM PDT, Viktor Somogyi-Vass 
> >  wrote:  
> >  
> >  Hey George,
> > Thanks for the answers. I'll try to block out time this week to review 
> > your PR.
> > I have one more point to clarify:I've seen some customers who are 
> > managing Kafka as an internal company-wide service and they may or may 
> > not know that how certain topics are used within the company. That 
> > might mean that some clients can start 

[jira] [Created] (KAFKA-8151) Broker hangs and lockups after Zookeeper outages

2019-03-23 Thread Joe Ammann (JIRA)
Joe Ammann created KAFKA-8151:
-

 Summary: Broker hangs and lockups after Zookeeper outages
 Key: KAFKA-8151
 URL: https://issues.apache.org/jira/browse/KAFKA-8151
 Project: Kafka
  Issue Type: Bug
  Components: controller, core, zkclient
Affects Versions: 2.1.1
Reporter: Joe Ammann


We're running several clusters (mostly with 3 brokers) with 2.1.1, where we see 
at least 3 different symptoms, all resulting on broker/controller lockups.

We are pretty sure that the triggering cause for all these symptoms are 
temporary (for 3-5 minutes normally) of the Zookeeper cluster. The Linux VMs 
where the ZK nodes run on regularly get stalled for a couple of minutes. The ZK 
nodes always very quickly reunite and build a Quorum after the situation 
clears, but the Kafka brokers (which run on then same Linux VMs) quite often 
show problems after this procedure.

I've seen 3 different kinds of problems (this is why I put "reproduce" in 
quotes, I can never predict what will happen)

# the brokers get their ZK sessions expired (obviously) and sometimes only 2 of 
3 re-register under /brokers/ids. The 3rd broker doesn't re-register for some 
reason (that's the problem I originally described)
# the brokers all re-register and re-elect a new controller. But that new 
controller does not fully work. For example it doesn't process partition 
reassignment requests and or does not transfer partition leadership after I 
kill a broker
# the previous controller gets "dead-locked" (it has 3-4 of the important 
controller threads in a lock) and hence does not perform any of it's controller 
duties. But it regards itsself still as the valid controller and is accepted by 
the other brokers

I'll try to describe each one of the problems in more detail below, and hope to 
be able to cleary separate them. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)