[jira] [Created] (KAFKA-9888) REST extensions can mutate connector configs in worker config state snapshot

2020-04-17 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-9888:


 Summary: REST extensions can mutate connector configs in worker 
config state snapshot
 Key: KAFKA-9888
 URL: https://issues.apache.org/jira/browse/KAFKA-9888
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.4.1, 2.5.0, 2.3.1, 2.4.0, 2.3.0
Reporter: Chris Egerton
Assignee: Chris Egerton


The changes made in 
[KIP-454|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface]
 involved adding a {{connectorConfig}} method to the 
[ConnectClusterState|https://github.com/apache/kafka/blob/ecde596180975f8546c0e8e10f77f7eee5f1c4d8/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java]
 interface that REST extensions could use to query the worker for the 
configuration of a given connector. The [implementation for this 
method|https://github.com/apache/kafka/blob/ecde596180975f8546c0e8e10f77f7eee5f1c4d8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java#L86-L89]
 returns the Java {{Map}} that's stored in the worker's view of the config 
topic (when running in distributed mode). No copying is performed, which causes 
mutations of that {{Map}} object to persist across invocations of 
{{connectorConfig}} and, even worse, propagate to the worker when, e.g., 
starting a connector.

We should not give REST extensions that original map, but instead a copy of it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk8 #4445

2020-04-17 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance convergence

[github] MINOR: reduce impact of trace logging in replica hot path (#8468)

[github] KAFKA-9818: Fix flaky test in RecordCollectorTest (#8507)


--
[...truncated 3.02 MB...]

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

> Task :streams:upgrade-system-tests-0100:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0100:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0100:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0100:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:compileTestJava
> Task :streams:upgrade-system-tests-0100:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0100:testClasses
> Task :streams:upgrade-system-tests-0100:checkstyleTest
> Task :streams:upgrade-system-tests-0100:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:test
> Task :streams:upgrade-system-tests-0101:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0101:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0101:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:compileTestJava
> Task :streams:upgrade-system-tests-0101:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:testClasses
> Task :streams:upgrade-system-tests-0101:checkstyleTest
> Task :streams:upgrade-system-tests-0101:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:test
> Task :streams:upgrade-system-tests-0102:compileJava NO-SOURCE
> Task 

Build failed in Jenkins: kafka-trunk-jdk11 #1364

2020-04-17 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9881: Convert integration test to verify measurements from 
RocksDB

[github] KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance convergence

[github] MINOR: reduce impact of trace logging in replica hot path (#8468)

[github] KAFKA-9818: Fix flaky test in RecordCollectorTest (#8507)


--
[...truncated 3.04 MB...]

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
PASSED

org.apache.kafka.streams.TestTopicsTest > testDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testValue STARTED

org.apache.kafka.streams.TestTopicsTest > testValue PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName PASSED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics STARTED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver PASSED

org.apache.kafka.streams.TestTopicsTest > testValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testValueList PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordList PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testInputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testInputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders STARTED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValue STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValue PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

> Task :streams:upgrade-system-tests-0100:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0100:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0100:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0100:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:compileTestJava
> Task :streams:upgrade-system-tests-0100:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0100:testClasses
> Task :streams:upgrade-system-tests-0100:checkstyleTest
> Task :streams:upgrade-system-tests-0100:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:test
> Task :streams:upgrade-system-tests-0101:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0101:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0101:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:compileTestJava
> Task :streams:upgrade-system-tests-0101:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:testClasses
> Task 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-04-17 Thread Ivan Ponomarev

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June 
2019. My project work was very intensive then and it didn't leave me 
spare time. But I think I must finish this, because we invested 
substantial effort into this discussion and I'm not feel entitled to 
propose other things before this one is finalized.


During these months I proceeded with writing and reviewing Kafka 
Streams-related code. Every time I needed branching, Spring-Kafka's 
KafkaStreamBrancher class of my invention (the original idea for this 
KIP) worked for me -- that's another reason why I gave up pushing the 
KIP forward. When I was coming across the problem with the scope of 
branches, I worked around it this way:


AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
.branch()
.defaultBranch(result::set)
.onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from 
May, 24th 2019. Let me quote it:


KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
  -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
  -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
  -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
  -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
  -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
  -> Map

I believe this would satisfy everyone. Optional names seems to be a good 
idea: when you don't need to have the branches in the same scope, you 
just don't use names and you don't risk making your code brittle. Or, 
you might want to add names just for debugging purposes. Or, finally, 
you might use the returned Map to have the named branches in the 
original scope.


There also was an input from John Roesler on June 4th, 2019, who 
suggested using Named class. I can't comment on this. The idea seems 
reasonable, but in this matter I'd rather trust people who are more 
familiar with Streams API design principles than me.


Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:

I am moving this KIP into "inactive status". Feel free to resume the KIP
at any point.

If anybody else is interested in picking up this KIP, feel free to do so.



-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:

Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:

Thanks for the input John!


under your suggestion, it seems that the name is required


If you want to get the `KStream` as part of the `Map` back using a
`Function`, yes. If you follow the "embedded chaining" pattern using a
`Consumer`, no.

Allowing for a default name via `split()` can of course be done.
Similarly, using `Named` instead of `String` is possible.

I wanted to sketch out a high level proposal to merge both patterns
only. Your suggestions to align the new API with the existing API make
totally sense.



One follow up question: Would `Named` be optional or required in
`split()` and `branch()`? It's unclear from your example.

If both are mandatory, what do we gain by it? The returned `Map` only
contains the corresponding branches, so why should we prefix all of
them? If only `Named` is mandatory in `branch()`, but optional in
`split()`, the same question raises?

Requiring `Named` in `split()` seems only to make sense, if `Named` is
optional in `branch()` and we generate `-X` suffix using a counter for
different branch name. However, this might lead to the problem of
changing names if branches are added/removed. Also, how would the names
be generated if `Consumer` is mixed in (ie, not all branches are
returned in the `Map`).

If `Named` is optional for both, it could happen that a user misses to
specify a name for a branch what would lead to runtime issues.


Hence, I am actually in favor to not allow a default name but keep
`split()` without parameter and make `Named` in `branch()` required if a
`Function` is used. This makes it explicit to the user that specifying a
name is required if a `Function` is used.



About


KBranchedStream#branch(BranchConfig)


I don't think that the branching predicate is a configuration and hence
would not include it in a configuration object.



Re: [Vote] KIP-588: Allow producers to recover gracefully from transaction timeouts

2020-04-17 Thread Guozhang Wang
Sounds good to me. Thanks Boyang.

On Fri, Apr 17, 2020 at 3:32 PM Boyang Chen 
wrote:

> Thanks Guozhang,
>
> I think most of the complexity comes from our intention to benefit older
> clients. After a second thought, I think the add-on complexity counteracts
> the gain here as only 2.5 client is getting a slice of the resilience
> improvement, not for many older versions.
>
> So I decide to drop the UNKNOWN_PRODUCER_ID path, by just claiming that
> this change would only benefit 2.6 Producer clients. So the only path that
> needs version detection is the new transaction coordinator handling
> transactional requests. If the Producer is 2.6+, we pick
> PRODUCER_FENCED(new error code) or TRANSACTION_TIMED_OUT as the response;
> otherwise  we return INVALID_PRODUCE_EPOCH to be consistent with older
> clients.
>
> Does this sound like a better plan? I already updated the KIP with
> simplifications.
>
>
> On Fri, Apr 17, 2020 at 12:02 PM Guozhang Wang  wrote:
>
> > Hi Boyang,
> >
> > Your reply to 3) seems conflicting with your other answers which is a bit
> > confusing to me. Following your other answers, it seems you suggest
> > returning UNKNOWN_PRODUCER_ID so that 2.5 clients can trigger retry logic
> > as well?
> >
> > To complete my reasoning here as a complete picture:
> >
> > a) post KIP-360 (2.5+) the partition leader broker does not return
> > UNKNOWN_PRODUCER_ID any more.
> > b) upon seeing an old epoch, partition leader cannot tell if it is due to
> > fencing or timeout; so it could only return INVALID_PRODUCER_EPOCH.
> >
> > So the basic idea is to let the clients ask the transaction coordinator
> for
> > the source of truth:
> >
> > 1) 2.5+ client would handle UNKNOWN_PRODUCER_ID (which could only be
> > returned from old brokers) by trying to re-initialize with the
> transaction
> > coordinator; the coordinator would then tell it whether it is
> > PRODUCER_FENCED or TXN_TIMEOUT. And for old brokers, it would always
> return
> > PRODUCER_FENCED anyways.
> > 2) 2.6+ client would also handle INVALID_PRODUCER_EPOCH with the retry
> > initializing logic; and similarly the transaction coordinator would
> > return PRODUCER_FENCED or TXN_TIMEOUT if it is new or always
> > return PRODUCER_FENCED if it is old.
> >
> > The question open is, whether
> >
> > * 3) the new broker should return UNKNOWN_PRODUCER_ID now when it is
> > *supposed* to return INVALID_PRODUCER_EPOCH and it found the request is
> > from 2.5 client (note as mentioned in a) right now we do not
> > return UNKNOWN_PRODUCER_ID from brokers anymore).
> >
> > If it does, then 2.5 client could still do the retry logic to the
> > transaction coordinator, i.e. benefit from KIP-360; but the cost is
> complex
> > logic on the broker side as well as producer API version bump up.
> > If it does not, then when INVALID_PRODUCER_EPOCH is returned to the old
> > client it would treat it as fatal and not ask the txn coordinator; but it
> > simplifies the broker logic and also do not require producer API version
> > bump.
> >
> > Personally I'd suggest we do the latter, knowing that it would not
> benefit
> > 2.5 client when the partition leader gets an old epoch and does not know
> > whether it is Fenced or Timed Out.
> >
> >
> > Guozhang
> >
> > On Thu, Apr 16, 2020 at 7:59 PM Boyang Chen 
> > wrote:
> >
> > > Thanks Jason and Guozhang for the thoughts.
> > >
> > > On Thu, Apr 16, 2020 at 6:09 PM Guozhang Wang 
> > wrote:
> > >
> > > > For 2/3 above, originally I was not thinking that we will have a
> > > different
> > > > exception for INVALID_PRODUCER_EPOCH and hence was thinking that in
> > order
> > > > to leverage KIP-360 for it, we'd have to let the broker to
> > > > return UNKNOWN_PRODUCER_ID. I.e. we'd change the logic of partition
> > > leader
> > > > as well to return UNKNOWN_PRODUCER_ID to let the producer try
> > > > re-initializing the PID on the coordinator, and if it is indeed due
> to
> > > > fencing, then coordinator can let the client know of the fatal error
> > and
> > > > then fail. In that case, then we do need to bump up the producer API
> > > > version so that the partition leader knows if it is from older or
> newer
> > > > clients: if it is older client who do not have KIP-360, we'd return
> > > > INVALID_PRODUCER_EPOCH still; for newer client, we can
> > > > return UNKNOWN_PRODUCER_ID to let it seek what's the truth on
> > > coordinator.
> > > >
> > > I know this is a bit of reversed order. Feel free to check out my reply
> > to
> > > Jason first and go back here :)
> > > I think the partition leader will have no change of returned code after
> > we
> > > discussed that only new client should be able to retry.
> > >
> > >
> > > > Now since we are additionally mapping INVALID_PRODUCER_EPOCH to a
> > > different
> > > > error code now and letting clients to handle that similar
> > > > to UNKNOWN_PRODUCER_ID, this can be saved indeed. However, it also
> > means
> > > > that 2.5.0 clients would not get benefited from this KIP, 

