[jira] [Commented] (KAFKA-8802) ConcurrentSkipListMap shows performance regression in cache and in-memory store

2022-09-23 Thread Avi Cherry (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17608923#comment-17608923
 ] 

Avi Cherry commented on KAFKA-8802:
---

Sorry to have to re-open this, but unfortunately copying the keyset view when 
creating the InMemoryKeyValueIterator doesn't stop the 
ConcurrentModificationException, but rather shifts when it happens to be while 
performing the copy during the creation of the InMemoryKeyValueIterator, 
instead of during the iteration. This might have made it slightly less likely 
to be triggered, but I just encountered it personally in the wild. Until this 
is fixed I will need to avoid using an InMemoryKeyValueStore in any place where 
it's accessed via {{KakfaStreams.store().}}

The performance hit for ConcurrentSkipListMap vs TreeMap from other sources I 
can find reports it at around half as fast. On the other hand, the existing 
version also makes an entire copy of the iterated data into a new TreeSet which 
takes time and especially unnecessary space. Another way you might look at this 
is that the InMemoryKeyValueStore would merely operate as fast as the fastest 
available (open-source) map implementation in Java that's both sorted and 
concurrent. If someone really, really needed that extra performance out of it 
they could license 
[AirConcurrentMap|[https://github.com/boilerbay/airconcurrentmap]] and 
reimplement InMemoryKeyValueStore using that. Heck, we could make the 
ConcurrentNavigableMap implementation configurable in Kafka Streams if it was 
important enough. Another potential option is to provide the option of creating 
a higher-performance in memory store using the TreeMap as long as it's 
guaranteed to only be used from a single thread.

The new issue is at KAFKA-14260

> ConcurrentSkipListMap shows performance regression in cache and in-memory 
> store
> ---
>
> Key: KAFKA-8802
> URL: https://issues.apache.org/jira/browse/KAFKA-8802
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> The use of ConcurrentSkipListMap in the cache and in-memory stores caused a 
> performance regression in 2.3.0. We should revert back to using TreeMap 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] niket-goel commented on a diff in pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


niket-goel commented on code in PR #12679:
URL: https://github.com/apache/kafka/pull/12679#discussion_r979129298


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -246,7 +246,9 @@ public KafkaRaftClient(
 logContext,
 random);
 this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
-
kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size());
+// All Raft voters are statically configured and known at startup
+// so there are no unknown voter connections. Report this metric as 0.
+kafkaRaftMetrics.updateNumUnknownVoterConnections(0);

Review Comment:
   Yeah! Was planning on filing a JIRA and KIP for its removal. Will do that 
next week. :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #12670: KAFKA-14239: Merge StateRestorationIntegrationTest into RestoreIntegrationTest

2022-09-23 Thread GitBox


guozhangwang merged PR #12670:
URL: https://github.com/apache/kafka/pull/12670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14260) InMemoryKeyValueStore iterator still throws ConcurrentModificationException

2022-09-23 Thread Avi Cherry (Jira)
Avi Cherry created KAFKA-14260:
--

 Summary: InMemoryKeyValueStore iterator still throws 
ConcurrentModificationException
 Key: KAFKA-14260
 URL: https://issues.apache.org/jira/browse/KAFKA-14260
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.2.3
Reporter: Avi Cherry


This is the same bug as KAFKA-7912 which was then re-introduced by KAFKA-8802.

Any iterator returned from {{InMemoryKeyValueStore}} may end up throwing a 
ConcurrentModificationException because the backing map is not concurrent safe. 
I expect that this only happens when the store is retrieved from 
{{KafkaStreams.store()}} from outside of the topology since any usage of the 
store from inside of the topology should be naturally single-threaded.

To start off, a reminder that this behaviour explicitly violates the interface 
contract for {{ReadOnlyKeyValueStore}} which states
{quote}The returned iterator must be safe from 
java.util.ConcurrentModificationExceptions
{quote}
It is often complicated to make code to demonstrate concurrency bugs, but 
thankfully it is trivial to reason through the source code in 
{{InMemoryKeyValueStore.java}} to show why this happens:
 * All of the InMemoryKeyValueStore methods that return iterators do so by 
passing a keySet based on the backing TreeMap to the InMemoryKeyValueIterator 
constructor.
 * These keySets are all VIEWS of the backing map, not copies.
 * The InMemoryKeyValueIterator then makes a private copy of the keySet by 
passing the original keySet into the constructor for TreeSet. This copying was 
implemented in KAFKA-8802, incorrectly intending it to fix the concurrency 
problem.
 * TreeSet then iterates over the keySet to make a copy. If the original 
backing TreeMap in InMemoryKeyValueStore is changed while this copy is being 
created it will fail-fast a ConcurrentModificationException.

This bug should be able to be trivially fixed by replacing the backing TreeMap 
with a ConcurrentSkipListMap but here's the rub:

This bug has already been found in KAFKA-7912 and the TreeMap was replaced with 
a ConcurrentSkipListMap. It was then reverted back to a TreeMap in KAFKA-8802 
because of the performance regression. I can [see from one of the 
PRs|https://github.com/apache/kafka/pull/7212/commits/384c12e40f3a59591f897d916f92253e126820ed]
 that it was believed the concurrency problem with the TreeMap implementation 
was fixed by copying the keyset when the iterator is created but the problem 
remains, plus the fix creates an extra copy of the iterated portion of the set 
in memory.

For what it's worth, the performance difference between TreeMap and 
ConcurrentSkipListMap do not extend into complexity. TreeMap enjoys a similar 
~2x speed through all operations with any size of data, but at the cost of what 
turned out to be an easy-to-encounter bug.

This is all unfortunate since the only time the state stores ever get accessed 
concurrently is through the `KafkaStreams.store()` mechanism, but I would 
imagine that "correct and slightly slower) is better than "incorrect and 
faster".

Too bad BoilerBay's AirConcurrentMap is closed-source and patented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


hachikuji commented on code in PR #12679:
URL: https://github.com/apache/kafka/pull/12679#discussion_r979119104


##
docs/ops.html:
##
@@ -1815,6 +1815,206 @@ KRaft Monitoring Metrics
+The set of metrics that allow monitoring of the KRaft quorum and the metadata 
log.
+Note: A Controller is defined as a Kafka Broker which has "controller" 
included among the process.roles config

Review Comment:
   How about this?
   
   > Note that some of the exposed metrics depend on the role of the node as 
defined by process.roles.



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -246,7 +246,9 @@ public KafkaRaftClient(
 logContext,
 random);
 this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
-
kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size());
+// All Raft voters are statically configured and known at startup
+// so there are no unknown voter connections. Report this metric as 0.
+kafkaRaftMetrics.updateNumUnknownVoterConnections(0);

Review Comment:
   Perhaps we should file a jira to follow up with removal of this metric if it 
is not reporting anything useful. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12681: KAFKA-14259: BrokerRegistration#toString throws an exception, terminating metadata replay

2022-09-23 Thread GitBox


cmccabe merged PR #12681:
URL: https://github.com/apache/kafka/pull/12681


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


hachikuji commented on code in PR #12679:
URL: https://github.com/apache/kafka/pull/12679#discussion_r979099689


