[GitHub] [kafka] rite2nikhil commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-06 Thread GitBox


rite2nikhil commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r500743939



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -432,14 +455,22 @@ abstract class AbstractFetcherThread(name: String,
   failedPartitions.removeAll(initialFetchStates.keySet)
 
   initialFetchStates.forKeyValue { (tp, initialFetchState) =>
-// We can skip the truncation step iff the leader epoch matches the 
existing epoch
+// For IBP 2.7 onwards, we can rely on truncation based on diverging 
data returned in fetch responses.
+// For older versions, we can skip the truncation step iff the leader 
epoch matches the existing epoch
 val currentState = partitionStates.stateValue(tp)
-val updatedState = if (currentState != null && 
currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {
+val updatedState = if (initialFetchState.offset >= 0 && 
isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty) {

Review comment:
   NIT: Should this be the first check in the if () statement ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

2020-10-06 Thread GitBox


vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r500715702



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
##
@@ -34,6 +34,7 @@
  * Demonstrate the use of {@link MockProcessorContext} for testing the {@link 
Processor} in the {@link WordCountProcessorDemo}.
  */
 public class WordCountProcessorTest {
+@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437

Review comment:
   This ticket needs to go in to 2.7.0 also, but I split it out for 
reviewability.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
##
@@ -45,12 +46,19 @@ public void flush() {
 throw new UnsupportedOperationException(ERROR_MESSAGE);
 }
 
+@Deprecated
 @Override
 public void init(final ProcessorContext context,
  final StateStore root) {
 throw new UnsupportedOperationException(ERROR_MESSAGE);
 }
 
+@Override
+public void init(final StateStoreContext context,
+ final StateStore root) {
+throw new UnsupportedOperationException(ERROR_MESSAGE);

Review comment:
   There are going to be a lot of duplicated init methods. It's not great, 
but hopefully we can drop the old API before too long.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##
@@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final 
ProcessorContext context)
 return (StreamsMetricsImpl) context.metrics();
 }
 
+/**
+ * Should be removed as part of KAFKA-10217
+ */
+public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext 
context) {
+return (StreamsMetricsImpl) context.metrics();
+}
+
 public static String changelogFor(final ProcessorContext context, final 
String storeName) {
 return context instanceof InternalProcessorContext
 ? ((InternalProcessorContext) context).changelogFor(storeName)
 : 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
 }
+
+public static String changelogFor(final StateStoreContext context, final 
String storeName) {
+return context instanceof InternalProcessorContext
+? ((InternalProcessorContext) context).changelogFor(storeName)
+: 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+}
+
+public static InternalProcessorContext asInternalProcessorContext(final 
ProcessorContext context) {
+if (context instanceof InternalProcessorContext) {
+return (InternalProcessorContext) context;
+} else {
+throw new IllegalArgumentException(
+"This component requires internal features of Kafka Streams 
and must be disabled for unit tests."
+);
+}
+}

Review comment:
   I replaced a lot of casts with this checked-cast method, which also lets 
us get rid of a lot of similar cast-checking blocks, which were inconsistently 
applied.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -83,14 +85,40 @@
 this.valueSerde = valueSerde;
 }
 
+@Deprecated
 @Override
 public void init(final ProcessorContext context,
  final StateStore root) {
-this.context = context;
+this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
 taskId = context.taskId().toString();
 initStoreSerde(context);
 streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+registerMetrics();

Review comment:
   I wasn't able to extract out quite as much common code in the Metered 
implementations because they need to work regardless of whether the context is 
an InternalProcessorContext or whether it's a straight mock (for unit tests).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##
@@ -65,7 +65,11 @@
  *
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
+ * @deprecated Since 2.7.0. Callers should invoke {@link 
this#init(StateStoreContext, StateStore)} instead.
+ * Implementers may choose to implement this method for 
backward compatibility or to throw an
+ * informative exception instead.
  */
+@Deprecated

Review comment:
   Adding the deprecation tag right now lets us be sure we encountered all 
places this method appears in the codebase.

##
File path: 

[GitHub] [kafka] vvcephei opened a new pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

2020-10-06 Thread GitBox


vvcephei opened a new pull request #9388:
URL: https://github.com/apache/kafka/pull/9388


   * all wrapping stores should pass StateStoreContext init through to the same
 method on the wrapped store and not translate it to ProcessorContext init
   * base-level stores should handle StateStoreContext init so that callers 
passing
 a non-InternalProcessorContext implementation will be able to initialize 
the store
   * extra tests are added to verify the desired behavior
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Description: 
In addition to implementing the KIP, search for and resolve these todos:

{color:#008dde}TODO will be fixed in KAFKA-10437{color}

Also, add unit tests in test-utils making sure we can initialize _all_ the 
kinds of store with the MPC and MPC.getSSC.

 

  was:
In addition to implementing the KIP, search for and resolve these todos:

{color:#008dde}TODO will be fixed in KAFKA-10437{color}


> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> In addition to implementing the KIP, search for and resolve these todos:
> {color:#008dde}TODO will be fixed in KAFKA-10437{color}
> Also, add unit tests in test-utils making sure we can initialize _all_ the 
> kinds of store with the MPC and MPC.getSSC.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Description: 
In addition to implementing the KIP, search for and resolve these todos:

{color:#008dde}TODO will be fixed in KAFKA-10437{color}

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> In addition to implementing the KIP, search for and resolve these todos:
> {color:#008dde}TODO will be fixed in KAFKA-10437{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-06 Thread GitBox


ableegoldman commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r500682632



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads; 
{@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = throwable -> 
handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+for (final StreamThread thread : threads) {

Review comment:
   If we want to use the same StreamsUncaughtExceptionHandler so users 
don't have to implement two different handlers, which sounds right to me, then 
maybe we should add a parameter to the `handle` method to indicate whether it's 
a global or stream thread. Or split into a separate 
`handleStreamThreadException` and `handleGlobalThreadException` methods, or 
something like that (and possibly rename `SHUTDOWN_STREAM_THREAD` to 
`SHUTDOWN_THREAD`)
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-06 Thread GitBox


ableegoldman commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r500682039



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Throwable exception);

Review comment:
   Should we consider also passing in the thread name?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-06 Thread GitBox


ableegoldman commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r500681912



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Throwable exception);
+
+/**
+ * Enumeration that describes the response from the exception handler.
+ */
+enum StreamsUncaughtExceptionHandlerResponse {
+
+
+SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"),
+//REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"),
+SHUTDOWN_KAFKA_STREAMS_CLIENT(2, "SHUTDOWN_KAFKA_STREAMS_CLIENT"),
+SHUTDOWN_KAFKA_STREAMS_APPLICATION(3, 
"SHUTDOWN_KAFKA_STREAMS_APPLICATION");
+
+
+/** an english description of the api--this is for debugging and can 
change */
+public final String name;
+
+/** the permanent and immutable id of an API--this can't change ever */
+public final int id;
+
+StreamsUncaughtExceptionHandlerResponse(final int id, final String 
name) {

Review comment:
   Oh, sorry, I thought this was the `handle` method. Ignore me  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-06 Thread GitBox


ableegoldman commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r500681512



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads; 
{@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = throwable -> 
handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+for (final StreamThread thread : threads) {

Review comment:
   What's our plan for the global thread?  I didn't think of this during 
the KIP discussion, and sorry if it was brought up there and I just forgot 
about it. But it seems like we should still give users a non-deprecated way to 
set a handler for the global thread. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-06 Thread GitBox


ableegoldman commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r500681407



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Throwable exception);
+
+/**
+ * Enumeration that describes the response from the exception handler.
+ */
+enum StreamsUncaughtExceptionHandlerResponse {
+
+
+SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"),
+//REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"),
+SHUTDOWN_KAFKA_STREAMS_CLIENT(2, "SHUTDOWN_KAFKA_STREAMS_CLIENT"),
+SHUTDOWN_KAFKA_STREAMS_APPLICATION(3, 
"SHUTDOWN_KAFKA_STREAMS_APPLICATION");
+
+
+/** an english description of the api--this is for debugging and can 
change */
+public final String name;
+
+/** the permanent and immutable id of an API--this can't change ever */
+public final int id;
+
+StreamsUncaughtExceptionHandlerResponse(final int id, final String 
name) {

Review comment:
   What is the `id`? And what is the `name`, is that the thread name? If 
so, can we clarify this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10578) Convert KTable to a KStream using the previous value

2020-10-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10578:
---

Assignee: Javier Freire Riobó

> Convert KTable to a KStream using the previous value
> 
>
> Key: KAFKA-10578
> URL: https://issues.apache.org/jira/browse/KAFKA-10578
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Javier Freire Riobó
>Assignee: Javier Freire Riobó
>Priority: Minor
>  Labels: kip
>
> Imagine that we have an entity for which we want to emit the difference 
> between the current and the previous state. The simplest case would be that 
> the entity was an integer number and you want to emit the subtraction between 
> the current and previous values.
> For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 
> (3 - 6) is expected.
> The way to achieve this with kafka streams would be through an aggregate.
> The main problem, apart from needing more code, is that if the same event is 
> received twice at the same time and the commit time is not 0, the difference 
> is deleted and nothing is emitted.
>  KIP-675: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10578) Convert KTable to a KStream using the previous value

2020-10-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17209253#comment-17209253
 ] 

Matthias J. Sax commented on KAFKA-10578:
-

[~javier.freire] – I added you to the list of contributors and assigned the 
ticket to you. You can know also self-assign tickets.

> Convert KTable to a KStream using the previous value
> 
>
> Key: KAFKA-10578
> URL: https://issues.apache.org/jira/browse/KAFKA-10578
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Javier Freire Riobó
>Priority: Minor
>  Labels: kip
>
> Imagine that we have an entity for which we want to emit the difference 
> between the current and the previous state. The simplest case would be that 
> the entity was an integer number and you want to emit the subtraction between 
> the current and previous values.
> For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 
> (3 - 6) is expected.
> The way to achieve this with kafka streams would be through an aggregate.
> The main problem, apart from needing more code, is that if the same event is 
> received twice at the same time and the commit time is not 0, the difference 
> is deleted and nothing is emitted.
>  KIP-675: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10578) Convert KTable to a KStream using the previous value

2020-10-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10578:

Summary: Convert KTable to a KStream using the previous value  (was: 
KIP-675: Convert KTable to a KStream using the previous value)

> Convert KTable to a KStream using the previous value
> 
>
> Key: KAFKA-10578
> URL: https://issues.apache.org/jira/browse/KAFKA-10578
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Javier Freire Riobó
>Priority: Minor
>  Labels: kip
>
> Imagine that we have an entity for which we want to emit the difference 
> between the current and the previous state. The simplest case would be that 
> the entity was an integer number and you want to emit the subtraction between 
> the current and previous values.
> For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 
> (3 - 6) is expected.
> The way to achieve this with kafka streams would be through an aggregate.
> The main problem, apart from needing more code, is that if the same event is 
> received twice at the same time and the commit time is not 0, the difference 
> is deleted and nothing is emitted.
>  KIP-675: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10578) KIP-675: Convert KTable to a KStream using the previous value

2020-10-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10578:

Description: 
Imagine that we have an entity for which we want to emit the difference between 
the current and the previous state. The simplest case would be that the entity 
was an integer number and you want to emit the subtraction between the current 
and previous values.

For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 
(3 - 6) is expected.

The way to achieve this with kafka streams would be through an aggregate.

The main problem, apart from needing more code, is that if the same event is 
received twice at the same time and the commit time is not 0, the difference is 
deleted and nothing is emitted.

 KIP-675: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value]

  was:
Imagine that we have an entity for which we want to emit the difference between 
the current and the previous state. The simplest case would be that the entity 
was an integer number and you want to emit the subtraction between the current 
and previous values.

For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 
(3 - 6) is expected.

The way to achieve this with kafka streams would be through an aggregate.

The main problem, apart from needing more code, is that if the same event is 
received twice at the same time and the commit time is not 0, the difference is 
deleted and nothing is emitted.

 


> KIP-675: Convert KTable to a KStream using the previous value
> -
>
> Key: KAFKA-10578
> URL: https://issues.apache.org/jira/browse/KAFKA-10578
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Javier Freire Riobó
>Priority: Minor
>  Labels: kip
>
> Imagine that we have an entity for which we want to emit the difference 
> between the current and the previous state. The simplest case would be that 
> the entity was an integer number and you want to emit the subtraction between 
> the current and previous values.
> For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 
> (3 - 6) is expected.
> The way to achieve this with kafka streams would be through an aggregate.
> The main problem, apart from needing more code, is that if the same event is 
> received twice at the same time and the commit time is not 0, the difference 
> is deleted and nothing is emitted.
>  KIP-675: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10578) KIP-675: Convert KTable to a KStream using the previous value

2020-10-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10578:

Labels: kip  (was: )

> KIP-675: Convert KTable to a KStream using the previous value
> -
>
> Key: KAFKA-10578
> URL: https://issues.apache.org/jira/browse/KAFKA-10578
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Javier Freire Riobó
>Priority: Minor
>  Labels: kip
>
> Imagine that we have an entity for which we want to emit the difference 
> between the current and the previous state. The simplest case would be that 
> the entity was an integer number and you want to emit the subtraction between 
> the current and previous values.
> For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 
> (3 - 6) is expected.
> The way to achieve this with kafka streams would be through an aggregate.
> The main problem, apart from needing more code, is that if the same event is 
> received twice at the same time and the commit time is not 0, the difference 
> is deleted and nothing is emitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

2020-10-06 Thread GitBox


ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r500664927



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##
@@ -278,6 +326,23 @@ public void shouldFetchCorrectlyAcrossSegments() {
 assertFalse(results.hasNext());
 }
 
+@Test
+public void shouldBackwardFetchCorrectlyAcrossSegments() {
+final Windowed a1 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+final Windowed a2 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+final Windowed a3 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+cachingStore.put(a1, "1".getBytes());
+cachingStore.put(a2, "2".getBytes());
+cachingStore.flush();
+cachingStore.put(a3, "3".getBytes());

Review comment:
   Can we add a few more records that span multiple segments that don't get 
flushed as well?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##
@@ -301,6 +366,29 @@ public void shouldFetchRangeCorrectlyAcrossSegments() {
 assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);
 }
 
+@Test
+public void shouldBackwardFetchRangeCorrectlyAcrossSegments() {
+final Windowed a1 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+final Windowed aa1 = new Windowed<>(keyAA, new 
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+final Windowed a2 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+final Windowed a3 = new Windowed<>(keyA, new 
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+final Windowed aa3 = new Windowed<>(keyAA, new 
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+cachingStore.put(a1, "1".getBytes());
+cachingStore.put(aa1, "1".getBytes());
+cachingStore.put(a2, "2".getBytes());
+cachingStore.put(a3, "3".getBytes());
+cachingStore.put(aa3, "3".getBytes());
+
+final KeyValueIterator, byte[]> rangeResults =
+cachingStore.backwardFindSessions(keyA, keyAA, 0, SEGMENT_INTERVAL 
* 2);
+final Set> keys = new HashSet<>();
+while (rangeResults.hasNext()) {
+keys.add(rangeResults.next().key);
+}
+rangeResults.close();
+assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);

Review comment:
   We're losing the ordering check by comparing this as a set, let's use a 
list (or whatever) to verify the actual order

##
File path: 
streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
##
@@ -82,11 +118,15 @@ public boolean hasNext() {
 public KeyValue, V> next() {
 return it.next();
 }
-
 }
 );
 }
 
+@Override
+public KeyValueIterator, V> backwardFetch(K from, K to) {
+return null;

Review comment:
   I guess it probably doesn't matter since we presumably aren't using 
these backward methods of the ReadOnlySessionStoreStub, but it seems like it 
might result in some tricky NPEs to debug if ever someone does try to use it in 
a test. If you don't feel like implementing it I think it's fine to just throw 
UnsupportedOperationException and say that you'll have to implement this to use 
it.
   
   Or just copy the code from the forward direction and flip it 路‍♀️  Same goes 
for all the methods in here that return null

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
##
@@ -456,68 +562,88 @@ public void shouldClearNamespaceCacheOnClose() {
 assertEquals(0, cache.size());
 }
 
-@Test(expected = InvalidStateStoreException.class)
+@Test
 public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
 cachingStore.close();
-cachingStore.fetch(keyA);
+assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.fetch(keyA));
 }
 
-@Test(expected = InvalidStateStoreException.class)
+@Test
 public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() {
 cachingStore.close();
-cachingStore.findSessions(keyA, 0, Long.MAX_VALUE);
+assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.findSessions(keyA, 0, Long.MAX_VALUE));
 }
 
-@Test(expected = InvalidStateStoreException.class)
+@Test
 public void shouldThrowIfTryingToRemoveFromClosedCachingStore() {
 cachingStore.close();
-cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)));
+assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.remove(new Windowed<>(keyA, new 

[GitHub] [kafka] guozhangwang commented on pull request #8988: KAFKA-10199: Separate restore threads

2020-10-06 Thread GitBox


guozhangwang commented on pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#issuecomment-704615880


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8988: KAFKA-10199: Separate restore threads

2020-10-06 Thread GitBox


guozhangwang commented on pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#issuecomment-704615342


   System test: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4195/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

2020-10-06 Thread GitBox


ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r500658560



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##
@@ -270,25 +335,32 @@ public void close() {
 
 private CacheIteratorWrapper(final Bytes key,
  final long earliestSessionEndTime,
- final long latestSessionStartTime) {
-this(key, key, earliestSessionEndTime, latestSessionStartTime);
+ final long latestSessionStartTime,
+ final boolean forward) {
+this(key, key, earliestSessionEndTime, latestSessionStartTime, 
forward);
 }
 
 private CacheIteratorWrapper(final Bytes keyFrom,
  final Bytes keyTo,
  final long earliestSessionEndTime,
- final long latestSessionStartTime) {
+ final long latestSessionStartTime,
+ final boolean forward) {
 this.keyFrom = keyFrom;
 this.keyTo = keyTo;
 this.latestSessionStartTime = latestSessionStartTime;
 this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp);
 this.segmentInterval = cacheFunction.getSegmentInterval();
+this.forward = forward;
 
 this.currentSegmentId = 
cacheFunction.segmentId(earliestSessionEndTime);

Review comment:
   Ok I _think_ that for the reverse case, this should be initialized to 
`cacheFunction.segmentId(maxObservedTimestamp)` and `lastSegmentId` should be 
initialized to this (`segmentId(earliestSessionEndTime)`). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-06 Thread GitBox


kowshik commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-704596581


   @junrao The test failure in 
`MirrorConnectorsIntegrationTest.testReplication` does not seem related. I have 
rebased the PR now against latest AK trunk, I'd like to see if the failure 
happens again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-06 Thread Anna Povzner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner reassigned KAFKA-10024:


Assignee: David Mao  (was: Anna Povzner)

> Add dynamic configuration and enforce quota for per-IP connection rate limits
> -
>
> Key: KAFKA-10024
> URL: https://issues.apache.org/jira/browse/KAFKA-10024
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Anna Povzner
>Assignee: David Mao
>Priority: Major
>  Labels: features
>
> This JIRA is for the second part of KIP-612 – Add per-IP connection creation 
> rate limits.
> As described here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-10-06 Thread GitBox


ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-704587824


   I don't think they're fixed on trunk lol   (besides, I believe Jenkins 
merges the PR with trunk before running the tests so it's running the most 
recent code anyways). 
   
   None of the failures seem to be related to this PR so I wouldn't worry about 
it. I think this PR is ready to be merged



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgo edited a comment on pull request #2541: KAFKA-4759: Add support for subnet masks in SimpleACLAuthorizer

2020-10-06 Thread GitBox


rgo edited a comment on pull request #2541:
URL: https://github.com/apache/kafka/pull/2541#issuecomment-704547984


   I've been working on it. I'm going to open a new PR and I'll link it to this 
one.
   
   PR: #9387 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgo opened a new pull request #9387: KAFKA-4759: Acl authorizer subnet support

2020-10-06 Thread GitBox


rgo opened a new pull request #9387:
URL: https://github.com/apache/kafka/pull/9387


   Add subnet support to ACL authorizer. For IPv4 and IPv6, it supports:
   
   - IP address range
   - Subnet CIDR notation
   
   Test strategy has been simple:
   - Define ranges and set ACLs with IPs included in that range and other that 
it is not included inside the range.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgo commented on pull request #2541: KAFKA-4759: Add support for subnet masks in SimpleACLAuthorizer

2020-10-06 Thread GitBox


rgo commented on pull request #2541:
URL: https://github.com/apache/kafka/pull/2541#issuecomment-704547984


   I've been working on it. I'm going to open a new PR and I'll link it to this 
one.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10550) Update AdminClient and kafka-topics.sh to support topic IDs

2020-10-06 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10550:
---
Description: 
Change some AdminClient methods to expose and support topic IDs (describe, 
delete, return the id on create)

 

 Make changes to kafka-topics.sh --describe so a user can specify a topic name 
to describe with the --topic parameter, or alternatively the user can supply a 
topic ID with the --topic_id parameter

  was: Make changes to kafka-topics.sh --describe so a user can specify a topic 
name to describe with the --topic parameter, or alternatively the user can 
supply a topic ID with the --topic_id parameter


> Update AdminClient and kafka-topics.sh to support topic IDs
> ---
>
> Key: KAFKA-10550
> URL: https://issues.apache.org/jira/browse/KAFKA-10550
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> Change some AdminClient methods to expose and support topic IDs (describe, 
> delete, return the id on create)
>  
>  Make changes to kafka-topics.sh --describe so a user can specify a topic 
> name to describe with the --topic parameter, or alternatively the user can 
> supply a topic ID with the --topic_id parameter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10550) Update AdminClient and kafka-topics.sh to support topic IDs

2020-10-06 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10550:
---
Summary: Update AdminClient and kafka-topics.sh to support topic IDs  (was: 
Update kafka-topics.sh to support topic IDs)

> Update AdminClient and kafka-topics.sh to support topic IDs
> ---
>
> Key: KAFKA-10550
> URL: https://issues.apache.org/jira/browse/KAFKA-10550
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
>  Make changes to kafka-topics.sh --describe so a user can specify a topic 
> name to describe with the --topic parameter, or alternatively the user can 
> supply a topic ID with the --topic_id parameter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10580) Add topic ID support to Fetch request

2020-10-06 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10580:
--

 Summary: Add topic ID support to Fetch request
 Key: KAFKA-10580
 URL: https://issues.apache.org/jira/browse/KAFKA-10580
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


Prevent fetching a stale topic with topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-10-06 Thread GitBox


nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-704543777


   @ableegoldman @hachikuji - Should I rebase these changes off of trunk 
(assuming these tests are fixed on trunk) ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10547) Add topic IDs to MetadataResponse, UpdateMetadata

2020-10-06 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10547:
---
Summary: Add topic IDs to MetadataResponse, UpdateMetadata  (was: Add topic 
IDs to MetadataResponse, UpdateMetadata, and Fetch)

> Add topic IDs to MetadataResponse, UpdateMetadata
> -
>
> Key: KAFKA-10547
> URL: https://issues.apache.org/jira/browse/KAFKA-10547
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> Prevent reads from deleted topics
> Will be able to use TopicDescription to identify the topic ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] splett2 opened a new pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-06 Thread GitBox


splett2 opened a new pull request #9386:
URL: https://github.com/apache/kafka/pull/9386


   This PR implements the part of KIP-612 for adding IP throttling enforcement, 
and a ZK entity for configuring dynamic IP throttles.
   
   I will add `kafka-configs` support as well as `KafkaApi` reconfiguration 
support in a follow-up PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9385: KAFKA-9274: fix incorrect default value for `task.timeout.ms` config

2020-10-06 Thread GitBox


mjsax opened a new pull request #9385:
URL: https://github.com/apache/kafka/pull/9385


   Part of KIP-572.
   
   Also add handler method to trigger/reset the timeout on a task.
   
   Call for review @vvcephei 
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-06 Thread GitBox


mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r500574894



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
##
@@ -203,8 +204,8 @@ public void 
shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTrigg
 builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), 
"processor1");
 final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
 
-createMockTaskManager(allTasks);
 adminClient = EasyMock.createMock(AdminClient.class);
+createMockTaskManager(allTasks);

Review comment:
   We need to setup the admin mock before the TM mock now (similar 
elsewhere)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-06 Thread GitBox


mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r500574645



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
##
@@ -131,12 +130,13 @@
 
 // Make sure to complete setting up any mocks (such as TaskManager or 
AdminClient) before configuring the assignor
 private void configurePartitionAssignorWith(final Map 
props) {
+EasyMock.replay(taskManager, adminClient);
+
 final Map configMap = configProps();
 configMap.putAll(props);
 
 streamsConfig = new StreamsConfig(configMap);
 partitionAssignor.configure(configMap);
-EasyMock.replay(taskManager, adminClient);

Review comment:
   We need to setup the mocks before calling 
`partitionAssignor.configure()` now, as we call `taskManager#adminClient()` in 
this method (similar elsewhere)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-06 Thread GitBox


mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r500574127



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##
@@ -291,11 +271,7 @@ public String userEndPoint() {
 }
 }
 