Build failed in Jenkins: kafka-trunk-jdk8 #4444

2020-04-17 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Switch order of sections on tumbling and hopping windows in

[github] MINOR: improve test coverage for dynamic LogConfig(s) (#7616)

[github] KAFKA-9881: Convert integration test to verify measurements from 
RocksDB


--
[...truncated 6.05 MB...]

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

> Task :streams:upgrade-system-tests-0100:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0100:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0100:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0100:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:compileTestJava
> Task :streams:upgrade-system-tests-0100:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0100:testClasses
> Task :streams:upgrade-system-tests-0100:checkstyleTest
> Task :streams:upgrade-system-tests-0100:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:test
> Task :streams:upgrade-system-tests-0101:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0101:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0101:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:compileTestJava
> Task :streams:upgrade-system-tests-0101:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:testClasses
> Task :streams:upgrade-system-tests-0101:checkstyleTest
> Task :streams:upgrade-system-tests-0101:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:test
> Task :streams:upgrade-system-tests-0102:compileJava NO-SOURCE
> Task 

Re: [Vote] KIP-588: Allow producers to recover gracefully from transaction timeouts

2020-04-17 Thread Boyang Chen
Thanks Guozhang,

I think most of the complexity comes from our intention to benefit older
clients. After a second thought, I think the add-on complexity counteracts
the gain here as only 2.5 client is getting a slice of the resilience
improvement, not for many older versions.

So I decide to drop the UNKNOWN_PRODUCER_ID path, by just claiming that
this change would only benefit 2.6 Producer clients. So the only path that
needs version detection is the new transaction coordinator handling
transactional requests. If the Producer is 2.6+, we pick
PRODUCER_FENCED(new error code) or TRANSACTION_TIMED_OUT as the response;
otherwise  we return INVALID_PRODUCE_EPOCH to be consistent with older
clients.

Does this sound like a better plan? I already updated the KIP with
simplifications.


On Fri, Apr 17, 2020 at 12:02 PM Guozhang Wang  wrote:

> Hi Boyang,
>
> Your reply to 3) seems conflicting with your other answers which is a bit
> confusing to me. Following your other answers, it seems you suggest
> returning UNKNOWN_PRODUCER_ID so that 2.5 clients can trigger retry logic
> as well?
>
> To complete my reasoning here as a complete picture:
>
> a) post KIP-360 (2.5+) the partition leader broker does not return
> UNKNOWN_PRODUCER_ID any more.
> b) upon seeing an old epoch, partition leader cannot tell if it is due to
> fencing or timeout; so it could only return INVALID_PRODUCER_EPOCH.
>
> So the basic idea is to let the clients ask the transaction coordinator for
> the source of truth:
>
> 1) 2.5+ client would handle UNKNOWN_PRODUCER_ID (which could only be
> returned from old brokers) by trying to re-initialize with the transaction
> coordinator; the coordinator would then tell it whether it is
> PRODUCER_FENCED or TXN_TIMEOUT. And for old brokers, it would always return
> PRODUCER_FENCED anyways.
> 2) 2.6+ client would also handle INVALID_PRODUCER_EPOCH with the retry
> initializing logic; and similarly the transaction coordinator would
> return PRODUCER_FENCED or TXN_TIMEOUT if it is new or always
> return PRODUCER_FENCED if it is old.
>
> The question open is, whether
>
> * 3) the new broker should return UNKNOWN_PRODUCER_ID now when it is
> *supposed* to return INVALID_PRODUCER_EPOCH and it found the request is
> from 2.5 client (note as mentioned in a) right now we do not
> return UNKNOWN_PRODUCER_ID from brokers anymore).
>
> If it does, then 2.5 client could still do the retry logic to the
> transaction coordinator, i.e. benefit from KIP-360; but the cost is complex
> logic on the broker side as well as producer API version bump up.
> If it does not, then when INVALID_PRODUCER_EPOCH is returned to the old
> client it would treat it as fatal and not ask the txn coordinator; but it
> simplifies the broker logic and also do not require producer API version
> bump.
>
> Personally I'd suggest we do the latter, knowing that it would not benefit
> 2.5 client when the partition leader gets an old epoch and does not know
> whether it is Fenced or Timed Out.
>
>
> Guozhang
>
> On Thu, Apr 16, 2020 at 7:59 PM Boyang Chen 
> wrote:
>
> > Thanks Jason and Guozhang for the thoughts.
> >
> > On Thu, Apr 16, 2020 at 6:09 PM Guozhang Wang 
> wrote:
> >
> > > For 2/3 above, originally I was not thinking that we will have a
> > different
> > > exception for INVALID_PRODUCER_EPOCH and hence was thinking that in
> order
> > > to leverage KIP-360 for it, we'd have to let the broker to
> > > return UNKNOWN_PRODUCER_ID. I.e. we'd change the logic of partition
> > leader
> > > as well to return UNKNOWN_PRODUCER_ID to let the producer try
> > > re-initializing the PID on the coordinator, and if it is indeed due to
> > > fencing, then coordinator can let the client know of the fatal error
> and
> > > then fail. In that case, then we do need to bump up the producer API
> > > version so that the partition leader knows if it is from older or newer
> > > clients: if it is older client who do not have KIP-360, we'd return
> > > INVALID_PRODUCER_EPOCH still; for newer client, we can
> > > return UNKNOWN_PRODUCER_ID to let it seek what's the truth on
> > coordinator.
> > >
> > I know this is a bit of reversed order. Feel free to check out my reply
> to
> > Jason first and go back here :)
> > I think the partition leader will have no change of returned code after
> we
> > discussed that only new client should be able to retry.
> >
> >
> > > Now since we are additionally mapping INVALID_PRODUCER_EPOCH to a
> > different
> > > error code now and letting clients to handle that similar
> > > to UNKNOWN_PRODUCER_ID, this can be saved indeed. However, it also
> means
> > > that 2.5.0 clients would not get benefited from this KIP, since they
> > still
> > > treat INVALID_PRODUCER_EPOCH as fatal. If people this that's okay then
> we
> > > can simplify the partition leader behavior as well as not bumping up
> > > producer APIs. I think I'm a bit inclined towards simplicity over
> > > benefiting older clients.
> > >
> >
> > 2.5 client should get 

