[kafka] branch 0.9.0 updated (34ae29a -> 342c817)

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a change to branch 0.9.0
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 34ae29a  KAFKA-7058: Comparing schema default values using 
Objects#deepEquals()
 add 342c817  KAFKA-7476: Fix Date-based types in SchemaProjector

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)



[kafka] branch 0.10.0 updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 0.10.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.0 by this push:
 new 0ab0369  KAFKA-7476: Fix Date-based types in SchemaProjector
0ab0369 is described below

commit 0ab0369c9275cb2555e53967ed74c0b35d7691bf
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index 6277e44..08ee37a 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -159,7 +159,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 101be04..ef6d029 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -351,6 +351,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch 0.10.1 updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 0.10.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.1 by this push:
 new daf38aa  KAFKA-7476: Fix Date-based types in SchemaProjector
daf38aa is described below

commit daf38aa825b37b2a86e3c0238ee8e493d54e158f
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index 6277e44..08ee37a 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -159,7 +159,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 101be04..ef6d029 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -351,6 +351,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch 0.10.2 updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
 new b90c609  KAFKA-7476: Fix Date-based types in SchemaProjector
b90c609 is described below

commit b90c609ee08bf239f3539d81dda5e8990cb34600
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index 6277e44..08ee37a 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -159,7 +159,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 101be04..ef6d029 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -351,6 +351,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch 0.11.0 updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
 new a9ca107  KAFKA-7476: Fix Date-based types in SchemaProjector
a9ca107 is described below

commit a9ca1079bd28c716ba05f703b7cc814620cb1586
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index ea31752..5400705 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -160,7 +160,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 151114e..0db4eec 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -352,6 +352,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch 1.0 updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
 new 2b17877  KAFKA-7476: Fix Date-based types in SchemaProjector
2b17877 is described below

commit 2b1787742a2d535bf2b14620c50169cd7cea2328
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index ea31752..5400705 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -160,7 +160,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 151114e..0db4eec 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -352,6 +352,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch 2.0 updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
 new 5cef640  KAFKA-7476: Fix Date-based types in SchemaProjector
5cef640 is described below

commit 5cef640876f731ee68b359b7ca3afe939e54cabc
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index ea31752..5400705 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -160,7 +160,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 151114e..0db4eec 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -352,6 +352,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch 1.1 updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
 new ef6c69d  KAFKA-7476: Fix Date-based types in SchemaProjector
ef6c69d is described below

commit ef6c69d6285be12d5d4efe1dab9505d50636e00d
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index ea31752..5400705 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -160,7 +160,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 151114e..0db4eec 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -352,6 +352,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch trunk updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3edd8e7  KAFKA-7476: Fix Date-based types in SchemaProjector
3edd8e7 is described below

commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index ea31752..5400705 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -160,7 +160,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 151114e..0db4eec 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -352,6 +352,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch 2.1 updated: KAFKA-7476: Fix Date-based types in SchemaProjector

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new af930d3  KAFKA-7476: Fix Date-based types in SchemaProjector
af930d3 is described below

commit af930d3c76db24961cb6a3cafdeb414726ee3952
Author: Robert Yokota 
AuthorDate: Thu Oct 4 20:34:50 2018 -0700

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota 