-public Admin adminClient() {
-return adminClient;
-}
-
-public InternalTopicManager internalTopicManager() {
+public InternalTopicManager internalTopicManager(final Admin adminClient) {

Review comment:
   Instead of passing the `Admin` as parameter, we could call 
`taskManager#adminClient()` in the next line, however, this requires to make 
the method `public` (it's package-private atm) thus I opted to for 
parameter-passing instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-06 Thread GitBox


mjsax opened a new pull request #9384:
URL: https://github.com/apache/kafka/pull/9384


   Currently, we pass `AdminClient` and `TaskManager` into 
`StreamsPartitionAssignor` and use `TaskManager#mainConsumer()` to get access 
to the main consumer. However, TM also had a reference to `AdminClient` and 
thus we can simplify the setup by only passing the `TaskManager` reference.
   
   Call for review @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-10-06 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500569567



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   臘 got it. I misunderstood it. Let me give it a try.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-10-06 Thread GitBox


mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500568357



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   I meant add support for that format. We obviously want to keep 
supporting the existing formats





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-10-06 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500566839



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   For users running on a Connect cluster, that format is not supported 
right now:
   ```
   source.producer.some-producer-setting: 123
   ```
   
   The only supported is (unless I'm missing something):
   ```
   "producer.some-producer-setting": 123
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-10-06 Thread GitBox


mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500562684



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   I don't think the format mentioned in 
https://github.com/apache/kafka/pull/9313#discussion_r498298987 would break 
compatibility. 
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9336: MINOR: Don't publish javadocs for raft module

2020-10-06 Thread GitBox


ijuma merged pull request #9336:
URL: https://github.com/apache/kafka/pull/9336


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9336: MINOR: Don't publish javadocs for raft module

2020-10-06 Thread GitBox


ijuma commented on pull request #9336:
URL: https://github.com/apache/kafka/pull/9336#issuecomment-704521581


   Sounds good.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9336: MINOR: Don't publish javadocs for raft module

2020-10-06 Thread GitBox


hachikuji commented on pull request #9336:
URL: https://github.com/apache/kafka/pull/9336#issuecomment-704519950


   @ijuma Thanks, LGTM. I think we can consider exposing an API eventually, but 
for now I'd like to have full flexibility to change it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-10-06 Thread GitBox


dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r500544582



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -1321,25 +1106,15 @@ object GroupMetadataManager {
*/
   def readMessageKey(buffer: ByteBuffer): BaseKey = {
 val version = buffer.getShort
-val keySchema = schemaForKey(version)
-val key = keySchema.read(buffer)
-
-if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
+if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION && version <= 
OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
   // version 0 and 1 refer to offset
-  val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
-  val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
-  val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
-
-  OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, 
partition)))
-
-} else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
+  val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), version)
+  OffsetKey(version, GroupTopicPartition(key.group, new 
TopicPartition(key.topic, key.partition)))
+} else if (version >= GroupMetadataKeyData.LOWEST_SUPPORTED_VERSION && 
version <= GroupMetadataKeyData.HIGHEST_SUPPORTED_VERSION) {
   // version 2 refers to offset

Review comment:
   nit: Not related to your changes but could fix this comment? `refers to 
offset` => `refers to group metadata`?

##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -1257,60 +1070,32 @@ object GroupMetadataManager {
  assignment: Map[String, Array[Byte]],
  apiVersion: ApiVersion): Array[Byte] = {
 
-val (version, value) = {
-  if (apiVersion < KAFKA_0_10_1_IV0)
-(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-  else if (apiVersion < KAFKA_2_1_IV0)
-(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-  else if (apiVersion < KAFKA_2_3_IV0)
-(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-  else
-(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-}
-
-value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-value.set(GENERATION_KEY, groupMetadata.generationId)
-value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-value.set(LEADER_KEY, groupMetadata.leaderOrNull)
-
-if (version >= 2)
-  value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
-
-val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-  val memberStruct = value.instance(MEMBERS_KEY)
-  memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-  memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-  memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-  memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
-
-  if (version > 0)
-memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
-
-  if (version >= 3)
-memberStruct.set(GROUP_INSTANCE_ID_KEY, 
memberMetadata.groupInstanceId.orNull)
-
-  // The group is non-empty, so the current protocol must be defined
-  val protocol = groupMetadata.protocolName.orNull
-  if (protocol == null)
-throw new IllegalStateException("Attempted to write non-empty group 
metadata with no defined protocol")
-
-  val metadata = memberMetadata.metadata(protocol)
-  memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
-
-  val memberAssignment = assignment(memberMetadata.memberId)
-  assert(memberAssignment != null)
-
-  memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
-
-  memberStruct
-}
-
-value.set(MEMBERS_KEY, memberArray.toArray)
-
-val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-byteBuffer.putShort(version)
-value.writeTo(byteBuffer)
-byteBuffer.array()
+val version =
+  if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+  else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+  else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+  else 3.toShort
+
+serializeMessage(version, new GroupMetadataValue()
+  .setProtocolType(groupMetadata.protocolType.getOrElse(""))
+  .setGeneration(groupMetadata.generationId)
+  .setProtocol(groupMetadata.protocolName.orNull)
+  .setLeader(groupMetadata.leaderOrNull)
+  .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
+  .setMembers(groupMetadata.allMemberMetadata.map { memberMetadata =>
+new GroupMetadataValue.MemberMetadata()
+  .setMemberId(memberMetadata.memberId)
+  .setClientId(memberMetadata.clientId)
+  .setClientHost(memberMetadata.clientHost)
+  

[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-10-06 Thread GitBox


ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-704502487


   LGTM!
   
   Looks like the builds just failed with some unrelated flaky tests:
   ```
   Build / JDK 8 / 
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
   Build / JDK 8 / kafka.api.TransactionsTest.testBumpTransactionalEpoch
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy

2020-10-06 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10579:

Labels: flaky-test  (was: )

> Flaky test 
> connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
> 
>
> Key: KAFKA-10579
> URL: https://issues.apache.org/jira/browse/KAFKA-10579
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
>  
> {{java.lang.NullPointerException
>   at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>   at org.reflections.Store.getAllIncluding(Store.java:82)
>   at org.reflections.Store.getAll(Store.java:93)
>   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
>   at 
> org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167)
>   at 
> org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}}
> {{}}
> https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy

2020-10-06 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10579:
---

 Summary: Flaky test 
connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
 Key: KAFKA-10579
 URL: https://issues.apache.org/jira/browse/KAFKA-10579
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sophie Blee-Goldman


 

{{java.lang.NullPointerException
at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at org.reflections.Store.getAllIncluding(Store.java:82)
at org.reflections.Store.getAll(Store.java:93)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
at 
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at 
org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
at 
org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167)
at 
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}}

{{}}

https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9373: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores

2020-10-06 Thread GitBox


ableegoldman commented on pull request #9373:
URL: https://github.com/apache/kafka/pull/9373#issuecomment-704493219


   FYI this should be cherrypicked back to 2.6 once merged



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] brbrown25 commented on pull request #9057: KAFKA-10299: Implementing Kafka Connect Hash SMT to allow for hashing…

2020-10-06 Thread GitBox


brbrown25 commented on pull request #9057:
URL: https://github.com/apache/kafka/pull/9057#issuecomment-704492883


   @mimaison I've updated the kip and put out a vote request but not seeing any 
action. I've gone ahead and updated this pr to be updated with latest trunk as 
well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]