Jenkins build is back to normal : kafka-trunk-jdk11 #1363

2020-04-17 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-587 Suppress detailed responses for handled exceptions in security-sensitive environments

2020-04-17 Thread Christopher Egerton
Hi Connor,

That's great, but I think you may have mistaken Colin for me :)

One more thing that should be addressed--the "public interfaces" section
isn't just for Java interfaces, it's for any changes to any public part of
Kafka that users and external developers interact with. As far as Connect
is concerned, this includes (but is not limited to) the REST API and worker
configuration properties, so it might be worth briefly summarizing the
scope of your proposed changes in that section (something like "We plan on
adding a new worker config named  that will affect the REST API under
".

Cheers,

Chris

On Wed, Apr 15, 2020 at 1:00 PM Connor Penhale 
wrote:

> Hi Chris,
>
> I can ask the customer if they can disclose any additional information. I
> provided the information around "PCI-DSS" to give the community a flavor of
> the type of environment the customer was operating in. The current mode is
> /not/ insecure, I would agree with this. I would be willing to agree that
> my customer has particular security audit requirements that go above and
> beyond what most environments would consider reasonable. Are you
> comfortable with that language?
>
> " enable.rest.response.stack.traces" works great for me!
>
> I created a new class in the example PR because I wanted the highest
> chance of not gunking up the works by stepping on toes in an important
> class. I figured I'd be reducing risk by creating an alternative
> implementing class. In retrospect, and now that I'm getting a first-hand
> look at Kafka's community process, that is probably unnecessary.
> Additionally, I would agree with your statement that we should modify the
> existing ExceptionMapper to avoid behavior divergence in subsequent
> releases and ensure this feature's particular scope is easy to maintain.
>
> Thanks!
> Connor
>
> On 4/15/20, 1:17 PM, "Colin McCabe"  wrote:
>
> Hi Connor,
>
> I still would like to hear more about whether this feature is required
> for PCI-DSS or any other security certification.  Nobody I talked to seemed
> to think that it was-- if there are certifications that would require this,
> it would be nice to know.  However, I don't object to implementing this as
> long as we don't imply that the current mode is insecure.
>
> What do you think about using "enable.rest.response.stack.traces" as
> the config name?  It seems like that  makes it clearer that it's a boolean
> value.
>
> It's not really necessary to describe the internal implementation in
> the KIP, but since you mentioned it, it's probably worth considering using
> the current ExceptionMapper class with a different configuration rather
> than creating a new one.
>
> best,
> Colin
>
>
> On Mon, Apr 13, 2020, at 09:04, Connor Penhale wrote:
> > Hi Chris!
> >
> > RE: SSL, indeed, the issue is not that the information is not
> > encrypted, but that there is no authorization layer.
> >
> > I'll be sure to edit the KIP as we continue discussion!
> >
> > RE: the 200 response you highlighted, great catch! I'll work with my
> > customer and get back to you on their audit team's intention! I'm
> > fairly certain I know the answer, but I need to be sure before I
> speak
> > for them.
> >
> > Thanks!
> > Connor
> >
> > On 4/8/20, 11:27 PM, "Christopher Egerton" 
> wrote:
> >
> > Hi Connor,
> >
> > Just a few more remarks!
> >
> > I noticed that you said "Kafka Connect was passing these
> exceptions without
> > authentication." For what it's worth, the Connect REST API can
> be secured
> > with TLS out-of-the-box by configuring the worker with the
> various ssl.*
> > properties, but that doesn't provide any kind of authorization
> layer to
> > provide levels of security depending who the user is. Just
> pointing out in
> > case this helps with your use case.
> >
> > As far as editing the KIP based on discussion goes--it's not only
> > acceptable, it's expected :) Ideally, the KIP should be kept
> up-to-date to
> > the point where, were it to be accepted at any moment, it would
> accurately
> > reflect the changes that would then be made to Kafka. This can
> be relaxed
> > if there's rapid iteration or items that are still up for
> discussion, but
> > as soon as things settle down it should be updated.
> >
> > As far as item 4 goes, my question was about exceptions that
> aren't handled
> > by the ExceptionMapper, but which are returned as part of the
> response body
> > when querying the status of a connector or task that has failed
> by querying
> > the /connectors/{name}/status or
> /connectors/{name}/tasks/{taskId}/status
> > endpoints. Even if the request is successful and results in an
> HTTP 200
> > response, the body might contain a stack trace if the connector
> or any of
> > its 

[jira] [Resolved] (KAFKA-9818) Flaky Test RecordCollectorTest.shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler

2020-04-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9818.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Flaky Test 
> RecordCollectorTest.shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler
> -
>
> Key: KAFKA-9818
> URL: https://issues.apache.org/jira/browse/KAFKA-9818
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> h3. Error Message
> java.lang.AssertionError
> h3. Stacktrace
> java.lang.AssertionError at org.junit.Assert.fail(Assert.java:87) at 
> org.junit.Assert.assertTrue(Assert.java:42) at 
> org.junit.Assert.assertTrue(Assert.java:53) at 
> org.apache.kafka.streams.processor.internals.RecordCollectorTest.shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler(RecordCollectorTest.java:521)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at jdk.internal.reflect.GeneratedMethodAccessor20.invoke(Unknown Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy5.processTestClass(Unknown Source) at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>  at jdk.internal.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> 

Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-17 Thread Boyang Chen
Thanks Colin for the suggestions! I have added metrics to track the number
of total messages being forwarded as a more generic monitoring. Right now
the list of metrics are:


   - num-client-forwarding-to-controller-rate
   - num-client-fowarding-to-controller-count
   - num-messages-redirected-rate
   - num-messages-redirected-count
   - request-forwarding-authorization-fail-count (Optional)

Let me know if we need to add more monitoring tags.

Boyang

On Fri, Apr 17, 2020 at 1:16 PM Colin McCabe  wrote:

> On Thu, Apr 16, 2020, at 12:33, Jose Garcia Sancio wrote:
> > Hi Boyang,
> >
> > Thanks for the KIP. The KIP looks good. I have a few questions and
> comments.
> >
> > > As part of the KIP-500
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum
> >
> > initiative, we need to build a bridge release version of Kafka that
> > could
> > isolate the direct Zookeeper write access only to the controller.
> >
> > This may be outside the scope of this KIP but I am trying to understand
> how
> > this is going to be used to implement KIP-500. My understanding is that
> the
> > Admin client discovers the controller by performing a
> > Metadata{Request,Response} round trip. The response of this request
> > includes the id of the controller. Based on my understanding of the
> > KIP-500, this architecture will need to change.
>
> Hi Jose,
>
> Thanks for the questions.
>
> We're not proposing to change the MetadataRequest / MetadataResponse.  It
> will still work the same way that it always has from the perspective of
> clients.  We have to do this to maintain compatibility, I think.
>
> This does mean that controllers and brokers will occupy the same ID
> namespace, even when running in standalone controller mode.  I think that's
> OK, though.
>
> > For example the controller
> > will not necessarily be a broker in which case the id may not correlate
> to
> > a broker id. Is the expectation that the Kafka Controller Quorum (as
> > defined in KIP-500) will push this new connection information to all of
> the
> > brokers? Will the Kafka Controller Quorum expose and implement all of the
> > RPCs being redirected in this KIP and the ones that are currently routed
> to
> > the controller? These include:
> >
> > ListPartitionReassignment
> > AlterPartitionReassignment
> > ElectLeaders
> > CreatePartitions
> > DeleteTopics
> > CreateTopics
>
> Yes, the KIP-500 controller (quorum) will implement all of those RPCs.
>
> >
> > > AUTHORIZATION_FAILED if the inter-broker verification failed.
> >
> > The rest of the document mentions CLUSTER_AUTHORIZATION_FAILED.
> >
> > > For CLUSTER_AUTHORIZATION_FAILED, this indicates an internal error for
> > broker security setup which has nothing to do with the client, so we have
> > no other way but returning an UNKNOWN_SERVER_ERROR to the admin client.
> >
> > I don't understand this. I think I don't understand this because it is
> not
> > clear to me who, how and when authorization is going to work when using
> > Envelopre{Request,Response}. Can you please add a section that explains
> how
> > authorization works when envelopes are involved?
>
> The only way a broker would fail to authorize itself to another broker is
> if the ACLs were incorrectly set up for the cluster.  This would cause
> other problems beyond just making forwarding not work, though.
>
> This isn't really related to envelopes -- any time a broker makes an RPC
> to another broker, in theory the ACLs could be totally screwed up and we
> could get denied.  It clearly means that administrator made a mistake, but
> we still have to handle the case somehow.
>
> best,
> Colin
>


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-17 Thread Colin McCabe
On Thu, Apr 16, 2020, at 12:33, Jose Garcia Sancio wrote:
> Hi Boyang,
> 
> Thanks for the KIP. The KIP looks good. I have a few questions and comments.
> 
> > As part of the KIP-500
> 
> initiative, we need to build a bridge release version of Kafka that 
> could
> isolate the direct Zookeeper write access only to the controller.
> 
> This may be outside the scope of this KIP but I am trying to understand how
> this is going to be used to implement KIP-500. My understanding is that the
> Admin client discovers the controller by performing a
> Metadata{Request,Response} round trip. The response of this request
> includes the id of the controller. Based on my understanding of the
> KIP-500, this architecture will need to change.

Hi Jose,

Thanks for the questions.

We're not proposing to change the MetadataRequest / MetadataResponse.  It will 
still work the same way that it always has from the perspective of clients.  We 
have to do this to maintain compatibility, I think.

This does mean that controllers and brokers will occupy the same ID namespace, 
even when running in standalone controller mode.  I think that's OK, though.

> For example the controller
> will not necessarily be a broker in which case the id may not correlate to
> a broker id. Is the expectation that the Kafka Controller Quorum (as
> defined in KIP-500) will push this new connection information to all of the
> brokers? Will the Kafka Controller Quorum expose and implement all of the
> RPCs being redirected in this KIP and the ones that are currently routed to
> the controller? These include:
> 
> ListPartitionReassignment
> AlterPartitionReassignment
> ElectLeaders
> CreatePartitions
> DeleteTopics
> CreateTopics

Yes, the KIP-500 controller (quorum) will implement all of those RPCs.

> 
> > AUTHORIZATION_FAILED if the inter-broker verification failed.
> 
> The rest of the document mentions CLUSTER_AUTHORIZATION_FAILED.
> 
> > For CLUSTER_AUTHORIZATION_FAILED, this indicates an internal error for
> broker security setup which has nothing to do with the client, so we have
> no other way but returning an UNKNOWN_SERVER_ERROR to the admin client.
> 
> I don't understand this. I think I don't understand this because it is not
> clear to me who, how and when authorization is going to work when using
> Envelopre{Request,Response}. Can you please add a section that explains how
> authorization works when envelopes are involved?

The only way a broker would fail to authorize itself to another broker is if 
the ACLs were incorrectly set up for the cluster.  This would cause other 
problems beyond just making forwarding not work, though.

This isn't really related to envelopes -- any time a broker makes an RPC to 
another broker, in theory the ACLs could be totally screwed up and we could get 
denied.  It clearly means that administrator made a mistake, but we still have 
to handle the case somehow.

best,
Colin


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-17 Thread Ismael Juma
Hi Colin,

The read/modify/write is protected by the zk version, right?

Ismael

On Fri, Apr 17, 2020 at 12:53 PM Colin McCabe  wrote:

> On Thu, Apr 16, 2020, at 08:51, Ismael Juma wrote:
> > I don't think these requests are necessarily infrequent under multi
> tenant
> > environments though. I've seen Controller availability being an issue for
> > describe topics for example (before it was changed to go to any broker).
>
> Hi Ismael,
>
> I don't think DescribeTopics is a good comparison.  That RPC is available
> to regular users and is used many orders of magnitude more frequently than
> administrative operations like changing ACLs or setting quotas.
>
> The operations we're talking about redirecting here all require the
> highest possible permissions and will not be frequent in any real-world
> cluster... unless someone is running a stress-test or a benchmark.  We
> didn't even notice some of the serious bugs in setting dynamic configs
> until recently because the alterConfigs / incrementalAlterConfigs RPCs are
> so infrequently called.
>
> Additionally, this KIP fixes some existing bugs.  The current approach of
> having random writers do a read-write-modify cycle on a configuration znode
> is buggy since it could be interleaved with another node's read-modify
> write cycle.  It has a "lost updates" problem.
>
> For example, node 1 reads a config znode.  Node 2 reads the same config
> znode.  Node 1 writes back a modified version of the znode.  Node 2 writes
> back its (differently) modified version, overwriting the changes from node
> 1.
>
> I don't think anyone ever noticed this problem since, again, these
> operations are very infrequent, making the chance of such a collision low.
> But it is a serious bug that is fixed by having a single writer.  (We
> should add this to the KIP...)
>
> >
> > Would it be better to redirect once the controller quorum is there?
>
> This KIP is needed for the bridge release.  The bridge release upgrade
> process relies on the old nodes sending their administrative operations to
> the controller quorum, not directly to zookeeper.
>
> best,
> Colin
>
>
> >
> > Note that this is different from things like AlterIsr since these calls
> are
> > coming from clients versus other brokers.
> >
> > Ismael
> >
> > On Wed, Apr 15, 2020, 5:10 PM Colin McCabe  wrote:
> >
> > > Hi Ismael,
> > >
> > > I agree that sending these requests through the controller will not
> work
> > > during the periods when there is no controller.  However, those periods
> > > should be short-- otherwise we have bigger problems in the cluster.
> > >
> > > These requests are very infrequent because they are administrative
> > > operations.  Basically the affected operations are changing ACLs,
> changing
> > > dynamic configurations, and changing quotas.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Wed, Apr 15, 2020, at 15:25, Ismael Juma wrote:
> > > > Hi Boyang,
> > > >
> > > > Thanks for the KIP. Have we considered that this reduces
> availability for
> > > > these operations since we have a single Controller instead of the ZK
> > > quorum?
> > > >
> > > > Ismael
> > > >
> > > > On Fri, Apr 3, 2020 at 4:45 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hey all,
> > > > >
> > > > > I would like to start off the discussion for KIP-590, a follow-up
> > > > > initiative after KIP-500:
> > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller
> > > > >
> > > > > This KIP proposes to migrate existing Zookeeper mutation paths,
> > > including
> > > > > configuration, security and quota changes, to controller-only by
> always
> > > > > routing these alterations to the controller.
> > > > >
> > > > > Let me know your thoughts!
> > > > >
> > > > > Best,
> > > > > Boyang
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-17 Thread Colin McCabe
On Thu, Apr 16, 2020, at 08:51, Ismael Juma wrote:
> I don't think these requests are necessarily infrequent under multi tenant
> environments though. I've seen Controller availability being an issue for
> describe topics for example (before it was changed to go to any broker).

Hi Ismael,

I don't think DescribeTopics is a good comparison.  That RPC is available to 
regular users and is used many orders of magnitude more frequently than 
administrative operations like changing ACLs or setting quotas.

The operations we're talking about redirecting here all require the highest 
possible permissions and will not be frequent in any real-world cluster... 
unless someone is running a stress-test or a benchmark.  We didn't even notice 
some of the serious bugs in setting dynamic configs until recently because the 
alterConfigs / incrementalAlterConfigs RPCs are so infrequently called.

Additionally, this KIP fixes some existing bugs.  The current approach of 
having random writers do a read-write-modify cycle on a configuration znode is 
buggy since it could be interleaved with another node's read-modify write 
cycle.  It has a "lost updates" problem.

For example, node 1 reads a config znode.  Node 2 reads the same config znode.  
Node 1 writes back a modified version of the znode.  Node 2 writes back its 
(differently) modified version, overwriting the changes from node 1.

I don't think anyone ever noticed this problem since, again, these operations 
are very infrequent, making the chance of such a collision low.  But it is a 
serious bug that is fixed by having a single writer.  (We should add this to 
the KIP...)

> 
> Would it be better to redirect once the controller quorum is there?

This KIP is needed for the bridge release.  The bridge release upgrade process 
relies on the old nodes sending their administrative operations to the 
controller quorum, not directly to zookeeper.

best,
Colin


> 
> Note that this is different from things like AlterIsr since these calls are
> coming from clients versus other brokers.
> 
> Ismael
> 
> On Wed, Apr 15, 2020, 5:10 PM Colin McCabe  wrote:
> 
> > Hi Ismael,
> >
> > I agree that sending these requests through the controller will not work
> > during the periods when there is no controller.  However, those periods
> > should be short-- otherwise we have bigger problems in the cluster.
> >
> > These requests are very infrequent because they are administrative
> > operations.  Basically the affected operations are changing ACLs, changing
> > dynamic configurations, and changing quotas.
> >
> > best,
> > Colin
> >
> >
> > On Wed, Apr 15, 2020, at 15:25, Ismael Juma wrote:
> > > Hi Boyang,
> > >
> > > Thanks for the KIP. Have we considered that this reduces availability for
> > > these operations since we have a single Controller instead of the ZK
> > quorum?
> > >
> > > Ismael
> > >
> > > On Fri, Apr 3, 2020 at 4:45 PM Boyang Chen 
> > > wrote:
> > >
> > > > Hey all,
> > > >
> > > > I would like to start off the discussion for KIP-590, a follow-up
> > > > initiative after KIP-500:
> > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller
> > > >
> > > > This KIP proposes to migrate existing Zookeeper mutation paths,
> > including
> > > > configuration, security and quota changes, to controller-only by always
> > > > routing these alterations to the controller.
> > > >
> > > > Let me know your thoughts!
> > > >
> > > > Best,
> > > > Boyang
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-17 Thread Colin McCabe
Hi David & Boyang,

I thought the intention of the "old-client-connections-count" metric was to 
give some information about how many redirections were going on in the cluster. 
 This is different from the "unknown" software version metric.  After all, many 
versions that support KIP-511 will still need redirection.

I think the problem with the metric is that it assumes an entire connection is 
used only for old client redirection.  This won't be the case-- we'll use those 
connections for other RPCs to the controller as well.  So I'd suggest a metric 
more like num-messages-redirected or something like that which operates at the 
message level, and can be monitored on each broker.

best,
Colin

On Thu, Apr 16, 2020, at 07:40, David Jacot wrote:
> Hi Boyang,
> 
> Thanks for the KIP. Overall, it looks good to me. I really like the
> envelope RPC!
> 
> One minor comment regarding the `old-client-connections-count` metric. Is
> it really necessary? The number of connected clients whose version is not
> known (prior to KIP-511) is already reported but with an "unknown" software
> name and an "unknown" software version, which is, I suppose, similar to what
> you intend to expose with this new metric, isn't it?
> 
> Regards,
> David
> 
> On Thu, Apr 16, 2020 at 7:24 AM Boyang Chen 
> wrote:
> 
> > Thanks Raymond and Colin for the detailed discussions! I totally agree
> > with the rational here. The new `Envelope` RPC has been added to the KIP
> > and the forwarding section logic has been revised, feel free to take
> > another look.
> >
> > On Wed, Apr 15, 2020 at 5:19 PM Colin McCabe  wrote:
> >
> > > Hi Boyang,
> > >
> > > I agree that we need a version bump on the request types we are going to
> > > forward.  The new versions will be able to return the NOT_CONTROLLER
> > error,
> > > and let the client do the retrying, which is what we typically prefer.
> > > The  existing versions can't ever return NOT_CONTROLLER.
> > >
> > > Since we have to have a new version for all these requests, we could
> > > technically do everything with just optional fields, like we originally
> > > discussed.  However, there is probably some value in having a real
> > > EnvelopeRequest (or whatever) that makes it clearer what is going on.
> > > Optional fields don't come with "guard rails" to prevent us from
> > > accidentally ignoring them on older versions of the broker.  A new ApiKey
> > > certainly does.
> > >
> > > Another issue is that it's nice to avoid changing the version of the
> > > request when forwarding it.  Sometimes different versions have slightly
> > > different semantics, and it simplifies things to avoid worrying about
> > that.
> > >
> > > We should restrict the use of forwarding to just principals that have
> > > CLUSTERACTION on CLUSTER for now, so that only the brokers and superusers
> > > can do it.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Apr 14, 2020, at 13:15, Boyang Chen wrote:
> > > > Thanks Raymond for the proposal! Yea, adding a unified forwarding
> > > envelope
> > > > request is a good idea, but it doesn't really solve our problem in this
> > > KIP.
> > > >
> > > > On Mon, Apr 13, 2020 at 11:14 PM Raymond Ng  wrote:
> > > >
> > > > > Hi Boyang,
> > > > >
> > > > > Thanks for the KIP. Overall looks great.
> > > > >
> > > > > One suggestion: instead of bumping the version and adding an optional
> > > field
> > > > > (PrincipalName) for a number of requests, can we consider adding a
> > > general
> > > > > ProxyRequest that acts as an "envelope" for the forwarded requests?
> > > > >
> > > > > A few advantages to this approach come to mind:
> > > > >
> > > > >1. Add one (new) Request API instead of modifying a number of them
> > > > >2. Make the forwarded nature of the request explicit instead of
> > > > >implicitly relying on an optional field and a specific version
> > that
> > > > > varies
> > > > >by type.
> > > > >3. This approach is flexible enough to be potentially useful
> > beyond
> > > the
> > > > >current use case (e.g. federated, inter-cluster scenarios)
> > > > >
> > > > > As a bonus, the combination of 1. and 2. should also simplify
> > > > > implementation & validation.
> > > > >
> > > > >
> > > > Firstly the broker has to differentiate old and new admin clients as it
> > > > should only support forwarding for old ones. Without a version bump,
> > > broker
> > > > couldn't differentiate both. Besides the bumping of the existing
> > > > protocol is not a big overhead comparing with adding a new RPC, so I
> > > don't
> > > > worry too much about the complexity here.
> > > >
> > > >
> > > > > On the other hand, it's not clear if the underlying RPC request
> > > > > encoding/decode machinery supports embedded requests. Hopefully, even
> > > if it
> > > > > doesn't it would not be too difficult to extend.
> > > > >
> > > >
> > > > Making the forwarding behavior more general is great, but could also
> > come
> > > > with problems we couldn't anticipate 

Kafka Meetup hosted by Confluent Online, Tuesday 4:00pm, April 21st, 2020

2020-04-17 Thread Guozhang Wang
Hello folks,

The Bay Area Kafka meetup will continue to be hosted online this month,
next Tuesday (April 21st) 4:00pm. We will be presenting the current on
going work for a Kafkaesque Raft Protocol:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum

*RSVP*:
https://www.meetup.com/KafkaBayArea/events/270053856/

*Date*
4:00pm, Tuesday, April 21st, 2019

*Zoom link can be found inside the meetup page as "online event" url.*

*Agenda*
4:00pm - 4:10pm: Online Networking (feel free to BYOB!!)
4:10pm - 4:50pm: A Kafkaesque Raft Protocol and other KIP-500 updates
(Jason Gustafson and Boyang Chen, Confluent)


Hope to e-meet you on Tuesday!


-- Guozhang


Re: [VOTE] KIP-577: Allow HTTP Response Headers Configured for Kafka Connect

2020-04-17 Thread Aneel Nazareth
Thanks Jeff, this seems like it addresses a user need.

+1 (non-binding)

On Fri, Apr 17, 2020 at 1:28 PM Zhiguo Huang  wrote:
>
> Thanks to everyone for their input. I've incorporated the changes, and I
> think this is ready for voting.
>
> To summarize, the KIP simply proposes to add a feature which allows HTTP
> response headers configured for Kafka Connect.The KIP can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect
>
> Thanks!
>
> Jeff.


Re: [Vote] KIP-588: Allow producers to recover gracefully from transaction timeouts

2020-04-17 Thread Guozhang Wang
Hi Boyang,

Your reply to 3) seems conflicting with your other answers which is a bit
confusing to me. Following your other answers, it seems you suggest
returning UNKNOWN_PRODUCER_ID so that 2.5 clients can trigger retry logic
as well?

To complete my reasoning here as a complete picture:

a) post KIP-360 (2.5+) the partition leader broker does not return
UNKNOWN_PRODUCER_ID any more.
b) upon seeing an old epoch, partition leader cannot tell if it is due to
fencing or timeout; so it could only return INVALID_PRODUCER_EPOCH.