##
docs/ops.html:
##
@@ -1815,6 +1815,194 @@ KRaft Monitoring Metrics
+The set of metrics that allow monitoring of the KRaft quorum and the metadata 
log
+KRaft Controller 
Monitoring Metrics
+
+  
+  
+Metric/Attribute name
+Description
+Mbean name
+  
+  
+Current State
+The current state of this member; possible values are leader, 
candidate, voted, follower, unattached.
+kafka.server:type=raft-metrics,name=current-state

Review Comment:
   I think the raft metrics are reported on both the controller and broker. We 
could either duplicate them or use a separate section as you suggested offline. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


hachikuji commented on code in PR #12679:
URL: https://github.com/apache/kafka/pull/12679#discussion_r979099254


##
docs/ops.html:
##
@@ -1815,6 +1815,194 @@ KRaft Monitoring Metrics
+The set of metrics that allow monitoring of the KRaft quorum and the metadata 
log
+KRaft Controller 
Monitoring Metrics

Review Comment:
   Do you think it's worth pointing out somewhere that a controller means any 
node which has "controller" included among `process.roles`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12682: MINOR: Add section on listener configuration (including kraft) to security docs

2022-09-23 Thread GitBox


hachikuji opened a new pull request, #12682:
URL: https://github.com/apache/kafka/pull/12682

   This patch adds a section in security.html about basic listener 
configuration. This includes the basics of how to define the security mapping 
of each listener as well as the configurations to control inter-cluster traffic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing

2022-09-23 Thread GitBox


C0urante commented on PR #12666:
URL: https://github.com/apache/kafka/pull/12666#issuecomment-1256672973

   @showuon I've added some tests for the custom extension that should 
demonstrate its effectiveness.
   
   I should note that I also experimented with an approach that leveraged JUnit 
5's [test class ordering 
API](https://junit.org/junit5/docs/5.8.0/api/org.junit.jupiter.api/org/junit/jupiter/api/TestClassOrder.html)
 to create a test that runs after all others have completed and checks for 
leaked threads that have tried to terminate the JVM. Unfortunately, I couldn't 
find a way to make this work with parallel testing, since the test would only 
be run on a single executor, and would often finish before other tests on other 
executors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12677: KAFKA-14132: Replace PowerMock/Easymock with Mockito for WorkerMetricsGroupTest

2022-09-23 Thread GitBox


C0urante merged PR #12677:
URL: https://github.com/apache/kafka/pull/12677


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #12681: KAFKA-14259: BrokerRegistration#toString throws an exception, terminating metadata replay

2022-09-23 Thread GitBox


cmccabe opened a new pull request, #12681:
URL: https://github.com/apache/kafka/pull/12681

   Previously, BrokerRegistration#toString sould throw an exception, 
terminating metadata replay,
   because the sorted() method was used on an entry set rather than a key set.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14259) BrokerRegistration#toString throws an exception, terminating metadata replay

2022-09-23 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14259:


 Summary: BrokerRegistration#toString throws an exception, 
terminating metadata replay
 Key: KAFKA-14259
 URL: https://issues.apache.org/jira/browse/KAFKA-14259
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3
Reporter: Colin McCabe
Assignee: Colin McCabe
 Fix For: 3.3


BrokerRegistration#toString throws an exception, terminating metadata replay, 
because the sorted() method is used on an entry set rather than a key set.


{noformat}
Caused by:  

 
java.util.concurrent.ExecutionException: java.lang.ClassCastException: 
class java.util.HashMap$Node cannot be cast to class java.lang.Comparable 
(java.util.HashMap$Node and java.lan
g.Comparable are in module java.base of loader 'bootstrap') 

 
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)


at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)

 
at kafka.server.BrokerServer.startup(BrokerServer.scala:846)

 
... 147 more

 


 
Caused by:  

 
java.lang.ClassCastException: class java.util.HashMap$Node cannot 
be cast to class java.lang.Comparable (java.util.HashMap$Node and 
java.lang.Comparable are in module java.base 
of loader 'bootstrap')  

 
at 
java.base/java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)

   
at 
java.base/java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)  

  
at java.base/java.util.TimSort.sort(TimSort.java:220)   

 
at java.base/java.util.Arrays.sort(Arrays.java:1307)

 
at 
java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)

  
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:510) 

  
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)


at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)

  
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 

  
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)

  
at 
org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:228)


at 
java.base/java.util.Formatter$FormatSpecifier.printString(Formatter.java:3056)  

[GitHub] [kafka] niket-goel commented on pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


niket-goel commented on PR #12679:
URL: https://github.com/apache/kafka/pull/12679#issuecomment-1256592811

   @showuon Thanks for the careful feedback. I have updated the PR with the 
correct Mbean names and fixed many typos. Please take a look when you have a 
chance. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12677: KAFKA-14132: Replace PowerMock/Easymock with Mockito for WorkerMetricsGroupTest

2022-09-23 Thread GitBox