2020-10-06 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17209038#comment-17209038
 ] 

Sophie Blee-Goldman commented on KAFKA-9831:


[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9373/3/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/Build___JDK_11___shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta_/]
h3. Stacktrace

java.lang.AssertionError: Expected: <[KeyValue(1, 0), KeyValue(1, 1), 
KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 
21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 45)]> but: was <[KeyValue(1, 
0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), 
KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 
45), KeyValue(1, 55)]> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
org.apache.kafka.streams.integration.EosIntegrationTest.checkResultPerKey(EosIntegrationTest.java:281)
 at 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:480)

> Failing test: 
> EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
> --
>
> Key: KAFKA-9831
> URL: https://issues.apache.org/jira/browse/KAFKA-9831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: John Roesler
>Assignee: Matthias J. Sax
>Priority: Major
> Attachments: one.stdout.txt, two.stdout.txt
>
>
> I've seen this fail twice in a row on the same build, but with different 
> errors. Stacktraces follow; stdout is attached.
> One:
> {noformat}
> java.lang.AssertionError: Did not receive all 40 records from topic 
> singlePartitionOutputTopic within 6 ms
> Expected: is a value equal to or greater than <40>
>  but: <39> was less than <40>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at 

[GitHub] [kafka] guozhangwang commented on a change in pull request #8988: KAFKA-10199: Separate restore threads