So the basic idea is to let the clients ask the transaction coordinator for
the source of truth:

1) 2.5+ client would handle UNKNOWN_PRODUCER_ID (which could only be
returned from old brokers) by trying to re-initialize with the transaction
coordinator; the coordinator would then tell it whether it is
PRODUCER_FENCED or TXN_TIMEOUT. And for old brokers, it would always return
PRODUCER_FENCED anyways.
2) 2.6+ client would also handle INVALID_PRODUCER_EPOCH with the retry
initializing logic; and similarly the transaction coordinator would
return PRODUCER_FENCED or TXN_TIMEOUT if it is new or always
return PRODUCER_FENCED if it is old.

The question open is, whether

* 3) the new broker should return UNKNOWN_PRODUCER_ID now when it is
*supposed* to return INVALID_PRODUCER_EPOCH and it found the request is
from 2.5 client (note as mentioned in a) right now we do not
return UNKNOWN_PRODUCER_ID from brokers anymore).

If it does, then 2.5 client could still do the retry logic to the
transaction coordinator, i.e. benefit from KIP-360; but the cost is complex
logic on the broker side as well as producer API version bump up.
If it does not, then when INVALID_PRODUCER_EPOCH is returned to the old
client it would treat it as fatal and not ask the txn coordinator; but it
simplifies the broker logic and also do not require producer API version
bump.