divijvaidya commented on code in PR #12677:
URL: https://github.com/apache/kafka/pull/12677#discussion_r978873294


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java:
##
@@ -55,24 +54,19 @@ public class WorkerMetricsGroupTest {
 private Sensor taskStartupSuccesses;
 private Sensor taskStartupFailures;
 
-private ConnectorStatus.Listener delegateConnectorListener;
-private TaskStatus.Listener delegateTaskListener;
+@Mock private ConnectorStatus.Listener delegateConnectorListener;
+@Mock private TaskStatus.Listener delegateTaskListener;
 
 @Before
 public void setup() {
-connectMetrics = PowerMock.createMock(ConnectMetrics.class);
-ConnectMetricsRegistry connectMetricsRegistry = 
PowerMock.createNiceMock(ConnectMetricsRegistry.class);
-ConnectMetrics.MetricGroup metricGroup = 
PowerMock.createNiceMock(ConnectMetrics.MetricGroup.class);
+ConnectMetricsRegistry connectMetricsRegistry = 
mock(ConnectMetricsRegistry.class);
+ConnectMetrics.MetricGroup metricGroup = 
mock(ConnectMetrics.MetricGroup.class);
+MetricName metricName = mock(MetricName.class);
 
-connectMetrics.registry();
-expectLastCall().andReturn(connectMetricsRegistry);
-
-connectMetrics.group(anyString());
-expectLastCall().andReturn(metricGroup);
-
-MetricName metricName = PowerMock.createMock(MetricName.class);
-metricGroup.metricName(anyObject(MetricNameTemplate.class));
-expectLastCall().andStubReturn(metricName);
+when(metricGroup.metricName((MetricNameTemplate) 
isNull())).thenReturn(metricName);

Review Comment:
   Fixed in the new commit. Thank you for suggesting the comment itself. I 
appreciate your detailed approach to code review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12677: KAFKA-14132: Replace PowerMock/Easymock with Mockito for WorkerMetricsGroupTest

2022-09-23 Thread GitBox


divijvaidya commented on code in PR #12677:
URL: https://github.com/apache/kafka/pull/12677#discussion_r978870789


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java:
##
@@ -83,167 +77,107 @@ public void setup() {
 taskStartupAttempts = mockSensor(metricGroup, "task-startup-attempts");
 taskStartupSuccesses = mockSensor(metricGroup, 
"task-startup-successes");
 taskStartupFailures = mockSensor(metricGroup, "task-startup-failures");
-
-delegateConnectorListener = 
PowerMock.createStrictMock(ConnectorStatus.Listener.class);
-delegateTaskListener = 
PowerMock.createStrictMock(TaskStatus.Listener.class);
 }
 
 private Sensor mockSensor(ConnectMetrics.MetricGroup metricGroup, String 
name) {
-Sensor sensor = PowerMock.createMock(Sensor.class);
-metricGroup.sensor(eq(name));
-expectLastCall().andReturn(sensor);
-
-sensor.add(anyObject(CompoundStat.class));
-expectLastCall().andStubReturn(true);
-
-sensor.add(anyObject(MetricName.class), 
anyObject(CumulativeSum.class));
-expectLastCall().andStubReturn(true);
-
+Sensor sensor = mock(Sensor.class);
+when(metricGroup.sensor(name)).thenReturn(sensor);
+when(sensor.add(any(CompoundStat.class))).thenReturn(true);
+when(sensor.add(any(MetricName.class), 
any(CumulativeSum.class))).thenReturn(true);
 return sensor;
 }
 
 @Test
 public void testConnectorStartupRecordedMetrics() {
-delegateConnectorListener.onStartup(eq(connector));
-expectLastCall();
-
-connectorStartupAttempts.record(eq(1.0));
-expectLastCall();
-connectorStartupSuccesses.record(eq(1.0));
-expectLastCall();
-connectorStartupResults.record(eq(1.0));
-expectLastCall();
-
-PowerMock.replayAll();
-
 WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
 final ConnectorStatus.Listener connectorListener = 
workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
 
 connectorListener.onStartup(connector);
 
-PowerMock.verifyAll();
+verifyRecordConnectorStartupSuccess();
+verify(delegateConnectorListener).onStartup(connector);
 }
 
 @Test
 public void testConnectorFailureAfterStartupRecordedMetrics() {
-delegateConnectorListener.onStartup(eq(connector));
-expectLastCall();
-
-connectorStartupAttempts.record(eq(1.0));
-expectLastCall();
-connectorStartupSuccesses.record(eq(1.0));
-expectLastCall();
-connectorStartupResults.record(eq(1.0));
-expectLastCall();
-
-delegateConnectorListener.onFailure(eq(connector), eq(exception));
-expectLastCall();
-
-// recordConnectorStartupFailure() should not be called if failure 
happens after a successful startup
-
-PowerMock.replayAll();
-
 WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
 final ConnectorStatus.Listener connectorListener = 
workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
 
 connectorListener.onStartup(connector);
 connectorListener.onFailure(connector, exception);
 
-PowerMock.verifyAll();
+verify(delegateConnectorListener).onStartup(connector);
+verifyRecordConnectorStartupSuccess();
+verify(delegateConnectorListener).onFailure(connector, exception);
 }
 
 @Test
 public void testConnectorFailureBeforeStartupRecordedMetrics() {
-delegateConnectorListener.onFailure(eq(connector), eq(exception));
-expectLastCall();
-
-connectorStartupAttempts.record(eq(1.0));
-expectLastCall();
-connectorStartupFailures.record(eq(1.0));
-expectLastCall();
-connectorStartupResults.record(eq(0.0));
-expectLastCall();
-
-PowerMock.replayAll();
-
 WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
 final ConnectorStatus.Listener connectorListener = 
workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
 
 connectorListener.onFailure(connector, exception);
 
-PowerMock.verifyAll();
+verify(delegateConnectorListener).onFailure(connector, exception);
+verifyRecordConnectorStartupFailure();
 }
 
 @Test
 public void testTaskStartupRecordedMetrics() {
-delegateTaskListener.onStartup(eq(task));
-expectLastCall();
-
-taskStartupAttempts.record(eq(1.0));
-expectLastCall();
-taskStartupSuccesses.record(eq(1.0));
-expectLastCall();
-taskStartupResults.record(eq(1.0));
-expectLastCall();
-
-PowerMock.replayAll();
-
 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #12677: KAFKA-14132: Replace PowerMock/Easymock with Mockito for WorkerMetricsGroupTest

2022-09-23 Thread GitBox


divijvaidya commented on code in PR #12677:
URL: https://github.com/apache/kafka/pull/12677#discussion_r978867978


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java:
##
@@ -83,167 +77,107 @@ public void setup() {
 taskStartupAttempts = mockSensor(metricGroup, "task-startup-attempts");
 taskStartupSuccesses = mockSensor(metricGroup, 
"task-startup-successes");
 taskStartupFailures = mockSensor(metricGroup, "task-startup-failures");
-
-delegateConnectorListener = 
PowerMock.createStrictMock(ConnectorStatus.Listener.class);
-delegateTaskListener = 
PowerMock.createStrictMock(TaskStatus.Listener.class);
 }
 
 private Sensor mockSensor(ConnectMetrics.MetricGroup metricGroup, String 
name) {
-Sensor sensor = PowerMock.createMock(Sensor.class);
-metricGroup.sensor(eq(name));
-expectLastCall().andReturn(sensor);
-
-sensor.add(anyObject(CompoundStat.class));
-expectLastCall().andStubReturn(true);
-
-sensor.add(anyObject(MetricName.class), 
anyObject(CumulativeSum.class));
-expectLastCall().andStubReturn(true);
-
+Sensor sensor = mock(Sensor.class);
+when(metricGroup.sensor(name)).thenReturn(sensor);
+when(sensor.add(any(CompoundStat.class))).thenReturn(true);
+when(sensor.add(any(MetricName.class), 
any(CumulativeSum.class))).thenReturn(true);
 return sensor;
 }
 
 @Test
 public void testConnectorStartupRecordedMetrics() {
-delegateConnectorListener.onStartup(eq(connector));
-expectLastCall();
-
-connectorStartupAttempts.record(eq(1.0));
-expectLastCall();
-connectorStartupSuccesses.record(eq(1.0));
-expectLastCall();
-connectorStartupResults.record(eq(1.0));
-expectLastCall();
-
-PowerMock.replayAll();
-
 WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
 final ConnectorStatus.Listener connectorListener = 
workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
 
 connectorListener.onStartup(connector);
 
-PowerMock.verifyAll();
+verifyRecordConnectorStartupSuccess();
+verify(delegateConnectorListener).onStartup(connector);
 }
 
 @Test
 public void testConnectorFailureAfterStartupRecordedMetrics() {
-delegateConnectorListener.onStartup(eq(connector));
-expectLastCall();
-
-connectorStartupAttempts.record(eq(1.0));
-expectLastCall();
-connectorStartupSuccesses.record(eq(1.0));
-expectLastCall();
-connectorStartupResults.record(eq(1.0));
-expectLastCall();
-
-delegateConnectorListener.onFailure(eq(connector), eq(exception));
-expectLastCall();
-
-// recordConnectorStartupFailure() should not be called if failure 
happens after a successful startup

Review Comment:
   Fixed in upcoming commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14258) Add ducktape or junit test verifying that brokers can reload snapshots after startup

2022-09-23 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14258:
-
Labels: kip-500  (was: )

> Add ducktape or junit test verifying that brokers can reload snapshots after 
> startup
> 
>
> Key: KAFKA-14258
> URL: https://issues.apache.org/jira/browse/KAFKA-14258
> Project: Kafka
>  Issue Type: Test
>Reporter: Colin McCabe
>Priority: Major
>  Labels: kip-500
>
> We should add a ducktape or junit test that verifies that brokers can reload 
> snapshots after startup. This code path is not exercised frequently but it is 
> important.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14258) Add ducktape or junit test verifying that brokers can reload snapshots after startup

2022-09-23 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14258:


 Summary: Add ducktape or junit test verifying that brokers can 
reload snapshots after startup
 Key: KAFKA-14258
 URL: https://issues.apache.org/jira/browse/KAFKA-14258
 Project: Kafka
  Issue Type: Test
Reporter: Colin McCabe


We should add a ducktape or junit test that verifies that brokers can reload 
snapshots after startup. This code path is not exercised frequently but it is 
important.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12677: KAFKA-14132: Replace PowerMock/Easymock with Mockito for WorkerMetricsGroupTest

2022-09-23 Thread GitBox


divijvaidya commented on code in PR #12677:
URL: https://github.com/apache/kafka/pull/12677#discussion_r978863704


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java:
##
@@ -55,24 +54,19 @@ public class WorkerMetricsGroupTest {
 private Sensor taskStartupSuccesses;
 private Sensor taskStartupFailures;
 
-private ConnectorStatus.Listener delegateConnectorListener;
-private TaskStatus.Listener delegateTaskListener;
+@Mock private ConnectorStatus.Listener delegateConnectorListener;
+@Mock private TaskStatus.Listener delegateTaskListener;
 
 @Before
 public void setup() {
-connectMetrics = PowerMock.createMock(ConnectMetrics.class);
-ConnectMetricsRegistry connectMetricsRegistry = 
PowerMock.createNiceMock(ConnectMetricsRegistry.class);
-ConnectMetrics.MetricGroup metricGroup = 
PowerMock.createNiceMock(ConnectMetrics.MetricGroup.class);
+ConnectMetricsRegistry connectMetricsRegistry = 
mock(ConnectMetricsRegistry.class);
+ConnectMetrics.MetricGroup metricGroup = 
mock(ConnectMetrics.MetricGroup.class);
+MetricName metricName = mock(MetricName.class);

Review Comment:
   Primarily to keep the PR code reviewer friendly :) by minimising the 
refactoring and doing in-place changes as much as possible. 
   
   I will move these to annotations in upcoming commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12674: KAFKA-14255: Fetching from follower should be disallowed if fetch from follower is disabled

2022-09-23 Thread GitBox


dajac commented on PR #12674:
URL: https://github.com/apache/kafka/pull/12674#issuecomment-1256427128

   > This change makes sense. I guess the other interesting case is if fetch 
from follower _is_ enabled on the broker, but the client is not sending the 
epoch (KIP-320). What should we do in that case? Is it safe to let it through? 
We may need another JIRA to discuss that case, which is not as clear, but still 
potentially problematic.
   
   I think that one fundamental issue is that a follower is accessible even if 
it has not caught-up yet with the leader. The follower might be far behind for 
instance or even have wrong records if the truncation did not happen yet. It 
seems to me that we should make it inaccessible until the follower has at least 
got the updated high watermark from the leader. Once we have the HWM, we could 
perhaps also use OFFSET_NOT_AVAILABLE if the requested offset are not available 
yet. I will file a jira and put my thoughts in it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-23 Thread GitBox


philipnee commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r978832019


##
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take 
precedence over the last.
  */
 public class StringDeserializer implements Deserializer {
+
 private String encoding = StandardCharsets.UTF_8.name();
 
 @Override
 public void configure(Map configs, boolean isKey) {
-String propertyName = isKey ? "key.deserializer.encoding" : 
"value.deserializer.encoding";
+final String propertyName = isKey ? "key.deserializer.encoding" : 
"value.deserializer.encoding";
 Object encodingValue = configs.get(propertyName);
-if (encodingValue == null)
+if (encodingValue == null) {
 encodingValue = configs.get("deserializer.encoding");
-if (encodingValue instanceof String)
+}
+
+if (encodingValue instanceof String) {
 encoding = (String) encodingValue;
+}
 }
 
 @Override
 public String deserialize(String topic, byte[] data) {
 try {
-if (data == null)
-return null;
-else
-return new String(data, encoding);
+return data == null ? null : new String(data, encoding);
 } catch (UnsupportedEncodingException e) {
 throw new SerializationException("Error when deserializing byte[] 
to string due to unsupported encoding " + encoding);
 }
 }
+
+@Override
+public String deserialize(String topic, Headers headers, ByteBuffer data) {
+if (data == null) {
+return null;
+}
+
+try {
+if (data.hasArray()) {
+return new String(data.array(), data.position() + 
data.arrayOffset(), data.remaining(), encoding);
+} else {

Review Comment:
   exactly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-23 Thread GitBox


LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r978814376


##
clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java:
##
@@ -27,27 +30,45 @@
  *  value.deserializer.encoding or deserializer.encoding. The first two take 
precedence over the last.
  */
 public class StringDeserializer implements Deserializer {
+
 private String encoding = StandardCharsets.UTF_8.name();
 
 @Override
 public void configure(Map configs, boolean isKey) {
-String propertyName = isKey ? "key.deserializer.encoding" : 
"value.deserializer.encoding";
+final String propertyName = isKey ? "key.deserializer.encoding" : 
"value.deserializer.encoding";
 Object encodingValue = configs.get(propertyName);
-if (encodingValue == null)
+if (encodingValue == null) {
 encodingValue = configs.get("deserializer.encoding");
-if (encodingValue instanceof String)
+}
+
+if (encodingValue instanceof String) {
 encoding = (String) encodingValue;
+}
 }
 
 @Override
 public String deserialize(String topic, byte[] data) {
 try {
-if (data == null)
-return null;
-else
-return new String(data, encoding);
+return data == null ? null : new String(data, encoding);
 } catch (UnsupportedEncodingException e) {
 throw new SerializationException("Error when deserializing byte[] 
to string due to unsupported encoding " + encoding);
 }
 }
+
+@Override
+public String deserialize(String topic, Headers headers, ByteBuffer data) {
+if (data == null) {
+return null;
+}
+
+try {
+if (data.hasArray()) {
+return new String(data.array(), data.position() + 
data.arrayOffset(), data.remaining(), encoding);
+} else {

Review Comment:
   Did you mean?
   ```
   try {
   if (data.hasArray()) {
   return new String(data.array(), data.position() + 
data.arrayOffset(), data.remaining(), encoding);
   } 
   return new String(Utils.toArray(data), encoding);
   } catch (UnsupportedEncodingException e) {
   throw new SerializationException("Error when deserializing ByteBuffer to 
string due to unsupported encoding " + encoding);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on a diff in pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


niket-goel commented on code in PR #12679:
URL: https://github.com/apache/kafka/pull/12679#discussion_r978811373


##
docs/ops.html:
##
@@ -1815,6 +1815,162 @@ KRaft Metrics
+All of the following metrics allow monitoring of the KRaft quourm and 
+
+  
+  
+Metric/Attribute name
+Description
+Mbean name
+  
+  
+Current State
+The current state of this member; possible values are leader, 
candidate, voted, follower, unattached.
+kafka.raft:type=raft-metrics,name=Current-state
+  
+  
+Current Leader
+The current quorum leader's id; -1 indicates unknown.
+kafka.raft:type=raft-metrics,name=current-leader
+  
+  
+Current Voted
+The current voted leader's id; -1 indicates not voted for anyone.
+kafka.raft:type=raft-metrics,name=current-vote
+  
+  
+Current Epoch
+The current quorum epoch.
+kafka.raft:type=raft-metrics,name=current-epoch
+  
+  
+High Watermark
+The high watermark maintained on this member; -1 if it is unknown.
+kafka.raft:type=raft-metrics,name=high-watermark
+  
+  
+Log End Offset
+The current raft log end offset.
+kafka.raft:type=raft-metrics,name=log-end-offset
+  
+  
+Election Latency
+The time in milliseconds to elect a new leader.
+kafka.raft:type=raft-metrics,name=election-latency
+  
+  
+Fetch Records
+The average number of records fetched from the leader of the raft 
quorum.
+kafka.raft:type=raft-metrics,name=fetch-records
+  
+  
+Append Records
+The average number of records appended per sec as the leader of the 
raft quorum.
+kafka.raft:type=raft-metrics,name=append-records
+  
+  
+Poll Idle Ratio
+The average fraction of time the client's poll() is idle as opposed to 
waiting for the user code to process records.
+kafka.raft:type=raft-metrics,name=poll-idle-ratio

Review Comment:
   ack! Will fix all metrics coming from this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on a diff in pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


niket-goel commented on code in PR #12679:
URL: https://github.com/apache/kafka/pull/12679#discussion_r978811138


##
docs/ops.html:
##
@@ -1815,6 +1815,162 @@ KRaft Metrics
+All of the following metrics allow monitoring of the KRaft quourm and 
+
+  
+  
+Metric/Attribute name
+Description
+Mbean name
+  
+  
+Current State
+The current state of this member; possible values are leader, 
candidate, voted, follower, unattached.
+kafka.raft:type=raft-metrics,name=Current-state
+  
+  
+Current Leader
+The current quorum leader's id; -1 indicates unknown.
+kafka.raft:type=raft-metrics,name=current-leader
+  
+  
+Current Voted
+The current voted leader's id; -1 indicates not voted for anyone.
+kafka.raft:type=raft-metrics,name=current-vote
+  
+  
+Current Epoch
+The current quorum epoch.
+kafka.raft:type=raft-metrics,name=current-epoch
+  
+  
+High Watermark
+The high watermark maintained on this member; -1 if it is unknown.
+kafka.raft:type=raft-metrics,name=high-watermark
+  
+  
+Log End Offset
+The current raft log end offset.
+kafka.raft:type=raft-metrics,name=log-end-offset
+  
+  
+Election Latency
+The time in milliseconds to elect a new leader.
+kafka.raft:type=raft-metrics,name=election-latency
+  
+  
+Fetch Records
+The average number of records fetched from the leader of the raft 
quorum.
+kafka.raft:type=raft-metrics,name=fetch-records

Review Comment:
   The description in code says that it is. Let me verify the value and update 
both of needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on a diff in pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


niket-goel commented on code in PR #12679:
URL: https://github.com/apache/kafka/pull/12679#discussion_r978810724


##
docs/ops.html:
##
@@ -1815,6 +1815,162 @@ KRaft Metrics
+All of the following metrics allow monitoring of the KRaft quourm and 
+
+  
+  
+Metric/Attribute name
+Description
+Mbean name
+  
+  
+Current State
+The current state of this member; possible values are leader, 
candidate, voted, follower, unattached.
+kafka.raft:type=raft-metrics,name=Current-state
+  
+  
+Current Leader
+The current quorum leader's id; -1 indicates unknown.
+kafka.raft:type=raft-metrics,name=current-leader
+  
+  
+Current Voted
+The current voted leader's id; -1 indicates not voted for anyone.
+kafka.raft:type=raft-metrics,name=current-vote
+  
+  
+Current Epoch
+The current quorum epoch.
+kafka.raft:type=raft-metrics,name=current-epoch
+  
+  
+High Watermark
+The high watermark maintained on this member; -1 if it is unknown.
+kafka.raft:type=raft-metrics,name=high-watermark
+  
+  
+Log End Offset
+The current raft log end offset.
+kafka.raft:type=raft-metrics,name=log-end-offset
+  
+  
+Election Latency
+The time in milliseconds to elect a new leader.
+kafka.raft:type=raft-metrics,name=election-latency

Review Comment:
   I was trying to remain true to the code. I agree that it is not the best 
description. Let me fix both.
   
   RE: the avg and max I think I made a mistake and picked up the sensor name 
instead of the metric name. this is tru for another few metrics here. Will fix 
all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on a diff in pull request #12679: Adding KRaft Monitoring Related Metrics to docs/ops.html

2022-09-23 Thread GitBox


niket-goel commented on code in PR #12679:
URL: https://github.com/apache/kafka/pull/12679#discussion_r978808028


##
docs/ops.html:
##
@@ -1815,6 +1815,162 @@ KRaft Metrics
+All of the following metrics allow monitoring of the KRaft quourm and 

Review Comment:
   Hmm.! how did that get deleted. Fixing it. Thanks for catching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11285: KAFKA-10548: Implement topic deletion logic with the LeaderAndIsr in KIP-516

2022-09-23 Thread GitBox


C0urante commented on PR #11285:
URL: https://github.com/apache/kafka/pull/11285#issuecomment-1256309584

   @gitlw were you still planning on rebasing this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aheyne commented on a diff in pull request #12644: KAFKA-14209 : Rewrite self joins to use single state store 2/3

2022-09-23 Thread GitBox


aheyne commented on code in PR #12644:
URL: https://github.com/apache/kafka/pull/12644#discussion_r978742684


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);
+long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+boolean emittedJoinWithSelf = false;
+final Record selfRecord = record
+.withValue(joinerThis.apply(record.key(), record.value(), (V2) 
record.value()))
+.withTimestamp(inputRecordTimestamp);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+// Join current record with other
+try (final WindowStoreIterator iter = windowStore.fetch(
+record.key(), timeFrom, timeTo)) {

Review Comment:
   Yes, definitely. Technically I think we're doing a Self Cartesian (Cross) 
Join. These two points should let us do that: 
   
   > - for self joins there’s usually a filter along with the join (like in SQL 
CREATE stream2 as SELECT .. FROM stream1 A, stream1 B WHERE A.field 

[GitHub] [kafka] mumrah commented on a diff in pull request #12678: MINOR: update configuration.html with KRaft details

2022-09-23 Thread GitBox


mumrah commented on code in PR #12678:
URL: https://github.com/apache/kafka/pull/12678#discussion_r978741172


##
docs/configuration.html:
##
@@ -20,13 +20,22 @@
 
   3.1 Broker Configs
 
-  The essential configurations are the following:
+  For ZooKeeper clusters, brokers must have the following configuration:
   
-  broker.id
-  log.dirs
-  zookeeper.connect
+broker.id
+log.dirs
+zookeeper.connect
   
 
+  For KRaft clusters, brokers and controllers must have the following 
configurations:
+  
+node.id
+log.dirs
+process.roles
+  
+
+  On KRaft brokers, if broker.id is set, it must be equal to 
node.id.

Review Comment:
   The docs for the individual configs are included via `  `. We dynamically generate those from 
KafkaConfig.scala 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest

2022-09-23 Thread GitBox


C0urante commented on PR #11792:
URL: https://github.com/apache/kafka/pull/11792#issuecomment-1256282722

   Hi @dplavcic! Any update? The refactoring to the `DistributedHerderTest` is 
blocking a few other PRs; if you don't have the time to tackle it, let us know 
and I'm sure someone else would be happy to take over.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12677: KAFKA-14132: Replace PowerMock/Easymock with Mockito for WorkerMetricsGroupTest

2022-09-23 Thread GitBox


C0urante commented on code in PR #12677:
URL: https://github.com/apache/kafka/pull/12677#discussion_r978651670


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java:
##
@@ -55,24 +54,19 @@ public class WorkerMetricsGroupTest {
 private Sensor taskStartupSuccesses;
 private Sensor taskStartupFailures;
 
-private ConnectorStatus.Listener delegateConnectorListener;
-private TaskStatus.Listener delegateTaskListener;
+@Mock private ConnectorStatus.Listener delegateConnectorListener;
+@Mock private TaskStatus.Listener delegateTaskListener;
 
 @Before
 public void setup() {
-connectMetrics = PowerMock.createMock(ConnectMetrics.class);
-ConnectMetricsRegistry connectMetricsRegistry = 
PowerMock.createNiceMock(ConnectMetricsRegistry.class);
-ConnectMetrics.MetricGroup metricGroup = 
PowerMock.createNiceMock(ConnectMetrics.MetricGroup.class);
+ConnectMetricsRegistry connectMetricsRegistry = 
mock(ConnectMetricsRegistry.class);
+ConnectMetrics.MetricGroup metricGroup = 
mock(ConnectMetrics.MetricGroup.class);
+MetricName metricName = mock(MetricName.class);

Review Comment:
   Any reason not to use `@Mock` to initialize these?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java:
##
@@ -55,24 +54,19 @@ public class WorkerMetricsGroupTest {
 private Sensor taskStartupSuccesses;
 private Sensor taskStartupFailures;
 
-private ConnectorStatus.Listener delegateConnectorListener;
-private TaskStatus.Listener delegateTaskListener;
+@Mock private ConnectorStatus.Listener delegateConnectorListener;
+@Mock private TaskStatus.Listener delegateTaskListener;
 
 @Before
 public void setup() {
-connectMetrics = PowerMock.createMock(ConnectMetrics.class);
-ConnectMetricsRegistry connectMetricsRegistry = 
PowerMock.createNiceMock(ConnectMetricsRegistry.class);
-ConnectMetrics.MetricGroup metricGroup = 
PowerMock.createNiceMock(ConnectMetrics.MetricGroup.class);
+ConnectMetricsRegistry connectMetricsRegistry = 
mock(ConnectMetricsRegistry.class);
+ConnectMetrics.MetricGroup metricGroup = 
mock(ConnectMetrics.MetricGroup.class);
+MetricName metricName = mock(MetricName.class);
 
-connectMetrics.registry();
-expectLastCall().andReturn(connectMetricsRegistry);
-
-connectMetrics.group(anyString());
-expectLastCall().andReturn(metricGroup);
-
-MetricName metricName = PowerMock.createMock(MetricName.class);
-metricGroup.metricName(anyObject(MetricNameTemplate.class));
-expectLastCall().andStubReturn(metricName);
+when(metricGroup.metricName((MetricNameTemplate) 
isNull())).thenReturn(metricName);

Review Comment:
   The 
[Javadocs](https://github.com/apache/kafka/blob/8e43548175db086cbedf1b990e17c80dc438d55e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java#L270-L276)
 for this method specifically prohibit invoking it with a null argument:
   
   ```java
   /**
* Create the name of a metric that belongs to this group and has the 
group's tags.
*
* @param template the name template for the metric; may not be null
* @return the metric name; never null
* @throws IllegalArgumentException if the name is not valid
*/
   public MetricName metricName(MetricNameTemplate template) {
   ```
   
   I can see how the way that we use the registry in the `WorkerMetricsGroup` 
class at places like 
[this](https://github.com/apache/kafka/blob/8e43548175db086cbedf1b990e17c80dc438d55e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java#L45)
 makes it difficult to mock out this part, and it's probably not worth a large 
refactoring to make mocking possible. Could we add a small comment explaining 
why we have this expectation, though? Maybe something like:
   
   ```java
   // We don't expect this to be invoked with null in practice,
   // but it's easier to test this way, and should have no impact
   // on the efficacy of these tests
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java:
##
@@ -83,167 +77,107 @@ public void setup() {
 taskStartupAttempts = mockSensor(metricGroup, "task-startup-attempts");
 taskStartupSuccesses = mockSensor(metricGroup, 
"task-startup-successes");
 taskStartupFailures = mockSensor(metricGroup, "task-startup-failures");
-
-delegateConnectorListener = 
PowerMock.createStrictMock(ConnectorStatus.Listener.class);
-delegateTaskListener = 
PowerMock.createStrictMock(TaskStatus.Listener.class);
 }
 
 private Sensor mockSensor(ConnectMetrics.MetricGroup metricGroup, String 
name) {
-Sensor sensor = PowerMock.createMock(Sensor.class);
-metricGroup.sensor(eq(name));
-

[GitHub] [kafka] ijuma commented on pull request #12674: KAFKA-14255: Fetching from follower should be disallowed if fetch from follower is disabled

2022-09-23 Thread GitBox


ijuma commented on PR #12674:
URL: https://github.com/apache/kafka/pull/12674#issuecomment-1256201486

   This change makes sense. I guess the other interesting case is if fetch from 
follower _is_ enabled on the broker, but the client is not sending the epoch 
(KIP-320). What should we do in that case? Is it safe to let it through? We may 
need another JIRA to discuss that case, which is not as clear, but still 
potentially problematic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2022-09-23 Thread GitBox


vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978581355


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
 leaderAssignment = deserializeAssignment(result, leaderId);
 assertAssignment(leaderId, offset,
 Collections.emptyList(), 0,
-Collections.emptyList(), 0,
+Collections.emptyList(), 1,

Review Comment:
   Thanks Chris/Luke. I have revoked the `canRevoke` flag commit.  Also, Chris, 
I looked at your PR and it's closed but not merged. I see some changes 
specially =>
   ```
   Perform load-balancing revocations any time the cluster appears imbalanced 
and there are still connectors and tasks that can be revoked from workers, 
instead of only when the number of workers in the cluster changes
   ```
   which sounds interesting (and remember discussing this on the ticket for 
this). Also, let me know if there's anything I can also contribute to for the 
other PR (Looking at your comment it appeared to me that you would be rebasing 
the commits from that closed PR and hence the question).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2022-09-23 Thread GitBox


vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978581355


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
 leaderAssignment = deserializeAssignment(result, leaderId);
 assertAssignment(leaderId, offset,
 Collections.emptyList(), 0,
-Collections.emptyList(), 0,
+Collections.emptyList(), 1,

Review Comment:
   Thanks Chris/Luke. Sure let me revoke the `canRevoke` flag.  Also, Chris, I 
looked at your PR and it's closed but not merged. I see some changes specially 
=>
   ```
   Perform load-balancing revocations any time the cluster appears imbalanced 
and there are still connectors and tasks that can be revoked from workers, 
instead of only when the number of workers in the cluster changes
   ```
   which sounds interesting (and remember discussing this on the ticket for 
this). Also, let me know if there's anything I can also contribute to for the 
other PR (Looking at your comment it appeared to me that you would be rebasing 
the commits from that closed PR and hence the question).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2022-09-23 Thread GitBox


vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978581355


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
 leaderAssignment = deserializeAssignment(result, leaderId);
 assertAssignment(leaderId, offset,
 Collections.emptyList(), 0,
-Collections.emptyList(), 0,
+Collections.emptyList(), 1,

Review Comment:
   Thanks Chris/Luke. Sure let me revoke the `canRevoke` flag.  Also, Chris, I 
looked at your PR and it's closed but not merged. I see some changes specially 
=>
   ```
   Perform load-balancing revocations any time the cluster appears imbalanced 
and there are still connectors and tasks that can be revoked from workers, 
instead of only when the number of workers in the cluster changes
   ```
   which sounds interesting (and remember discussing this on the ticket for 
this) ? Also, let me know if there's anything I can also contribute to for the 
other PR (Looking at your comment it appeared to me that you would be rebasing 
the commits from that closed PR and hence the question).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2022-09-23 Thread GitBox


vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978581355


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
 leaderAssignment = deserializeAssignment(result, leaderId);
 assertAssignment(leaderId, offset,
 Collections.emptyList(), 0,
-Collections.emptyList(), 0,
+Collections.emptyList(), 1,

Review Comment:
   Thanks Chris/Luke. Sure let me revoke the `canRevoke` flag but that would 
mean `testTaskAssignmentWhenWorkerBounces` would fail on this PR. Also, Chris, 
I looked at your PR and it's closed but not merged. I see some radical changes 
specially =>
   ```
   Perform load-balancing revocations any time the cluster appears imbalanced 
and there are still connectors and tasks that can be revoked from workers, 
instead of only when the number of workers in the cluster changes
   ```
   which sounds interesting (and remember discussing this on the ticket for 
this) ? Also, let me know if there's anything I can also contribute to for the 
other PR (Looking at your comment it appeared to me that you would be rebasing 
the commits from that closed PR and hence the question).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2022-09-23 Thread GitBox


showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978567165


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
 leaderAssignment = deserializeAssignment(result, leaderId);
 assertAssignment(leaderId, offset,
 Collections.emptyList(), 0,
-Collections.emptyList(), 0,
+Collections.emptyList(), 1,

Review Comment:
   Wow, nice catch!
   
   > At this point, I think we may want to split this into two separate PRs 
that get merged together. We can revert the canRevoke flag from this one, and 
then add a downstream PR that fixes how we calculate task-balancing revocations 
in tricky situations like when lost or newly-created tasks have just been 
assigned. That should fully address cases like the one tested 
[here](https://github.com/apache/kafka/blob/cda5da9b65f78b51cdfe5371f712a0d392dbdb4d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java#L427).
   
   Agree! Let's revert the canRevoke flag and then deal with tricky cases in a 
follow-up PR. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12669: KAFKA-14248: Fix flaky test PlaintextAdminIntegrationTest.testCreateTopicsReturnsConfigs

2022-09-23 Thread GitBox


divijvaidya commented on code in PR #12669:
URL: https://github.com/apache/kafka/pull/12669#discussion_r978518711


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -2523,19 +2522,29 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
 client = Admin.create(super.createConfig)
 
-val alterMap = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]
-alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, ""), 
util.Arrays.asList(
-  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "1080"), OpType.SET)))
-(brokers.map(_.config) ++ controllerServers.map(_.config)).foreach { case 
config =>
-  alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, 
config.nodeId.toString()),
-util.Arrays.asList(new AlterConfigOp(new ConfigEntry(
-  KafkaConfig.LogCleanerDeleteRetentionMsProp, "34"), OpType.SET)))
+val newLogRetentionProperties = new Properties
+newLogRetentionProperties.put(KafkaConfig.LogRetentionTimeMillisProp, 
"1080")
+TestUtils.incrementalAlterConfigs(null, client, newLogRetentionProperties, 
perBrokerConfig = false)
+  .all().get(15, TimeUnit.SECONDS)
+
+val newLogCleanerDeleteRetention = new Properties
+
newLogCleanerDeleteRetention.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, 
"34")
+TestUtils.incrementalAlterConfigs(brokers, client, 
newLogCleanerDeleteRetention, perBrokerConfig = true)
+  .all().get(15, TimeUnit.SECONDS)
+
+if (isKRaftTest()) {
+  ensureConsistentKRaftMetadata()
+} else {
+  waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(

Review Comment:
   I am resolving this comment. Please let me know if I missed changing 
something here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12669: KAFKA-14248: Fix flaky test PlaintextAdminIntegrationTest.testCreateTopicsReturnsConfigs

2022-09-23 Thread GitBox


divijvaidya commented on code in PR #12669:
URL: https://github.com/apache/kafka/pull/12669#discussion_r978518387


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -169,8 +168,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 waitForTopics(client, expectedPresent = topics, expectedMissing = List())
 
 val controller = brokers.find(_.config.brokerId == 
brokers.flatMap(_.metadataCache.getControllerId).head).get
-controller.shutdown()
-controller.awaitShutdown()
+killBroker(controller.config.brokerId)

Review Comment:
   > We have already verified metadata propagation in the waitForTopics above
   
   I think this assumption is not correct. `waitForTopics` sends a `metadata` 
API call to a broker (not necessarily the controller). In such a situation, it 
is not guaranteed that the broker will have the latest metadata in the metadata 
cache. (e.g. maybe the broker listener has not updated it's metadata cache from 
zk yet). Am I right in saying this?
   
   >  I feel like we could also just delete the test since it is probably 
covered by every integration test which kills the controller.
   
   The test is meant to validate that the metadata is durable even when 
controller is replaced. Other tests in this class that use `killBroker` do not 
necessarily kill the controller, hence, I believe that there is value in 
keeping this test here.
   
   I agree that we can change the name of the test. I have renamed it to 
`testMetadataDurabilityOnControllerFailure`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12644: KAFKA-14209 : Rewrite self joins to use single state store 2/3

2022-09-23 Thread GitBox


vpapavas commented on code in PR #12644:
URL: https://github.com/apache/kafka/pull/12644#discussion_r978508669


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);
+long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+boolean emittedJoinWithSelf = false;
+final Record selfRecord = record
+.withValue(joinerThis.apply(record.key(), record.value(), (V2) 
record.value()))
+.withTimestamp(inputRecordTimestamp);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+// Join current record with other
+try (final WindowStoreIterator iter = windowStore.fetch(
+record.key(), timeFrom, timeTo)) {

Review Comment:
   Hi @aheyne ! You mean sacrificing either of the sides would work for your 
use case? I have opened the follow-up ticket to optimize the performance of the 
self-join if the ordering of the records is not important. Would that work for 
you? 

[GitHub] [kafka] divijvaidya commented on pull request #12677: KAFKA-14132: Replace PowerMock/Easymock with Mockito for WorkerMetricsGroupTest

2022-09-23 Thread GitBox


divijvaidya commented on PR #12677:
URL: https://github.com/apache/kafka/pull/12677#issuecomment-1255925164

   @mimaison please review this when you get an opportunity. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru opened a new pull request, #12680: [DO NOT MERGE] KAFKA-12689: Remove deprecated EOS configs

2022-09-23 Thread GitBox


lucasbru opened a new pull request, #12680:
URL: https://github.com/apache/kafka/pull/12680

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru closed pull request #12646: KAFKA-12689: Remove exactly_once / exactly_once_beta

2022-09-23 Thread GitBox


lucasbru closed pull request #12646: KAFKA-12689: Remove exactly_once / 
exactly_once_beta
URL: https://github.com/apache/kafka/pull/12646


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13866) Support more advanced time retention policies

2022-09-23 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17608609#comment-17608609
 ] 

Nikolay Izhikov commented on KAFKA-13866:
-

Hello, [~mjsax] Can you, please, share your feedback on KIP?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-870%3A+Retention+policy+based+on+record+event+time

https://lists.apache.org/thread/9njcnjd231l3l7xv121os99m3f7gggb3

> Support more advanced time retention policies
> -
>
> Key: KAFKA-13866
> URL: https://issues.apache.org/jira/browse/KAFKA-13866
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, core, log cleaner
>Reporter: Matthias J. Sax
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
>
> Time-based retention policy compares the record timestamp to broker 
> wall-clock time. Those semantics are questionable and also lead to issues for 
> data reprocessing: If one want to re-process older data then retention time, 
> it's not possible as broker expire those record aggressively and user need to 
> increate the retention time accordingly.
> Especially for Kafka Stream, we have seen many cases when users got bit by 
> the current behavior.
> It would be best, if Kafka would track _two_ timestamps per record: the 
> record event-time (as the broker do currently), plus the log append-time 
> (which is only tracked currently if the topic is configured with 
> "append-time" tracking, but the issue is, that it overwrite the producer 
> provided record event-time).
> Tracking both timestamps would allow to set a pure wall-clock time retention 
> time plus a pure event-time retention time policy:
>  * Wall-clock time: keep (at least) the date X days after writing
>  * Event-time: keep (at max) the X days worth of event-time data
> Comparing wall-clock time to wall-clock time and event-time to event-time 
> provides much cleaner semantics. The idea is to combine both policies and 
> only expire data if both policies trigger.
> For the event-time policy, the broker would need to track "stream time" as 
> max event-timestamp it has see per partition (similar to how Kafka Streams is 
> tracking "stream time" client side).
> Note the difference between "at least" and "at max" above: for the 
> data-reprocessing case, the max-based event-time policy avoids that the 
> broker would keep a huge history for the reprocessing case.
> It would be part of a KIP discussion on the details how wall-clock/event-time 
> and mix/max policies could be combined. For example, it might also be useful 
> to have the following policy: keep at least X days worth of event-time 
> history no matter how long the data is already stored (ie, there would only 
> be an event-time base expiration but not wall-clock time). It could also be 
> combined with a wall-clock time expiration: delete data only after it's at 
> least X days old and stored for at least Y days.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13725) KIP-768 OAuth code mixes public and internal classes in same package

2022-09-23 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-13725:
--
Fix Version/s: 3.4.0
Affects Version/s: 3.3.0

> KIP-768 OAuth code mixes public and internal classes in same package
> 
>
> Key: KAFKA-13725
> URL: https://issues.apache.org/jira/browse/KAFKA-13725
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.1.0, 3.2.0, 3.1.1, 3.3.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.4.0
>
>
> The {{org.apache.kafka.common.security.oauthbearer.secured}} package from 
> KIP-768 incorrectly mixed all of the classes (public and internal) in the 
> package together.
> This bug is to remove all but the public classes from that package and move 
> the rest to a new 
> {{{}org.apache.kafka.common.security.oauthbearer.internal.{}}}{{{}secured{}}} 
> package. This should be back-ported to all versions in which the KIP-768 
> OAuth work occurs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] omkreddy merged pull request #12039: KAFKA-13725: KIP-768 OAuth code mixes public and internal classes in same package

2022-09-23 Thread GitBox


omkreddy merged PR #12039:
URL: https://github.com/apache/kafka/pull/12039


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahmedsobeh commented on a diff in pull request #12670: KAFKA-14239: Merge StateRestorationIntegrationTest into RestoreIntegrationTest

2022-09-23 Thread GitBox


ahmedsobeh commented on code in PR #12670:
URL: https://github.com/apache/kafka/pull/12670#discussion_r978335459


##
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java:
##
@@ -97,11 +102,13 @@ public class RestoreIntegrationTest {
 
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
+

Review Comment:
   Used a formatter that seems to have added a couple of few lines. Removed  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12643: KAFKA-14097: make producer ID expiration a dynamic config

2022-09-23 Thread GitBox


dajac merged PR #12643:
URL: https://github.com/apache/kafka/pull/12643


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12674: KAFKA-14255: Fetching from follower should be disallowed if fetch from follower is disabled

2022-09-23 Thread GitBox


showuon commented on code in PR #12674:
URL: https://github.com/apache/kafka/pull/12674#discussion_r978329491


##
core/src/test/scala/unit/kafka/server/FetchRequestTest.scala:
##
@@ -285,8 +315,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
 
 // Check follower error codes
 val followerId = TestUtils.findFollowerId(topicPartition, servers)
-assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
-assertResponseErrorForEpoch(Errors.NONE, followerId, 
Optional.of(secondLeaderEpoch))
+assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, 
Optional.empty())
+assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, 
Optional.of(secondLeaderEpoch))

Review Comment:
   Sounds good to me! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14043) The latest status of the task is overwritten by the old status

2022-09-23 Thread doupengwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17606402#comment-17606402
 ] 

doupengwei edited comment on KAFKA-14043 at 9/23/22 7:01 AM:
-

Exp: for pg connector will try to release slot and publication resources, but 
due to net work issue. it will failed and try to update task status to failed.

So i think we can add a generation information for task status, so that after 
net work issue resolved. the old status info can be filtered by check 
generation.


was (Author: doudou):
Exp: for pg connector will try to release slot and publication resources, but 
due to net work issue. it will failed and try to update task status to failed.

So i think we can add a generation information for task status, so that after 
net work issue resolved. the old status info can be filtered by check 
generation.

 

Hello Ismael Juma,  sorry to bother u,  can u  check my point. @[~ijuma] 

 

> The latest status of the task is overwritten by the old status
> --
>
> Key: KAFKA-14043
> URL: https://issues.apache.org/jira/browse/KAFKA-14043
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
> Environment: Centos 7.6
> kafka  2.4.0
>Reporter: doupengwei
>Priority: Major
>
> Kafka version : 2.4.0
> connect.protocol : compatible
> In kafka connect cluster, if  one node  faced net work issue,  then will 
> caused worker try to stop connector and task which running on this node. and 
> due to net work issue. it will stop failed and throw exception, then worker 
> process will try to update task or connector status, producer will retry 
> indefinitely until successfully sent. but due to net work issue, the new 
> assignment have  performed on the other connect node. after net work recover, 
> old task status will cover latest status.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #12674: KAFKA-14255: Fetching from follower should be disallowed if fetch from follower is disabled

2022-09-23 Thread GitBox


dajac commented on code in PR #12674:
URL: https://github.com/apache/kafka/pull/12674#discussion_r978308464


##
core/src/test/scala/unit/kafka/server/FetchRequestTest.scala:
##
@@ -285,8 +315,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
 
 // Check follower error codes
 val followerId = TestUtils.findFollowerId(topicPartition, servers)
-assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
-assertResponseErrorForEpoch(Errors.NONE, followerId, 
Optional.of(secondLeaderEpoch))
+assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, 
Optional.empty())
+assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, 
Optional.of(secondLeaderEpoch))

Review Comment:
   Yeah, I was thinking about this as well. The point is that this is the 
correct and expected behavior in this case so I am inclined to keep it as it is 
now. However, it would be great to have a test suite with a replica selector 
enabled. I can do this as a follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org