2020-10-06 Thread GitBox


guozhangwang commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r500521516



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -65,10 +65,6 @@ public ProcessorContextImpl(final TaskId id,
 
 @Override
 public void transitionToActive(final StreamTask streamTask, final 
RecordCollector recordCollector, final ThreadCache newCache) {
-if (stateManager.taskType() != TaskType.ACTIVE) {

Review comment:
   Actually I realized that we call this function sometimes (e.g. upon 
initialization) even when the task is already active, so we cannot just reject 
and fail if it was not in standby





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-06 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r500521571



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -384,9 +390,8 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
 builder,
 threadId,
 logContext,
-assignmentErrorCode,

Review comment:
   this diff is a bit off. The assignmentErrorCode was added in this PR. I 
didn't change the order of the params for no reason...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-06 Thread GitBox


guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-704474872


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang closed pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-06 Thread GitBox


guozhangwang closed pull request #9020:
URL: https://github.com/apache/kafka/pull/9020


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-06 Thread GitBox


guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-704462953


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9321: KAFKA-9929: fix: add missing default implementations

2020-10-06 Thread GitBox


guozhangwang merged pull request #9321:
URL: https://github.com/apache/kafka/pull/9321


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9321: KAFKA-9929: fix: add missing default implementations

2020-10-06 Thread GitBox


guozhangwang commented on pull request #9321:
URL: https://github.com/apache/kafka/pull/9321#issuecomment-704461565


   LGTM!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-06 Thread GitBox


guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-704460681


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

2020-10-06 Thread GitBox


rhauch merged pull request #8910:
URL: https://github.com/apache/kafka/pull/8910


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-06 Thread GitBox


guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-704460234


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10338) Support PEM format for SSL certificates and private key

2020-10-06 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-10338.

Fix Version/s: 2.7.0
 Reviewer: Manikumar
   Resolution: Fixed

> Support PEM format for SSL certificates and private key
> ---
>
> Key: KAFKA-10338
> URL: https://issues.apache.org/jira/browse/KAFKA-10338
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> We currently support only file-based JKS/PKCS12 format for SSL key stores and 
> trust stores. It will be good to add support for PEM as configuration values 
> that fits better with config externalization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2020-10-06 Thread GitBox


rajinisivaram merged pull request #9345:
URL: https://github.com/apache/kafka/pull/9345


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2020-10-06 Thread GitBox


rajinisivaram commented on pull request #9345:
URL: https://github.com/apache/kafka/pull/9345#issuecomment-704456290


   @omkreddy Thanks for the review, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2020-10-06 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17209006#comment-17209006
 ] 

Sophie Blee-Goldman commented on KAFKA-5998:


Yeah I think both of those (running with shared state directory and using /tmp) 
can each lead to this exception. For testing on your local machine you should 
just configure each of the instances to use a different state dir (preferably 
something other than /tmp, but that is up to you). See [the 
docs|https://kafka.apache.org/26/documentation/streams/developer-guide/config-streams.html#state-dir]
 for this config: 

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.2.2, 2.4.0, 2.3.1
>
> Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> 

[jira] [Commented] (KAFKA-10520) InitProducerId may be blocked if least loaded node is not ready to send

2020-10-06 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208995#comment-17208995
 ] 

Sophie Blee-Goldman commented on KAFKA-10520:
-

Hey [~rsivaram], is this something we can get fixed for 2.7? I'm just asking 
because the freeze deadlines are approaching, and this seems like it might be a 
simple fix for a pretty much fatal error (although workarounds do exist)

> InitProducerId may be blocked if least loaded node is not ready to send
> ---
>
> Key: KAFKA-10520
> URL: https://issues.apache.org/jira/browse/KAFKA-10520
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> From the logs of a failing producer that shows InitProducerId timing out 
> after request timeout, it looks like we don't poll while waiting for 
> transactional producer to be initialized and FindCoordinator request cannot 
> be sent. The producer configuration used one bootstrap server and 
> `max.in.flight.requests.per.connection=1`. The failing sequence:
>  # Producer sends MetadataRequest to least loaded node (bootstrap server)
>  # Producer is ready to send InitProducerId, needs to find transaction 
> coordinator
>  # Producer creates FindCoordinator request, but the only node known is the 
> bootstrap server. Producer cannot send to this node since there is already 
> the Metadata request in flight and max.inflight is 1.
>  # Producer waits without polling, so Metadata response is not processed. 
> InitProducerId times out eventually.
>   
>  
>  We need to update the condition used to determine whether Sender should 
> poll() to fix this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] piotrrzysko commented on pull request #9371: KAFKA-10510: Validate replication factor consistency on reassignment