Personally I'd suggest we do the latter, knowing that it would not benefit
2.5 client when the partition leader gets an old epoch and does not know
whether it is Fenced or Timed Out.


Guozhang

On Thu, Apr 16, 2020 at 7:59 PM Boyang Chen 
wrote:

> Thanks Jason and Guozhang for the thoughts.
>
> On Thu, Apr 16, 2020 at 6:09 PM Guozhang Wang  wrote:
>
> > For 2/3 above, originally I was not thinking that we will have a
> different
> > exception for INVALID_PRODUCER_EPOCH and hence was thinking that in order
> > to leverage KIP-360 for it, we'd have to let the broker to
> > return UNKNOWN_PRODUCER_ID. I.e. we'd change the logic of partition
> leader
> > as well to return UNKNOWN_PRODUCER_ID to let the producer try
> > re-initializing the PID on the coordinator, and if it is indeed due to
> > fencing, then coordinator can let the client know of the fatal error and
> > then fail. In that case, then we do need to bump up the producer API
> > version so that the partition leader knows if it is from older or newer
> > clients: if it is older client who do not have KIP-360, we'd return
> > INVALID_PRODUCER_EPOCH still; for newer client, we can
> > return UNKNOWN_PRODUCER_ID to let it seek what's the truth on
> coordinator.
> >
> I know this is a bit of reversed order. Feel free to check out my reply to
> Jason first and go back here :)
> I think the partition leader will have no change of returned code after we
> discussed that only new client should be able to retry.
>
>
> > Now since we are additionally mapping INVALID_PRODUCER_EPOCH to a
> different
> > error code now and letting clients to handle that similar
> > to UNKNOWN_PRODUCER_ID, this can be saved indeed. However, it also means
> > that 2.5.0 clients would not get benefited from this KIP, since they
> still
> > treat INVALID_PRODUCER_EPOCH as fatal. If people this that's okay then we
> > can simplify the partition leader behavior as well as not bumping up
> > producer APIs. I think I'm a bit inclined towards simplicity over
> > benefiting older clients.
> >
>
> 2.5 client should get benefits if we return UNKNOWN_PRODUCER_ID as I have
> described below.
>
>
> > Guozhang
> >
> > On Thu, Apr 16, 2020 at 5:37 PM Jason Gustafson 
> > wrote:
> >
> > > Hi Boyang,
> > >
> > > A few minor questions below:
> > >
> > > 1. You mention UNKNOWN_PRODUCER_ID in 2.a under Resilience
> Improvements.
> > I
> > > assume that should be INVALID_PRODUCER_EPOCH? I am not sure this case
> > makes
> > > sense for 2.5 clients which would view this error as fatal regardless
> of
> > > whatever the broker does. Not sure there's anything we can do about
> that.
> > > Similarly, if a newer client is talking to a 2.5 broker, it wouldn't be
> > > able to bump the epoch after a timeout because the broker would not
> know
> > to
> > > keep the last epoch. Unfortunately, I think the only improvements that
> > are
> > > possible here are newer clients talking to newer brokers, but I might
> > have
> > > missed something.
> > >
> >
> The reasoning for returning UNKNOWN_PRODUCER_ID is to trigger the retry
> logic on 2.5 client 