Reviewers: Konstantine Karantasis , Ewen 
Cheslack-Postava 

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../java/org/apache/kafka/connect/data/SchemaProjector.java   |  2 +-
 .../org/apache/kafka/connect/data/SchemaProjectorTest.java| 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index ea31752..5400705 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -160,7 +160,7 @@ public class SchemaProjector {
 assert source.type().isPrimitive();
 assert target.type().isPrimitive();
 Object result;
-if (isPromotable(source.type(), target.type())) {
+if (isPromotable(source.type(), target.type()) && record instanceof 
Number) {
 Number numberRecord = (Number) record;
 switch (target.type()) {
 case INT8:
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 151114e..0db4eec 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -352,6 +352,17 @@ public class SchemaProjectorTest {
 projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
 assertEquals(34567L, projected);
 
+java.util.Date date = new java.util.Date();
+
+projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+assertEquals(date, projected);
+
+projected = SchemaProjector.project(Timestamp.SCHEMA, date, 
Timestamp.SCHEMA);
+assertEquals(date, projected);
+
 Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
 for (Schema logicalTypeSchema: logicalTypeSchemas) {
 try {



[kafka] branch 2.1 updated: MINOR: Fix generic type of ProcessorParameters (#5741)

2018-10-04 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new fa3fe3e  MINOR: Fix generic type of ProcessorParameters (#5741)
fa3fe3e is described below

commit fa3fe3e68059068ac894552078155b16c21404aa
Author: John Roesler 
AuthorDate: Thu Oct 4 19:37:53 2018 -0500

MINOR: Fix generic type of ProcessorParameters (#5741)

In unrelated recent work, I noticed some warnings about the missing type 
parameters on ProcessorParameters.

While investigating it, it seems like there was a bug in the creation of 
repartition topics.

Reviewers: Bill Bejeck , Guozhang Wang 

---
 .../internals/GroupedStreamAggregateBuilder.java   |   7 +-
 .../kstream/internals/KGroupedTableImpl.java   |  22 +--
 .../streams/kstream/internals/KStreamImpl.java | 184 +++--
 .../internals/graph/ProcessorParameters.java   |  12 +-
 .../internals/graph/StatefulProcessorNode.java |   6 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |   8 +-
 6 files changed, 125 insertions(+), 114 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 3439cf5..8e6f990 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -84,16 +84,17 @@ class GroupedStreamAggregateBuilder {
 builder.addGraphNode(parentNode, repartitionNode);
 parentNode = repartitionNode;
 }
-final StatefulProcessorNode.StatefulProcessorNodeBuilder 
statefulProcessorNodeBuilder = 
StatefulProcessorNode.statefulProcessorNodeBuilder();
+final StatefulProcessorNode.StatefulProcessorNodeBuilder 
statefulProcessorNodeBuilder = 
StatefulProcessorNode.statefulProcessorNodeBuilder();
+
+final ProcessorParameters processorParameters = new 
ProcessorParameters<>(aggregateSupplier, aggFunctionName);
 
-final ProcessorParameters processorParameters = new 
ProcessorParameters<>(aggregateSupplier, aggFunctionName);
 statefulProcessorNodeBuilder
 .withProcessorParameters(processorParameters)
 .withNodeName(aggFunctionName)
 .withRepartitionRequired(repartitionRequired)
 .withStoreBuilder(storeBuilder);
 
-final StatefulProcessorNode statefulProcessorNode = 
statefulProcessorNodeBuilder.build();
+final StatefulProcessorNode statefulProcessorNode = 
statefulProcessorNodeBuilder.build();
 
 builder.addGraphNode(parentNode, statefulProcessorNode);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index c97576b..013028d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -47,7 +47,7 @@ public class KGroupedTableImpl extends 
AbstractStream implements KGr
 
 private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-protected final String userSpecifiedName;
+private final String userSpecifiedName;
 
 private final Initializer countInitializer = () -> 0L;
 
@@ -72,7 +72,7 @@ public class KGroupedTableImpl extends 
AbstractStream implements KGr
 final String sourceName = 
builder.newProcessorName(KStreamImpl.SOURCE_NAME);
 final String funcName = builder.newProcessorName(functionName);
 final String repartitionTopic = (userSpecifiedName != null ? 
userSpecifiedName : materialized.storeName())
-+ KStreamImpl.REPARTITION_TOPIC_SUFFIX;
++ KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
 final StreamsGraphNode repartitionNode = 
createRepartitionNode(sinkName, sourceName, repartitionTopic);
 
@@ -98,16 +98,18 @@ public class KGroupedTableImpl extends 
AbstractStream implements KGr
 builder);
 }
 
-private  StatefulProcessorNode getStatefulProcessorNode(final 
MaterializedInternal> materialized,
-   final String 
functionName,
-   final 
ProcessorSupplier> aggregateSupplier) {
+private  StatefulProcessorNode> 
getStatefulProcessorNode(final MaterializedInternal> materialized,
+ 
final String functionName,
+ 
final 

[kafka] branch trunk updated: MINOR: Fix generic type of ProcessorParameters (#5741)

2018-10-04 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new d76805f  MINOR: Fix generic type of ProcessorParameters (#5741)
d76805f is described below

commit d76805f0fd2d65e9fb5582e4e0d30c1bd3fccd89
Author: John Roesler 
AuthorDate: Thu Oct 4 19:37:53 2018 -0500

MINOR: Fix generic type of ProcessorParameters (#5741)

In unrelated recent work, I noticed some warnings about the missing type 
parameters on ProcessorParameters.

While investigating it, it seems like there was a bug in the creation of 
repartition topics.

Reviewers: Bill Bejeck , Guozhang Wang 

---
 .../internals/GroupedStreamAggregateBuilder.java   |   7 +-
 .../kstream/internals/KGroupedTableImpl.java   |  22 +--
 .../streams/kstream/internals/KStreamImpl.java | 184 +++--
 .../internals/graph/ProcessorParameters.java   |  12 +-
 .../internals/graph/StatefulProcessorNode.java |   6 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |   8 +-
 6 files changed, 125 insertions(+), 114 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 3439cf5..8e6f990 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -84,16 +84,17 @@ class GroupedStreamAggregateBuilder {
 builder.addGraphNode(parentNode, repartitionNode);
 parentNode = repartitionNode;
 }
-final StatefulProcessorNode.StatefulProcessorNodeBuilder 
statefulProcessorNodeBuilder = 
StatefulProcessorNode.statefulProcessorNodeBuilder();
+final StatefulProcessorNode.StatefulProcessorNodeBuilder 
statefulProcessorNodeBuilder = 
StatefulProcessorNode.statefulProcessorNodeBuilder();
+
+final ProcessorParameters processorParameters = new 
ProcessorParameters<>(aggregateSupplier, aggFunctionName);
 
-final ProcessorParameters processorParameters = new 
ProcessorParameters<>(aggregateSupplier, aggFunctionName);
 statefulProcessorNodeBuilder
 .withProcessorParameters(processorParameters)
 .withNodeName(aggFunctionName)
 .withRepartitionRequired(repartitionRequired)
 .withStoreBuilder(storeBuilder);
 
-final StatefulProcessorNode statefulProcessorNode = 
statefulProcessorNodeBuilder.build();
+final StatefulProcessorNode statefulProcessorNode = 
statefulProcessorNodeBuilder.build();
 
 builder.addGraphNode(parentNode, statefulProcessorNode);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index c97576b..013028d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -47,7 +47,7 @@ public class KGroupedTableImpl extends 
AbstractStream implements KGr
 
 private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-protected final String userSpecifiedName;
+private final String userSpecifiedName;
 
 private final Initializer countInitializer = () -> 0L;
 
@@ -72,7 +72,7 @@ public class KGroupedTableImpl extends 
AbstractStream implements KGr
 final String sourceName = 
builder.newProcessorName(KStreamImpl.SOURCE_NAME);
 final String funcName = builder.newProcessorName(functionName);
 final String repartitionTopic = (userSpecifiedName != null ? 
userSpecifiedName : materialized.storeName())
-+ KStreamImpl.REPARTITION_TOPIC_SUFFIX;
++ KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
 final StreamsGraphNode repartitionNode = 
createRepartitionNode(sinkName, sourceName, repartitionTopic);
 
@@ -98,16 +98,18 @@ public class KGroupedTableImpl extends 
AbstractStream implements KGr
 builder);
 }
 
-private  StatefulProcessorNode getStatefulProcessorNode(final 
MaterializedInternal> materialized,
-   final String 
functionName,
-   final 
ProcessorSupplier> aggregateSupplier) {
+private  StatefulProcessorNode> 
getStatefulProcessorNode(final MaterializedInternal> materialized,
+ 
final String functionName,
+ 
fi

[kafka] branch trunk updated: MINOR: Bump version to 2.2.0-SNAPSHOT

2018-10-04 Thread lindong
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 2b37a04  MINOR: Bump version to 2.2.0-SNAPSHOT
2b37a04 is described below

commit 2b37a041d3c7048a4b95b65197c278df7f51a521
Author: Dong Lin 
AuthorDate: Thu Oct 4 17:30:03 2018 -0700

MINOR: Bump version to 2.2.0-SNAPSHOT

Author: Dong Lin 

Reviewers: Ismael Juma 

Closes #5744 from lindong28/bump-up-version-2.2.0
---
 gradle.properties  | 2 +-
 kafka-merge-pr.py  | 2 +-
 streams/quickstart/java/pom.xml| 2 +-
 streams/quickstart/java/src/main/resources/archetype-resources/pom.xml | 2 +-
 streams/quickstart/pom.xml | 2 +-
 tests/kafkatest/__init__.py| 2 +-
 tests/kafkatest/version.py | 2 +-
 7 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/gradle.properties b/gradle.properties
index 1853cf3..3e3dbfa 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -19,7 +19,7 @@ group=org.apache.kafka
 #  - tests/kafkatest/__init__.py,
 #  - tests/kafkatest/version.py (variable DEV_VERSION)
 #  - kafka-merge-pr.py.
-version=2.1.0-SNAPSHOT
+version=2.2.0-SNAPSHOT
 scalaVersion=2.11.12
 task=build
 org.gradle.jvmargs=-Xmx1024m -Xss2m
diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py
index e6b9a55..7bfeb2d 100755
--- a/kafka-merge-pr.py
+++ b/kafka-merge-pr.py
@@ -70,7 +70,7 @@ TEMP_BRANCH_PREFIX = "PR_TOOL"
 
 DEV_BRANCH_NAME = "trunk"
 
-DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.1.0")
+DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.2.0")
 
 def get_json(url):
 try:
diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml
index ec2dbf4..29fce17 100644
--- a/streams/quickstart/java/pom.xml
+++ b/streams/quickstart/java/pom.xml
@@ -26,7 +26,7 @@
 
 org.apache.kafka
 streams-quickstart
-2.1.0-SNAPSHOT
+2.2.0-SNAPSHOT
 ..
 
 
diff --git 
a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml 
b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
index 684e270..04643c1 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
@@ -29,7 +29,7 @@
 
 
 UTF-8
-2.1.0-SNAPSHOT
+2.2.0-SNAPSHOT
 1.7.7
 1.2.17
 
diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml
index e51a7e9..d1d456f 100644
--- a/streams/quickstart/pom.xml
+++ b/streams/quickstart/pom.xml
@@ -22,7 +22,7 @@
 org.apache.kafka
 streams-quickstart
 pom
-2.1.0-SNAPSHOT
+2.2.0-SNAPSHOT
 
 Kafka Streams :: Quickstart
 
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index ba7c48c..a53e943 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -22,4 +22,4 @@
 # Instead, in development branches, the version should have a suffix of the 
form ".devN"
 #
 # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be 
something like "1.0.0.dev0"
-__version__ = '2.1.0.dev0'
+__version__ = '2.2.0.dev0'
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 676ab96..6eed81f 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -60,7 +60,7 @@ def get_version(node=None):
 return DEV_BRANCH
 
 DEV_BRANCH = KafkaVersion("dev")
-DEV_VERSION = KafkaVersion("2.1.0-SNAPSHOT")
+DEV_VERSION = KafkaVersion("2.2.0-SNAPSHOT")
 
 # 0.8.2.x versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")



[kafka] branch trunk updated (f2dd6aa -> 7ea0655)

2018-10-04 Thread lindong
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from f2dd6aa  KAFKA-7415; Persist leader epoch and start offset on becoming 
a leader (#5678)
 add 7ea0655  KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be 
used concurrently

No new revisions were added by this update.

Summary of changes:
 .../main/scala/kafka/log/LogCleanerManager.scala   | 72 +++---
 core/src/main/scala/kafka/log/LogManager.scala | 15 +++--
 .../unit/kafka/log/LogCleanerManagerTest.scala | 62 +++
 3 files changed, 110 insertions(+), 39 deletions(-)



[kafka] branch 1.0 updated: MINOR: Increase timeout for starting JMX tool (#5735)

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
 new 53d76c7  MINOR: Increase timeout for starting JMX tool (#5735)
53d76c7 is described below

commit 53d76c73587f751923dfca83178847c9d3deb338
Author: Randall Hauch 
AuthorDate: Wed Oct 3 10:56:44 2018 -0500

MINOR: Increase timeout for starting JMX tool (#5735)

In some tests, the check monitoring the JMX tool log output doesn’t quite 
wait long enough before failing. Increasing the timeout from 10 to 20 seconds.
---
 tests/kafkatest/services/monitor/jmx.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/monitor/jmx.py 
b/tests/kafkatest/services/monitor/jmx.py
index 6f6e221..a64842c 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -83,7 +83,7 @@ class JmxMixin(object):
 
 self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, 
idx, cmd))
 node.account.ssh(cmd, allow_fail=False)
-wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
+wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
 self.started[idx-1] = True
 
 def _jmx_has_output(self, node):



[kafka] branch 2.1 created (now 7ea0655)

2018-10-04 Thread lindong
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a change to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git.


  at 7ea0655  KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be 
used concurrently

No new revisions were added by this update.



[kafka] branch 0.10.2 updated: MINOR: Increase timeout for starting JMX tool (#5735)

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
 new 6396b77  MINOR: Increase timeout for starting JMX tool (#5735)
6396b77 is described below

commit 6396b776a47a2f58d3a5a3d76bc8de495d8bc43c
Author: Randall Hauch 
AuthorDate: Wed Oct 3 10:56:44 2018 -0500

MINOR: Increase timeout for starting JMX tool (#5735)

In some tests, the check monitoring the JMX tool log output doesn’t quite 
wait long enough before failing. Increasing the timeout from 10 to 20 seconds.
---
 tests/kafkatest/services/monitor/jmx.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/monitor/jmx.py 
b/tests/kafkatest/services/monitor/jmx.py
index 0859bb4..f2ac33c 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -70,7 +70,7 @@ class JmxMixin(object):
 
 self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, 
idx, cmd))
 node.account.ssh(cmd, allow_fail=False)
-wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
+wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
 self.started[idx-1] = True
 
 def _jmx_has_output(self, node):



[kafka] branch 0.11.0 updated: MINOR: Increase timeout for starting JMX tool (#5735)

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
 new 59991f6  MINOR: Increase timeout for starting JMX tool (#5735)
59991f6 is described below

commit 59991f69cde98f691c2382754ed22bcd0fd884cf
Author: Randall Hauch 
AuthorDate: Wed Oct 3 10:56:44 2018 -0500

MINOR: Increase timeout for starting JMX tool (#5735)

In some tests, the check monitoring the JMX tool log output doesn’t quite 
wait long enough before failing. Increasing the timeout from 10 to 20 seconds.
---
 tests/kafkatest/services/monitor/jmx.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/monitor/jmx.py 
b/tests/kafkatest/services/monitor/jmx.py
index 7331cb9f..5af2b18 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -77,7 +77,7 @@ class JmxMixin(object):
 
 self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, 
idx, cmd))
 node.account.ssh(cmd, allow_fail=False)
-wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
+wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
 self.started[idx-1] = True
 
 def _jmx_has_output(self, node):



[kafka] branch 1.1 updated: MINOR: Increase timeout for starting JMX tool (#5735)

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
 new 9b61a0e  MINOR: Increase timeout for starting JMX tool (#5735)
9b61a0e is described below

commit 9b61a0ee0901867ed62a47edee21baa0e4ce79fa
Author: Randall Hauch 
AuthorDate: Wed Oct 3 10:56:44 2018 -0500

MINOR: Increase timeout for starting JMX tool (#5735)

In some tests, the check monitoring the JMX tool log output doesn’t quite 
wait long enough before failing. Increasing the timeout from 10 to 20 seconds.
---
 tests/kafkatest/services/monitor/jmx.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/monitor/jmx.py 
b/tests/kafkatest/services/monitor/jmx.py
index 6f6e221..a64842c 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -83,7 +83,7 @@ class JmxMixin(object):
 
 self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, 
idx, cmd))
 node.account.ssh(cmd, allow_fail=False)
-wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
+wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
 self.started[idx-1] = True
 
 def _jmx_has_output(self, node):



[kafka] branch 2.0 updated: MINOR: Increase timeout for starting JMX tool (#5735)

2018-10-04 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
 new e7093d6  MINOR: Increase timeout for starting JMX tool (#5735)
e7093d6 is described below

commit e7093d6c1a946abd23103ba5b3802c4d3dd38892
Author: Randall Hauch 
AuthorDate: Wed Oct 3 10:56:44 2018 -0500

MINOR: Increase timeout for starting JMX tool (#5735)

In some tests, the check monitoring the JMX tool log output doesn’t quite 
wait long enough before failing. Increasing the timeout from 10 to 20 seconds.
---
 tests/kafkatest/services/monitor/jmx.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/monitor/jmx.py 
b/tests/kafkatest/services/monitor/jmx.py
index 542d3a5..cf8cbc3 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -83,7 +83,7 @@ class JmxMixin(object):
 
 self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, 
idx, cmd))
 node.account.ssh(cmd, allow_fail=False)
-wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
+wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, 
backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
 self.started[idx-1] = True
 
 def _jmx_has_output(self, node):



[kafka] branch 2.0 updated: KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678)

2018-10-04 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
 new 18b2577  KAFKA-7415; Persist leader epoch and start offset on becoming 
a leader (#5678)
18b2577 is described below

commit 18b25773b11a4d7240457ac0f88670c7a55301e1
Author: Jason Gustafson 
AuthorDate: Thu Oct 4 14:02:23 2018 -0700

KAFKA-7415; Persist leader epoch and start offset on becoming a leader 
(#5678)

This patch ensures that the leader epoch cache is updated when a broker 
becomes leader with the latest epoch and the log end offset as its starting 
offset. This guarantees that the leader will be able to provide the right 
truncation point even if the follower has data from leader epochs which the 
leader itself does not have. This situation can occur when there are back to 
back leader elections.

Additionally, we have made the following changes:

1. The leader epoch cache enforces monotonically increase epochs and 
starting offsets among its entry. Whenever a new entry is appended which 
violates requirement, we remove the conflicting entries from the cache.
2. Previously we returned an unknown epoch and offset if an epoch is 
queried which comes before the first entry in the cache. Now we return the 
smallest . For example, if the earliest entry in the cache is (epoch=5, 
startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). 
This ensures that followers (and consumers in KIP-320) can always determine 
where the correct starting point is for the active log range on the leader.

Reviewers: Jun Rao 
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  11 +-
 core/src/main/scala/kafka/cluster/Replica.scala|   3 +-
 core/src/main/scala/kafka/log/Log.scala|  24 +-
 core/src/main/scala/kafka/log/LogSegment.scala |   6 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   6 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   6 +-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  | 197 +
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  33 ++
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  41 +-
 .../unit/kafka/server/ISRExpirationTest.scala  |   4 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala |  28 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala| 115 -
 .../unit/kafka/server/ReplicaManagerTest.scala |   8 +-
 .../LeaderEpochCheckpointFileTest.scala|   1 -
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  24 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala| 467 -
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  60 +--
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala|   5 +-
 19 files changed, 551 insertions(+), 490 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 24883e2..8c68c78 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -302,8 +302,17 @@ class Partition(val topic: String,
   leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
   leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
   zkVersion = partitionStateInfo.basePartitionState.zkVersion
-  val isNewLeader = leaderReplicaIdOpt.map(_ != 
localBrokerId).getOrElse(true)
 
+  // In the case of successive leader elections in a short time period, a 
follower may have
+  // entries in its log from a later epoch than any entry in the new 
leader's log. In order
+  // to ensure that these followers can truncate to the right offset, we 
must cache the new
+  // leader epoch and the start offset since it should be larger than any 
epoch that a follower
+  // would try to query.
+  leaderReplica.epochs.foreach { epochCache =>
+epochCache.assign(leaderEpoch, leaderEpochStartOffset)
+  }
+
+  val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
   val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
   val curTimeMs = time.milliseconds
   // initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and lastFetchLeaderLogEndOffset.
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 962aaff..c37889e 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,6 +18,7 @@
 package kafka.cluster
 
 import kafka.log.Log
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -54,7 +55,7 @@ class Replica(val brok

[kafka] branch trunk updated: KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678)

2018-10-04 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new f2dd6aa  KAFKA-7415; Persist leader epoch and start offset on becoming 
a leader (#5678)
f2dd6aa is described below

commit f2dd6aa2698345fd0b0348f7bc74ce3215adf682
Author: Jason Gustafson 
AuthorDate: Thu Oct 4 14:02:23 2018 -0700

KAFKA-7415; Persist leader epoch and start offset on becoming a leader 
(#5678)

This patch ensures that the leader epoch cache is updated when a broker 
becomes leader with the latest epoch and the log end offset as its starting 
offset. This guarantees that the leader will be able to provide the right 
truncation point even if the follower has data from leader epochs which the 
leader itself does not have. This situation can occur when there are back to 
back leader elections.

Additionally, we have made the following changes:

1. The leader epoch cache enforces monotonically increase epochs and 
starting offsets among its entry. Whenever a new entry is appended which 
violates requirement, we remove the conflicting entries from the cache.
2. Previously we returned an unknown epoch and offset if an epoch is 
queried which comes before the first entry in the cache. Now we return the 
smallest . For example, if the earliest entry in the cache is (epoch=5, 
startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). 
This ensures that followers (and consumers in KIP-320) can always determine 
where the correct starting point is for the active log range on the leader.

Reviewers: Jun Rao 
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  11 +-
 core/src/main/scala/kafka/cluster/Replica.scala|   4 +-
 core/src/main/scala/kafka/log/Log.scala|  24 +-
 core/src/main/scala/kafka/log/LogSegment.scala |   4 +-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  | 197 +
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  33 ++
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  41 +-
 .../unit/kafka/server/ISRExpirationTest.scala  |   4 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala |  28 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala|  22 +-
 .../unit/kafka/server/ReplicaManagerTest.scala |   6 +-
 .../LeaderEpochCheckpointFileTest.scala|   1 -
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  24 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala| 467 -
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  49 ++-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala|   5 +-
 17 files changed, 444 insertions(+), 478 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2036bb0..307fb81 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -301,8 +301,17 @@ class Partition(val topic: String,
   leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
   leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
   zkVersion = partitionStateInfo.basePartitionState.zkVersion
-  val isNewLeader = leaderReplicaIdOpt.map(_ != 
localBrokerId).getOrElse(true)
 
+  // In the case of successive leader elections in a short time period, a 
follower may have
+  // entries in its log from a later epoch than any entry in the new 
leader's log. In order
+  // to ensure that these followers can truncate to the right offset, we 
must cache the new
+  // leader epoch and the start offset since it should be larger than any 
epoch that a follower
+  // would try to query.
+  leaderReplica.epochs.foreach { epochCache =>
+epochCache.assign(leaderEpoch, leaderEpochStartOffset)
+  }
+
+  val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
   val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
   val curTimeMs = time.milliseconds
   // initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and lastFetchLeaderLogEndOffset.
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index d729dad..22860c7 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,7 +18,7 @@
 package kafka.cluster
 
 import kafka.log.Log
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -55,7 +55,7 @@ class Replica(val brokerId: Int,
 
   def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
 
-  val e

[kafka] branch 1.0 updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance

2018-10-04 Thread lindong
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
 new cd22770  KAFKA-7196; Remove heartbeat delayed operation for those 
removed consumers at the end of each rebalance
cd22770 is described below

commit cd2277069d355d2a9839c97265448b5a883feb89
Author: Lincong Li 
AuthorDate: Thu Oct 4 09:14:44 2018 -0700

KAFKA-7196; Remove heartbeat delayed operation for those removed consumers 
at the end of each rebalance

During the consumer group rebalance, when the joining group phase finishes, 
the heartbeat delayed operation of the consumer that fails to rejoin the group 
should be removed from the purgatory. Otherwise, even though the member ID of 
the consumer has been removed from the group, its heartbeat delayed operation 
is still registered in the purgatory and the heartbeat delayed operation is 
going to timeout and then another unnecessary rebalance is triggered because of 
it.

Author: Lincong Li 

Reviewers: Dong Lin 

Closes #5556 from Lincong/remove_heartbeat_delayedOperation

(cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea)
Signed-off-by: Dong Lin 
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  1 +
 .../coordinator/group/GroupCoordinatorTest.scala   | 22 --
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 25b5780..c70d0e9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -727,6 +727,7 @@ class GroupCoordinator(val brokerId: Int,
 group.inLock {
   // remove any members who haven't joined the group yet
   group.notYetRejoinedMembers.foreach { failedMember =>
+removeHeartbeatForLeavingMember(group, failedMember)
 group.remove(failedMember.memberId)
 // TODO: cut the socket connection to the client
   }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3592d6a..a68a3d8 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -469,11 +469,29 @@ class GroupCoordinatorTest extends JUnitSuite {
 heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
 assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
-// now timeout the rebalance, which should kick the unjoined member out of 
the group
-// and let the rebalance finish with only the new member
+// now timeout the rebalance
 timer.advanceClock(500)
 val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+val otherMemberId = otherJoinResult.memberId
+val otherGenerationId = otherJoinResult.generationId
+EasyMock.reset(replicaManager)
+val syncResult = syncGroupLeader(groupId, otherGenerationId, 
otherMemberId, Map(otherMemberId -> Array[Byte]()))
+assertEquals(Errors.NONE, syncResult._2)
+
+// the unjoined member should be kicked out from the group
 assertEquals(Errors.NONE, otherJoinResult.error)
+EasyMock.reset(replicaManager)
+heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+
+// the joined member should get heart beat response with no error. Let the 
new member keep heartbeating for a while
+// to verify that no new rebalance is triggered unexpectedly
+for ( _ <-  1 to 20) {
+  timer.advanceClock(500)
+  EasyMock.reset(replicaManager)
+  heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
+  assertEquals(Errors.NONE, heartbeatResult)
+}
   }
 
   @Test



[kafka] branch 1.1 updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance

2018-10-04 Thread lindong
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
 new 391d065  KAFKA-7196; Remove heartbeat delayed operation for those 
removed consumers at the end of each rebalance
391d065 is described below

commit 391d065a43ab3518014c3fcc661f2b57953435db
Author: Lincong Li 
AuthorDate: Thu Oct 4 09:14:44 2018 -0700

KAFKA-7196; Remove heartbeat delayed operation for those removed consumers 
at the end of each rebalance

During the consumer group rebalance, when the joining group phase finishes, 
the heartbeat delayed operation of the consumer that fails to rejoin the group 
should be removed from the purgatory. Otherwise, even though the member ID of 
the consumer has been removed from the group, its heartbeat delayed operation 
is still registered in the purgatory and the heartbeat delayed operation is 
going to timeout and then another unnecessary rebalance is triggered because of 
it.

Author: Lincong Li 

Reviewers: Dong Lin 

Closes #5556 from Lincong/remove_heartbeat_delayedOperation

(cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea)
Signed-off-by: Dong Lin 
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  1 +
 .../coordinator/group/GroupCoordinatorTest.scala   | 22 --
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 6cc0a41..5ffe6ba 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -774,6 +774,7 @@ class GroupCoordinator(val brokerId: Int,
 group.inLock {
   // remove any members who haven't joined the group yet
   group.notYetRejoinedMembers.foreach { failedMember =>
+removeHeartbeatForLeavingMember(group, failedMember)
 group.remove(failedMember.memberId)
 // TODO: cut the socket connection to the client
   }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 8529bf9..515dbe1 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -471,11 +471,29 @@ class GroupCoordinatorTest extends JUnitSuite {
 heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
 assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
-// now timeout the rebalance, which should kick the unjoined member out of 
the group
-// and let the rebalance finish with only the new member
+// now timeout the rebalance
 timer.advanceClock(500)
 val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+val otherMemberId = otherJoinResult.memberId
+val otherGenerationId = otherJoinResult.generationId
+EasyMock.reset(replicaManager)
+val syncResult = syncGroupLeader(groupId, otherGenerationId, 
otherMemberId, Map(otherMemberId -> Array[Byte]()))
+assertEquals(Errors.NONE, syncResult._2)
+
+// the unjoined member should be kicked out from the group
 assertEquals(Errors.NONE, otherJoinResult.error)
+EasyMock.reset(replicaManager)
+heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+
+// the joined member should get heart beat response with no error. Let the 
new member keep heartbeating for a while
+// to verify that no new rebalance is triggered unexpectedly
+for ( _ <-  1 to 20) {
+  timer.advanceClock(500)
+  EasyMock.reset(replicaManager)
+  heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
+  assertEquals(Errors.NONE, heartbeatResult)
+}
   }
 
   @Test



[kafka] branch 2.0 updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance

2018-10-04 Thread lindong
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
 new cc5e6f1  KAFKA-7196; Remove heartbeat delayed operation for those 
removed consumers at the end of each rebalance
cc5e6f1 is described below

commit cc5e6f18a0153f8b9cf433ebe1b55e341fdbcd27
Author: Lincong Li 
AuthorDate: Thu Oct 4 09:14:44 2018 -0700

KAFKA-7196; Remove heartbeat delayed operation for those removed consumers 
at the end of each rebalance

During the consumer group rebalance, when the joining group phase finishes, 
the heartbeat delayed operation of the consumer that fails to rejoin the group 
should be removed from the purgatory. Otherwise, even though the member ID of 
the consumer has been removed from the group, its heartbeat delayed operation 
is still registered in the purgatory and the heartbeat delayed operation is 
going to timeout and then another unnecessary rebalance is triggered because of 
it.

Author: Lincong Li 

Reviewers: Dong Lin 

Closes #5556 from Lincong/remove_heartbeat_delayedOperation

(cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea)
Signed-off-by: Dong Lin 
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  1 +
 .../coordinator/group/GroupCoordinatorTest.scala   | 22 --
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 9748e17..2c062e2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -779,6 +779,7 @@ class GroupCoordinator(val brokerId: Int,
 group.inLock {
   // remove any members who haven't joined the group yet
   group.notYetRejoinedMembers.foreach { failedMember =>
+removeHeartbeatForLeavingMember(group, failedMember)
 group.remove(failedMember.memberId)
 // TODO: cut the socket connection to the client
   }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 608d7cc..efa44fa 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -527,11 +527,29 @@ class GroupCoordinatorTest extends JUnitSuite {
 heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
 assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
-// now timeout the rebalance, which should kick the unjoined member out of 
the group
-// and let the rebalance finish with only the new member
+// now timeout the rebalance
 timer.advanceClock(500)
 val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+val otherMemberId = otherJoinResult.memberId
+val otherGenerationId = otherJoinResult.generationId
+EasyMock.reset(replicaManager)
+val syncResult = syncGroupLeader(groupId, otherGenerationId, 
otherMemberId, Map(otherMemberId -> Array[Byte]()))
+assertEquals(Errors.NONE, syncResult._2)
+
+// the unjoined member should be kicked out from the group
 assertEquals(Errors.NONE, otherJoinResult.error)
+EasyMock.reset(replicaManager)
+heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+
+// the joined member should get heart beat response with no error. Let the 
new member keep heartbeating for a while
+// to verify that no new rebalance is triggered unexpectedly
+for ( _ <-  1 to 20) {
+  timer.advanceClock(500)
+  EasyMock.reset(replicaManager)
+  heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
+  assertEquals(Errors.NONE, heartbeatResult)
+}
   }
 
   @Test



[kafka] branch trunk updated: KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance

2018-10-04 Thread lindong
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 260b07a  KAFKA-7196; Remove heartbeat delayed operation for those 
removed consumers at the end of each rebalance
260b07a is described below

commit 260b07a6da070e6312443fb7cc6b937bef2865ea
Author: Lincong Li 
AuthorDate: Thu Oct 4 09:14:44 2018 -0700

KAFKA-7196; Remove heartbeat delayed operation for those removed consumers 
at the end of each rebalance

During the consumer group rebalance, when the joining group phase finishes, 
the heartbeat delayed operation of the consumer that fails to rejoin the group 
should be removed from the purgatory. Otherwise, even though the member ID of 
the consumer has been removed from the group, its heartbeat delayed operation 
is still registered in the purgatory and the heartbeat delayed operation is 
going to timeout and then another unnecessary rebalance is triggered because of 
it.

Author: Lincong Li 

Reviewers: Dong Lin 

Closes #5556 from Lincong/remove_heartbeat_delayedOperation
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  1 +
 .../coordinator/group/GroupCoordinatorTest.scala   | 22 --
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index c4e6dc9..db89f14 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -775,6 +775,7 @@ class GroupCoordinator(val brokerId: Int,
 group.inLock {
   // remove any members who haven't joined the group yet
   group.notYetRejoinedMembers.foreach { failedMember =>
+removeHeartbeatForLeavingMember(group, failedMember)
 group.remove(failedMember.memberId)
 // TODO: cut the socket connection to the client
   }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 9df16ad..c2c0841 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -529,11 +529,29 @@ class GroupCoordinatorTest extends JUnitSuite {
 heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
 assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
-// now timeout the rebalance, which should kick the unjoined member out of 
the group
-// and let the rebalance finish with only the new member
+// now timeout the rebalance
 timer.advanceClock(500)
 val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+val otherMemberId = otherJoinResult.memberId
+val otherGenerationId = otherJoinResult.generationId
+EasyMock.reset(replicaManager)
+val syncResult = syncGroupLeader(groupId, otherGenerationId, 
otherMemberId, Map(otherMemberId -> Array[Byte]()))
+assertEquals(Errors.NONE, syncResult._2)
+
+// the unjoined member should be kicked out from the group
 assertEquals(Errors.NONE, otherJoinResult.error)
+EasyMock.reset(replicaManager)
+heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+
+// the joined member should get heart beat response with no error. Let the 
new member keep heartbeating for a while
+// to verify that no new rebalance is triggered unexpectedly
+for ( _ <-  1 to 20) {
+  timer.advanceClock(500)
+  EasyMock.reset(replicaManager)
+  heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
+  assertEquals(Errors.NONE, heartbeatResult)
+}
   }
 
   @Test