2020-10-06 Thread GitBox


piotrrzysko commented on pull request #9371:
URL: https://github.com/apache/kafka/pull/9371#issuecomment-704430920


   Thanks for the review, @stanislavkozlovski. If I understand your point 
clearly, you are wondering whether this validation will block changing the 
replication factor for all partitions (replication factor of a topic). Right? 
It will not if all requested reassignments in the batch have the same number of 
target replicas for a given topic. Please take look at the test 
`ReassignPartitionsIntegrationTest::testReassignmentFailOnInconsistentReplicationFactorBetweenPartitions`
 - topic `bar`. It is a simple case because the topic has only one partition, 
but it shows that changing the replication factor from 3 to 4 is possible. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10527) Voters should always initialize as followers

2020-10-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10527.
-
Resolution: Fixed

> Voters should always initialize as followers
> 
>
> Key: KAFKA-10527
> URL: https://issues.apache.org/jira/browse/KAFKA-10527
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> The current state initialization logic preserves whatever state the broker 
> was in when it was shutdown. In particular, if the node was previously a 
> leader, it will remain a leader. This can be dangerous if we want to consider 
> optimizations such as in KAFKA-10526 since the leader might lose unflushed 
> data following the restart. It would be safer to always initialize as a 
> follower so that a leader's tenure never crosses process restarts. This helps 
> to guarantee the uniqueness of the (offset, epoch) tuple which the 
> replication protocol depends on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9378: MINOR: ACLs for secured cluster system tests

2020-10-06 Thread GitBox


rondagostino commented on a change in pull request #9378:
URL: https://github.com/apache/kafka/pull/9378#discussion_r500465547



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -451,7 +451,8 @@ def _kafka_topics_cmd(self, node, force_use_zk_connection):
 set. If Admin client is not going to be used, don't set the 