Re: [VOTE] KIP-584: Versioning scheme for features

2020-04-17 Thread Guozhang Wang
Thanks for the great KIP Kowshik, +1 (binding).

On Fri, Apr 17, 2020 at 11:22 AM Jun Rao  wrote:

> Hi, Kowshik,
>
> Thanks for the KIP. +1
>
> Jun
>
> On Thu, Apr 16, 2020 at 11:14 AM Kowshik Prakasam 
> wrote:
>
> > Hi all,
> >
> > I'd like to start a vote for KIP-584. The link to the KIP can be found
> > here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features
> > .
> >
> > Thanks!
> >
> >
> > Cheers,
> > Kowshik
> >
>


-- 
-- Guozhang


[VOTE] KIP-577: Allow HTTP Response Headers Configured for Kafka Connect

2020-04-17 Thread Zhiguo Huang
Thanks to everyone for their input. I've incorporated the changes, and I
think this is ready for voting.

To summarize, the KIP simply proposes to add a feature which allows HTTP
response headers configured for Kafka Connect.The KIP can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect

Thanks!

Jeff.


Re: [VOTE] KIP-584: Versioning scheme for features

2020-04-17 Thread Jun Rao
Hi, Kowshik,

Thanks for the KIP. +1

Jun

On Thu, Apr 16, 2020 at 11:14 AM Kowshik Prakasam 
wrote:

> Hi all,
>
> I'd like to start a vote for KIP-584. The link to the KIP can be found
> here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features
> .
>
> Thanks!
>
>
> Cheers,
> Kowshik
>


[jira] [Created] (KAFKA-9887) failed-task-count JMX metric not updated if task fails during startup

2020-04-17 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-9887:


 Summary: failed-task-count JMX metric not updated if task fails 
during startup
 Key: KAFKA-9887
 URL: https://issues.apache.org/jira/browse/KAFKA-9887
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.4.1, 2.5.0, 2.4.0
Reporter: Chris Egerton


If a task fails on startup (specifically, during [this code 
section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L427-L468]),
 the {{failed-task-count}} JMX metric is not updated to reflect the task 
failure, even though the status endpoints in the REST API do report the task as 
failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk11 #1362

2020-04-17 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9819: Fix flaky test in StoreChangelogReaderTest (#8488)

[github] MINOR: Switch order of sections on tumbling and hopping windows in


--
[...truncated 3.04 MB...]

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairsWithCustomTimestampAndIncrementsAndNotAdvanceTime
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairsWithCustomTimestampAndIncrementsAndNotAdvanceTime
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithTimestampWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithTimestampWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKeyAndDefaultTimestamp
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKeyAndDefaultTimestamp
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairsWithTimestampAndIncrements STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairsWithTimestampAndIncrements PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED


[jira] [Created] (KAFKA-9886) Validate segment range before reading in `Log.read`

2020-04-17 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9886:
--

 Summary: Validate segment range before reading in `Log.read`
 Key: KAFKA-9886
 URL: https://issues.apache.org/jira/browse/KAFKA-9886
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Log.read uses the following logic to set the upper limit on a segment read.

{code}
val maxPosition = {
   // Use the max offset position if it is on this segment; otherwise, the 
segment size is the limit.
  if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
 maxOffsetMetadata.relativePositionInSegment
  } else {
segment.size
  }
}
{code}

In the else branch, the expectation is that 
`maxOffsetMetadata.segmentBaseOffset > segment.baseOffset`. In KAFKA-9838, we 
found a bug where this assumption failed  which led to reads above the high 
watermark. We should validate the expectation explicitly so that we don't leave 
the door open for similar bugs in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9819) Flaky Test StoreChangelogReaderTest#shouldNotThrowOnUnknownRevokedPartition[0]

2020-04-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9819.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Flaky Test StoreChangelogReaderTest#shouldNotThrowOnUnknownRevokedPartition[0]
> --
>
> Key: KAFKA-9819
> URL: https://issues.apache.org/jira/browse/KAFKA-9819
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> h3. Error Message
> java.lang.AssertionError: expected:<[test-reader Changelog partition 
> unknown-0 could not be found, it could be already cleaned up during the 
> handlingof task corruption and never restore again]> but was:<[[AdminClient 
> clientId=adminclient-91] Connection to node -1 (localhost/127.0.0.1:8080) 
> could not be established. Broker may not be available., test-reader Changelog 
> partition unknown-0 could not be found, it could be already cleaned up during 
> the handlingof task corruption and never restore again]>
> h3. Stacktrace
> java.lang.AssertionError: expected:<[test-reader Changelog partition 
> unknown-0 could not be found, it could be already cleaned up during the 
> handlingof task corruption and never restore again]> but was:<[[AdminClient 
> clientId=adminclient-91] Connection to node -1 (localhost/127.0.0.1:8080) 
> could not be established. Broker may not be available., test-reader Changelog 
> partition unknown-0 could not be found, it could be already cleaned up during 
> the handlingof task corruption and never restore again]>



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9885) Evict last members of a group when the maximum allowed is reached

2020-04-17 Thread David Jacot (Jira)
David Jacot created KAFKA-9885:
--

 Summary: Evict last members of a group when the maximum allowed is 
reached
 Key: KAFKA-9885
 URL: https://issues.apache.org/jira/browse/KAFKA-9885
 Project: Kafka
  Issue Type: Bug
Reporter: David Jacot
Assignee: David Jacot


While analysing https://issues.apache.org/jira/browse/KAFKA-7965, we found that 
multiple members of a group can be evicted from a group if the leader of the 
consumer offset partition changes before the group is persisted. This happens 
because the current evection logic always evict the first member which rejoins 
the group.

We would like to change the evection logic so that the last members to rejoin 
the group are kicked out instead.

