This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e24916a MINOR:Fix table outer join test (#5099)
e24916a is described below
commit e24916a68f8259046be677e7f8c1f365960e0dc3
Author: emmanuel Harel
AuthorDate: Fri Jun 1 13:09:24 2018 +0200
MINOR:Fix table outer join test (#5099)
---
.../java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 399e519..0b9c1ab 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -472,7 +472,7 @@ public class KTableImplTest {
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
-table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized)
null);
+table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized)
null);
}
@Test(expected = NullPointerException.class)
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a change to annotated tag 1.1.0-rc3
in repository https://gitbox.apache.org/repos/asf/kafka.git.
*** WARNING: tag 1.1.0-rc3 was modified! ***
from 9368c84 (commit)
to ecb57c1 (tag)
tagging 9368c84565224fff1c74199af995c86f806be37a (commit)
replaces 0.8.0-beta1
by Damian Guy
on Thu Mar 15 13:27:58 2018 +
- Log -
1.1.0-rc3
---
No new revisions were added by this update.
Summary of changes:
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a change to annotated tag 1.1.0-rc0
in repository https://gitbox.apache.org/repos/asf/kafka.git.
at 7d74914 (tag)
tagging e99dd247490bab052023315aa6789ebe03dd0927 (commit)
replaces 0.8.0-beta1
by Damian Guy
on Sat Feb 24 15:13:54 2018 +
- Log -
1.1.0-rc0
---
This annotated tag includes the following new commits:
new e99dd24 Bump version to 1.1.0
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to annotated tag 1.1.0-rc0
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e99dd247490bab052023315aa6789ebe03dd0927
Author: Damian Guy <damian@gmail.com>
AuthorDate: Sat Feb 24 15:13:54 2018 +
Bump version to 1.1.0
---
gradle.properties | 2 +-
streams/quickstart/java/pom.xml | 4 ++--
.../quickstart/java/src/main/resources/archetype-resources/pom.xml| 4 ++--
streams/quickstart/pom.xml| 4 ++--
tests/kafkatest/__init__.py | 2 +-
5 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/gradle.properties b/gradle.properties
index 7062ba0..190f223 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,7 +16,7 @@
group=org.apache.kafka
# NOTE: When you change this version number, you should also make sure to
update
# the version numbers in tests/kafkatest/__init__.py and kafka-merge-pr.py.
-version=1.1.0-SNAPSHOT
+version=1.1.0
scalaVersion=2.11.12
task=build
org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m
diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml
index 70c1416..c3f50d0 100644
--- a/streams/quickstart/java/pom.xml
+++ b/streams/quickstart/java/pom.xml
@@ -26,11 +26,11 @@
org.apache.kafka
streams-quickstart
-1.1.0-SNAPSHOT
+1.1.0
..
streams-quickstart-java
maven-archetype
-
\ No newline at end of file
+
diff --git
a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
index f2b8a8f..07ca444 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
@@ -29,7 +29,7 @@
UTF-8
-1.1.0-SNAPSHOT
+1.1.0
1.7.7
1.2.17
@@ -133,4 +133,4 @@
${kafka.version}
-
\ No newline at end of file
+
diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml
index 010b3fb..0a165d9 100644
--- a/streams/quickstart/pom.xml
+++ b/streams/quickstart/pom.xml
@@ -22,7 +22,7 @@
org.apache.kafka
streams-quickstart
pom
-1.1.0-SNAPSHOT
+1.1.0
Kafka Streams :: Quickstart
@@ -118,4 +118,4 @@
-
\ No newline at end of file
+
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 80824f9..e7778a4 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -22,4 +22,4 @@
# Instead, in development branches, the version should have a suffix of the
form ".devN"
#
# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be
something like "1.0.0.dev0"
-__version__ = '1.1.0.dev0'
+__version__ = '1.1.0'
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new ba0389f KAFKA-6577: Fix Connect system tests and add debug messages
ba0389f is described below
commit ba0389f57cf9d1321e3762f21da04b62d04559d5
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Thu Feb 22 09:39:59 2018 +
KAFKA-6577: Fix Connect system tests and add debug messages
**NOTE: This should be backported to the `1.1` branch, and is currently a
blocker for 1.1.**
The `connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink`
system test is failing with the SASL configuration without a sufficient
explanation. During the test, the Connect worker fails to start, but the
Connect log contains no useful information. There are actual several things
compounding to cause the failure and make it difficult to understand the
problem.
First, the
`tests/kafkatest/tests/connect/templates/connect_standalone.properties` is only
adding in the broker's security configuration with the `producer.` and
`consumer.` prefixes, but is not adding them with no prefix. The worker uses
the AdminClient to connect to the broker to get the Kafka cluster ID and to
manage the three internal topics, and the AdminClient is configured via
top-level properties. Because the SASL test requires the clients all connect
using SASL, the lack of b [...]
Second, the default `request.timeout.ms` for the AdminClient (and the other
clients) is 120 seconds, so the AdminClient was retrying for 120 seconds before
it would give up and thrown an error. However, the test was only waiting for 60
seconds before determining that the service failed to start. This can be
corrected by setting `request.timeout.ms=1` in the Connect distributed and
standalone worker configurations.
Third, the Connect workers were recently changed to lookup the Kafka
cluster ID before it started the herder. This is unlike the older uses of the
AdminClient to find and manage the internal topics, where failure to connect
was not necessarily logged correctly but nevertheless still skipped over,
relying upon broker auto-topic creation to create the internal topics. (This
may be why the test did not fail prior to the recent change to always require a
successful AdminClient connection. [...]
The `ConnectStandaloneFileTest.test_file_source_and_sink` system tests were
run locally prior to this fix, and they failed as with the nightlies. Once
these fixes were made, the locally run system tests passed.
Author: Randall Hauch <rha...@gmail.com>
Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Ewen
Cheslack-Postava <m...@ewencp.org>
Closes #4610 from rhauch/kafka-6577-trunk
(cherry picked from commit fc19c3e6f243a8d1b3e27cdc912dc092bbd342e0)
Signed-off-by: Damian Guy <damian@gmail.com>
---
.../main/java/org/apache/kafka/connect/cli/ConnectDistributed.java | 1 +
.../main/java/org/apache/kafka/connect/cli/ConnectStandalone.java| 1 +
.../org/apache/kafka/connect/storage/KafkaConfigBackingStore.java| 1 +
.../org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java| 1 +
.../org/apache/kafka/connect/storage/KafkaStatusBackingStore.java| 1 +
.../src/main/java/org/apache/kafka/connect/util/ConnectUtils.java| 5 -
tests/kafkatest/tests/connect/connect_test.py| 2 +-
.../kafkatest/tests/connect/templates/connect-distributed.properties | 3 +++
.../kafkatest/tests/connect/templates/connect-standalone.properties | 4
9 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 98a77ed..8930602 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -73,6 +73,7 @@ public class ConnectDistributed {
DistributedConfig config = new DistributedConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
URI advertisedUrl = rest.advertisedUrl();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 1769905..413cb46 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStanda
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 660c0c0 KAFKA-6238; Fix inter-broker protocol message format
compatibility check
660c0c0 is described below
commit 660c0c0aa33ced5307ee70bfdb78ebde4b978d73
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 21 09:38:39 2018 +
KAFKA-6238; Fix inter-broker protocol message format compatibility check
This patch fixes a bug in the validation of the inter-broker protocol and
the message format version. We should allow the configured message format api
version to be greater than the inter-broker protocol api version as long as the
actual message format versions are equal. For example, if the message format
version is set to 1.0, it is fine for the inter-broker protocol version to be
0.11.0 because they both use message format v2.
I have added a unit test which checks compatibility for all combinations of
the message format version and the inter-broker protocol version.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <ism...@juma.me.uk>
Closes #4583 from hachikuji/KAFKA-6328-REOPENED
---
.../apache/kafka/common/record/RecordFormat.java | 41 +++
core/src/main/scala/kafka/api/ApiVersion.scala | 46 ++
core/src/main/scala/kafka/log/Log.scala| 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 5 ++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 9 -
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../test/scala/unit/kafka/api/ApiVersionTest.scala | 13 ++
.../scala/unit/kafka/server/KafkaConfigTest.scala | 24 +++
docs/upgrade.html | 19 +
9 files changed, 131 insertions(+), 32 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
new file mode 100644
index 000..e71ec59
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public enum RecordFormat {
+V0(0), V1(1), V2(2);
+
+public final byte value;
+
+RecordFormat(int value) {
+this.value = (byte) value;
+}
+
+public static RecordFormat lookup(byte version) {
+switch (version) {
+case 0: return V0;
+case 1: return V1;
+case 2: return V2;
+default: throw new IllegalArgumentException("Unknown format
version: " + version);
+}
+}
+
+public static RecordFormat current() {
+return V2;
+}
+
+}
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala
b/core/src/main/scala/kafka/api/ApiVersion.scala
index b8329c1..9270a7a 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
package kafka.api
-import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.RecordFormat
/**
* This class contains the different Kafka versions.
@@ -90,11 +90,23 @@ object ApiVersion {
def latestVersion = versionNameMap.values.max
+ def allVersions: Set[ApiVersion] = {
+versionNameMap.values.toSet
+ }
+
+ def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String =
{
+messageFormatVersion match {
+ case RecordFormat.V0 => "0.8.0"
+ case RecordFormat.V1 => "0.10.0"
+ case RecordFormat.V2 => "0.11.0"
+ case _ => throw new IllegalArgumentException(s"Invalid message format
version $messageFormatVersion")
+}
+ }
}
sealed trait ApiVersion extends Ordered[ApiVersion] {
val version: String
- val messageFormatVersion: Byte
+ val messageFormatVersion: RecordFormat
val id: Int
override def compare(that: Api
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 57059d4 MINOR: Fix streams broker compatibility test.
57059d4 is described below
commit 57059d40223d6284d1b2c9f3034b30f6bd61c44f
Author: Damian Guy <damian@gmail.com>
AuthorDate: Tue Feb 20 17:46:05 2018 +
MINOR: Fix streams broker compatibility test.
Change the string in the test condition to the one that is logged
Author: Damian Guy <damian@gmail.com>
Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang
<wangg...@gmail.com>
Closes #4599 from dguy/broker-compatibility
---
tests/kafkatest/tests/streams/streams_broker_compatibility_test.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 1eb46ef..b00b9bb 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -67,9 +67,9 @@ class StreamsBrokerCompatibility(Test):
processor.node.account.ssh(processor.start_cmd(processor.node))
with processor.node.account.monitor_log(processor.STDERR_FILE) as
monitor:
-monitor.wait_until('FATAL: An unexpected exception
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not
support LIST_OFFSETS ',
+monitor.wait_until("Exception in thread \"main\"
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not
support LIST_OFFSETS ",
timeout_sec=60,
- err_msg="Never saw 'FATAL: An unexpected
exception org.apache.kafka.common.errors.UnsupportedVersionException: The
broker does not support LIST_OFFSETS ' error message " +
str(processor.node.account))
+ err_msg="Exception in thread \"main\"
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not
support LIST_OFFSETS " + str(processor.node.account))
self.kafka.stop()
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 6ca5997 MINOR: ignore streams eos tests (#4597)
6ca5997 is described below
commit 6ca59977f378647254d50b2c62f192b93ba72551
Author: Damian Guy <damian@gmail.com>
AuthorDate: Tue Feb 20 17:26:31 2018 +
MINOR: ignore streams eos tests (#4597)
---
tests/kafkatest/tests/streams/streams_eos_test.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py
b/tests/kafkatest/tests/streams/streams_eos_test.py
index d6ac600..986702c 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -38,6 +38,7 @@ class StreamsEosTest(KafkaTest):
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.test_context = test_context
+@ignored
@cluster(num_nodes=9)
def test_rebalance_simple(self):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context,
self.kafka),
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new d373fe2 MINOR: Redirect response code in Connect's RestClient to logs
instead of stdout
d373fe2 is described below
commit d373fe2e378e3797d1b64f255f281f0b1f41cede
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Tue Feb 20 17:15:31 2018 +
MINOR: Redirect response code in Connect's RestClient to logs instead of
stdout
Sending the response code of an http request issued via `RestClient` in
Connect to stdout seems like a unconventional choice.
This PR redirects the responds code with a message in the logs at DEBUG
level (usually the same level as the one that the caller of
`RestClient.httpRequest` uses.
This fix will also fix system tests that broke by outputting this response
code to stdout.
Author: Konstantine Karantasis <konstant...@confluent.io>
Reviewers: Randall Hauch <rha...@gmail.com>, Damian Guy
<damian@gmail.com>
Closes #4591 from
kkonstantine/MINOR-Redirect-response-code-in-Connect-RestClient-to-logs-instead-of-stdout
(cherry picked from commit b79e11bb511e259c8187d865761c3b448391603f)
Signed-off-by: Damian Guy <damian@gmail.com>
---
.../main/java/org/apache/kafka/connect/runtime/rest/RestClient.java | 6 --
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index d500ad2..15e8418 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -82,12 +82,14 @@ public class RestClient {
req.method(method);
req.accept("application/json");
req.agent("kafka-connect");
-req.content(new StringContentProvider(serializedBody,
StandardCharsets.UTF_8), "application/json");
+if (serializedBody != null) {
+req.content(new StringContentProvider(serializedBody,
StandardCharsets.UTF_8), "application/json");
+}
ContentResponse res = req.send();
int responseCode = res.getStatus();
-System.out.println(responseCode);
+log.debug("Request's response code: {}", responseCode);
if (responseCode == HttpStatus.NO_CONTENT_204) {
return new HttpResponse<>(responseCode,
convertHttpFieldsToMap(res.getHeaders()), null);
} else if (responseCode >= 400) {
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b79e11b MINOR: Redirect response code in Connect's RestClient to logs
instead of stdout
b79e11b is described below
commit b79e11bb511e259c8187d865761c3b448391603f
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Tue Feb 20 17:15:31 2018 +
MINOR: Redirect response code in Connect's RestClient to logs instead of
stdout
Sending the response code of an http request issued via `RestClient` in
Connect to stdout seems like a unconventional choice.
This PR redirects the responds code with a message in the logs at DEBUG
level (usually the same level as the one that the caller of
`RestClient.httpRequest` uses.
This fix will also fix system tests that broke by outputting this response
code to stdout.
Author: Konstantine Karantasis <konstant...@confluent.io>
Reviewers: Randall Hauch <rha...@gmail.com>, Damian Guy
<damian@gmail.com>
Closes #4591 from
kkonstantine/MINOR-Redirect-response-code-in-Connect-RestClient-to-logs-instead-of-stdout
---
.../main/java/org/apache/kafka/connect/runtime/rest/RestClient.java | 6 --
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index d500ad2..15e8418 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -82,12 +82,14 @@ public class RestClient {
req.method(method);
req.accept("application/json");
req.agent("kafka-connect");
-req.content(new StringContentProvider(serializedBody,
StandardCharsets.UTF_8), "application/json");
+if (serializedBody != null) {
+req.content(new StringContentProvider(serializedBody,
StandardCharsets.UTF_8), "application/json");
+}
ContentResponse res = req.send();
int responseCode = res.getStatus();
-System.out.println(responseCode);
+log.debug("Request's response code: {}", responseCode);
if (responseCode == HttpStatus.NO_CONTENT_204) {
return new HttpResponse<>(responseCode,
convertHttpFieldsToMap(res.getHeaders()), null);
} else if (responseCode >= 400) {
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ca01711 Bump trunk versions to 1.2-SNAPSHOT (#4505)
ca01711 is described below
commit ca01711c0ec0b616840cc696419e5bbf500f6651
Author: Damian Guy <damian@gmail.com>
AuthorDate: Thu Feb 1 11:35:43 2018 +
Bump trunk versions to 1.2-SNAPSHOT (#4505)
---
gradle.properties | 2 +-
kafka-merge-pr.py | 2 +-
streams/quickstart/java/pom.xml| 2 +-
streams/quickstart/java/src/main/resources/archetype-resources/pom.xml | 2 +-
streams/quickstart/pom.xml | 2 +-
tests/kafkatest/__init__.py| 2 +-
6 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/gradle.properties b/gradle.properties
index 7062ba0..325a1d0 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,7 +16,7 @@
group=org.apache.kafka
# NOTE: When you change this version number, you should also make sure to
update
# the version numbers in tests/kafkatest/__init__.py and kafka-merge-pr.py.
-version=1.1.0-SNAPSHOT
+version=1.2.0-SNAPSHOT
scalaVersion=2.11.12
task=build
org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m
diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py
index 90fcf22..02cf6e0 100755
--- a/kafka-merge-pr.py
+++ b/kafka-merge-pr.py
@@ -70,7 +70,7 @@ TEMP_BRANCH_PREFIX = "PR_TOOL"
DEV_BRANCH_NAME = "trunk"
-DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "1.1.0")
+DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "1.2.0")
def get_json(url):
try:
diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml
index 70c1416..fed2bbc 100644
--- a/streams/quickstart/java/pom.xml
+++ b/streams/quickstart/java/pom.xml
@@ -26,7 +26,7 @@
org.apache.kafka
streams-quickstart
-1.1.0-SNAPSHOT
+1.2.0-SNAPSHOT
..
diff --git
a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
index f2b8a8f..6da81a7 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
@@ -29,7 +29,7 @@
UTF-8
-1.1.0-SNAPSHOT
+1.2.0-SNAPSHOT
1.7.7
1.2.17
diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml
index d348e64..b14a9ab 100644
--- a/streams/quickstart/pom.xml
+++ b/streams/quickstart/pom.xml
@@ -22,7 +22,7 @@
org.apache.kafka
streams-quickstart
pom
-1.1.0-SNAPSHOT
+1.2.0-SNAPSHOT
Kafka Streams :: Quickstart
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 80824f9..935f20d 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -22,4 +22,4 @@
# Instead, in development branches, the version should have a suffix of the
form ".devN"
#
# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be
something like "1.0.0.dev0"
-__version__ = '1.1.0.dev0'
+__version__ = '1.2.0.dev0'
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a change to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git.
at c38a345 MINOR: Fix brokerId passed to metrics reporters (#4497)
No new revisions were added by this update.
--
To stop receiving notification emails like this one, please contact
damian...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new ab3e4a2 KAFKA-6378 KStream-GlobalKTable null KeyValueMapper handling
ab3e4a2 is described below
commit ab3e4a27671df2499f8dea34a84fc0740102269c
Author: Andy Bryant <andybry...@gmail.com>
AuthorDate: Wed Jan 31 10:20:12 2018 +
KAFKA-6378 KStream-GlobalKTable null KeyValueMapper handling
For KStream-GlobalKTable joins let `null` `KeyValueMapper` results indicate
no match
For KStream-GlobalKTable joins, a `KeyValueMapper` is used to derive a key
from the stream records into the `GlobalKTable`. For some stream values there
may be no valid reference to the table stream. This patch allows developers to
use `null` return values to indicate there is no possible match. This is
possible in this case since `null` is never a valid key value for a
`GlobalKTable`.
Without this patch, providing a `null` value caused the stream to crash on
Kafka 1.0.
I added unit tests for KStream-GlobalKTable left and inner joins, since
they were missing. I also covered this additional scenario where
`KeyValueMapper` returns `null` to insure it is handled correctly.
Author: Andy Bryant <andybry...@gmail.com>
Reviewers: Matthias J. Sax <matth...@confluent.io>, Damian Guy
<damian@gmail.com>
Closes #4424 from
andybryant/KAFKA-6378-null-handling-stream-globaltable-join
---
.../org/apache/kafka/streams/kstream/KStream.java | 12 +-
.../internals/KStreamKTableJoinProcessor.java | 5 +-
.../internals/KStreamGlobalKTableJoinTest.java | 211 +
.../internals/KStreamGlobalKTableLeftJoinTest.java | 211 +
.../kstream/internals/KStreamKTableJoinTest.java | 132 +
.../internals/KStreamKTableLeftJoinTest.java | 139 +-
6 files changed, 617 insertions(+), 93 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0d1d201..6973719 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -2474,8 +2474,10 @@ public interface KStream<K, V> {
* For each {@code KStream} record that finds a corresponding record in
{@link GlobalKTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary
type) for the result record.
* The key of the result record is the same as the key of this {@code
KStream}.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
+ * If a {@code KStream} input record key or value is {@code null} the
record will not be included in the join
* operation and thus no output record will be added to the resulting
{@code KStream}.
+ * If {@code keyValueMapper} returns {@code null} implying no match
exists, no output record will be added to the
+ * resulting {@code KStream}.
*
* @param globalKTable the {@link GlobalKTable} to be joined with this
stream
* @param keyValueMapper instance of {@link KeyValueMapper} used to map
from the (key, value) of this stream
@@ -2506,11 +2508,13 @@ public interface KStream<K, V> {
*
* For each {@code KStream} record whether or not it finds a corresponding
record in {@link GlobalKTable} the
* provided {@link ValueJoiner} will be called to compute a value (with
arbitrary type) for the result record.
- * If no {@link GlobalKTable} record was found during lookup, a {@code
null} value will be provided to
- * {@link ValueJoiner}.
* The key of the result record is the same as this {@code KStream}.
- * If an {@code KStream} input record key or value is {@code null} the
record will not be included in the join
+ * If a {@code KStream} input record key or value is {@code null} the
record will not be included in the join
* operation and thus no output record will be added to the resulting
{@code KStream}.
+ * If {@code keyValueMapper} returns {@code null} implying no match
exists, a {@code null} value will be
+ * provided to {@link ValueJoiner}.
+ * If no {@link GlobalKTable} record was found during lookup, a {@code
null} value will be provided to
+ * {@link ValueJoiner}.
*
* @param globalKTable the {@link GlobalKTable} to be joined with this
stream
* @param keyValueMapper instance of {@link KeyValueMapper} used to map
from the (key, value) of this stream
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/ap
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e1c5d0c MINOR: Add documentation for KAFKA-6086
(ProductionExceptionHandler) (#4395)
e1c5d0c is described below
commit e1c5d0c119b38a9ddb2b09b6309a3817d86d8e14
Author: Matt Farmer <m...@frmr.me>
AuthorDate: Mon Jan 8 06:33:23 2018 -0500
MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) (#4395)
* Update streams documentation to describe production exception handler
* Add a mention of the ProductionExceptionHandler in the upgrade guide
---
docs/streams/developer-guide/config-streams.html | 76 +++-
docs/streams/upgrade-guide.html | 9 ++-
2 files changed, 68 insertions(+), 17 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index dbac7fb..256cc18 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -69,6 +69,7 @@
Optional configuration
parameters
default.deserialization.exception.handler
+default.production.exception.handler
default.key.serde
default.value.serde
num.standby.replicas
@@ -216,77 +217,82 @@
Exception handling class that implements the DeserializationExceptionHandler interface.
3 milliseconds
- key.serde
+ default.production.exception.handler
+Medium
+Exception handling class that implements the ProductionExceptionHandler interface.
+DefaultProductionExceptionHandler
+
+ key.serde
Medium
Default serializer/deserializer class for record
keys, implements the Serde interface (see also value.serde).
Serdes.ByteArray().getClass().getName()
- metric.reporters
+ metric.reporters
Low
A list of classes to use as metrics reporters.
the empty list
- metrics.num.samples
+ metrics.num.samples
Low
The number of samples maintained to compute
metrics.
2
- metrics.recording.level
+ metrics.recording.level
Low
The highest recording level for metrics.
INFO
- metrics.sample.window.ms
+ metrics.sample.window.ms
Low
The window of time a metrics sample is computed
over.
3 milliseconds
- num.standby.replicas
+ num.standby.replicas
Medium
The number of standby replicas for each task.
0
- num.stream.threads
+ num.stream.threads
Medium
The number of threads to execute stream
processing.
1
- partition.grouper
+ partition.grouper
Low
Partition grouper class that implements the PartitionGrouper
interface.
See Partition Grouper
- poll.ms
+ poll.ms
Low
The amount of time in milliseconds to block
waiting for input.
100 milliseconds
- replication.factor
+ replication.factor
High
The replication factor for changelog topics and
repartition topics created by the application.
1
- state.cleanup.delay.ms
+ state.cleanup.delay.ms
Low
The amount of time in milliseconds to wait before
deleting state when a partition has migrated.
600 milliseconds
- state.dir
+ state.dir
High
Directory location for state stores.
/var/lib/kafka-streams
- timestamp.extractor
+ timestamp.extractor
Medium
Timestamp extractor class that implements the
TimestampExtractor interface.
See Timestamp Extractor
- value.serde
+ value.serde
Medium
Default serializer/deserializer class for record
values, implements the Serde interface (see also key.serde).
Serdes.ByteArray().getClass().getName()
- windowstore.changelog.additional.retention.ms
+ windowstore.changelog.additional.retention.ms
Low
Added to a windows maintainMs to ensure data is
not deleted from the log prematurely. Allows for clock drift.
8640 milliseconds = 1 day
@@ -309,6 +
Repository: kafka
Updated Branches:
refs/heads/trunk 86cd558b3 -> c216adb4b
MINOR: add hint for setting an uncaught exception handler to JavaDocs
Author: Matthias J. Sax
Reviewers: Bill Bejeck , Damian Guy
Closes #4104 from mjsax/minor-uncaught-exception-handler
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c216adb4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c216adb4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c216adb4
Branch: refs/heads/trunk
Commit: c216adb4bbf8306977380a1ec371380e30137765
Parents: 86cd558
Author: Matthias J. Sax
Authored: Mon Oct 23 10:33:51 2017 +0100
Committer: Damian Guy
Committed: Mon Oct 23 10:33:51 2017 +0100
--
.../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 6 ++
1 file changed, 6 insertions(+)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/c216adb4/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ae4ef34..6e48f19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -723,6 +723,12 @@ public class KafkaStreams {
* Start the {@code KafkaStreams} instance by starting all its threads.
* This function is expected to be called only once during the life cycle
of the client.
*
+ * Because threads are started in the background, this method does not
block.
+ * As a consequence, any fatal exception that happens during processing is
by default only logged.
+ * If you want to be notified about dying threads, you can
+ * {@link #setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)
register an uncaught exception handler}
+ * before starting the {@code KafkaStreams} instance.
+ *
* Note, for brokers with version {@code 0.9.x} or lower, the broker
version cannot be checked.
* There will be no error and the client will hang and retry to verify the
broker version until it
* {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
Repository: kafka
Updated Branches:
refs/heads/1.0 5ee157126 -> 2a3219413
MINOR: add hint for setting an uncaught exception handler to JavaDocs
Author: Matthias J. Sax
Reviewers: Bill Bejeck , Damian Guy
Closes #4104 from mjsax/minor-uncaught-exception-handler
(cherry picked from commit c216adb4bbf8306977380a1ec371380e30137765)
Signed-off-by: Damian Guy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a321941
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a321941
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a321941
Branch: refs/heads/1.0
Commit: 2a321941387c7739f2fbbbe592d017b703223ada
Parents: 5ee1571
Author: Matthias J. Sax
Authored: Mon Oct 23 10:33:51 2017 +0100
Committer: Damian Guy
Committed: Mon Oct 23 10:34:04 2017 +0100
--
.../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 6 ++
1 file changed, 6 insertions(+)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/2a321941/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ae4ef34..6e48f19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -723,6 +723,12 @@ public class KafkaStreams {
* Start the {@code KafkaStreams} instance by starting all its threads.
* This function is expected to be called only once during the life cycle
of the client.
*
+ * Because threads are started in the background, this method does not
block.
+ * As a consequence, any fatal exception that happens during processing is
by default only logged.
+ * If you want to be notified about dying threads, you can
+ * {@link #setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)
register an uncaught exception handler}
+ * before starting the {@code KafkaStreams} instance.
+ *
* Note, for brokers with version {@code 0.9.x} or lower, the broker
version cannot be checked.
* There will be no error and the client will hang and retry to verify the
broker version until it
* {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
Repository: kafka
Updated Branches:
refs/heads/trunk 62682d078 -> 53c23bb5e
MINOR: improve Store parameter checks
Author: Matthias J. Sax
Reviewers: Bill Bejeck , Damian Guy
Closes #4063 from mjsax/minor-improve-store-parameter-checks
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53c23bb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53c23bb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53c23bb5
Branch: refs/heads/trunk
Commit: 53c23bb5e65c147d7b2cae0a7fd9b3ba46c8fce5
Parents: 62682d0
Author: Matthias J. Sax
Authored: Thu Oct 12 15:55:43 2017 +0100
Committer: Damian Guy
Committed: Thu Oct 12 15:55:43 2017 +0100
--
.../org/apache/kafka/streams/state/Stores.java | 50 +++
.../apache/kafka/streams/state/StoresTest.java | 65
2 files changed, 102 insertions(+), 13 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/53c23bb5/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c9c44af..0ce6d9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/**
* Factory for creating state stores in Kafka Streams.
@@ -85,21 +86,23 @@ public class Stores {
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
- * @param name name of the store
+ * @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can
be used
* to build a persistent store
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final
String name) {
+Objects.requireNonNull(name, "name cannot be null");
return new RocksDbKeyValueBytesStoreSupplier(name);
}
/**
* Create an in-memory {@link KeyValueBytesStoreSupplier}.
- * @param name name of the store
+ * @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} than can
be used to
* build an in-memory store
*/
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final
String name) {
+Objects.requireNonNull(name, "name cannot be null");
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
@@ -120,12 +123,16 @@ public class Stores {
/**
* Create a LRU Map {@link KeyValueBytesStoreSupplier}.
- * @param name name of the store
- * @param maxCacheSize maximum number of items in the LRU
+ * @param name name of the store (cannot be {@code null})
+ * @param maxCacheSize maximum number of items in the LRU (cannot be
negative)
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be
used to build
* an LRU Map based store
*/
public static KeyValueBytesStoreSupplier lruMap(final String name, final
int maxCacheSize) {
+Objects.requireNonNull(name, "name cannot be null");
+if (maxCacheSize < 0) {
+throw new IllegalArgumentException("maxCacheSize cannot be
negative");
+}
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
@@ -146,10 +153,10 @@ public class Stores {
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
- * @param name name of the store
- * @param retentionPeriod length of time to retain data in the store
- * @param numSegments number of db segments
- * @param windowSizesize of the windows
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length of time to retain data in the store
(cannot be negative)
+ * @param numSegments number of db segments (cannot be zero or
negative)
+ * @param windowSizesize of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
*/
@@ -158,24 +165,38 @@ public class Stores {
final int
numSegments,
Repository: kafka
Updated Branches:
refs/heads/trunk 716330a5b -> 3dcbbf703
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 66ec0d7..1abc5e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -166,7 +166,7 @@ public interface KTable {
* (i.e., that would be equivalent to calling
{@link KTable#filter(Predicate)}.
* @return a {@code KTable} that contains only those records that satisfy
the given predicate
* @see #filterNot(Predicate, Materialized)
- * @deprecated use {@link #filter(Predicate, Materialized)}
+ * @deprecated use {@link #filter(Predicate, Materialized)
filter(predicate, Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable filter(final Predicate predicate, final
String queryableStoreName);
@@ -203,7 +203,7 @@ public interface KTable {
* @param storeSupplier user defined state store supplier. Cannot be
{@code null}.
* @return a {@code KTable} that contains only those records that satisfy
the given predicate
* @see #filterNot(Predicate, Materialized)
- * @deprecated use {@link #filter(Predicate, Materialized)}
+ * @deprecated use {@link #filter(Predicate, Materialized)
filter(predicate, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable filter(final Predicate predicate, final
StateStoreSupplier storeSupplier);
@@ -297,7 +297,7 @@ public interface KTable {
* @param storeSupplier user defined state store supplier. Cannot be
{@code null}.
* @return a {@code KTable} that contains only those records that do
not satisfy the given predicate
* @see #filter(Predicate, Materialized)
- * @deprecated use {@link #filterNot(Predicate, Materialized)}
+ * @deprecated use {@link #filterNot(Predicate, Materialized)
filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable filterNot(final Predicate predicate,
final StateStoreSupplier storeSupplier);
@@ -336,7 +336,7 @@ public interface KTable {
* (i.e., that would be equivalent to calling {@link
KTable#filterNot(Predicate)}.
* @return a {@code KTable} that contains only those records that do
not satisfy the given predicate
* @see #filter(Predicate, Materialized)
- * @deprecated use {@link #filter(Predicate, Materialized)}
+ * @deprecated use {@link #filter(Predicate, Materialized)
filterNot(predicate, Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable filterNot(final Predicate predicate,
final String queryableStoreName);
@@ -463,7 +463,7 @@ public interface KTable {
* @paramthe value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and
new values (possibly of different type)
- * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
+ * @deprecated use {@link #mapValues(ValueMapper, Materialized)
mapValues(mapper,
Materialized.as(queryableStoreName).withValueSerde(valueSerde))}
*/
@Deprecated
KTable mapValues(final ValueMapper
mapper, final Serde valueSerde, final String queryableStoreName);
@@ -507,7 +507,7 @@ public interface KTable {
* @param storeSupplier user defined state store supplier. Cannot be
{@code null}.
* @paramthe value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and
new values (possibly of different type)
- * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
+ * @deprecated use {@link #mapValues(ValueMapper, Materialized)
mapValues(mapper,
Materialized.as(KeyValueByteStoreSupplier).withValueSerde(valueSerde))}
*/
@Deprecated
KTable mapValues(final ValueMapper
mapper,
@@ -530,7 +530,8 @@ public interface KTable {
* update record.
* @deprecated Use the Interactive Queries APIs (e.g., {@link
KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the
keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link
KStream#print()} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toSysOut())} on the result.
*/
@Deprecated
void print();
@@ -551,7 +552,8 @@ public interface KTable {
* @param label the name used to label the
Repository: kafka
Updated Branches:
refs/heads/trunk 4f4f99532 -> 39d5cdccc
KAFKA-5985; update javadoc regarding closing iterators
Author: Bill Bejeck
Reviewers: Matthias J. Sax , Michael G. Noll
, Damian Guy
Closes #3994 from bbejeck/KAFKA-5985_document_need_to_close_iterators
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39d5cdcc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39d5cdcc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39d5cdcc
Branch: refs/heads/trunk
Commit: 39d5cdcccfc0f7d7893188bb22580da0c842a993
Parents: 4f4f995
Author: Bill Bejeck
Authored: Mon Oct 2 11:49:22 2017 -0700
Committer: Damian Guy
Committed: Mon Oct 2 11:49:22 2017 -0700
--
docs/streams/developer-guide.html| 8
.../org/apache/kafka/streams/state/KeyValueIterator.java | 2 +-
.../apache/kafka/streams/state/ReadOnlyKeyValueStore.java| 4 ++--
.../org/apache/kafka/streams/state/ReadOnlySessionStore.java | 6 --
.../org/apache/kafka/streams/state/ReadOnlyWindowStore.java | 4
.../java/org/apache/kafka/streams/state/SessionStore.java| 4
.../org/apache/kafka/streams/state/WindowStoreIterator.java | 2 +-
7 files changed, 24 insertions(+), 6 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/docs/streams/developer-guide.html
--
diff --git a/docs/streams/developer-guide.html
b/docs/streams/developer-guide.html
index 3368757..a064a5d 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -185,6 +185,7 @@
In the init method, schedule the punctuation every 1
second and retrieve the local state store by its name "Counts".
In the process method, upon each received record,
split the value string into words, and update their counts into the state store
(we will talk about this feature later in the section).
In the scheduled punctuate method, iterate the local
state store and send the aggregated counts to the downstream processor, and
commit the current stream state.
+When done with the KeyValueIteratorString,
Long you must close the iterator, as shown above or use the
try-with-resources statement.
@@ -253,6 +254,13 @@ With deletion enabled, old windows that have expired will
be cleaned up by Kafka
The default retention setting is Windows#maintainMs() + 1 day.
This setting can be overriden by specifying
StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
in the StreamsConfig.
+
+One additional note regarding the use of state stores. Any time you open an
Iterator from a state store you must call
close() on the iterator
+when you are done working with it to reclaim resources. Or you can use the
iterator from within a try-with-resources statement.
+By not closing an iterator, you may likely encounter an OOM error.
+
+
+
Monitoring the
Restoration Progress of Fault-tolerant State Stores
http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index 3f44635..70a142b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
/**
* Iterator interface of {@link KeyValue}.
*
- * Users need to call its {@code close} method explicitly upon completeness to
release resources,
+ * Users must call its {@code close} method explicitly upon completeness to
release resources,
* or use try-with-resources statement (available since JDK7) for this {@link
Closeable} class.
*
* @param Type of keys
http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
index 76bb47b..0632980 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
@@ -39,7 +39,7 @@ public interface ReadOnlyKeyValueStore
Repository: kafka
Updated Branches:
refs/heads/trunk b79b17971 -> 082def05c
MINOR: always set Serde.Long on count operations
Author: Damian Guy
Reviewers: Guozhang Wang , Ismael Juma ,
Bill Bejeck , Matthias J. Sax
Closes #3943 from dguy/count-materialized
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/082def05
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/082def05
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/082def05
Branch: refs/heads/trunk
Commit: 082def05ca5af4f30e05aa28ba83fa299f30337b
Parents: b79b179
Author: Damian Guy
Authored: Fri Sep 29 11:06:34 2017 +0100
Committer: Damian Guy
Committed: Fri Sep 29 11:06:34 2017 +0100
--
.../java/org/apache/kafka/streams/kstream/KGroupedStream.java | 2 ++
.../apache/kafka/streams/kstream/SessionWindowedKStream.java | 4 +++-
.../org/apache/kafka/streams/kstream/TimeWindowedKStream.java | 4 +++-
.../kafka/streams/kstream/internals/KGroupedStreamImpl.java| 6 ++
.../streams/kstream/internals/SessionWindowedKStreamImpl.java | 5 +
.../streams/kstream/internals/TimeWindowedKStreamImpl.java | 5 +
.../streams/kstream/internals/KGroupedStreamImplTest.java | 3 +--
.../kstream/internals/SessionWindowedKStreamImplTest.java | 3 +--
8 files changed, 26 insertions(+), 6 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 1ff1759..1c72ebf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -177,6 +178,7 @@ public interface KGroupedStream {
* query the value of the key on a parallel running instance of your Kafka
Streams application.
*
* @param materialized an instance of {@link Materialized} used to
materialize a state store. Cannot be {@code null}.
+ * Note: the valueSerde will be automatically set to
{@link Serdes#Long()} if there is no valueSerde provided
* @return a {@link KTable} that contains "update" records with unmodified
keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each
key
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index d8044ac..3c3ef7e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -90,7 +91,8 @@ public interface SessionWindowedKStream {
* For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka
Streams application.
*
- * @param materialized an instance of {@link Materialized} used to
materialize a state store. Cannot be {@code null}
+ * @param materialized an instance of {@link Materialized} used to
materialize a state store. Cannot be {@code null}.
+ * Note: the valueSerde will be automatically set to
{@link Serdes#Long()} if there is no valueSerde provided
* @return a windowed {@link KTable} that contains "update" records with
unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for
each key
Repository: kafka
Updated Branches:
refs/heads/trunk 177dd7f21 -> eaabb6cd0
KAFKA-4593; Don't throw IllegalStateException and die on task migration
Author: Matthias J. Sax
Reviewers: Damian Guy , Guozhang Wang
Closes #3948 from mjsax/kafka-4593-illegal-state-exception-in-restore
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eaabb6cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eaabb6cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eaabb6cd
Branch: refs/heads/trunk
Commit: eaabb6cd0173c4f6854eb5da39194a7e3fc0162c
Parents: 177dd7f
Author: Matthias J. Sax
Authored: Fri Sep 29 10:00:13 2017 +0100
Committer: Damian Guy
Committed: Fri Sep 29 10:00:13 2017 +0100
--
.../streams/errors/TaskMigratedException.java | 52 +++
.../processor/internals/AssignedTasks.java | 89 +---
.../processor/internals/ChangelogReader.java| 2 +-
.../processor/internals/PunctuationQueue.java | 6 +-
.../processor/internals/RestoringTasks.java | 23 +++
.../internals/StoreChangelogReader.java | 43 +++---
.../streams/processor/internals/StreamTask.java | 111 ++-
.../processor/internals/StreamThread.java | 36 -
.../processor/internals/TaskManager.java| 33 -
.../processor/internals/AssignedTasksTest.java | 140 ---
.../internals/MockChangelogReader.java | 53 +++
.../internals/ProcessorStateManagerTest.java| 1 -
.../internals/StoreChangelogReaderTest.java | 87 +---
.../processor/internals/StreamThreadTest.java | 16 ++-
.../processor/internals/TaskManagerTest.java| 4 +-
.../apache/kafka/test/MockChangelogReader.java | 55
16 files changed, 530 insertions(+), 221 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
new file mode 100644
index 000..f2fa594
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.internals.Task;
+
+/**
+ * Indicates that a task got migrated to another thread.
+ * Thus, the task raising this exception can be cleaned up and closed as
"zombie".
+ */
+public class TaskMigratedException extends StreamsException {
+
+private final static long serialVersionUID = 1L;
+
+public TaskMigratedException(final Task task) {
+this(task, null);
+}
+
+public TaskMigratedException(final Task task,
+ final TopicPartition topicPartition,
+ final long endOffset,
+ final long pos) {
+super(String.format("Log end offset of %s should not change while
restoring: old end offset %d, current offset %d%n%s",
+topicPartition,
+endOffset,
+pos,
+task.toString("> ")),
+null);
+}
+
+public TaskMigratedException(final Task task,
+ final Throwable throwable) {
+super(task.toString(), throwable);
+}
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
--
diff --git
Repository: kafka
Updated Branches:
refs/heads/trunk 73cc41666 -> 5b943ca8a
MINOR:Updated Rabobank description
dguy Please review
Author: Manjula K
Author: manjuapu
Reviewers: Damian Guy
Closes #3963 from manjuapu/customer-logo-stream
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b943ca8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b943ca8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b943ca8
Branch: refs/heads/trunk
Commit: 5b943ca8a9bec9f2c990d9d03fc0f4b7c3e9cca5
Parents: 73cc416
Author: Manjula K
Authored: Wed Sep 27 09:26:57 2017 +0100
Committer: Damian Guy
Committed: Wed Sep 27 09:26:57 2017 +0100
--
docs/streams/index.html | 7 ---
1 file changed, 4 insertions(+), 3 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b943ca8/docs/streams/index.html
--
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 3704a3d..112e304 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -213,9 +213,9 @@
-
+
- Rabobank is one of the 3
largest banks in the Netherlands. Its digital nervous system, the Business
Event Bus, is powered by Apache Kafka and Kafka Streams.
+ Rabobank is one of the 3
largest banks in the Netherlands. Its digital nervous system, the Business
Event Bus, is powered by Apache Kafka. It is used by an increasing amount of
financial processes and services, one which is Rabo Alerts. This service alerts
customers in real-time upon financial events and is built using Kafka Streams.
https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/;>Learn
More
@@ -223,7 +223,7 @@
-
+
As the leading online
fashion retailer in Europe, Zalando uses Apache Kafka as an ESB (Enterprise
Service Bus), which helps us in transitioning from a monolithic to a micro
services architecture. Using Kafka for processing event streams enables our
technical team to do near-real time business intelligence.
https://kafka-summit.org/sessions/using-kstreams-ktables-calculate-real-time-domain-rankings/;>Learn
More
@@ -237,6 +237,7 @@
+
Previous
Next
Repository: kafka
Updated Branches:
refs/heads/trunk 4e43a7231 -> b8be86b80
KAFKA-5765; Move merge() from StreamsBuilder to KStream
This is the polished version.
1. The old merge() method in StreamsBuilder has been removed,
2. The merge() method in KStreamBuilder was changed so that it would use the
single variable argument
rather than several variable arguments in the KStreamImpl implementation
3. The merge() method in KStream has been declared as final and tests have been
added to test correctness.
Author: Richard Yu
Reviewers: Matthias J. Sax , Bill Bejeck
, Guozhang Wang , Damian Guy
Closes #3916 from ConcurrencyPractitioner/trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8be86b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8be86b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8be86b8
Branch: refs/heads/trunk
Commit: b8be86b80543e41fd3181a8de8f1a3ac0a72e4c5
Parents: 4e43a72
Author: Richard Yu
Authored: Tue Sep 26 09:42:53 2017 +0100
Committer: Damian Guy
Committed: Tue Sep 26 09:42:53 2017 +0100
--
docs/streams/upgrade-guide.html | 10 +++-
.../apache/kafka/streams/StreamsBuilder.java| 14 +
.../apache/kafka/streams/kstream/KStream.java | 13 +
.../kafka/streams/kstream/KStreamBuilder.java | 10 +++-
.../internals/InternalStreamsBuilder.java | 5 --
.../streams/kstream/internals/KStreamImpl.java | 30 +--
.../kafka/streams/StreamsBuilderTest.java | 6 +--
.../streams/kstream/KStreamBuilderTest.java | 2 +-
.../internals/InternalStreamsBuilderTest.java | 2 +-
.../kstream/internals/KStreamImplTest.java | 55
.../internals/StreamsMetadataStateTest.java | 2 +-
11 files changed, 107 insertions(+), 42 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/docs/streams/upgrade-guide.html
--
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index c90024f..c2835a3 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -84,7 +84,13 @@
and can be obtained by calling Topology#describe().
An example using this new API is shown in the quickstart section.
-
+
+
+With the introduction of https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream;>KIP-202
+a new method merge() has been created in
KStream as the StreamsBuilder class's
StreamsBuilder#merge() has been removed.
+The method signature was also changed, too: instead of providing
multiple KStreams into the method at the once, only a single
KStream is accepted.
+
+
New methods in KafkaStreams:
@@ -214,7 +220,9 @@
If exactly-once processing is enabled via the
processing.guarantees parameter, internally Streams switches from
a producer per thread to a producer per task runtime model.
In order to distinguish the different producers, the producer's
client.id additionally encodes the task-ID for this case.
Because the producer's client.id is used to report JMX
metrics, it might be required to update tools that receive those metrics.
+
+
Producer's client.id naming schema:
at-least-once (default):
[client.Id]-StreamThread-[sequence-number]
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 7e746e6..94d19ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -59,7 +59,7 @@ public class StreamsBuilder {
final InternalTopologyBuilder internalTopologyBuilder =
topology.internalTopologyBuilder;
private final InternalStreamsBuilder internalStreamsBuilder = new
InternalStreamsBuilder(internalTopologyBuilder);
-
+
/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy, default {@link
TimestampExtractor}, and default key and value
@@ -493,18 +493,6 @@ public class StreamsBuilder {
}
/**
- * Create a new instance of {@link KStream} by merging the given {@link
KStream}s.
- *
- * There is no
Repository: kafka-site
Updated Branches:
refs/heads/asf-site 4bb2fd8c6 -> 8c1a77237
MINOR:Updating Rabobank description and Zalando image in powered-by & streams
page
guozhangwang dguy Updated text was provided by Rabobank, so in this PR I am
updating it.
Please review. Thanks!!
Author: Manjula K
Reviewers: Damian Guy
Closes #86 from manjuapu/asf-site
Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/8c1a7723
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/8c1a7723
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/8c1a7723
Branch: refs/heads/asf-site
Commit: 8c1a77237c0afc54ae158d5670da2f5f887c77c6
Parents: 4bb2fd8
Author: Manjula K
Authored: Tue Sep 26 09:22:25 2017 +0100
Committer: Damian Guy
Committed: Tue Sep 26 09:22:25 2017 +0100
--
0110/streams/index.html | 6 +++---
images/powered-by/zalando.jpg | Bin 0 -> 6356 bytes
images/powered-by/zalando.png | Bin 2716 -> 0 bytes
powered-by.html | 4 ++--
4 files changed, 5 insertions(+), 5 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c1a7723/0110/streams/index.html
--
diff --git a/0110/streams/index.html b/0110/streams/index.html
index f323456..919c60b 100644
--- a/0110/streams/index.html
+++ b/0110/streams/index.html
@@ -213,9 +213,9 @@
-
+
- Rabobank is one of the 3
largest banks in the Netherlands. Its digital nervous system, the Business
Event Bus, is powered by Apache Kafka and Kafka Streams.
+ Rabobank is one of the 3
largest banks in the Netherlands. Its digital nervous system, the Business
Event Bus, is powered by Apache Kafka. It is used by an increasing amount of
financial processes and services, one which is Rabo Alerts. This service alerts
customers in real-time upon financial events and is built using Kafka Streams.
https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/;>Learn
More
@@ -223,7 +223,7 @@
-
+
As the leading online
fashion retailer in Europe, Zalando uses Apache Kafka as an ESB (Enterprise
Service Bus), which helps us in transitioning from a monolithic to a micro
services architecture. Using Kafka for processing event streams enables our
technical team to do near-real time business intelligence.
https://kafka-summit.org/sessions/using-kstreams-ktables-calculate-real-time-domain-rankings/;>Learn
More
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c1a7723/images/powered-by/zalando.jpg
--
diff --git a/images/powered-by/zalando.jpg b/images/powered-by/zalando.jpg
new file mode 100644
index 000..0d8e9d7
Binary files /dev/null and b/images/powered-by/zalando.jpg differ
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c1a7723/images/powered-by/zalando.png
--
diff --git a/images/powered-by/zalando.png b/images/powered-by/zalando.png
deleted file mode 100755
index 719a7dc..000
Binary files a/images/powered-by/zalando.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c1a7723/powered-by.html
--
diff --git a/powered-by.html b/powered-by.html
index bb51217..fee1d5d 100644
--- a/powered-by.html
+++ b/powered-by.html
@@ -416,7 +416,7 @@
"link": "https://www.rabobank.com;,
"logo": "rabobank.jpg",
"logoBgColor": "#ff",
-"description": "Rabobank is one of the 3 largest banks in the
Netherlands. Its digital nervous system, the Business Event Bus, is powered by
Apache Kafka and Kafka Streams."
+"description": "Rabobank is one of the 3 largest banks in the
Netherlands. Its digital nervous system, the Business Event Bus, is powered by
Apache Kafka. It is used by an increasing amount of financial processes and
services, one which is Rabo Alerts. This service alerts customers in real-time
upon financial events and is built using Kafka Streams."
},{
"link": "http://www.portoseguro.com.br/;,
"logo": "porto-seguro.png",
@@ -434,7 +434,7 @@
"description": "Apache Kafka is used at CJ Affiliate to process
many of the key events driving our core product. Nearly every aspect of CJ's
products and services
Repository: kafka
Updated Branches:
refs/heads/trunk c58790595 -> 37ec15e96
KAFKA-5931; deprecate KTable#through and KTable#to
Author: Damian Guy
Reviewers: Matthias J. Sax , Bill Bejeck
, Guozhang Wang
Closes #3903 from dguy/deprectate-to-through
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37ec15e9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37ec15e9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37ec15e9
Branch: refs/heads/trunk
Commit: 37ec15e9627e2fe68d78eb6d95e9a117e3bca320
Parents: c587905
Author: Damian Guy
Authored: Wed Sep 20 12:04:13 2017 +0100
Committer: Damian Guy
Committed: Wed Sep 20 12:04:13 2017 +0100
--
.../apache/kafka/streams/kstream/KTable.java| 84 +++-
1 file changed, 64 insertions(+), 20 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/37ec15e9/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 6d1d85d..66ec0d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -38,7 +38,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* {@code KTable} is an abstraction of a changelog stream from a
primary-keyed table.
* Each record in this changelog stream is an update on the primary-keyed
table with the record key as the primary key.
*
- * A {@code KTable} is either {@link StreamsBuilder#table(String, String)
defined from a single Kafka topic} that is
+ * A {@code KTable} is either {@link StreamsBuilder#table(String) defined from
a single Kafka topic} that is
* consumed message by message or the result of a {@code KTable}
transformation.
* An aggregation of a {@link KStream} also yields a {@code KTable}.
*
@@ -66,7 +66,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* @see KStream
* @see KGroupedTable
* @see GlobalKTable
- * @see StreamsBuilder#table(String, String)
+ * @see StreamsBuilder#table(String)
*/
@InterfaceStability.Evolving
public interface KTable {
@@ -763,17 +763,20 @@ public interface KTable {
* started).
*
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link StreamsBuilder#table(String, String)
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, Materialized)
StreamsBuilder#table(someTopicName, queryableStoreName)}.
*
* The resulting {@code KTable} will be materialized in a local state
store with the given store name (cf.
- * {@link StreamsBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, Materialized)})
* The store name must be a valid Kafka topic name and cannot contain
characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
* @param queryableStoreName the state store name used for the result
{@code KTable}; valid characters are ASCII
* alphanumerics, '.', '_' and '-'. If {@code null} this
is the equivalent of {@link KTable#through(String)()}
* @return a {@code KTable} that contains the exact same (and potentially
repartitioned) records as this {@code KTable}
+ * @deprecated use {@link #toStream()} followed by {@link
KStream#to(String)}
+ * and {@link StreamsBuilder#table(String)} to read back as a {@code
KTable}
*/
+@Deprecated
KTable through(final String topic,
final String queryableStoreName);
@@ -784,16 +787,19 @@ public interface KTable {
* started).
*
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link StreamsBuilder#table(String, String)
StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, Materialized)
StreamsBuilder#table(someTopicName, queryableStoreName)}.
*
* The resulting {@code KTable} will be materialized in a local state
store with the given store name (cf.
- * {@link StreamsBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, Materialized)})
* The store name must be a valid Kafka topic name and cannot contain
characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
* @param storeSupplier user defined state store supplier.
Repository: kafka
Updated Branches:
refs/heads/trunk 83bdcdbae -> c8f147199
KAFKA-5921; add Materialized overloads to windowed kstream
Add `Materialized` overloads to `WindowedKStream`. Deprecate existing methods
on `KGroupedStream`
Author: Damian Guy
Reviewers: Guozhang Wang
Closes #3889 from dguy/kafka-5921
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c8f14719
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c8f14719
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c8f14719
Branch: refs/heads/trunk
Commit: c8f1471992c98e0104e3a7b2e093adc21b2d2a6f
Parents: 83bdcdb
Author: Damian Guy
Authored: Tue Sep 19 10:56:42 2017 +0100
Committer: Damian Guy
Committed: Tue Sep 19 10:56:42 2017 +0100
--
.../kafka/streams/kstream/KGroupedStream.java | 8 +
.../kafka/streams/kstream/WindowedKStream.java | 150 +--
.../GroupedStreamAggregateBuilder.java | 15 ++
.../kstream/internals/KGroupedStreamImpl.java | 32 ++--
.../kstream/internals/WindowedKStreamImpl.java | 95 ++--
.../KStreamAggregationIntegrationTest.java | 6 +-
.../internals/WindowedKStreamImplTest.java | 109 +-
7 files changed, 361 insertions(+), 54 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 08916ef..5621ab4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -667,7 +667,9 @@ public interface KGroupedStream {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be
equivalent to {@link KGroupedStream#reduce(Reducer, Windows)} ()}.
* @return a windowed {@link KTable} that contains "update" records with
unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(Windows)}
*/
+@Deprecated
KTable reduce(final Reducer reducer,
final Windows windows,
final String
queryableStoreName);
@@ -772,7 +774,9 @@ public interface KGroupedStream {
* @param storeSupplier user defined state store supplier. Cannot be
{@code null}.
* @return a windowed {@link KTable} that contains "update" records with
unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(Windows)}
*/
+@Deprecated
KTable reduce(final Reducer reducer,
final Windows windows,
final
StateStoreSupplier storeSupplier);
@@ -1259,7 +1263,9 @@ public interface KGroupedStream {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be
equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows,
Serde)} ()} ()}.
* @return a windowed {@link KTable} that contains "update" records with
unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(Windows)}
*/
+@Deprecated
KTable aggregate(final
Initializer initializer,
final
Aggregator aggregator,
final Windows
windows,
@@ -1369,7 +1375,9 @@ public interface KGroupedStream {
* @param storeSupplier user defined state store supplier. Cannot be
{@code null}.
* @return a windowed {@link KTable} that contains "update" records with
unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(Windows)}
*/
+@Deprecated
KTable aggregate(final
Initializer initializer,
final
Aggregator aggregator,
final Windows
windows,
http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
Repository: kafka
Updated Branches:
refs/heads/trunk b363901cb -> bd83ae6ba
MINOR: Fix typo in mapper parameter of flatMapValues
The parameter is already called `mapper` in the KStreamImpl class. I think it
was probably named `processor` here because it was copy/pasted from some other
signature. This sees trivial enough to not require a jira as per the
contribution guidelines.
Author: Andy Chambers
Reviewers: Damian Guy
Closes #3888 from cddr/fix-kstream-flatMapValues-signature
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd83ae6b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd83ae6b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd83ae6b
Branch: refs/heads/trunk
Commit: bd83ae6ba1f887ab112c4ccb2002633dfd387d69
Parents: b363901
Author: Andy Chambers
Authored: Mon Sep 18 15:30:25 2017 +0100
Committer: Damian Guy
Committed: Mon Sep 18 15:30:25 2017 +0100
--
.../src/main/java/org/apache/kafka/streams/kstream/KStream.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd83ae6b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 3a51fad..f8f99f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -252,7 +252,7 @@ public interface KStream {
* Thus, no internal data redistribution is required if a key
based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link
#flatMap(KeyValueMapper)})
*
- * @param processor a {@link ValueMapper} the computes the new output
values
+ * @param mapper a {@link ValueMapper} the computes the new output values
* @param the value type of the result stream
* @return a {@code KStream} that contains more or less records with
unmodified keys and new values of different type
* @see #selectKey(KeyValueMapper)
@@ -262,7 +262,7 @@ public interface KStream {
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
*/
- KStream flatMapValues(final ValueMapper> processor);
+ KStream flatMapValues(final ValueMapper> mapper);
/**
* Print the records of this stream to {@code System.out}.
Repository: kafka
Updated Branches:
refs/heads/trunk d83252eba -> be6252d8e
MINOR: Code cleanup, subject: log statements.
I'm doing this in my spare time, so don't let reviewing this PR take away
actual work time. This is just me going over the code with the Intellij
analyzer and implementing the most easily implementable fixes.
This PR is focused only on seemingly erronous log statements.
1: A log statement that has 4 arguments supplied but only 3 `{}` statements
2: A log statement that checks is debug is enabled, but then logs on `info`
level.
Author: coscale_kdegroot
Reviewers: Damian Guy
Closes #3886 from KoenDG/loggingErrors
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be6252d8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be6252d8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be6252d8
Branch: refs/heads/trunk
Commit: be6252d8ebdf9cf2d151028a7ba20eb1591b5961
Parents: d83252e
Author: coscale_kdegroot
Authored: Mon Sep 18 12:04:56 2017 +0100
Committer: Damian Guy
Committed: Mon Sep 18 12:04:56 2017 +0100
--
.../kafka/streams/processor/internals/StreamPartitionAssignor.java | 2 +-
.../org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/be6252d8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 34e9e8a..621eb15 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -502,7 +502,7 @@ public class StreamPartitionAssignor implements
PartitionAssignor, Configurable,
states.put(entry.getKey(), entry.getValue().state);
}
-log.debug("Assigning tasks {} to clients {} with number of replicas
{}",
+log.debug("{} Assigning tasks {} to clients {} with number of replicas
{}",
logPrefix, partitionsForTask.keySet(), states,
numStandbyReplicas);
final StickyTaskAssignor taskAssignor = new
StickyTaskAssignor<>(states, partitionsForTask.keySet());
http://git-wip-us.apache.org/repos/asf/kafka/blob/be6252d8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b753cf9..867359b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -963,7 +963,7 @@ public class StreamThread extends Thread implements
ThreadDataProvider {
streamsMetrics.commitTimeSensor.record(computeLatency() /
(double) committed, timerStartedMs);
}
if (log.isDebugEnabled()) {
-log.info("Committed all active tasks {} and standby tasks {}
in {}ms",
+log.debug("Committed all active tasks {} and standby tasks {}
in {}ms",
taskManager.activeTaskIds(),
taskManager.standbyTaskIds(), timerStartedMs - now);
}
Repository: kafka
Updated Branches:
refs/heads/trunk 346d0ca53 -> d83252eba
KAFKA-5654; add materialized count, reduce, aggregate to KGroupedStream
Add overloads of `count`, `reduce`, and `aggregate` that are `Materialized` to
`KGroupedStream`.
Refactor common parts between `KGroupedStream` and `WindowedKStream`
Author: Damian Guy
Reviewers: Matthias J. Sax , Guozhang Wang
Closes #3827 from dguy/kafka-5654
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d83252eb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d83252eb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d83252eb
Branch: refs/heads/trunk
Commit: d83252ebaeeca5bf19584908d95b424beb31b12e
Parents: 346d0ca
Author: Damian Guy
Authored: Mon Sep 18 11:54:14 2017 +0100
Committer: Damian Guy
Committed: Mon Sep 18 11:54:14 2017 +0100
--
.../kafka/streams/kstream/KGroupedStream.java | 210 ++-
.../GroupedStreamAggregateBuilder.java | 76 +++
.../kstream/internals/KGroupedStreamImpl.java | 127 +++
.../streams/kstream/internals/KStreamImpl.java | 25 +--
.../kstream/internals/MaterializedInternal.java | 13 +-
.../kstream/internals/WindowedKStreamImpl.java | 57 ++---
.../internals/KGroupedStreamImplTest.java | 106 ++
7 files changed, 515 insertions(+), 99 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index f12c2b2..08916ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -146,6 +147,38 @@ public interface KGroupedStream {
KTable count(final StateStoreSupplier
storeSupplier);
/**
+ * Count the number of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is
basically an ever-updating materialized view)
+ * provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link
KTable} changelog stream.
+ *
+ * Not all updates might get sent downstream, as an internal cache is used
to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the
number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig
configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ *
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType)
KafkaStreams#store(...)}.
+ * {@code
+ * KafkaStreams streams = ... // counting words
+ * String queryableStoreName = "count-store"; // the queryableStoreName
should be the name of the store as defined by the Materialized instance
+ * ReadOnlyKeyValueStore localStore =
streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+ * String key = "some-word";
+ * Long countForWord = localStore.get(key); // key must be local
(application state is shared over all running Kafka Streams instances)
+ * }
+ * For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka
Streams application.
+ *
+ * @param materialized an instance of {@link Materialized} used to
materialize a state store. Cannot be {@code null}.
+ * @return a {@link KTable} that contains "update" records with unmodified
keys and {@link Long} values that
+ * represent the latest (rolling) count (i.e., number of records) for each
key
+ */
+KTable count(final Materialized
Repository: kafka-site
Updated Branches:
refs/heads/asf-site b415c59b0 -> e834dd428
MINOR: add note to streams quickstart about snapshot dependency removal being
temporary
Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/e834dd42
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/e834dd42
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/e834dd42
Branch: refs/heads/asf-site
Commit: e834dd4286a16c3edca83dfa9ffcd3aac5d8df62
Parents: b415c59
Author: Damian Guy
Authored: Thu Sep 14 14:11:24 2017 +0100
Committer: Damian Guy
Committed: Thu Sep 14 14:11:24 2017 +0100
--
0110/streams/tutorial.html | 1 +
1 file changed, 1 insertion(+)
--
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/e834dd42/0110/streams/tutorial.html
--
diff --git a/0110/streams/tutorial.html b/0110/streams/tutorial.html
index 7791880..f95eddc 100644
--- a/0110/streams/tutorial.html
+++ b/0110/streams/tutorial.html
@@ -63,6 +63,7 @@
Important: You must manually update the setting of
kafka.version in the generated pom.xml file
from 0.11.0.1-SNAPSHOT to 0.11.0.1.
+Note: in the next release the above step will not be
required.
There are already several example programs written with Streams
library under src/main/java.
Since we are going to start writing such programs from scratch, we can
now delete these examples:
Repository: kafka
Updated Branches:
refs/heads/0.11.0 b95a6bf61 -> 1c9581e2e
MINOR: update docs to add note about removing SNAPSHOT from streams dependency
Author: Damian Guy
Reviewers: Michael G. Noll , Ismael Juma
Closes #3858 from dguy/docs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c9581e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c9581e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c9581e2
Branch: refs/heads/0.11.0
Commit: 1c9581e2e9bb4a05dc2e25b4262272cfa1a4b470
Parents: b95a6bf
Author: Damian Guy
Authored: Thu Sep 14 14:10:10 2017 +0100
Committer: Damian Guy
Committed: Thu Sep 14 14:10:10 2017 +0100
--
docs/streams/tutorial.html | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c9581e2/docs/streams/tutorial.html
--
diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html
index a1520de..f95eddc 100644
--- a/docs/streams/tutorial.html
+++ b/docs/streams/tutorial.html
@@ -62,8 +62,9 @@
-The pom.xml file included in the project already has the
Streams dependency defined,
-and there are already several example programs written with Streams
library under src/main/java.
+Important: You must manually update the setting of
kafka.version in the generated pom.xml file
from 0.11.0.1-SNAPSHOT to 0.11.0.1.
+Note: in the next release the above step will not be
required.
+There are already several example programs written with Streams
library under src/main/java.
Since we are going to start writing such programs from scratch, we can
now delete these examples:
Repository: kafka-site
Updated Branches:
refs/heads/asf-site 7e98fdb95 -> b415c59b0
MINOR: update streams quickstart to have note about updating snapshot dependency
Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/b415c59b
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/b415c59b
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/b415c59b
Branch: refs/heads/asf-site
Commit: b415c59b05155244e27105773287919111735bc9
Parents: 7e98fdb
Author: Damian Guy
Authored: Thu Sep 14 12:06:45 2017 +0100
Committer: Damian Guy
Committed: Thu Sep 14 12:06:45 2017 +0100
--
0110/streams/tutorial.html | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/b415c59b/0110/streams/tutorial.html
--
diff --git a/0110/streams/tutorial.html b/0110/streams/tutorial.html
index a1520de..7791880 100644
--- a/0110/streams/tutorial.html
+++ b/0110/streams/tutorial.html
@@ -62,8 +62,8 @@
-The pom.xml file included in the project already has the
Streams dependency defined,
-and there are already several example programs written with Streams
library under src/main/java.
+Important: You must manually update the setting of
kafka.version in the generated pom.xml file
from 0.11.0.1-SNAPSHOT to 0.11.0.1.
+There are already several example programs written with Streams
library under src/main/java.
Since we are going to start writing such programs from scratch, we can
now delete these examples:
Repository: kafka
Updated Branches:
refs/heads/trunk 49b992dd8 -> c42bfc0d5
MINOR: Fix JavaDoc for StreamsConfig.PROCESSING_GUARANTEE_CONFIG
The contribution is my original work and I license the work to the project
under the project's open source licence.
Author: lperry
Reviewers: Matthias J. Sax , Damian Guy
Closes #3843 from leigh-perry/trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c42bfc0d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c42bfc0d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c42bfc0d
Branch: refs/heads/trunk
Commit: c42bfc0d51e6691b4b5672ef7c8a1bedcd452d7f
Parents: 49b992d
Author: lperry
Authored: Wed Sep 13 17:52:45 2017 +0100
Committer: Damian Guy
Committed: Wed Sep 13 17:52:45 2017 +0100
--
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/c42bfc0d/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6b0e245..446f941 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -230,7 +230,7 @@ public class StreamsConfig extends AbstractConfig {
public static final String POLL_MS_CONFIG = "poll.ms";
private static final String POLL_MS_DOC = "The amount of time in
milliseconds to block waiting for input.";
-/** {@code cache.max.bytes.buffering} */
+/** {@code processing.guarantee} */
public static final String PROCESSING_GUARANTEE_CONFIG =
"processing.guarantee";
private static final String PROCESSING_GUARANTEE_DOC = "The processing
guarantee that should be used. Possible values are " + AT_LEAST_ONCE +
" (default) and " + EXACTLY_ONCE + ".";
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a1278d06/0110/javadoc/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.html
--
diff --git
a/0110/javadoc/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.html
b/0110/javadoc/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.html
index 89cbd03..50e67ba 100644
---
a/0110/javadoc/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.html
+++
b/0110/javadoc/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.html
@@ -2,15 +2,15 @@
-
-UnknownTopicOrPartitionException (kafka 0.11.0.0 API)
-
+
+UnknownTopicOrPartitionException (kafka 0.11.0.1 API)
+
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a1278d06/0110/javadoc/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.html
--
diff --git
a/0110/javadoc/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.html
b/0110/javadoc/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.html
index 5b2d1e6..bf7cb27 100644
---
a/0110/javadoc/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.html
+++
b/0110/javadoc/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.html
@@ -2,15 +2,15 @@
-
-UnsupportedForMessageFormatException (kafka 0.11.0.0 API)
-
+
+UnsupportedForMessageFormatException (kafka 0.11.0.1 API)
+
@@ -126,7 +126,8 @@
public class UnsupportedForMessageFormatException
extends ApiException
-The message format version does not support the requested
function.
+The message format version does not support the requested
function. For example, if idempotence is
+ requested and the topic is using a message format older than 0.11.0.0, then
this error will be returned.
See Also:Serialized
Form
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a1278d06/0110/javadoc/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.html
--
diff --git
a/0110/javadoc/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.html
b/0110/javadoc/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.html
index aa6dab0..1446b1f 100644
---
a/0110/javadoc/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.html
+++
b/0110/javadoc/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.html
@@ -2,15 +2,15 @@
-
-UnsupportedSaslMechanismException (kafka 0.11.0.0 API)
-
+
+UnsupportedSaslMechanismException (kafka 0.11.0.1 API)
+
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a1278d06/0110/javadoc/org/apache/kafka/common/errors/UnsupportedVersionException.html
--
diff --git
a/0110/javadoc/org/apache/kafka/common/errors/UnsupportedVersionException.html
b/0110/javadoc/org/apache/kafka/common/errors/UnsupportedVersionException.html
index f75335f..9c4e50b 100644
---
a/0110/javadoc/org/apache/kafka/common/errors/UnsupportedVersionException.html
+++
b/0110/javadoc/org/apache/kafka/common/errors/UnsupportedVersionException.html
@@ -2,15 +2,15 @@
-
-UnsupportedVersionException (kafka 0.11.0.0 API)
-
+
+UnsupportedVersionException (kafka 0.11.0.1 API)
+
@@ -126,6 +126,14 @@
public class UnsupportedVersionException
extends ApiException
+Indicates that a request API or version needed by the
client is not supported by the broker. This is
+ typically a fatal error as Kafka clients will downgrade request versions as
needed except in cases where
+ a needed feature is not available in old versions. Fatal errors can generally
only be handled by closing
+ the client instance, although in some cases it may be possible to continue
without relying on the
+ underlying feature.
There is a quickstart
example that provides how to run a stream processing program coded in the Kafka
Streams library.
@@ -505,7 +505,7 @@
A Kafka Streams application is typically running on many instances.
The state that is locally available on any given instance is only a
subset of the application's entire state.
Querying the local stores on an instance will, by definition, only
return data locally available on that particular instance.
-We explain how to access data in state stores that are not locally
available in section Querying remote
state stores (for the entire application).
+We explain how to access data in state stores that are not locally
available in section Querying
remote state stores (for the entire application).
@@ -536,7 +536,7 @@
This read-only constraint is important to guarantee that the
underlying state stores
Repository: kafka
Updated Branches:
refs/heads/trunk 08063f50a -> 8bd2a68b5
KAFKA-5655; materialized count, aggregate, reduce to KGroupedTable
Add overloads of `count`, `aggregate`, `reduce` using `Materialized` to
`KGroupedTable`
deprecate other overloads
Author: Damian Guy
Reviewers: Matthias J. Sax , Bill Bejeck
, Guozhang Wang
Closes #3829 from dguy/kafka-5655
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8bd2a68b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8bd2a68b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8bd2a68b
Branch: refs/heads/trunk
Commit: 8bd2a68b5020f0bf8f79cbe59676d649eebf170f
Parents: 08063f5
Author: Damian Guy
Authored: Tue Sep 12 17:20:43 2017 +0100
Committer: Damian Guy
Committed: Tue Sep 12 17:20:43 2017 +0100
--
.../kafka/streams/kstream/KGroupedTable.java| 204 +++
.../kafka/streams/kstream/Materialized.java | 12 +-
.../kstream/internals/KGroupedTableImpl.java| 134 +---
.../kafka/streams/kstream/MaterializedTest.java | 54 +
.../internals/KGroupedTableImplTest.java| 137 -
.../kstream/internals/KTableAggregateTest.java | 1 +
6 files changed, 509 insertions(+), 33 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index bf0df55..f854320 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -80,7 +81,9 @@ public interface KGroupedTable {
* alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent
of {@link KGroupedTable#count()}.
* @return a {@link KTable} that contains "update" records with unmodified
keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each
key
+ * @deprecated use {@link #count(Materialized)}
*/
+@Deprecated
KTable count(final String queryableStoreName);
/**
@@ -98,6 +101,47 @@ public interface KGroupedTable {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
*
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType)
KafkaStreams#store(...)}:
+ * {@code
+ * KafkaStreams streams = ... // counting words
+ * ReadOnlyKeyValueStore localStore =
streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+ * String key = "some-word";
+ * Long countForWord = localStore.get(key); // key must be local
(application state is shared over all running Kafka Streams instances)
+ * }
+ * For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka
Streams application.
+ *
+ * For failure and recovery the store will be backed by an internal
changelog topic that will be created in Kafka.
+ * The changelog topic will be named
"${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+ * The store name must be a valid Kafka topic name and cannot contain
characters other than ASCII alphanumerics,
+ * '.', '_' and '-'.
+ * You can retrieve all generated internal topic names via {@link
KafkaStreams#toString()}.
+ *
+ * @param materialized the instance of {@link Materialized} used to
materialize the state store. Cannot be {@code null}
+ * @return a {@link KTable} that contains "update" records with unmodified
keys and
Repository: kafka
Updated Branches:
refs/heads/trunk e1491d4a0 -> 08063f50a
KAFKA:5653: add join overloads to KTable
Add `join`, `leftJoin`, `outerJoin` overloads that use `Materialized` to
`KTable`
Author: Damian Guy
Reviewers: Bill Bejeck , Matthias J. Sax
, Guozhang Wang
Closes #3826 from dguy/kafka-5653
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/08063f50
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/08063f50
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/08063f50
Branch: refs/heads/trunk
Commit: 08063f50a04fda3e40c6060a432a97f49bb68c8c
Parents: e1491d4
Author: Damian Guy
Authored: Tue Sep 12 16:01:19 2017 +0100
Committer: Damian Guy
Committed: Tue Sep 12 16:01:19 2017 +0100
--
.../apache/kafka/streams/kstream/KTable.java| 290 +--
.../streams/kstream/internals/KTableImpl.java | 88 +-
.../KTableKTableJoinIntegrationTest.java| 33 ++-
.../kstream/internals/KTableImplTest.java | 24 +-
4 files changed, 400 insertions(+), 35 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 2571ac1..6d1d85d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -84,7 +84,7 @@ public interface KTable {
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but
the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
- * Furthermore, for each record that gets dropped (i.e., dot not satisfied
the given predicate) a tombstone record
+ * Furthermore, for each record that gets dropped (i.e., dot not satisfy
the given predicate) a tombstone record
* is forwarded.
*
* @param predicate a filter {@link Predicate} that is applied to each
record
@@ -106,7 +106,7 @@ public interface KTable {
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but
the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
- * Furthermore, for each record that gets dropped (i.e., dot not satisfied
the given predicate) a tombstone record
+ * Furthermore, for each record that gets dropped (i.e., dot not satisfy
the given predicate) a tombstone record
* is forwarded.
*
* To query the local {@link KeyValueStore} it must be obtained via
@@ -124,7 +124,7 @@ public interface KTable {
*
* @param predicate a filter {@link Predicate} that is applied to each
record
* @param materialized a {@link Materialized} that describes how the
{@link StateStore} for the resulting {@code KTable}
- * should be materialized
+ * should be materialized. Cannot be {@code null}
* @return a {@code KTable} that contains only those records that satisfy
the given predicate
* @see #filterNot(Predicate, Materialized)
*/
@@ -144,7 +144,7 @@ public interface KTable {
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but
the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
- * Furthermore, for each record that gets dropped (i.e., dot not satisfied
the given predicate) a tombstone record
+ * Furthermore, for each record that gets dropped (i.e., dot not satisfy
the given predicate) a tombstone record
* is forwarded.
*
* To query the local {@link KeyValueStore} it must be obtained via
@@ -184,7 +184,7 @@ public interface KTable {
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but
the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
- * Furthermore, for each record that gets dropped (i.e., dot not satisfied
the given predicate) a tombstone record
+ * Furthermore, for each record that gets dropped (i.e., dot not satisfy
the given predicate) a tombstone record
* is forwarded.
*
* To query the local {@link KeyValueStore} it must be obtained via
@@ -260,7 +260,7 @@ public interface KTable {
*
Author: damianguy
Date: Tue Sep 12 13:38:14 2017
New Revision: 21571
Log:
Release 0.11.0.1
Added:
dev/kafka/0.11.0.1/
dev/kafka/0.11.0.1/RELEASE_NOTES.html
dev/kafka/0.11.0.1/RELEASE_NOTES.html.asc
dev/kafka/0.11.0.1/RELEASE_NOTES.html.md5
dev/kafka/0.11.0.1/RELEASE_NOTES.html.sha1
dev/kafka/0.11.0.1/RELEASE_NOTES.html.sha2
dev/kafka/0.11.0.1/kafka-0.11.0.1-src.tgz (with props)
dev/kafka/0.11.0.1/kafka-0.11.0.1-src.tgz.asc
dev/kafka/0.11.0.1/kafka-0.11.0.1-src.tgz.md5
dev/kafka/0.11.0.1/kafka-0.11.0.1-src.tgz.sha1
dev/kafka/0.11.0.1/kafka-0.11.0.1-src.tgz.sha2
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1-site-docs.tgz (with props)
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1-site-docs.tgz.asc
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1-site-docs.tgz.md5
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1-site-docs.tgz.sha1
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1-site-docs.tgz.sha2
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz (with props)
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz.asc
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz.md5
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz.sha1
dev/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz.sha2
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1-site-docs.tgz (with props)
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1-site-docs.tgz.asc
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1-site-docs.tgz.md5
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1-site-docs.tgz.sha1
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1-site-docs.tgz.sha2
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz (with props)
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz.asc
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz.md5
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz.sha1
dev/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz.sha2
Modified:
dev/kafka/KEYS
Added: dev/kafka/0.11.0.1/RELEASE_NOTES.html
==
--- dev/kafka/0.11.0.1/RELEASE_NOTES.html (added)
+++ dev/kafka/0.11.0.1/RELEASE_NOTES.html Tue Sep 12 13:38:14 2017
@@ -0,0 +1,73 @@
+Release Notes - Kafka - Version 0.11.0.1
+Below is a summary of the JIRA issues addressed in the 0.11.0.1 release of
Kafka. For full documentation of the
+release, a guide to get started, and information about the project, see
the http://kafka.apache.org/;>Kafka
+project site.
+
+Note about upgrades: Please carefully review the
+http://kafka.apache.org/0110/documentation.html#upgrade;>upgrade
documentation for this release thoroughly
+before upgrading your cluster. The upgrade notes discuss any critical
information about incompatibilities and breaking
+changes, performance changes, and any other changes that might impact your
production deployment of Kafka.
+
+The documentation for the most recent release can be found at
+http://kafka.apache.org/documentation.html;>http://kafka.apache.org/documentation.html.
+Improvement
+
+[https://issues.apache.org/jira/browse/KAFKA-5242;>KAFKA-5242] - add
max_number _of_retries to exponential backoff strategy
+[https://issues.apache.org/jira/browse/KAFKA-5410;>KAFKA-5410] - Fix
taskClass() method name in Connector and flush() signature in SinkTask
+[https://issues.apache.org/jira/browse/KAFKA-5485;>KAFKA-5485] -
Streams should not suspend tasks twice
+
+Bug
+
+[https://issues.apache.org/jira/browse/KAFKA-2105;>KAFKA-2105] -
NullPointerException in client on MetadataRequest
+[https://issues.apache.org/jira/browse/KAFKA-4669;>KAFKA-4669] -
KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws
exception
+[https://issues.apache.org/jira/browse/KAFKA-4856;>KAFKA-4856] -
Calling KafkaProducer.close() from multiple threads may cause spurious
error
+[https://issues.apache.org/jira/browse/KAFKA-5152;>KAFKA-5152] - Kafka
Streams keeps restoring state after shutdown is initiated during startup
+[https://issues.apache.org/jira/browse/KAFKA-5167;>KAFKA-5167] -
streams task gets stuck after re-balance due to LockException
+[https://issues.apache.org/jira/browse/KAFKA-5417;>KAFKA-5417] -
Clients get inconsistent connection states when SASL/SSL connection is marked
CONECTED and DISCONNECTED at the same time
+[https://issues.apache.org/jira/browse/KAFKA-5431;>KAFKA-5431] -
LogCleaner stopped due to
org.apache.kafka.common.errors.CorruptRecordException
+[https://issues.apache.org/jira/browse/KAFKA-5464;>KAFKA-5464] -
StreamsKafkaClient should not use StreamsConfig.POLL_MS_CONFIG
+[https://issues.apache.org/jira/browse/KAFKA-5484;>KAFKA-5484] -
Refactor kafkatest docker support
+[https://issues.apache.org/jira/browse/KAFKA-5506;>KAFKA-5506] -
bin/kafka-consumer-groups.sh failing to query offsets
+[https://issues.apache.org/jira/browse/KAFKA-5508;>KAFKA-5508] -
Documentation for altering topics
+[https://issues.apache.org/jira/browse/KAFKA-5512;>KAFKA-5512] -
KafkaConsumer: High memory allocation rate when idle
+[https:
Bump version to 0.11.0.1
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba8483d2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba8483d2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba8483d2
Branch: refs/heads/0.11.0
Commit: ba8483d27798b784f5bf1936dfbd5f3363ef1619
Parents: b53b7fc
Author: Damian Guy
Authored: Tue Sep 5 19:18:40 2017 +0100
Committer: Damian Guy
Committed: Tue Sep 12 14:07:42 2017 +0100
--
gradle.properties | 2 +-
tests/kafkatest/__init__.py | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba8483d2/gradle.properties
--
diff --git a/gradle.properties b/gradle.properties
index bea7e2e..e78025c 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,7 +16,7 @@
group=org.apache.kafka
# NOTE: When you change this version number, you should also make sure to
update
# the version numbers in tests/kafkatest/__init__.py and kafka-merge-pr.py.
-version=0.11.0.1-SNAPSHOT
+version=0.11.0.1
scalaVersion=2.11.11
task=build
org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba8483d2/tests/kafkatest/__init__.py
--
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 9bee572..d4c6c8c 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -22,4 +22,4 @@
# Instead, in development branches, the version should have a suffix of the
form ".devN"
#
# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be
something like "0.9.0.0.dev0"
-__version__ = '0.11.0.1.dev0'
+__version__ = '0.11.0.1'
Repository: kafka
Updated Branches:
refs/heads/trunk 439050816 -> a67140317
MINOR: update processor topology test driver
Author: Bill Bejeck
Reviewers: Matthias J. Sax , Guozhang Wang
, Damian Guy
Closes #3828 from bbejeck/MINOR_update_processor_topology_test_driver
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a6714031
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a6714031
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a6714031
Branch: refs/heads/trunk
Commit: a67140317a644034e91ee596ab22bfb55adde1e0
Parents: 4390508
Author: Bill Bejeck
Authored: Tue Sep 12 09:23:28 2017 +0100
Committer: Damian Guy
Committed: Tue Sep 12 09:23:28 2017 +0100
--
.../kafka/streams/InternalTopologyAccessor.java | 32
.../kafka/test/ProcessorTopologyTestDriver.java | 13
2 files changed, 45 insertions(+)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/a6714031/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
--
diff --git
a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
b/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
new file mode 100644
index 000..a6144f2
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+
+/**
+ * This class is meant for testing purposes only and allows the testing of
+ * topologies by using the {@link
org.apache.kafka.test.ProcessorTopologyTestDriver}
+ */
+public class InternalTopologyAccessor {
+
+public static InternalTopologyBuilder getInternalTopologyBuilder(Topology
topology) {
+return topology.internalTopologyBuilder;
+}
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a6714031/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
--
diff --git
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index b2dbeb5..148511a 100644
---
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.InternalTopologyAccessor;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
@@ -157,6 +158,18 @@ public class ProcessorTopologyTestDriver {
private StreamTask task;
private GlobalStateUpdateTask globalStateTask;
+
+/**
+ * Create a new test diver instance
+ * @param config the stream configuration for the topology
+ * @param topology the {@link Topology} whose {@link
InternalTopologyBuilder} will
+ *be use to create the topology instance.
+ */
+public ProcessorTopologyTestDriver(final StreamsConfig config,
+ final Topology topology) {
+this(config,
InternalTopologyAccessor.getInternalTopologyBuilder(topology));
+}
+
/**
* Create a new test driver instance.
* @param config the stream configuration for the topology
Repository: kafka
Updated Branches:
refs/heads/trunk e16b9143d -> 4769e3d92
KAFKA-5815; add Printed class and KStream#print(printed)
Part of KIP-182
- Add `Printed` class and `KStream#print(Printed)`
- deprecate all other `print` and `writeAsText` methods
Author: Damian Guy
Reviewers: Bill Bejeck , Matthias J. Sax
, Guozhang Wang
Closes #3768 from dguy/kafka-5652-printed
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4769e3d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4769e3d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4769e3d9
Branch: refs/heads/trunk
Commit: 4769e3d92acdc6036f1f834c70004f0c867ae582
Parents: e16b914
Author: Damian Guy
Authored: Fri Sep 8 18:22:04 2017 +0100
Committer: Damian Guy
Committed: Fri Sep 8 18:22:04 2017 +0100
--
docs/streams/developer-guide.html | 12 +-
docs/streams/upgrade-guide.html | 6 +
.../apache/kafka/streams/kstream/KStream.java | 37 ++
.../streams/kstream/PrintForeachAction.java | 61 -
.../apache/kafka/streams/kstream/Printed.java | 126 +++
.../streams/kstream/internals/KStreamImpl.java | 29 ++---
.../streams/kstream/internals/KStreamPrint.java | 43 +--
.../streams/kstream/internals/KTableImpl.java | 5 +-
.../kstream/internals/PrintForeachAction.java | 64 ++
.../kstream/internals/PrintedInternal.java | 36 ++
.../kafka/streams/kstream/PrintedTest.java | 126 +++
.../kstream/internals/KStreamImplTest.java | 7 +-
.../kstream/internals/KStreamPrintTest.java | 19 +--
13 files changed, 426 insertions(+), 145 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/docs/streams/developer-guide.html
--
diff --git a/docs/streams/developer-guide.html
b/docs/streams/developer-guide.html
index 05acb55..42a9b20 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1016,10 +1016,14 @@ Note that in the WordCountProcessor
implementation, users need to r
KStreambyte[], String stream = ...;
stream.print();
-
- // Several variants of `print` exist to e.g. override
the default serdes for record keys
- // and record values, set a prefix label for the output
string, etc
- stream.print(Serdes.ByteArray(), Serdes.String());
+
+ // You can also override how and where the data is
printed, i.e, to file:
+ stream.print(Printed.toFile("stream.out"));
+
+ // with a custom KeyValueMapper and label
+ stream.print(Printed.toSysOut()
+.withLabel("my-stream")
+.withKeyValueMapper((key, value) -> key + " ->
" + value));
http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/docs/streams/upgrade-guide.html
--
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index ffb365e..96c5941 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -86,6 +86,12 @@
+With the introduction of https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines;>KIP-182
+you should no longer pass in Serde to
KStream#print operations.
+If you can't rely on using toString to print your keys an
values, you should instead you provide a custom KeyValueMapper via
the Printed#withKeyValueMapper call.
+
+
+
Windowed aggregations have moved from KGroupedStream to
WindowedKStream.
You can now perform a windowed aggregation by, for example, using
KGroupedStream#windowedBy(Windows)#reduce(Reducer).
Note: the previous aggregate functions on KGroupedStream
still work, but have been deprecated.
http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index c1e5b87..3a51fad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++
Repository: kafka
Updated Branches:
refs/heads/trunk beeed8660 -> e16b9143d
KAFKA-5853; implement WindowedKStream
Add the `WindowedKStream` interface and implementation of methods that don't
require `Materialized`
Author: Damian Guy
Reviewers: Bill Bejeck , Matthias J. Sax
, Guozhang Wang
Closes #3809 from dguy/kgrouped-stream-windowed-by
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e16b9143
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e16b9143
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e16b9143
Branch: refs/heads/trunk
Commit: e16b9143dfcecbd58e3bebecbdb7d8e933b88cc4
Parents: beeed86
Author: Damian Guy
Authored: Fri Sep 8 16:49:18 2017 +0100
Committer: Damian Guy
Committed: Fri Sep 8 16:49:18 2017 +0100
--
docs/streams/developer-guide.html | 55 +--
docs/streams/upgrade-guide.html | 6 +
.../kafka/streams/kstream/KGroupedStream.java | 14 ++
.../kafka/streams/kstream/WindowedKStream.java | 150 +++
.../kstream/internals/KGroupedStreamImpl.java | 22 ++-
.../kstream/internals/WindowedKStreamImpl.java | 143 ++
.../KStreamAggregationIntegrationTest.java | 55 +++
.../internals/KGroupedStreamImplTest.java | 1 +
.../internals/WindowedKStreamImplTest.java | 144 ++
9 files changed, 544 insertions(+), 46 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/docs/streams/developer-guide.html
--
diff --git a/docs/streams/developer-guide.html
b/docs/streams/developer-guide.html
index b8d3ae4..05acb55 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1175,9 +1175,10 @@ Note that in the WordCountProcessor
implementation, users need to r
Once records are grouped by key via groupByKey or
groupBy -- and
thus represented as either a KGroupedStream or a
KGroupedTable -- they can be aggregated via an operation
such as
-reduce. Aggregations are key-based operations, i.e.
-they always operate over records (notably record values) of the
same key. You may
-choose to perform aggregations on
+reduce.
+For windowed aggregations use
windowedBy(Windows).reduce(Reducer).
+Aggregations are key-based operations, i.e.they always operate
over records (notably record values) of the same key.
+You maychoose to perform aggregations on
windowed or non-windowed data.
@@ -1205,20 +1206,20 @@ Note that in the WordCountProcessor
implementation, users need to r
Several variants of aggregate exist, see
Javadocs for details.
-KGroupedStreambyte[], String groupedStream = ...;
-KGroupedTablebyte[], String groupedTable = ...;
+KGroupedStreamBytes, String groupedStream = ...;
+KGroupedTableBytes, String groupedTable = ...;
// Java 8+ examples, using lambda expressions
// Aggregating a KGroupedStream (note how the value type
changes from String to Long)
-KTablebyte[], Long aggregatedStream =
groupedStream.aggregate(
+KTableBytes, Long aggregatedStream =
groupedStream.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue +
newValue.length(), /* adder */
Serdes.Long(), /* serde for aggregate value */
"aggregated-stream-store" /* state store name */);
// Aggregating a KGroupedTable (note how the value type
changes from String to Long)
-KTablebyte[], Long aggregatedTable =
groupedTable.aggregate(
+KTableBytes, Long aggregatedTable =
groupedTable.aggregate(
() -> 0L, /* initializer */
(aggKey, newValue, aggValue) -> aggValue +
newValue.length(), /* adder */
(aggKey, oldValue, aggValue) -> aggValue -
oldValue.length(), /* subtractor */
@@ -1226,19 +1227,26 @@ Note that in the WordCountProcessor
implementation, users need to r
"aggregated-table-store" /* state store name */);
+// windowed aggregation
+KTableWindowed, Long windowedAggregate
= groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
+.aggregate(() -> 0L, /* initializer
Repository: kafka
Updated Branches:
refs/heads/trunk 9cbb9f093 -> 329d5fa64
KAFKA-5844; add groupBy(selector, serialized) to Ktable
add `KTable#groupBy(KeyValueMapper, Serialized)` and deprecate the overload
with `Serde` params
Author: Damian Guy
Reviewers: Matthias J. Sax , Guozhang Wang
, Bill Bejeck
Closes #3802 from dguy/kip-182-ktable-groupby
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/329d5fa6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/329d5fa6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/329d5fa6
Branch: refs/heads/trunk
Commit: 329d5fa64a2a3ac1d39ac37fdacbf6e43d500d11
Parents: 9cbb9f0
Author: Damian Guy
Authored: Thu Sep 7 12:35:31 2017 +0100
Committer: Damian Guy
Committed: Thu Sep 7 12:35:31 2017 +0100
--
.../apache/kafka/streams/kstream/KTable.java| 33 +++-
.../kafka/streams/kstream/KeyValueMapper.java | 4 +--
.../streams/kstream/internals/KTableImpl.java | 22 -
.../kstream/internals/KTableAggregateTest.java | 21 ++---
.../internals/KTableKTableLeftJoinTest.java | 3 +-
.../kafka/streams/tests/SmokeTestClient.java| 3 +-
6 files changed, 62 insertions(+), 24 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 06a0eee..4bc9572 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -1001,7 +1001,7 @@ public interface KTable {
* records to and rereading all update records from it, such that the
resulting {@link KGroupedTable} is partitioned
* on the new key.
*
- * If the key or value type is changed, it is recommended to use {@link
#groupBy(KeyValueMapper, Serde, Serde)}
+ * If the key or value type is changed, it is recommended to use {@link
#groupBy(KeyValueMapper, Serialized)}
* instead.
*
* @param selector a {@link KeyValueMapper} that computes a new grouping
key and value to be aggregated
@@ -1012,6 +1012,35 @@ public interface KTable {
KGroupedTable groupBy(final KeyValueMapper> selector);
/**
+ * Re-groups the records of this {@code KTable} using the provided {@link
KeyValueMapper}
+ * and {@link Serde}s as specified by {@link Serialized}.
+ * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new
{@link KeyValue} pair by applying the
+ * provided {@link KeyValueMapper}.
+ * Re-grouping a {@code KTable} is required before an aggregation operator
can be applied to the data
+ * (cf. {@link KGroupedTable}).
+ * The {@link KeyValueMapper} selects a new key and value (with should
both have unmodified type).
+ * If the new record key is {@code null} the record will not be included
in the resulting {@link KGroupedTable}
+ *
+ * Because a new key is selected, an internal repartitioning topic will be
created in Kafka.
+ * This topic will be named "${applicationId}-XXX-repartition", where
"applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+ * an internally generated name, and "-repartition" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link
KafkaStreams#toString()}.
+ *
+ * All data of this {@code KTable} will be redistributed through the
repartitioning topic by writing all update
+ * records to and rereading all update records from it, such that the
resulting {@link KGroupedTable} is partitioned
+ * on the new key.
+ *
+ * @param selector a {@link KeyValueMapper} that computes a new
grouping key and value to be aggregated
+ * @param serializedthe {@link Serialized} instance used to specify
{@link org.apache.kafka.common.serialization.Serdes}
+ * @param the key type of the result {@link KGroupedTable}
+ * @param the value type of the result {@link KGroupedTable}
+ * @return a {@link KGroupedTable} that contains the re-grouped records of
the original {@code KTable}
+ */
+ KGroupedTable groupBy(final KeyValueMapper> selector,
+ final Serialized
serialized);
+
+/**
* Re-groups the records of this
Repository: kafka
Updated Branches:
refs/heads/trunk 667cd60dc -> 9cbb9f093
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
new file mode 100644
index 000..a0500b6
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import static
org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS;
+
+public class RocksDbWindowBytesStoreSupplier implements
WindowBytesStoreSupplier {
+private final String name;
+private final long retentionPeriod;
+private final int segments;
+private final long windowSize;
+private final boolean retainDuplicates;
+
+public RocksDbWindowBytesStoreSupplier(final String name,
+ final long retentionPeriod,
+ final int segments,
+ final long windowSize,
+ final boolean retainDuplicates) {
+if (segments < MIN_SEGMENTS) {
+throw new IllegalArgumentException("numSegments must be >= " +
MIN_SEGMENTS);
+}
+this.name = name;
+this.retentionPeriod = retentionPeriod;
+this.segments = segments;
+this.windowSize = windowSize;
+this.retainDuplicates = retainDuplicates;
+}
+
+@Override
+public String name() {
+return name;
+}
+
+@Override
+public WindowStore get() {
+final RocksDBSegmentedBytesStore segmentedBytesStore = new
RocksDBSegmentedBytesStore(
+name,
+retentionPeriod,
+segments,
+new WindowKeySchema()
+);
+return RocksDBWindowStore.bytesStore(segmentedBytesStore,
+ retainDuplicates,
+ windowSize);
+
+}
+
+@Override
+public String metricsScope() {
+return "rocksdb-window";
+}
+
+@Override
+public int segments() {
+return segments;
+}
+
+@Override
+public long windowSize() {
+return windowSize;
+}
+
+@Override
+public boolean retainDuplicates() {
+return retainDuplicates;
+}
+
+@Override
+public long retentionPeriod() {
+return retentionPeriod;
+}
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
new file mode 100644
index 000..61919c3
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the
Repository: kafka
Updated Branches:
refs/heads/trunk b687c0680 -> 45394d52c
KAFKA-5819; Add Joined class and relevant KStream join overloads
Add the `Joined` class and the overloads to `KStream` that use it.
Deprecate existing methods that have `Serde` params
Author: Damian Guy
Reviewers: Matthias J. Sax , Guozhang Wang
Closes #3776 from dguy/kip-182-stream-join
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45394d52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45394d52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45394d52
Branch: refs/heads/trunk
Commit: 45394d52c1ba566178c57897297a3ea31379f957
Parents: b687c06
Author: Damian Guy
Authored: Wed Sep 6 10:55:43 2017 +0100
Committer: Damian Guy
Committed: Wed Sep 6 10:55:43 2017 +0100
--
.../kafka/streams/kstream/JoinWindows.java | 4 +-
.../apache/kafka/streams/kstream/Joined.java| 146 +++
.../apache/kafka/streams/kstream/KStream.java | 426 ++-
.../kafka/streams/kstream/ValueJoiner.java | 10 +-
.../streams/kstream/internals/KStreamImpl.java | 110 +++--
.../integration/KStreamRepartitionJoinTest.java | 25 +-
.../kstream/internals/KStreamImplTest.java | 54 ++-
.../internals/KStreamKStreamJoinTest.java | 28 +-
.../internals/KStreamKStreamLeftJoinTest.java | 13 +-
9 files changed, 732 insertions(+), 84 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
--
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 9d69738..ef9ed01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -55,9 +55,9 @@ import java.util.Map;
* @see UnlimitedWindows
* @see SessionWindows
* @see KStream#join(KStream, ValueJoiner, JoinWindows)
- * @see KStream#join(KStream, ValueJoiner, JoinWindows,
org.apache.kafka.common.serialization.Serde,
org.apache.kafka.common.serialization.Serde,
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
- * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows,
org.apache.kafka.common.serialization.Serde,
org.apache.kafka.common.serialization.Serde,
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
* @see TimestampExtractor
http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
new file mode 100644
index 000..8601e1c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * The {@code Joined} class represents optional params that can be passed to
+ * {@link KStream#join}, {@link KStream#leftJoin}, and {@link
KStream#outerJoin} operations.
+ */
+public class Joined {
+
+private Serde keySerde;
+private Serde valueSerde;
+private Serde otherValueSerde;
+
+private Joined(final Serde keySerde,
+ final Serde valueSerde,
+ final Serde
Repository: kafka
Updated Branches:
refs/heads/trunk 2fb5664bf -> b687c0680
KAFKA-5817; Add Serialized class and overloads to KStream#groupBy and
KStream#groupByKey
Part of KIP-182
- Add the `Serialized` class
- implement overloads of `KStream#groupByKey` and KStream#groupBy`
- deprecate existing methods that have more than default arguments
Author: Damian Guy
Reviewers: Bill Bejeck , Matthias J. Sax
, Guozhang Wang
Closes #3772 from dguy/kafka-5817
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b687c068
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b687c068
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b687c068
Branch: refs/heads/trunk
Commit: b687c068008a81fad390c80da289249cc04b3efb
Parents: 2fb5664
Author: Damian Guy
Authored: Wed Sep 6 10:43:14 2017 +0100
Committer: Damian Guy
Committed: Wed Sep 6 10:43:14 2017 +0100
--
docs/streams/developer-guide.html | 35
.../examples/pageview/PageViewTypedDemo.java| 3 +-
.../examples/pageview/PageViewUntypedDemo.java | 3 +-
.../apache/kafka/streams/kstream/KStream.java | 59 -
.../kafka/streams/kstream/Serialized.java | 88
.../streams/kstream/internals/KStreamImpl.java | 38 ++---
.../KStreamAggregationDedupIntegrationTest.java | 6 +-
.../KStreamAggregationIntegrationTest.java | 7 +-
.../KStreamKTableJoinIntegrationTest.java | 3 +-
.../internals/KGroupedStreamImplTest.java | 3 +-
.../internals/KStreamWindowAggregateTest.java | 8 +-
.../kafka/streams/perf/YahooBenchmark.java | 3 +-
.../kafka/streams/tests/SmokeTestClient.java| 3 +-
13 files changed, 215 insertions(+), 44 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/docs/streams/developer-guide.html
--
diff --git a/docs/streams/developer-guide.html
b/docs/streams/developer-guide.html
index b530e5e..8433bf3 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -842,8 +842,9 @@ Note that in the WordCountProcessor
implementation, users need to r
// When the key and/or value types do not match the
configured
// default serdes, we must explicitly specify serdes.
KGroupedStreambyte[], String groupedStream =
stream.groupByKey(
- Serdes.ByteArray(), /* key */
- Serdes.String() /* value */
+ Serialized.with(
+Serdes.ByteArray(), /* key */
+Serdes.String()) /* value */
);
@@ -883,15 +884,17 @@ Note that in the WordCountProcessor
implementation, users need to r
// Group the stream by a new key and key type
KGroupedStreamString, String groupedStream =
stream.groupBy(
(key, value) -> value,
- Serdes.String(), /* key (note: type was modified) */
- Serdes.String() /* value */
+ Serialize.with(
+Serdes.String(), /* key (note: type was
modified) */
+Serdes.String()) /* value */
);
// Group the table by a new key and key type, and also
modify the value and value type.
KGroupedTableString, Integer groupedTable =
table.groupBy(
(key, value) -> KeyValue.pair(value,
value.length()),
- Serdes.String(), /* key (note: type was modified) */
- Serdes.Integer() /* value (note: type was modified)
*/
+ Serialized.with(
+ Serdes.String(), /* key (note: type was
modified) */
+ Serdes.Integer()) /* value (note: type was
modified) */
);
@@ -905,8 +908,9 @@ Note that in the WordCountProcessor
implementation, users need to r
return value;
}
},
- Serdes.String(), /* key (note: type was modified) */
- Serdes.String() /* value */
+ Serialized.with(
+Serdes.String(), /* key (note: type was
modified) */
+Serdes.String()) /* value */