environment variable.
 """
 kafka_topic_script = self.path.script("kafka-topics.sh", node)
-skip_security_settings = force_use_zk_connection or not 
self.all_nodes_topic_command_supports_bootstrap_server()
+skip_security_settings = force_use_zk_connection or not 
self.all_nodes_topic_command_supports_bootstrap_server() \
+ or self.interbroker_security_protocol == 
SecurityConfig.PLAINTEXT

Review comment:
   Yes, good point, I think it would be a helpful to consolidate the 
assumptions into one method (or a couple or a few, at most).  I'll see what I 
can do about refactoring those assumptions out and reusing the logic instead of 
having it sprinkled around so much as it is now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-10-06 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500464804



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   @mimaison just a friendly ping.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9378: MINOR: ACLs for secured cluster system tests

2020-10-06 Thread GitBox


rondagostino commented on a change in pull request #9378:
URL: https://github.com/apache/kafka/pull/9378#discussion_r500464258



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -983,7 +988,10 @@ def _topic_command_connect_setting(self, node, 
force_use_zk_connection):
 bootstrap server, otherwise returns zookeeper connection string.
 """
 if not force_use_zk_connection and 
self.all_nodes_topic_command_supports_bootstrap_server():
-connection_setting = "--bootstrap-server %s" % 
(self.bootstrap_servers(self.security_protocol))
+if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+connection_setting = "--bootstrap-server %s" % 
(self.bootstrap_servers(self.interbroker_security_protocol))
+else:
+connection_setting = "--bootstrap-server %s" % 
(self.bootstrap_servers(self.security_protocol))

Review comment:
   This code is identifying the port to contact when we are using 
`--bootstrap-server` instead of `--zookeeper`, and we use the port associated 
with `PLAINTEXT` rather than `SASL_{PLAINTEXT,SSL}` or `SSL` if `PLAINTEXT` is 
in use.  The assumption is that if `PLAINTEXT` is in use it will be the 
inter-broker security protocol rather than (or in addition to) the client 
security protocol, which is the case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9348: KAFKA-10527; Voters should not reinitialize as leader in same epoch

2020-10-06 Thread GitBox


hachikuji merged pull request #9348:
URL: https://github.com/apache/kafka/pull/9348


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9378: MINOR: ACLs for secured cluster system tests

2020-10-06 Thread GitBox


rondagostino commented on a change in pull request #9378:
URL: https://github.com/apache/kafka/pull/9378#discussion_r500460487



##
File path: tests/kafkatest/services/security/kafka_acls.py
##
@@ -93,11 +97,13 @@ def add_cluster_acl(self, kafka, principal, 
force_use_zk_connection=False):
 
 force_use_zk_connection = force_use_zk_connection or not 
kafka.all_nodes_acl_command_supports_bootstrap_server()
 
-cmd = "%(cmd_prefix)s --add --cluster --operation=ClusterAction 
--allow-principal=%(principal)s" % {
-'cmd_prefix': self._acl_cmd_prefix(kafka, node, 
force_use_zk_connection),
-'principal': principal
-}
-kafka.run_cli_tool(node, cmd)
+for operation in ['ClusterAction', 'Alter', 'Create']:

Review comment:
   Yes, Alter is needed to create user SCRAM credentials, and Create is 
needed to create topics.  When we start up a cluster we create the 
`__consumer_offsets` topic and a `test_topic` (typically).  If the test is 
using SCRAM we also create the SCRAM credentials at this point.  We now use 
`--bootstrap-server` instead of `--zookeeper` for these CLI operations, and 
without these ACLs a system test will not be able to perform these necessary 
actions if security is enabled.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-06 Thread GitBox


lct45 commented on pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#issuecomment-704410260


   @ableegoldman ready for initial review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9378: MINOR: ACLs for secured cluster system tests

2020-10-06 Thread GitBox


rajinisivaram commented on a change in pull request #9378:
URL: https://github.com/apache/kafka/pull/9378#discussion_r500429377



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -451,7 +451,8 @@ def _kafka_topics_cmd(self, node, force_use_zk_connection):
 set. If Admin client is not going to be used, don't set the 
environment variable.
 """
 kafka_topic_script = self.path.script("kafka-topics.sh", node)
-skip_security_settings = force_use_zk_connection or not 
self.all_nodes_topic_command_supports_bootstrap_server()
+skip_security_settings = force_use_zk_connection or not 
self.all_nodes_topic_command_supports_bootstrap_server() \
+ or self.interbroker_security_protocol == 
SecurityConfig.PLAINTEXT

Review comment:
   should we add a method to KafkaService or SecurityConfig that tells you 
which listener to use for tools? Using inter-broker here looks a bit odd, even 
though it becomes clear when you look at the other changes.

##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -983,7 +988,10 @@ def _topic_command_connect_setting(self, node, 
force_use_zk_connection):
 bootstrap server, otherwise returns zookeeper connection string.
 """
 if not force_use_zk_connection and 
self.all_nodes_topic_command_supports_bootstrap_server():
-connection_setting = "--bootstrap-server %s" % 
(self.bootstrap_servers(self.security_protocol))
+if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+connection_setting = "--bootstrap-server %s" % 
(self.bootstrap_servers(self.interbroker_security_protocol))
+else:
+connection_setting = "--bootstrap-server %s" % 
(self.bootstrap_servers(self.security_protocol))

Review comment:
   Is this assuming that `else` section always means PLAINTEXT since there 
are no security configs here?

##
File path: tests/kafkatest/services/security/kafka_acls.py
##
@@ -93,11 +97,13 @@ def add_cluster_acl(self, kafka, principal, 
force_use_zk_connection=False):
 
 force_use_zk_connection = force_use_zk_connection or not 
kafka.all_nodes_acl_command_supports_bootstrap_server()
 
-cmd = "%(cmd_prefix)s --add --cluster --operation=ClusterAction 
--allow-principal=%(principal)s" % {
-'cmd_prefix': self._acl_cmd_prefix(kafka, node, 
force_use_zk_connection),
-'principal': principal
-}
-kafka.run_cli_tool(node, cmd)
+for operation in ['ClusterAction', 'Alter', 'Create']:

Review comment:
   we are adding more ACLs than we had before. Are they necessary - it is 
better to limit ACLs unless they are necessary for the tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-06 Thread GitBox


junrao commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-704393633


   @abbccdda : Any more comments from you?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9358: MINOR: Refactor unit tests around RocksDBConfigSetter

2020-10-06 Thread GitBox


guozhangwang commented on pull request #9358:
URL: https://github.com/apache/kafka/pull/9358#issuecomment-704386882


   Thanks @abbccdda for the reviews!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9358: MINOR: Refactor unit tests around RocksDBConfigSetter

2020-10-06 Thread GitBox


guozhangwang merged pull request #9358:
URL: https://github.com/apache/kafka/pull/9358


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10578) KIP-675: Convert KTable to a KStream using the previous value

2020-10-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-10578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Javier Freire Riobó updated KAFKA-10578:

Issue Type: Improvement  (was: Wish)

> KIP-675: Convert KTable to a KStream using the previous value
> -
>
> Key: KAFKA-10578
> URL: https://issues.apache.org/jira/browse/KAFKA-10578
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Javier Freire Riobó
>Priority: Minor
>
> Imagine that we have an entity for which we want to emit the difference 
> between the current and the previous state. The simplest case would be that 
> the entity was an integer number and you want to emit the subtraction between 
> the current and previous values.
> For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 
> (3 - 6) is expected.
> The way to achieve this with kafka streams would be through an aggregate.
> The main problem, apart from needing more code, is that if the same event is 
> received twice at the same time and the commit time is not 0, the difference 
> is deleted and nothing is emitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lct45 opened a new pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-06 Thread GitBox


lct45 opened a new pull request #9383:
URL: https://github.com/apache/kafka/pull/9383


   Add data to subscriptionUserData to make sure that it's different each time 
a consumer rejoins
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-06 Thread GitBox


ning2008wisc commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-704353923


   @mimaison I think this pr is ready for initial review. Please take your time 
when available. Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-6733) Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-06 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-6733:
---
Fix Version/s: 2.7.0

> Support of printing additional ConsumerRecord fields in 
> DefaultMessageFormatter
> ---
>
> Key: KAFKA-6733
> URL: https://issues.apache.org/jira/browse/KAFKA-6733
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 2.7.0
>Reporter: Mateusz Zakarczemny
>Assignee: Badai Aqrandista
>Priority: Minor
> Fix For: 2.7.0
>
>
> It would be useful to have possibility of printing headers, partition and 
> offset in ConsoleConsumer. Especially support of headers seems to be missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6733) Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-06 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-6733:
---
Affects Version/s: 2.7.0

