[kafka] branch trunk updated: MINOR: Use mock time in DefaultStateUpdaterTest (#12344)

2022-06-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3faa6cf6d0 MINOR: Use mock time in DefaultStateUpdaterTest (#12344)
3faa6cf6d0 is described below

commit 3faa6cf6d060887288fcf68adb8c3f1e2090b8ed
Author: Guozhang Wang 
AuthorDate: Wed Jun 29 12:33:00 2022 -0700

MINOR: Use mock time in DefaultStateUpdaterTest (#12344)

For most tests we would need an auto-ticking mock timer to work with 
draining-with-timeout functions.
For tests that check for never checkpoint we need no auto-ticking timer to 
control exactly how much time elapsed.

Reviewers: Bruno Cadonna 
---
 .../processor/internals/DefaultStateUpdater.java   |  5 ++--
 .../internals/DefaultStateUpdaterTest.java | 34 --
 2 files changed, 22 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 0e84574c5c..886a37b314 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -356,8 +356,6 @@ public class DefaultStateUpdater implements StateUpdater {
 this.offsetResetter = offsetResetter;
 this.time = time;
 this.commitIntervalMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
-// initialize the last commit as of now to prevent first commit 
happens immediately
-this.lastCommitMs = time.milliseconds();
 }
 
 public void start() {
@@ -365,6 +363,9 @@ public class DefaultStateUpdater implements StateUpdater {
 stateUpdaterThread = new StateUpdaterThread("state-updater", 
changelogReader, offsetResetter);
 stateUpdaterThread.start();
 shutdownGate = new CountDownLatch(1);
+
+// initialize the last commit as of now to prevent first commit 
happens immediately
+this.lastCommitMs = time.milliseconds();
 }
 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 8bd81828f6..5e2d90de71 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -44,7 +45,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.common.utils.Utils.sleep;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.easymock.EasyMock.anyBoolean;
@@ -79,24 +79,25 @@ class DefaultStateUpdaterTest {
 private final static TaskId TASK_1_0 = new TaskId(1, 0);
 private final static TaskId TASK_1_1 = new TaskId(1, 1);
 
-private final StreamsConfig config = new 
StreamsConfig(configProps(COMMIT_INTERVAL));
+// need an auto-tick timer to work for draining with timeout
+private final Time time = new MockTime(1L);
+private final StreamsConfig config = new StreamsConfig(configProps());
 private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
 private final java.util.function.Consumer> 
offsetResetter = topicPartitions -> { };
-
-private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, 
changelogReader, offsetResetter, Time.SYSTEM);
+private final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, time);
 
 @AfterEach
 public void tearDown() {
 stateUpdater.shutdown(Duration.ofMinutes(1));
 }
 
-private Properties configProps(final int commitInterval) {
+private Properties configProps() {
 return mkObjectProperties(mkMap(
 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:2171"),
 

[kafka] branch trunk updated: [9/N][Emit final] Emit final for session window aggregations (#12204)

2022-06-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ababc4261b [9/N][Emit final] Emit final for session window 
aggregations (#12204)
ababc4261b is described below

commit ababc4261bfa03ee9d29ae7254ddd0ba988f826d
Author: Guozhang Wang 
AuthorDate: Wed Jun 29 09:22:37 2022 -0700

[9/N][Emit final] Emit final for session window aggregations (#12204)

* Add a new API for session windows to range query session window by end 
time (KIP related).
* Augment session window aggregator with emit strategy.
* Minor: consolidated some dup classes.
* Test: unit test on session window aggregator.

Reviewers: Guozhang Wang 
---
 .../streams/kstream/SessionWindowedKStream.java|   5 +-
 .../kafka/streams/kstream/TimeWindowedKStream.java |   1 +
 ...bstractKStreamTimeWindowAggregateProcessor.java |  11 +-
 .../internals/CogroupedStreamAggregateBuilder.java |   2 +
 .../internals/KStreamSessionWindowAggregate.java   | 272 -
 .../kstream/internals/KStreamWindowAggregate.java  |   7 -
 .../kstream/internals/SessionTupleForwarder.java   |  56 -
 .../internals/SessionWindowedKStreamImpl.java  |  29 ++-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  55 +++--
 .../internals/TimestampedTupleForwarder.java   |   3 +-
 .../internals/AbstractReadWriteDecorator.java  |   6 +
 .../apache/kafka/streams/state/SessionStore.java   |  13 +
 ...tractRocksDBTimeOrderedSegmentedBytesStore.java |   6 +-
 .../internals/ChangeLoggingSessionBytesStore.java  |  12 +-
 .../state/internals/InMemorySessionStore.java  |  21 +-
 .../state/internals/MeteredSessionStore.java   |  12 +
 .../state/internals/PrefixedSessionKeySchemas.java |  13 +-
 ...cksDBTimeOrderedSessionSegmentedBytesStore.java |  33 ++-
 .../internals/RocksDBTimeOrderedSessionStore.java  |   7 +
 .../streams/state/internals/SegmentIterator.java   |   2 +-
 .../state/internals/SegmentedBytesStore.java   |   4 +-
 .../streams/state/internals/SessionKeySchema.java  |   2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java | 219 -
 .../internals/KStreamWindowAggregateTest.java  |   2 +-
 .../internals/SessionTupleForwarderTest.java   | 108 
 .../internals/SessionWindowedKStreamImplTest.java  | 171 +
 .../internals/TimeWindowedKStreamImplTest.java |   2 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |   8 +-
 28 files changed, 676 insertions(+), 406 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index c561b62abf..fe897515a9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -39,7 +39,7 @@ import java.time.Duration;
  * materialized view) that can be queried using the name provided in the 
{@link Materialized} instance.
  * Furthermore, updates to the store are sent downstream into a windowed 
{@link KTable} changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
- * New events are added to sessions until their grace period ends (see {@link 
SessionWindows#grace(Duration)}).
+ * New events are added to sessions until their grace period ends (see {@link 
SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}).
  * 
  * A {@code SessionWindowedKStream} must be obtained from a {@link 
KGroupedStream} via
  * {@link KGroupedStream#windowedBy(SessionWindows)}.
@@ -643,4 +643,7 @@ public interface SessionWindowedKStream {
 KTable, V> reduce(final Reducer reducer,
   final Named named,
   final Materialized> materialized);
+
+// TODO: add javadoc
+SessionWindowedKStream emitStrategy(final EmitStrategy emitStrategy);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 46ebd267f9..3f36838f20 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -649,5 +649,6 @@ public interface TimeWindowedKStream {
   final Named named,
   final Materialized> materialized);
 
+// TODO: add javadoc
 TimeWindowedKStream emitStrategy(final EmitStrategy emitStrategy);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
 

[kafka-site] branch asf-site updated: Update powered-by.html (#320)

2022-06-29 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new db7bd434 Update powered-by.html (#320)
db7bd434 is described below

commit db7bd43492bc83cedfc48a3d288b8b49172acad1
Author: javioverflow 
AuthorDate: Wed Jun 29 16:47:19 2022 +0200

Update powered-by.html (#320)

The landing page is showing 80%, while the powered-by page is showing 60%. 
I want to link this as a source for a blog post, but I can't just link the 
homepage, and the powered by page is showing outdated stats.
---
 powered-by.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/powered-by.html b/powered-by.html
index 469da08d..8e03f89b 100644
--- a/powered-by.html
+++ b/powered-by.html
@@ -734,7 +734,7 @@
 Apache Kafka is the most popular open-source stream-processing 
software for collecting, processing, storing, and analyzing data at scale. Most 
known for its excellent performance, low latency, fault tolerance, and high 
throughput, it's capable of handling thousands of messages per second. With 
over 1,000 Kafka use cases and counting, some common benefits are building data 
pipelines, leveraging real-time data streams, enabling operational metrics, and 
data integration across c [...]
 
 
-Today, Kafka is used by thousands of companies including over 60% 
of the Fortune 100. Among these are Box, Goldman Sachs, Target, Cisco, Intuit, 
and more.  As the trusted tool for empowering and innovating companies, Kafka 
allows organizations to modernize their data strategies with event streaming 
architecture. Learn how Kafka is used by organizations in every industry - from 
computer software, financial services, and health care, to government and 
transportation.
+Today, Kafka is used by thousands of companies including over 80% 
of the Fortune 100. Among these are Box, Goldman Sachs, Target, Cisco, Intuit, 
and more.  As the trusted tool for empowering and innovating companies, Kafka 
allows organizations to modernize their data strategies with event streaming 
architecture. Learn how Kafka is used by organizations in every industry - from 
computer software, financial services, and health care, to government and 
transportation.
 
 




[kafka-site] branch asf-site updated: Add Covage to powered-by (#413)

2022-06-29 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 04e5d4e3 Add Covage to powered-by (#413)
04e5d4e3 is described below

commit 04e5d4e3345279f28827e0d25810824516423143
Author: Zacharie LAMAZIERE <106680797+zac...@users.noreply.github.com>
AuthorDate: Wed Jun 29 16:34:16 2022 +0200

Add Covage to powered-by (#413)
---
 images/powered-by/covage.png | Bin 0 -> 26522 bytes
 powered-by.html  |   5 +
 2 files changed, 5 insertions(+)

diff --git a/images/powered-by/covage.png b/images/powered-by/covage.png
new file mode 100644
index ..db8a811a
Binary files /dev/null and b/images/powered-by/covage.png differ
diff --git a/powered-by.html b/powered-by.html
index f36ac342..469da08d 100644
--- a/powered-by.html
+++ b/powered-by.html
@@ -715,6 +715,11 @@
 "logo": "atguigu.png",
 "logoBgColor": "#ff",
 "description": "In our real-time data warehouse, apache kafka is used 
as a reliable distributed message queue, which allows us to build a highly 
available analysis system."
+}, {
+"link": "https://www.covage.com/;,
+"logo": "covage.png",
+"logoBgColor": "#ff",
+"description": "Covage is an infrastructure operator designing, 
deploying and operating high speed open access networks. At the very heart of 
our IT platform, Kafka is ensuring propagating our business workflows' events 
among all applications."
 }];
 
 



[kafka] branch trunk updated: KAFKA-14010: AlterPartition request won't retry when receiving retriable error (#12329)

2022-06-29 Thread showuon
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 08e509459b KAFKA-14010: AlterPartition request won't retry when 
receiving retriable error (#12329)
08e509459b is described below

commit 08e509459bd61f6aafb9f7d3902dccbbc7e13296
Author: Luke Chen 
AuthorDate: Wed Jun 29 20:53:36 2022 +0800

KAFKA-14010: AlterPartition request won't retry when receiving retriable 
error (#12329)

When submitting the AlterIsr request, we register a future listener to 
handle the response. When receiving retriable error, we expected the AlterIsr 
request will get retried. And then, we'll re-submit the request again.

However, before the future listener got called, we didn't clear the 
`unsentIsrUpdates`, which causes we failed to "enqueue" the request because we 
thought there's an in-flight request. We use "try/finally" to make sure the 
unsentIsrUpdates got cleared, but it happened "after" we retry the request

Reviewers: David Jacot , dengziming 

---
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 .../scala/kafka/server/AlterPartitionManager.scala |  14 ++-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 107 -
 .../kafka/server/AlterPartitionManagerTest.scala   | 102 
 4 files changed, 194 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 046e21c1f9..319025226c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -952,7 +952,7 @@ class Partition(val topicPartition: TopicPartition,
*
* With the addition of AlterPartition, we also consider newly added 
replicas as part of the ISR when advancing
* the HW. These replicas have not yet been committed to the ISR by the 
controller, so we could revert to the previously
-   * committed ISR. However, adding additional replicas to the ISR makes it 
more restrictive and therefor safe. We call
+   * committed ISR. However, adding additional replicas to the ISR makes it 
more restrictive and therefore safe. We call
* this set the "maximal" ISR. See KIP-497 for more details
*
* Note There is no need to acquire the leaderIsrUpdate lock here since all 
callers of this private API acquire that lock
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala 
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 9f89f47e82..3dd6cfb0b1 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -359,14 +359,12 @@ class DefaultAlterPartitionManager(
 inflightAlterPartitionItems.foreach { inflightAlterPartition =>
   
partitionResponses.get(inflightAlterPartition.topicIdPartition.topicPartition) 
match {
 case Some(leaderAndIsrOrError) =>
-  try {
-leaderAndIsrOrError match {
-  case Left(error) => 
inflightAlterPartition.future.completeExceptionally(error.exception)
-  case Right(leaderAndIsr) => 
inflightAlterPartition.future.complete(leaderAndIsr)
-}
-  } finally {
-// Regardless of callback outcome, we need to clear from the 
unsent updates map to unblock further updates
-
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition.topicPartition)
+  // Regardless of callback outcome, we need to clear from the 
unsent updates map to unblock further
+  // updates. We clear it now to allow the callback to submit a 
new update if needed.
+  
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition.topicPartition)
+  leaderAndIsrOrError match {
+case Left(error) => 
inflightAlterPartition.future.completeExceptionally(error.exception)
+case Right(leaderAndIsr) => 
inflightAlterPartition.future.complete(leaderAndIsr)
   }
 case None =>
   // Don't remove this partition from the update map so it will 
get re-sent
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7dcead062b..65a6cdadf4 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -26,12 +26,12 @@ import kafka.server.epoch.EpochEntry
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors.{ApiException, 
FencedLeaderEpochException, InconsistentTopicIdException, 
NotLeaderOrFollowerException, OffsetNotAvailableException, 
OffsetOutOfRangeException, UnknownLeaderEpochException}