[kafka] branch 0.9.0 updated (34ae29a -> 342c817)
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a change to branch 0.9.0 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 34ae29a KAFKA-7058: Comparing schema default values using Objects#deepEquals() add 342c817 KAFKA-7476: Fix Date-based types in SchemaProjector No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-)
[kafka] branch 0.10.0 updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 0.10.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.10.0 by this push: new 0ab0369 KAFKA-7476: Fix Date-based types in SchemaProjector 0ab0369 is described below commit 0ab0369c9275cb2555e53967ed74c0b35d7691bf Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index 6277e44..08ee37a 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -159,7 +159,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 101be04..ef6d029 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -351,6 +351,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch 0.10.1 updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 0.10.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.10.1 by this push: new daf38aa KAFKA-7476: Fix Date-based types in SchemaProjector daf38aa is described below commit daf38aa825b37b2a86e3c0238ee8e493d54e158f Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index 6277e44..08ee37a 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -159,7 +159,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 101be04..ef6d029 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -351,6 +351,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch 0.10.2 updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 0.10.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.10.2 by this push: new b90c609 KAFKA-7476: Fix Date-based types in SchemaProjector b90c609 is described below commit b90c609ee08bf239f3539d81dda5e8990cb34600 Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index 6277e44..08ee37a 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -159,7 +159,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 101be04..ef6d029 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -351,6 +351,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch 0.11.0 updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.11.0 by this push: new a9ca107 KAFKA-7476: Fix Date-based types in SchemaProjector a9ca107 is described below commit a9ca1079bd28c716ba05f703b7cc814620cb1586 Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index ea31752..5400705 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -160,7 +160,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 151114e..0db4eec 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -352,6 +352,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch 1.0 updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new 2b17877 KAFKA-7476: Fix Date-based types in SchemaProjector 2b17877 is described below commit 2b1787742a2d535bf2b14620c50169cd7cea2328 Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index ea31752..5400705 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -160,7 +160,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 151114e..0db4eec 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -352,6 +352,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch 2.0 updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 5cef640 KAFKA-7476: Fix Date-based types in SchemaProjector 5cef640 is described below commit 5cef640876f731ee68b359b7ca3afe939e54cabc Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index ea31752..5400705 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -160,7 +160,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 151114e..0db4eec 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -352,6 +352,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch 1.1 updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new ef6c69d KAFKA-7476: Fix Date-based types in SchemaProjector ef6c69d is described below commit ef6c69d6285be12d5d4efe1dab9505d50636e00d Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index ea31752..5400705 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -160,7 +160,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 151114e..0db4eec 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -352,6 +352,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch trunk updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3edd8e7 KAFKA-7476: Fix Date-based types in SchemaProjector 3edd8e7 is described below commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index ea31752..5400705 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -160,7 +160,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 151114e..0db4eec 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -352,6 +352,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch 2.1 updated: KAFKA-7476: Fix Date-based types in SchemaProjector
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new af930d3 KAFKA-7476: Fix Date-based types in SchemaProjector af930d3 is described below commit af930d3c76db24961cb6a3cafdeb414726ee3952 Author: Robert Yokota AuthorDate: Thu Oct 4 20:34:50 2018 -0700 KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../java/org/apache/kafka/connect/data/SchemaProjector.java | 2 +- .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index ea31752..5400705 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -160,7 +160,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; -if (isPromotable(source.type(), target.type())) { +if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 151114e..0db4eec 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -352,6 +352,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); +java.util.Date date = new java.util.Date(); + +projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); +assertEquals(date, projected); + +projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); +assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {
[kafka] branch 2.1 updated: MINOR: Fix generic type of ProcessorParameters (#5741)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new fa3fe3e MINOR: Fix generic type of ProcessorParameters (#5741) fa3fe3e is described below commit fa3fe3e68059068ac894552078155b16c21404aa Author: John Roesler AuthorDate: Thu Oct 4 19:37:53 2018 -0500 MINOR: Fix generic type of ProcessorParameters (#5741) In unrelated recent work, I noticed some warnings about the missing type parameters on ProcessorParameters. While investigating it, it seems like there was a bug in the creation of repartition topics. Reviewers: Bill Bejeck , Guozhang Wang --- .../internals/GroupedStreamAggregateBuilder.java | 7 +- .../kstream/internals/KGroupedTableImpl.java | 22 +-- .../streams/kstream/internals/KStreamImpl.java | 184 +++-- .../internals/graph/ProcessorParameters.java | 12 +- .../internals/graph/StatefulProcessorNode.java | 6 +- .../internals/graph/GraphGraceSearchUtilTest.java | 8 +- 6 files changed, 125 insertions(+), 114 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 3439cf5..8e6f990 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -84,16 +84,17 @@ class GroupedStreamAggregateBuilder { builder.addGraphNode(parentNode, repartitionNode); parentNode = repartitionNode; } -final StatefulProcessorNode.StatefulProcessorNodeBuilder statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder(); +final StatefulProcessorNode.StatefulProcessorNodeBuilder statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder(); + +final ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName); -final ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName); statefulProcessorNodeBuilder .withProcessorParameters(processorParameters) .withNodeName(aggFunctionName) .withRepartitionRequired(repartitionRequired) .withStoreBuilder(storeBuilder); -final StatefulProcessorNode statefulProcessorNode = statefulProcessorNodeBuilder.build(); +final StatefulProcessorNode statefulProcessorNode = statefulProcessorNodeBuilder.build(); builder.addGraphNode(parentNode, statefulProcessorNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index c97576b..013028d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -47,7 +47,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr private static final String REDUCE_NAME = "KTABLE-REDUCE-"; -protected final String userSpecifiedName; +private final String userSpecifiedName; private final Initializer countInitializer = () -> 0L; @@ -72,7 +72,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME); final String funcName = builder.newProcessorName(functionName); final String repartitionTopic = (userSpecifiedName != null ? userSpecifiedName : materialized.storeName()) -+ KStreamImpl.REPARTITION_TOPIC_SUFFIX; ++ KStreamImpl.REPARTITION_TOPIC_SUFFIX; final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, repartitionTopic); @@ -98,16 +98,18 @@ public class KGroupedTableImpl extends AbstractStream implements KGr builder); } -private StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal> materialized, - final String functionName, - final ProcessorSupplier> aggregateSupplier) { +private StatefulProcessorNode> getStatefulProcessorNode(final MaterializedInternal> materialized, + final String functionName, + final
[kafka] branch trunk updated: MINOR: Fix generic type of ProcessorParameters (#5741)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d76805f MINOR: Fix generic type of ProcessorParameters (#5741) d76805f is described below commit d76805f0fd2d65e9fb5582e4e0d30c1bd3fccd89 Author: John Roesler AuthorDate: Thu Oct 4 19:37:53 2018 -0500 MINOR: Fix generic type of ProcessorParameters (#5741) In unrelated recent work, I noticed some warnings about the missing type parameters on ProcessorParameters. While investigating it, it seems like there was a bug in the creation of repartition topics. Reviewers: Bill Bejeck , Guozhang Wang --- .../internals/GroupedStreamAggregateBuilder.java | 7 +- .../kstream/internals/KGroupedTableImpl.java | 22 +-- .../streams/kstream/internals/KStreamImpl.java | 184 +++-- .../internals/graph/ProcessorParameters.java | 12 +- .../internals/graph/StatefulProcessorNode.java | 6 +- .../internals/graph/GraphGraceSearchUtilTest.java | 8 +- 6 files changed, 125 insertions(+), 114 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 3439cf5..8e6f990 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -84,16 +84,17 @@ class GroupedStreamAggregateBuilder { builder.addGraphNode(parentNode, repartitionNode); parentNode = repartitionNode; } -final StatefulProcessorNode.StatefulProcessorNodeBuilder statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder(); +final StatefulProcessorNode.StatefulProcessorNodeBuilder statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder(); + +final ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName); -final ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName); statefulProcessorNodeBuilder .withProcessorParameters(processorParameters) .withNodeName(aggFunctionName) .withRepartitionRequired(repartitionRequired) .withStoreBuilder(storeBuilder); -final StatefulProcessorNode statefulProcessorNode = statefulProcessorNodeBuilder.build(); +final StatefulProcessorNode statefulProcessorNode = statefulProcessorNodeBuilder.build(); builder.addGraphNode(parentNode, statefulProcessorNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index c97576b..013028d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -47,7 +47,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr private static final String REDUCE_NAME = "KTABLE-REDUCE-"; -protected final String userSpecifiedName; +private final String userSpecifiedName; private final Initializer countInitializer = () -> 0L; @@ -72,7 +72,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME); final String funcName = builder.newProcessorName(functionName); final String repartitionTopic = (userSpecifiedName != null ? userSpecifiedName : materialized.storeName()) -+ KStreamImpl.REPARTITION_TOPIC_SUFFIX; ++ KStreamImpl.REPARTITION_TOPIC_SUFFIX; final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, repartitionTopic); @@ -98,16 +98,18 @@ public class KGroupedTableImpl extends AbstractStream implements KGr builder); } -private StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal> materialized, - final String functionName, - final ProcessorSupplier> aggregateSupplier) { +private StatefulProcessorNode> getStatefulProcessorNode(final MaterializedInternal> materialized, + final String functionName, + fi
[kafka] branch trunk updated: MINOR: Bump version to 2.2.0-SNAPSHOT
This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2b37a04 MINOR: Bump version to 2.2.0-SNAPSHOT 2b37a04 is described below commit 2b37a041d3c7048a4b95b65197c278df7f51a521 Author: Dong Lin AuthorDate: Thu Oct 4 17:30:03 2018 -0700 MINOR: Bump version to 2.2.0-SNAPSHOT Author: Dong Lin Reviewers: Ismael Juma Closes #5744 from lindong28/bump-up-version-2.2.0 --- gradle.properties | 2 +- kafka-merge-pr.py | 2 +- streams/quickstart/java/pom.xml| 2 +- streams/quickstart/java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py| 2 +- tests/kafkatest/version.py | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/gradle.properties b/gradle.properties index 1853cf3..3e3dbfa 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py, # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py. -version=2.1.0-SNAPSHOT +version=2.2.0-SNAPSHOT scalaVersion=2.11.12 task=build org.gradle.jvmargs=-Xmx1024m -Xss2m diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py index e6b9a55..7bfeb2d 100755 --- a/kafka-merge-pr.py +++ b/kafka-merge-pr.py @@ -70,7 +70,7 @@ TEMP_BRANCH_PREFIX = "PR_TOOL" DEV_BRANCH_NAME = "trunk" -DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.1.0") +DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.2.0") def get_json(url): try: diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index ec2dbf4..29fce17 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart -2.1.0-SNAPSHOT +2.2.0-SNAPSHOT .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 684e270..04643c1 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 -2.1.0-SNAPSHOT +2.2.0-SNAPSHOT 1.7.7 1.2.17 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index e51a7e9..d1d456f 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom -2.1.0-SNAPSHOT +2.2.0-SNAPSHOT Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index ba7c48c..a53e943 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '2.1.0.dev0' +__version__ = '2.2.0.dev0' diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 676ab96..6eed81f 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -60,7 +60,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("2.1.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("2.2.0-SNAPSHOT") # 0.8.2.x versions V_0_8_2_1 = KafkaVersion("0.8.2.1")
[kafka] branch trunk updated (f2dd6aa -> 7ea0655)
This is an automated email from the ASF dual-hosted git repository. lindong pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from f2dd6aa KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678) add 7ea0655 KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be used concurrently No new revisions were added by this update. Summary of changes: .../main/scala/kafka/log/LogCleanerManager.scala | 72 +++--- core/src/main/scala/kafka/log/LogManager.scala | 15 +++-- .../unit/kafka/log/LogCleanerManagerTest.scala | 62 +++ 3 files changed, 110 insertions(+), 39 deletions(-)
[kafka] branch 1.0 updated: MINOR: Increase timeout for starting JMX tool (#5735)
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new 53d76c7 MINOR: Increase timeout for starting JMX tool (#5735) 53d76c7 is described below commit 53d76c73587f751923dfca83178847c9d3deb338 Author: Randall Hauch AuthorDate: Wed Oct 3 10:56:44 2018 -0500 MINOR: Increase timeout for starting JMX tool (#5735) In some tests, the check monitoring the JMX tool log output doesn’t quite wait long enough before failing. Increasing the timeout from 10 to 20 seconds. --- tests/kafkatest/services/monitor/jmx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 6f6e221..a64842c 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -83,7 +83,7 @@ class JmxMixin(object): self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) node.account.ssh(cmd, allow_fail=False) -wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) +wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) self.started[idx-1] = True def _jmx_has_output(self, node):
[kafka] branch 2.1 created (now 7ea0655)
This is an automated email from the ASF dual-hosted git repository. lindong pushed a change to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git. at 7ea0655 KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be used concurrently No new revisions were added by this update.
[kafka] branch 0.10.2 updated: MINOR: Increase timeout for starting JMX tool (#5735)
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 0.10.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.10.2 by this push: new 6396b77 MINOR: Increase timeout for starting JMX tool (#5735) 6396b77 is described below commit 6396b776a47a2f58d3a5a3d76bc8de495d8bc43c Author: Randall Hauch AuthorDate: Wed Oct 3 10:56:44 2018 -0500 MINOR: Increase timeout for starting JMX tool (#5735) In some tests, the check monitoring the JMX tool log output doesn’t quite wait long enough before failing. Increasing the timeout from 10 to 20 seconds. --- tests/kafkatest/services/monitor/jmx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 0859bb4..f2ac33c 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -70,7 +70,7 @@ class JmxMixin(object): self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) node.account.ssh(cmd, allow_fail=False) -wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) +wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) self.started[idx-1] = True def _jmx_has_output(self, node):
[kafka] branch 0.11.0 updated: MINOR: Increase timeout for starting JMX tool (#5735)
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.11.0 by this push: new 59991f6 MINOR: Increase timeout for starting JMX tool (#5735) 59991f6 is described below commit 59991f69cde98f691c2382754ed22bcd0fd884cf Author: Randall Hauch AuthorDate: Wed Oct 3 10:56:44 2018 -0500 MINOR: Increase timeout for starting JMX tool (#5735) In some tests, the check monitoring the JMX tool log output doesn’t quite wait long enough before failing. Increasing the timeout from 10 to 20 seconds. --- tests/kafkatest/services/monitor/jmx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 7331cb9f..5af2b18 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -77,7 +77,7 @@ class JmxMixin(object): self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) node.account.ssh(cmd, allow_fail=False) -wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) +wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) self.started[idx-1] = True def _jmx_has_output(self, node):
[kafka] branch 1.1 updated: MINOR: Increase timeout for starting JMX tool (#5735)
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 9b61a0e MINOR: Increase timeout for starting JMX tool (#5735) 9b61a0e is described below commit 9b61a0ee0901867ed62a47edee21baa0e4ce79fa Author: Randall Hauch AuthorDate: Wed Oct 3 10:56:44 2018 -0500 MINOR: Increase timeout for starting JMX tool (#5735) In some tests, the check monitoring the JMX tool log output doesn’t quite wait long enough before failing. Increasing the timeout from 10 to 20 seconds. --- tests/kafkatest/services/monitor/jmx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 6f6e221..a64842c 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -83,7 +83,7 @@ class JmxMixin(object): self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) node.account.ssh(cmd, allow_fail=False) -wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) +wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) self.started[idx-1] = True def _jmx_has_output(self, node):
[kafka] branch 2.0 updated: MINOR: Increase timeout for starting JMX tool (#5735)
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new e7093d6 MINOR: Increase timeout for starting JMX tool (#5735) e7093d6 is described below commit e7093d6c1a946abd23103ba5b3802c4d3dd38892 Author: Randall Hauch AuthorDate: Wed Oct 3 10:56:44 2018 -0500 MINOR: Increase timeout for starting JMX tool (#5735) In some tests, the check monitoring the JMX tool log output doesn’t quite wait long enough before failing. Increasing the timeout from 10 to 20 seconds. --- tests/kafkatest/services/monitor/jmx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 542d3a5..cf8cbc3 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -83,7 +83,7 @@ class JmxMixin(object): self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) node.account.ssh(cmd, allow_fail=False) -wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) +wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) self.started[idx-1] = True def _jmx_has_output(self, node):
[kafka] branch 2.0 updated: KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 18b2577 KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678) 18b2577 is described below commit 18b25773b11a4d7240457ac0f88670c7a55301e1 Author: Jason Gustafson AuthorDate: Thu Oct 4 14:02:23 2018 -0700 KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678) This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections. Additionally, we have made the following changes: 1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache. 2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader. Reviewers: Jun Rao --- core/src/main/scala/kafka/cluster/Partition.scala | 11 +- core/src/main/scala/kafka/cluster/Replica.scala| 3 +- core/src/main/scala/kafka/log/Log.scala| 24 +- core/src/main/scala/kafka/log/LogSegment.scala | 6 +- .../kafka/server/ReplicaAlterLogDirsThread.scala | 6 +- .../scala/kafka/server/ReplicaFetcherThread.scala | 6 +- .../kafka/server/epoch/LeaderEpochFileCache.scala | 197 + .../scala/unit/kafka/cluster/PartitionTest.scala | 33 ++ core/src/test/scala/unit/kafka/log/LogTest.scala | 41 +- .../unit/kafka/server/ISRExpirationTest.scala | 4 +- .../server/ReplicaAlterLogDirsThreadTest.scala | 28 +- .../kafka/server/ReplicaFetcherThreadTest.scala| 115 - .../unit/kafka/server/ReplicaManagerTest.scala | 8 +- .../LeaderEpochCheckpointFileTest.scala| 1 - ...chDrivenReplicationProtocolAcceptanceTest.scala | 24 +- .../server/epoch/LeaderEpochFileCacheTest.scala| 467 - .../server/epoch/LeaderEpochIntegrationTest.scala | 60 +-- .../server/epoch/OffsetsForLeaderEpochTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 5 +- 19 files changed, 551 insertions(+), 490 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 24883e2..8c68c78 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -302,8 +302,17 @@ class Partition(val topic: String, leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) zkVersion = partitionStateInfo.basePartitionState.zkVersion - val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) + // In the case of successive leader elections in a short time period, a follower may have + // entries in its log from a later epoch than any entry in the new leader's log. In order + // to ensure that these followers can truncate to the right offset, we must cache the new + // leader epoch and the start offset since it should be larger than any epoch that a follower + // would try to query. + leaderReplica.epochs.foreach { epochCache => +epochCache.assign(leaderEpoch, leaderEpochStartOffset) + } + + val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId) val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 962aaff..c37889e 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,6 +18,7 @@ package kafka.cluster import kafka.log.Log +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils.Logging import kafka.server.{LogOffsetMetadata, LogReadResult} import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -54,7 +55,7 @@ class Replica(val brok
[kafka] branch trunk updated: KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f2dd6aa KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678) f2dd6aa is described below commit f2dd6aa2698345fd0b0348f7bc74ce3215adf682 Author: Jason Gustafson AuthorDate: Thu Oct 4 14:02:23 2018 -0700 KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678) This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections. Additionally, we have made the following changes: 1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache. 2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader. Reviewers: Jun Rao --- core/src/main/scala/kafka/cluster/Partition.scala | 11 +- core/src/main/scala/kafka/cluster/Replica.scala| 4 +- core/src/main/scala/kafka/log/Log.scala| 24 +- core/src/main/scala/kafka/log/LogSegment.scala | 4 +- .../kafka/server/epoch/LeaderEpochFileCache.scala | 197 + .../scala/unit/kafka/cluster/PartitionTest.scala | 33 ++ core/src/test/scala/unit/kafka/log/LogTest.scala | 41 +- .../unit/kafka/server/ISRExpirationTest.scala | 4 +- .../server/ReplicaAlterLogDirsThreadTest.scala | 28 +- .../kafka/server/ReplicaFetcherThreadTest.scala| 22 +- .../unit/kafka/server/ReplicaManagerTest.scala | 6 +- .../LeaderEpochCheckpointFileTest.scala| 1 - ...chDrivenReplicationProtocolAcceptanceTest.scala | 24 +- .../server/epoch/LeaderEpochFileCacheTest.scala| 467 - .../server/epoch/LeaderEpochIntegrationTest.scala | 49 ++- .../server/epoch/OffsetsForLeaderEpochTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 5 +- 17 files changed, 444 insertions(+), 478 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2036bb0..307fb81 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -301,8 +301,17 @@ class Partition(val topic: String, leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) zkVersion = partitionStateInfo.basePartitionState.zkVersion - val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) + // In the case of successive leader elections in a short time period, a follower may have + // entries in its log from a later epoch than any entry in the new leader's log. In order + // to ensure that these followers can truncate to the right offset, we must cache the new + // leader epoch and the start offset since it should be larger than any epoch that a follower + // would try to query. + leaderReplica.epochs.foreach { epochCache => +epochCache.assign(leaderEpoch, leaderEpochStartOffset) + } + + val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId) val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index d729dad..22860c7 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.log.Log -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils.Logging import kafka.server.{LogOffsetMetadata, LogReadResult} import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -55,7 +55,7 @@ class Replica(val brokerId: Int, def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs - val e
[kafka] branch 1.0 updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance
This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new cd22770 KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance cd22770 is described below commit cd2277069d355d2a9839c97265448b5a883feb89 Author: Lincong Li AuthorDate: Thu Oct 4 09:14:44 2018 -0700 KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance During the consumer group rebalance, when the joining group phase finishes, the heartbeat delayed operation of the consumer that fails to rejoin the group should be removed from the purgatory. Otherwise, even though the member ID of the consumer has been removed from the group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat delayed operation is going to timeout and then another unnecessary rebalance is triggered because of it. Author: Lincong Li Reviewers: Dong Lin Closes #5556 from Lincong/remove_heartbeat_delayedOperation (cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea) Signed-off-by: Dong Lin --- .../kafka/coordinator/group/GroupCoordinator.scala | 1 + .../coordinator/group/GroupCoordinatorTest.scala | 22 -- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 25b5780..c70d0e9 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -727,6 +727,7 @@ class GroupCoordinator(val brokerId: Int, group.inLock { // remove any members who haven't joined the group yet group.notYetRejoinedMembers.foreach { failedMember => +removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) // TODO: cut the socket connection to the client } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 3592d6a..a68a3d8 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -469,11 +469,29 @@ class GroupCoordinatorTest extends JUnitSuite { heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) -// now timeout the rebalance, which should kick the unjoined member out of the group -// and let the rebalance finish with only the new member +// now timeout the rebalance timer.advanceClock(500) val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) +val otherMemberId = otherJoinResult.memberId +val otherGenerationId = otherJoinResult.generationId +EasyMock.reset(replicaManager) +val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]())) +assertEquals(Errors.NONE, syncResult._2) + +// the unjoined member should be kicked out from the group assertEquals(Errors.NONE, otherJoinResult.error) +EasyMock.reset(replicaManager) +heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) +assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + +// the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while +// to verify that no new rebalance is triggered unexpectedly +for ( _ <- 1 to 20) { + timer.advanceClock(500) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId) + assertEquals(Errors.NONE, heartbeatResult) +} } @Test
[kafka] branch 1.1 updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance
This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 391d065 KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance 391d065 is described below commit 391d065a43ab3518014c3fcc661f2b57953435db Author: Lincong Li AuthorDate: Thu Oct 4 09:14:44 2018 -0700 KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance During the consumer group rebalance, when the joining group phase finishes, the heartbeat delayed operation of the consumer that fails to rejoin the group should be removed from the purgatory. Otherwise, even though the member ID of the consumer has been removed from the group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat delayed operation is going to timeout and then another unnecessary rebalance is triggered because of it. Author: Lincong Li Reviewers: Dong Lin Closes #5556 from Lincong/remove_heartbeat_delayedOperation (cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea) Signed-off-by: Dong Lin --- .../kafka/coordinator/group/GroupCoordinator.scala | 1 + .../coordinator/group/GroupCoordinatorTest.scala | 22 -- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 6cc0a41..5ffe6ba 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -774,6 +774,7 @@ class GroupCoordinator(val brokerId: Int, group.inLock { // remove any members who haven't joined the group yet group.notYetRejoinedMembers.foreach { failedMember => +removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) // TODO: cut the socket connection to the client } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 8529bf9..515dbe1 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -471,11 +471,29 @@ class GroupCoordinatorTest extends JUnitSuite { heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) -// now timeout the rebalance, which should kick the unjoined member out of the group -// and let the rebalance finish with only the new member +// now timeout the rebalance timer.advanceClock(500) val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) +val otherMemberId = otherJoinResult.memberId +val otherGenerationId = otherJoinResult.generationId +EasyMock.reset(replicaManager) +val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]())) +assertEquals(Errors.NONE, syncResult._2) + +// the unjoined member should be kicked out from the group assertEquals(Errors.NONE, otherJoinResult.error) +EasyMock.reset(replicaManager) +heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) +assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + +// the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while +// to verify that no new rebalance is triggered unexpectedly +for ( _ <- 1 to 20) { + timer.advanceClock(500) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId) + assertEquals(Errors.NONE, heartbeatResult) +} } @Test
[kafka] branch 2.0 updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance
This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new cc5e6f1 KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance cc5e6f1 is described below commit cc5e6f18a0153f8b9cf433ebe1b55e341fdbcd27 Author: Lincong Li AuthorDate: Thu Oct 4 09:14:44 2018 -0700 KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance During the consumer group rebalance, when the joining group phase finishes, the heartbeat delayed operation of the consumer that fails to rejoin the group should be removed from the purgatory. Otherwise, even though the member ID of the consumer has been removed from the group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat delayed operation is going to timeout and then another unnecessary rebalance is triggered because of it. Author: Lincong Li Reviewers: Dong Lin Closes #5556 from Lincong/remove_heartbeat_delayedOperation (cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea) Signed-off-by: Dong Lin --- .../kafka/coordinator/group/GroupCoordinator.scala | 1 + .../coordinator/group/GroupCoordinatorTest.scala | 22 -- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 9748e17..2c062e2 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -779,6 +779,7 @@ class GroupCoordinator(val brokerId: Int, group.inLock { // remove any members who haven't joined the group yet group.notYetRejoinedMembers.foreach { failedMember => +removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) // TODO: cut the socket connection to the client } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 608d7cc..efa44fa 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -527,11 +527,29 @@ class GroupCoordinatorTest extends JUnitSuite { heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) -// now timeout the rebalance, which should kick the unjoined member out of the group -// and let the rebalance finish with only the new member +// now timeout the rebalance timer.advanceClock(500) val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) +val otherMemberId = otherJoinResult.memberId +val otherGenerationId = otherJoinResult.generationId +EasyMock.reset(replicaManager) +val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]())) +assertEquals(Errors.NONE, syncResult._2) + +// the unjoined member should be kicked out from the group assertEquals(Errors.NONE, otherJoinResult.error) +EasyMock.reset(replicaManager) +heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) +assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + +// the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while +// to verify that no new rebalance is triggered unexpectedly +for ( _ <- 1 to 20) { + timer.advanceClock(500) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId) + assertEquals(Errors.NONE, heartbeatResult) +} } @Test
[kafka] branch trunk updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance
This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 260b07a KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance 260b07a is described below commit 260b07a6da070e6312443fb7cc6b937bef2865ea Author: Lincong Li AuthorDate: Thu Oct 4 09:14:44 2018 -0700 KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance During the consumer group rebalance, when the joining group phase finishes, the heartbeat delayed operation of the consumer that fails to rejoin the group should be removed from the purgatory. Otherwise, even though the member ID of the consumer has been removed from the group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat delayed operation is going to timeout and then another unnecessary rebalance is triggered because of it. Author: Lincong Li Reviewers: Dong Lin Closes #5556 from Lincong/remove_heartbeat_delayedOperation --- .../kafka/coordinator/group/GroupCoordinator.scala | 1 + .../coordinator/group/GroupCoordinatorTest.scala | 22 -- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index c4e6dc9..db89f14 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -775,6 +775,7 @@ class GroupCoordinator(val brokerId: Int, group.inLock { // remove any members who haven't joined the group yet group.notYetRejoinedMembers.foreach { failedMember => +removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) // TODO: cut the socket connection to the client } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 9df16ad..c2c0841 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -529,11 +529,29 @@ class GroupCoordinatorTest extends JUnitSuite { heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) -// now timeout the rebalance, which should kick the unjoined member out of the group -// and let the rebalance finish with only the new member +// now timeout the rebalance timer.advanceClock(500) val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) +val otherMemberId = otherJoinResult.memberId +val otherGenerationId = otherJoinResult.generationId +EasyMock.reset(replicaManager) +val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]())) +assertEquals(Errors.NONE, syncResult._2) + +// the unjoined member should be kicked out from the group assertEquals(Errors.NONE, otherJoinResult.error) +EasyMock.reset(replicaManager) +heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) +assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + +// the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while +// to verify that no new rebalance is triggered unexpectedly +for ( _ <- 1 to 20) { + timer.advanceClock(500) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId) + assertEquals(Errors.NONE, heartbeatResult) +} } @Test