> Support of printing additional ConsumerRecord fields in 
> DefaultMessageFormatter
> ---
>
> Key: KAFKA-6733
> URL: https://issues.apache.org/jira/browse/KAFKA-6733
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 2.7.0
>Reporter: Mateusz Zakarczemny
>Assignee: Badai Aqrandista
>Priority: Minor
>
> It would be useful to have possibility of printing headers, partition and 
> offset in ConsoleConsumer. Especially support of headers seems to be missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-06 Thread GitBox


bbejeck commented on pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#issuecomment-704347684


   Java 8 failed with 
`kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault`
   Java 11 failed with 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta`
   
   Java 15 passed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] coded9 commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-10-06 Thread GitBox


coded9 commented on pull request #8575:
URL: https://github.com/apache/kafka/pull/8575#issuecomment-704328653


   Any update  or possible workaround ? @rhauch 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-10-06 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r500361487



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -997,174 +996,7 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-new Field("topic", STRING),
-new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64),
-new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-new Field("offset", INT64),
-new Field("leader_epoch", INT32),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  

[GitHub] [kafka] rajinisivaram commented on pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2020-10-06 Thread GitBox


rajinisivaram commented on pull request #9345:
URL: https://github.com/apache/kafka/pull/9345#issuecomment-704327619


   @omkreddy Thanks for the review, have addressed the comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2020-10-06 Thread GitBox


rajinisivaram commented on a change in pull request #9345:
URL: https://github.com/apache/kafka/pull/9345#discussion_r500360036



##
File path: 
clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
##
@@ -167,17 +181,12 @@ public void testValidEndpointIdentificationSanIp() throws 
Exception {
 @Test
 public void testValidEndpointIdentificationCN() throws Exception {
 String node = "0";

Review comment:
   fixed these and few other warnings





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2020-10-06 Thread GitBox


rajinisivaram commented on a change in pull request #9345:
URL: https://github.com/apache/kafka/pull/9345#discussion_r500359056



##
File path: clients/src/test/java/org/apache/kafka/common/network/CertStores.java
##
@@ -54,13 +60,30 @@ public CertStores(boolean server, String commonName, 
InetAddress hostAddress) th
 }
 
 private CertStores(boolean server, String commonName, 
TestSslUtils.CertificateBuilder certBuilder) throws Exception {
+this(server, commonName, "RSA", certBuilder, false);
+}
+
+private CertStores(boolean server, String commonName, String keyAlgorithm, 
TestSslUtils.CertificateBuilder certBuilder, boolean usePem) throws Exception {
 String name = server ? "server" : "client";
 Mode mode = server ? Mode.SERVER : Mode.CLIENT;
-File truststoreFile = File.createTempFile(name + "TS", ".jks");
-sslConfig = TestSslUtils.createSslConfig(!server, true, mode, 
truststoreFile, name, commonName, certBuilder);
+File truststoreFile = usePem ? null : File.createTempFile(name + "TS", 
".jks");
+sslConfig = new SslConfigsBuilder(mode)
+.useClientCert(!server)
+.certAlias(name)
+.cn(commonName)
+.createNewTrustStore(truststoreFile)
+.certBuilder(certBuilder)
+.algorithm(keyAlgorithm)
+.usePem(usePem)
+.build();
 }
 
+
 public Map getTrustingConfig(CertStores truststoreConfig) {
+return getTrustingConfig(truststoreConfig, false);
+}
+
+public Map getTrustingConfig(CertStores truststoreConfig, 
boolean usePemCerts) {

Review comment:
   removed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-06 Thread GitBox


bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-704322766


   Thanks for the contribution @badaiaqrandista!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-06 Thread GitBox


bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-704322552


   Merged #9099 into trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-06 Thread GitBox


bbejeck merged pull request #9099:
URL: https://github.com/apache/kafka/pull/9099


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-10-06 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r500352497



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -1257,60 +1059,31 @@ object GroupMetadataManager {
  assignment: Map[String, Array[Byte]],
  apiVersion: ApiVersion): Array[Byte] = {
 
-val (version, value) = {
-  if (apiVersion < KAFKA_0_10_1_IV0)
-(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-  else if (apiVersion < KAFKA_2_1_IV0)
-(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-  else if (apiVersion < KAFKA_2_3_IV0)
-(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-  else
-(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-}
-
-value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-value.set(GENERATION_KEY, groupMetadata.generationId)
-value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-value.set(LEADER_KEY, groupMetadata.leaderOrNull)
-
-if (version >= 2)
-  value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
-
-val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-  val memberStruct = value.instance(MEMBERS_KEY)
-  memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-  memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-  memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-  memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
-
-  if (version > 0)
-memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
-
-  if (version >= 3)
-memberStruct.set(GROUP_INSTANCE_ID_KEY, 
memberMetadata.groupInstanceId.orNull)
-
-  // The group is non-empty, so the current protocol must be defined
-  val protocol = groupMetadata.protocolName.orNull
-  if (protocol == null)
-throw new IllegalStateException("Attempted to write non-empty group 
metadata with no defined protocol")
-
-  val metadata = memberMetadata.metadata(protocol)
-  memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
-
-  val memberAssignment = assignment(memberMetadata.memberId)
-  assert(memberAssignment != null)
-
-  memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
-
-  memberStruct
-}
-
-value.set(MEMBERS_KEY, memberArray.toArray)
-
-val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-byteBuffer.putShort(version)
-value.writeTo(byteBuffer)
-byteBuffer.array()
+val version =
+  if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+  else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+  else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+  else 3.toShort
+
+serializeMessage(version, new GroupMetadataValue()
+  .setProtocolType(groupMetadata.protocolType.getOrElse(""))
+  .setGeneration(groupMetadata.generationId)
+  .setProtocol(groupMetadata.protocolName.orNull)
+  .setLeader(groupMetadata.leaderOrNull)
+  .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
+  .setMembers(groupMetadata.allMemberMetadata.map(memberMetadata =>
+new GroupMetadataValue.MemberMetadata()
+  .setMemberId(memberMetadata.memberId)
+  .setClientId(memberMetadata.clientId)
+  .setClientHost(memberMetadata.clientHost)
+  .setSessionTimeout(memberMetadata.sessionTimeoutMs)
+  .setRebalanceTimeout(memberMetadata.rebalanceTimeoutMs)
+  .setGroupInstanceId(memberMetadata.groupInstanceId.orNull)
+  
.setSubscription(groupMetadata.protocolName.map(memberMetadata.metadata)
+.getOrElse(throw new IllegalStateException("The group is non-empty 
so the current protocol must be defined")))

Review comment:
   ok. keep previous error message





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-10-06 Thread GitBox


dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r500293553



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -1257,60 +1059,31 @@ object GroupMetadataManager {
  assignment: Map[String, Array[Byte]],
  apiVersion: ApiVersion): Array[Byte] = {
 
-val (version, value) = {
-  if (apiVersion < KAFKA_0_10_1_IV0)
-(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-  else if (apiVersion < KAFKA_2_1_IV0)
-(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-  else if (apiVersion < KAFKA_2_3_IV0)
-(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-  else
-(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-}
-
-value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-value.set(GENERATION_KEY, groupMetadata.generationId)
-value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-value.set(LEADER_KEY, groupMetadata.leaderOrNull)
-
-if (version >= 2)
-  value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
-
-val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-  val memberStruct = value.instance(MEMBERS_KEY)
-  memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-  memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-  memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-  memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
-
-  if (version > 0)
-memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
-
-  if (version >= 3)
-memberStruct.set(GROUP_INSTANCE_ID_KEY, 
memberMetadata.groupInstanceId.orNull)
-
-  // The group is non-empty, so the current protocol must be defined
-  val protocol = groupMetadata.protocolName.orNull
-  if (protocol == null)
-throw new IllegalStateException("Attempted to write non-empty group 
metadata with no defined protocol")
-
-  val metadata = memberMetadata.metadata(protocol)
-  memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
-
-  val memberAssignment = assignment(memberMetadata.memberId)
-  assert(memberAssignment != null)
-
-  memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
-
-  memberStruct
-}
-
-value.set(MEMBERS_KEY, memberArray.toArray)
-
-val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-byteBuffer.putShort(version)
-value.writeTo(byteBuffer)
-byteBuffer.array()
+val version =
+  if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+  else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+  else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+  else 3.toShort
+
+serializeMessage(version, new GroupMetadataValue()
+  .setProtocolType(groupMetadata.protocolType.getOrElse(""))
+  .setGeneration(groupMetadata.generationId)
+  .setProtocol(groupMetadata.protocolName.orNull)
+  .setLeader(groupMetadata.leaderOrNull)
+  .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
+  .setMembers(groupMetadata.allMemberMetadata.map(memberMetadata =>

Review comment:
   nit: We tend to use curly braces when the lambda does not fit on the 
same line.

##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -1257,60 +1059,31 @@ object GroupMetadataManager {
  assignment: Map[String, Array[Byte]],
  apiVersion: ApiVersion): Array[Byte] = {
 
-val (version, value) = {
-  if (apiVersion < KAFKA_0_10_1_IV0)
-(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-  else if (apiVersion < KAFKA_2_1_IV0)
-(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-  else if (apiVersion < KAFKA_2_3_IV0)
-(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-  else
-(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-}
-
-value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-value.set(GENERATION_KEY, groupMetadata.generationId)
-value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-value.set(LEADER_KEY, groupMetadata.leaderOrNull)
-
-if (version >= 2)
-  value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
-
-val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-  val memberStruct = value.instance(MEMBERS_KEY)
-  memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-  memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-  memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-  memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
-
-  if (version > 0)
-

[GitHub] [kafka] kkonstantine merged pull request #9379: MINOR: Annotate test BlockingConnectorTest as integration test

2020-10-06 Thread GitBox


kkonstantine merged pull request #9379:
URL: https://github.com/apache/kafka/pull/9379


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #9379: MINOR: Annotate test BlockingConnectorTest as integration test

2020-10-06 Thread GitBox


kkonstantine commented on pull request #9379:
URL: https://github.com/apache/kafka/pull/9379#issuecomment-704314583


   JDK 15 was green. Rest of the failures are not relevant. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on pull request #9371: KAFKA-10510: Validate replication factor consistency on reassignment

2020-10-06 Thread GitBox


stanislavkozlovski commented on pull request #9371:
URL: https://github.com/apache/kafka/pull/9371#issuecomment-704300915


   Thanks for the PR @piotrrzysko. The changes look good although I'm not sure 
if we're not regressing the ability to change the replication factor of a topic 
if we're to outright block it.
   
   Perhaps we could have a workaround in the form of allowing the reassignment 
batch to run if it's changing all the partitions for the topic. Another 
solution I like more is adding a flag to the AlterPartitionReassignments API 
that allows it to change the replication factor and keeping this validation.
   
   In the long term, I think we may want to consider an API that allows you to 
alter the replication factor of a topic.
   
   cc @ijuma @gwenshap @cmccabe @hachikuji @soondenana do you happen to have 
any thoughts on the matter?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-06 Thread GitBox


rajinisivaram opened a new pull request #9382:
URL: https://github.com/apache/kafka/pull/9382


   For IBP 2.7 onwards, fetch responses include diverging epoch and offset in 
fetch responses if `lastFetchedEpoch` is provided in the fetch request. This PR 
uses that information for truncation and avoids the additional 
OffsetForLeaderEpoch requests in followers when `lastFetchEpoch` is known.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] javierfreire opened a new pull request #9381: KIP-675: Convert KTable to a KStream using the previous value

2020-10-06 Thread GitBox


javierfreire opened a new pull request #9381:
URL: https://github.com/apache/kafka/pull/9381


   This gives the possibility of converting a table into a stream using a 
mapper with the new and old values as arguments.
   
   
[KIP-675](https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value)
   
   KTableImpl tests complete with new methods.
    
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on a change in pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2020-10-06 Thread GitBox


omkreddy commented on a change in pull request #9345:
URL: https://github.com/apache/kafka/pull/9345#discussion_r500186433



##
File path: clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
##
@@ -199,6 +206,156 @@ public static void createKeyStore(String filename,
 return builder.build();
 }
 
+public static void convertToPem(Map sslProps, boolean 
writeToFile, boolean encryptPrivateKey) throws Exception {
+String tsPath = (String) 
sslProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
+String tsType = (String) 
sslProps.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG);
+Password tsPassword = (Password) 
sslProps.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
+Password trustCerts = (Password) 
sslProps.remove(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);
+if (trustCerts == null && tsPath != null) {
+trustCerts = exportCertificates(tsPath, tsPassword, tsType);
+}
+if (trustCerts != null) {
+if (tsPath == null) {
+tsPath = File.createTempFile("truststore", ".pem").getPath();
+sslProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
tsPath);
+}
+sslProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, PEM_TYPE);
+if (writeToFile)
+writeToFile(tsPath, trustCerts);
+else {
+sslProps.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, 
trustCerts);
+sslProps.remove(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
+}
+}
+
+String ksPath = (String) 
sslProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+Password certChain = (Password) 
sslProps.remove(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG);
+Password key = (Password) 
sslProps.remove(SslConfigs.SSL_KEYSTORE_KEY_CONFIG);
+if (certChain == null && ksPath != null) {
+String ksType = (String) 
sslProps.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG);
+Password ksPassword = (Password) 
sslProps.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+Password keyPassword = (Password) 
sslProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+certChain = exportCertificates(ksPath, ksPassword, ksType);
+Password pemKeyPassword = encryptPrivateKey ? keyPassword : null;
+key = exportPrivateKey(ksPath, ksPassword, keyPassword, ksType, 
pemKeyPassword);
+if (!encryptPrivateKey)
+sslProps.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+} else if (!encryptPrivateKey) {

Review comment:
   Can we remove empty else block?

##
File path: clients/src/test/java/org/apache/kafka/common/network/CertStores.java
##
@@ -54,13 +60,30 @@ public CertStores(boolean server, String commonName, 
InetAddress hostAddress) th
 }
 
 private CertStores(boolean server, String commonName, 
TestSslUtils.CertificateBuilder certBuilder) throws Exception {
+this(server, commonName, "RSA", certBuilder, false);
+}
+
+private CertStores(boolean server, String commonName, String keyAlgorithm, 
TestSslUtils.CertificateBuilder certBuilder, boolean usePem) throws Exception {
 String name = server ? "server" : "client";
 Mode mode = server ? Mode.SERVER : Mode.CLIENT;
-File truststoreFile = File.createTempFile(name + "TS", ".jks");
-sslConfig = TestSslUtils.createSslConfig(!server, true, mode, 
truststoreFile, name, commonName, certBuilder);
+File truststoreFile = usePem ? null : File.createTempFile(name + "TS", 
".jks");
+sslConfig = new SslConfigsBuilder(mode)
+.useClientCert(!server)
+.certAlias(name)
+.cn(commonName)
+.createNewTrustStore(truststoreFile)
+.certBuilder(certBuilder)
+.algorithm(keyAlgorithm)
+.usePem(usePem)
+.build();
 }
 
+
 public Map getTrustingConfig(CertStores truststoreConfig) {
+return getTrustingConfig(truststoreConfig, false);
+}
+
+public Map getTrustingConfig(CertStores truststoreConfig, 
boolean usePemCerts) {

Review comment:
   unused usePemCerts?

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -923,9 +926,12 @@ object KafkaConfig {
   val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC
   val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC
   val SslKeyPasswordDoc = SslConfigs.SSL_KEY_PASSWORD_DOC
+  val SslKeystoreKeyDoc = SslConfigs.SSL_KEYSTORE_KEY_DOC
+  val SslKeystoreCertificateChainDoc = 
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG

Review comment:
   SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG => 
SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -923,9 +926,12 @@ object 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2020-10-06 Thread Sandeep (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208688#comment-17208688
 ] 

Sandeep commented on KAFKA-5998:


[~ableegoldman] , Yes I was trying out in my local machine. The tmp/ directory 
is the default configuration.
For Production this will be deployed in separate docker containers.



 

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.2.2, 2.4.0, 2.3.1
>
> Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

  1   2   >