Here is an example of what happens when the leader changes:
{noformat}
// Group is loaded in GroupCoordinator 0
// A rebalance is triggered because the group is over capacity
[2020-04-02 11:14:33,393] INFO [GroupMetadataManager brokerId=0] Scheduling 
loading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager:66)
[2020-04-02 11:14:33,406] INFO [Consumer clientId=ConsumerTestConsumer, 
groupId=group-max-size-test] Discovered group coordinator localhost:40071 (id: 
2147483647 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:794)
[2020-04-02 11:14:33,409] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=1, rebalanceTimeoutMs=6, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,410] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=1, rebalanceTimeoutMs=6, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,412] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=1, rebalanceTimeoutMs=6, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Loading group metadata for 
group-max-size-test with generation 1 
(kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Preparing to rebalance 
group group-max-size-test in state PreparingRebalance with old generation 1 
(__consumer_offsets-0) (reason: Freshly-loaded group is over capacity 
(GroupConfig(10,180,2,0).groupMaxSize). Rebalacing in order to give a 
chance for consumers to commit offsets) 
(kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:33,431] INFO [GroupMetadataManager brokerId=0] Finished 
loading offsets and group metadata from __consumer_offsets-0 in 28 
milliseconds, of which 0 milliseconds was spent in the scheduler. 
(kafka.coordinator.group.GroupMetadataManager:66)

// A first consumer is kicked out of the group while trying to re-join
[2020-04-02 11:14:33,449] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=group-max-size-test] Attempt to join group failed due to fatal error: 
The consumer group has reached its max size. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
[2020-04-02 11:14:33,451] ERROR [daemon-consumer-assignment-2]: Error due to 
(kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group 
group-max-size-test already has the configured maximum number of members.
[2020-04-02 11:14:33,451] INFO [daemon-consumer-assignment-2]: Stopped 
(kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66)

// Before the rebalance is completed, a preferred replica leader election kicks 
in and move the leader from 0 to 1
[2020-04-02 11:14:34,155] INFO [Controller id=0] Processing automatic preferred 
replica leader election (kafka.controller.KafkaController:66)
[2020-04-02 11:14:34,169] INFO [Controller id=0] Starting replica leader 
election (PREFERRED) for partitions 
group-max-size-test-0,group-max-size-test-3,__consumer_offsets-0 triggered by 
AutoTriggered (kafka.controller.KafkaController:66)

// The group 

Re: [VOTE] KIP-551: Expose disk read and write metrics

2020-04-17 Thread Alexandre Dupriez
Hello all,

Apologies for the excavation. +1 (non-binding).

One of the advantage I see having the metrics exposed by Kafka, as
opposed to a system monitoring tool, is that it can be refined to
provide a segregated view on the consumption of disk resources, for
instance factored by topic or principal.

Another metrics I think could be useful would be the latency of file
read/write on Kafka's ingest or fetch data paths.
The motivation would be to help correlate the degradation of
applicative throughput with that of the file system read or write,
which happens for instance when the page cache becomes saturated and
synchronous page frame reclamation starts slowing down write(2)
syscalls.

Do you think that could be a useful addition to the metrics generated
by Apache Kafka?

Many thanks,
Alexandre

Le jeu. 16 janv. 2020 à 22:43, Colin McCabe  a écrit :
>
> With binding +1 votes from Gwen Shapira, Manikumar Reddy, Mickael Maison, and 
>   David Arthur, and non-binding +1 votes from Jose Garcia Sancio, M. Manna, 
> Lucas Bradstreet, Mitchell, and Sönke Liebau, the vote passes.
>
> Thanks, all!
> Colin
>
>
> On Wed, Jan 15, 2020, at 09:25, Colin McCabe wrote:
> > Thanks, all.  I will close the vote later today.
> >
> > best,
> > Colin
> >
> >
> > On Wed, Jan 15, 2020, at 01:48, Mickael Maison wrote:
> > > +1 (binding)
> > > Thanks for the KIP
> > >
> > > On Tue, Jan 14, 2020 at 6:50 PM David Arthur  wrote:
> > > >
> > > > +1 binding
> > > >
> > > > This will be very nice to have. Thanks for the KIP, Colin.
> > > >
> > > > -David
> > > >
> > > > On Tue, Jan 14, 2020 at 11:39 AM Sönke Liebau
> > > >  wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Thanks for creating this!
> > > > >
> > > > > On Tue, 14 Jan 2020 at 17:36, Mitchell  wrote:
> > > > >
> > > > > > +1 (non-binding)!
> > > > > > Very useful kip.
> > > > > > -mitch
> > > > > >
> > > > > > On Tue, Jan 14, 2020 at 10:26 AM Manikumar 
> > > > > > 
> > > > > > wrote:
> > > > > > >
> > > > > > > +1 (binding).
> > > > > > >
> > > > > > > Thanks for the KIP.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Sun, Jan 12, 2020 at 1:23 AM Lucas Bradstreet 
> > > > > > > 
> > > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non binding)
> > > > > > > >
> > > > > > > > On Sat, 11 Jan 2020 at 02:32, M. Manna  
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Colin,
> > > > > > > > >
> > > > > > > > > +1 - Really useful for folks managing a cluster by themselves.
> > > > > > > > >
> > > > > > > > > M. MAnna
> > > > > > > > >
> > > > > > > > > On Fri, 10 Jan 2020 at 22:35, Jose Garcia Sancio <
> > > > > > jsan...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > +1, LGTM.
> > > > > > > > > >
> > > > > > > > > > On Fri, Jan 10, 2020 at 2:19 PM Gwen Shapira 
> > > > > > > > > > 
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > +1, thanks for driving this
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jan 10, 2020 at 2:17 PM Colin McCabe <
> > > > > cmcc...@apache.org
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi all,
> > > > > > > > > > > >
> > > > > > > > > > > > I'd like to start the vote on KIP-551: Expose disk read 
> > > > > > > > > > > > and
> > > > > > write
> > > > > > > > > > > metrics.
> > > > > > > > > > > >
> > > > > > > > > > > > KIP:  https://cwiki.apache.org/confluence/x/sotSC
> > > > > > > > > > > >
> > > > > > > > > > > > Discussion thread:
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > > https://lists.apache.org/thread.html/cfaac4426455406abe890464a7f4ae23a5c69a39afde66fe6eb3d696%40%3Cdev.kafka.apache.org%3E
> > > > > > > > > > > >
> > > > > > > > > > > > cheers,
> > > > > > > > > > > > Colin
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -Jose
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Sönke Liebau
> > > > > Partner
> > > > > Tel. +49 179 7940878
> > > > > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
> > > > >
> > > >
> > > >
> > > > --
> > > > David Arthur
> > >
> >


[jira] [Created] (KAFKA-9884) Unable to override some client properties in Mirror maker 2.0

2020-04-17 Thread Mithun Kumar (Jira)
Mithun Kumar created KAFKA-9884:
---

 Summary: Unable to override some client properties in Mirror maker 
2.0
 Key: KAFKA-9884
 URL: https://issues.apache.org/jira/browse/KAFKA-9884
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.4.1, 2.5.0, 2.4.0
Reporter: Mithun Kumar
 Attachments: mm2.log

I have a two 3 node kafka clusters. MirrorMaker 2.0 is being run as a cluster 
with bin/connect-mirror-maker.sh mm2.properties

I am trying to disable message duplication on replication by enabling 
idempotence. I understand that EOS is marked as a future work in 
[KIP-382|https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
 however it should be possible by setting enable.idempotence = true and retries 
> 0.

The .enable.idempotence = true takes effect, however overriding 
the retries fails. I tried all 3 versions that provide MM2 2.4.0 , 2.4.1 and 
2.5.0.

My mm2.properties config :
{noformat}
name = pri_to_bkp
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
topics = test-mm-topic-3
groups = .*
clusters = pri, bkp
source.cluster.alias = pri
target.cluster.alias = bkp

sasl.mechanism = GSSAPI
sasl.kerberos.service.name = kafka
security.protocol = SASL_PLAINTEXT
sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
keyTab="/etc/security/keytabs/user.keytab" \
principal="u...@xx.xx.com";

pri.enable.idempotence = true
bkp.enable.idempotence = true
pri.retries = 2147483647
bkp.retries = 2147483647

pri.bootstrap.servers = SASL_PLAINTEXT://kafka1:9092, 
SASL_PLAINTEXT://kafka2:9092, SASL_PLAINTEXT://kafka3:9092
bkp.bootstrap.servers = SASL_PLAINTEXT://bkp-kafka1:9092, 
SASL_PLAINTEXT://bkp-kafka2:9092, SASL_PLAINTEXT://bkp-kafka3:9092
pri->bkp.enabled = true
pri->bkp.topics = "test-mm-topic-3"
{noformat}
 

The error leading to failure is:
{noformat}
[2020-04-17 15:46:26,525] ERROR [Worker clientId=connect-1, groupId=pri-mm2] 
Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:297)
org.apache.kafka.common.config.ConfigException: Must set retries to non-zero 
when using the idempotent producer.
at 
org.apache.kafka.clients.producer.ProducerConfig.maybeOverrideAcksAndRetries(ProducerConfig.java:432)
at 
org.apache.kafka.clients.producer.ProducerConfig.postProcessParsedConfig(ProducerConfig.java:400)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:110)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129)
at 
org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:481)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:270)
at 
org.apache.kafka.connect.util.KafkaBasedLog.createProducer(KafkaBasedLog.java:248)
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:129)
at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:199)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:124)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:284)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-04-17 15:46:29,089] INFO [Worker clientId=connect-1, groupId=pri-mm2] 
Herder stopped 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:636)
[2020-04-17 15:46:29,089] INFO [Worker clientId=connect-2, groupId=bkp-mm2] 
Herder stopping 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:616)

[2020-04-17 15:46:34,090] INFO [Worker clientId=connect-2, groupId=bkp-mm2] 
Herder stopped 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:636)
[2020-04-17 15:46:34,090] INFO Kafka MirrorMaker stopped. 
(org.apache.kafka.connect.mirror.MirrorMaker:191)
{noformat}
 The complete log file is attached.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)