[kafka] branch trunk updated: KAFKA-8614; Consistent naming for IncrementalAlterConfig and AlterConfig responses (#7022)

2019-07-12 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ea814d7  KAFKA-8614; Consistent naming for IncrementalAlterConfig and 
AlterConfig responses (#7022)
ea814d7 is described below

commit ea814d78690940827e90950282f40a74ca45eb0f
Author: Bob Barrett 
AuthorDate: Fri Jul 12 17:18:41 2019 -0700

KAFKA-8614; Consistent naming for IncrementalAlterConfig and AlterConfig 
responses (#7022)

This patch changes the name of the `Resources` field of 
AlterConfigsResponse to `Responses`. This makes it consistent with 
AlterConfigsResponse, which has a differently-named but structurally-identical 
field. Tested with unit tests.

Reviewers: Jason Gustafson 
---
 .../common/requests/IncrementalAlterConfigsRequest.java|  4 ++--
 .../common/requests/IncrementalAlterConfigsResponse.java   | 14 +++---
 .../resources/common/message/AlterConfigsResponse.json |  2 +-
 .../common/message/IncrementalAlterConfigsResponse.json|  2 +-
 .../apache/kafka/clients/admin/KafkaAdminClientTest.java   |  8 
 .../apache/kafka/common/requests/RequestResponseTest.java  |  4 ++--
 6 files changed, 17 insertions(+), 17 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
index 3a87cdb..0c6c0b2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
@@ -20,7 +20,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
 
@@ -80,7 +80,7 @@ public class IncrementalAlterConfigsRequest extends 
AbstractRequest {
 IncrementalAlterConfigsResponseData response = new 
IncrementalAlterConfigsResponseData();
 ApiError apiError = ApiError.fromThrowable(e);
 for (AlterConfigsResource resource : data.resources()) {
-response.responses().add(new AlterConfigsResourceResult()
+response.responses().add(new AlterConfigsResourceResponse()
 .setResourceName(resource.resourceName())
 .setResourceType(resource.resourceType())
 .setErrorCode(apiError.error().code())
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
index 1e5aea1..46b1d53 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -35,7 +35,7 @@ public class IncrementalAlterConfigsResponse extends 
AbstractResponse {
 IncrementalAlterConfigsResponseData responseData = new 
IncrementalAlterConfigsResponseData();
 responseData.setThrottleTimeMs(requestThrottleMs);
 for (Map.Entry entry : results.entrySet()) {
-responseData.responses().add(new AlterConfigsResourceResult().
+responseData.responses().add(new AlterConfigsResourceResponse().
 setResourceName(entry.getKey().name()).
 setResourceType(entry.getKey().type().id()).
 setErrorCode(entry.getValue().error().code()).
@@ -46,9 +46,9 @@ public class IncrementalAlterConfigsResponse extends 
AbstractResponse {
 
 public static Map fromResponseData(final 
IncrementalAlterConfigsResponseData data) {
 Map map = new HashMap<>();
-for (AlterConfigsResourceResult result : data.responses()) {
-  

[kafka] branch trunk updated: MINOR: Use dynamic port in `RestServerTest` (#7079)

2019-07-12 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 1d873a9  MINOR: Use dynamic port in `RestServerTest` (#7079)
1d873a9 is described below

commit 1d873a9de9ad98d9f3120fa2ff59c3372650775b
Author: Jason Gustafson 
AuthorDate: Fri Jul 12 15:29:10 2019 -0700

MINOR: Use dynamic port in `RestServerTest` (#7079)

We have seen some failures recently in `RestServerTest`. It's the usual 
problem with reliance on static ports.
```
Caused by: java.io.IOException: Failed to bind to 0.0.0.0/0.0.0.0:8083
at 
org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:346)
at 
org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:308)
at 
org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
at 
org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:236)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.server.Server.doStart(Server.java:396)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at 
org.apache.kafka.connect.runtime.rest.RestServer.initializeServer(RestServer.java:178)
... 56 more
Caused by: java.net.BindException: Address already in use
```
This patch makes the chosen port dynamic.

Reviewers: Ismael Juma 
---
 .../java/org/apache/kafka/connect/runtime/rest/RestServerTest.java   | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 3609fb3..a640b83 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -78,6 +78,7 @@ public class RestServerTest {
 workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
 workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
 workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"connect-offsets");
+workerProps.put(WorkerConfig.LISTENERS_CONFIG, "HTTP://localhost:0");
 
 return workerProps;
 }
@@ -105,6 +106,7 @@ public class RestServerTest {
 
 // Build listener from hostname and port
 configMap = new HashMap<>(baseWorkerProps());
+configMap.remove(WorkerConfig.LISTENERS_CONFIG);
 configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname");
 configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080");
 config = new DistributedConfig(configMap);
@@ -115,7 +117,7 @@ public class RestServerTest {
 @SuppressWarnings("deprecation")
 @Test
 public void testAdvertisedUri() {
-// Advertised URI from listeenrs without protocol
+// Advertised URI from listeners without protocol
 Map configMap = new HashMap<>(baseWorkerProps());
 configMap.put(WorkerConfig.LISTENERS_CONFIG, 
"http://localhost:8080,https://localhost:8443;);
 DistributedConfig config = new DistributedConfig(configMap);
@@ -153,6 +155,7 @@ public class RestServerTest {
 
 // listener from hostname and port
 configMap = new HashMap<>(baseWorkerProps());
+configMap.remove(WorkerConfig.LISTENERS_CONFIG);
 configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname");
 configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080");
 config = new DistributedConfig(configMap);



[kafka] branch 2.2 updated: HOT FIX: close RocksDB objects in correct order (#7076)

2019-07-12 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new 74b672b  HOT FIX: close RocksDB objects in correct order (#7076)
74b672b is described below

commit 74b672b0fa11bd5989f1e1f2559686ed433999ed
Author: A. Sophie Blee-Goldman 
AuthorDate: Fri Jul 12 12:43:46 2019 -0700

HOT FIX: close RocksDB objects in correct order (#7076)

Reviewers: Bill Bejeck , Matthias J. Sax 

---
 .../RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java  | 3 ++-
 .../java/org/apache/kafka/streams/state/internals/RocksDBStore.java  | 5 -
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
index c07e43b..ba83c02 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
@@ -1356,7 +1356,8 @@ class 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
 
 @Override
 public void close() {
-columnFamilyOptions.close();
+// ColumnFamilyOptions should be closed last
 dbOptions.close();
+columnFamilyOptions.close();
 }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 109b8c3..ee62ec7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -374,11 +374,14 @@ public class RocksDBStore implements KeyValueStore {
 
 open = false;
 closeOpenIterators();
+
+// Important: do not rearrange the order in which the below objects 
are closed!
+// Order of closing must follow: ColumnFamilyHandle > RocksDB > 
DBOptions > ColumnFamilyOptions
 dbAccessor.close();
+db.close();
 userSpecifiedOptions.close();
 wOptions.close();
 fOptions.close();
-db.close();
 filter.close();
 
 dbAccessor = null;



[kafka] branch 2.3 updated: HOT FIX: close RocksDB objects in correct order (#7076)

2019-07-12 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 5942700  HOT FIX: close RocksDB objects in correct order (#7076)
5942700 is described below

commit 5942700a45363170c914f5a450bba0d9b3ca05d8
Author: A. Sophie Blee-Goldman 
AuthorDate: Fri Jul 12 12:43:46 2019 -0700

HOT FIX: close RocksDB objects in correct order (#7076)

Reviewers: Bill Bejeck , Matthias J. Sax 

---
 .../RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java   | 3 ++-
 .../java/org/apache/kafka/streams/state/internals/RocksDBStore.java   | 4 +++-
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
index d28682a..5b892a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
@@ -1384,7 +1384,8 @@ class 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
 
 @Override
 public void close() {
-columnFamilyOptions.close();
+// ColumnFamilyOptions should be closed last
 dbOptions.close();
+columnFamilyOptions.close();
 }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 0643e64..bcc4372 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -402,11 +402,13 @@ public class RocksDBStore implements KeyValueStore, BulkLoadingSt
 configSetter = null;
 }
 
+// Important: do not rearrange the order in which the below objects 
are closed!
+// Order of closing must follow: ColumnFamilyHandle > RocksDB > 
DBOptions > ColumnFamilyOptions
 dbAccessor.close();
+db.close();
 userSpecifiedOptions.close();
 wOptions.close();
 fOptions.close();
-db.close();
 filter.close();
 cache.close();
 



[kafka] branch trunk updated: HOT FIX: close RocksDB objects in correct order (#7076)

2019-07-12 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new d2328a1  HOT FIX: close RocksDB objects in correct order (#7076)
d2328a1 is described below

commit d2328a1a0e2fa5f3592cd67b320fb8f223978659
Author: A. Sophie Blee-Goldman 
AuthorDate: Fri Jul 12 12:43:46 2019 -0700

HOT FIX: close RocksDB objects in correct order (#7076)

Reviewers: Bill Bejeck , Matthias J. Sax 

---
 .../RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java   | 3 ++-
 .../java/org/apache/kafka/streams/state/internals/RocksDBStore.java   | 4 +++-
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
index d28682a..5b892a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
@@ -1384,7 +1384,8 @@ class 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
 
 @Override
 public void close() {
-columnFamilyOptions.close();
+// ColumnFamilyOptions should be closed last
 dbOptions.close();
+columnFamilyOptions.close();
 }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 0643e64..bcc4372 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -402,11 +402,13 @@ public class RocksDBStore implements KeyValueStore, BulkLoadingSt
 configSetter = null;
 }
 
+// Important: do not rearrange the order in which the below objects 
are closed!
+// Order of closing must follow: ColumnFamilyHandle > RocksDB > 
DBOptions > ColumnFamilyOptions
 dbAccessor.close();
+db.close();
 userSpecifiedOptions.close();
 wOptions.close();
 fOptions.close();
-db.close();
 filter.close();
 cache.close();
 



[kafka] branch trunk updated: MINOR: Create a new topic for each test for flaky RegexSourceIntegrationTest (#6853)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ae4a975  MINOR: Create a new topic for each test for flaky 
RegexSourceIntegrationTest (#6853)
ae4a975 is described below

commit ae4a97543e990407096d9a1e63c77bab8c18649c
Author: Bill Bejeck 
AuthorDate: Fri Jul 12 15:18:17 2019 -0400

MINOR: Create a new topic for each test for flaky 
RegexSourceIntegrationTest (#6853)

The RegexSourceIntegrationTest has some flakiness as it deletes and 
re-creates the same output topic before each test. This PR reduces the chance 
for errors by creating a unique output topic for each test.

Reviewers:  Matthias J. Sax ,  Boyang Chen 

---
 .../integration/RegexSourceIntegrationTest.java| 42 +-
 1 file changed, 25 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index f74487b..10e0650 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -62,6 +62,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -90,11 +91,12 @@ public class RegexSourceIntegrationTest {
 private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
 private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
 
-private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
 private static final String STRING_SERDE_CLASSNAME = 
Serdes.String().getClass().getName();
 private Properties streamsConfiguration;
 private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not 
updated";
 private KafkaStreams streams;
+private static volatile AtomicInteger topicSuffixGenerator = new 
AtomicInteger(0);
+private String outputTopic;
 
 
 @BeforeClass
@@ -107,16 +109,14 @@ public class RegexSourceIntegrationTest {
 TOPIC_Y,
 TOPIC_Z,
 FA_TOPIC,
-FOO_TOPIC,
-DEFAULT_OUTPUT_TOPIC);
+FOO_TOPIC);
 CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
 CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
 }
 
 @Before
-public void setUp() throws Exception {
-CLUSTER.deleteAndRecreateTopics(DEFAULT_OUTPUT_TOPIC);
-
+public void setUp() throws InterruptedException {
+outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
 final Properties properties = new Properties();
 properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
 properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
@@ -141,6 +141,7 @@ public class RegexSourceIntegrationTest {
 
 @Test
 public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+
 final Serde stringSerde = Serdes.String();
 final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
 final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
@@ -151,7 +152,7 @@ public class RegexSourceIntegrationTest {
 
 final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, 
stringSerde));
+pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
 final List assignedTopics = new CopyOnWriteArrayList<>();
 streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
 @Override
@@ -175,6 +176,12 @@ public class RegexSourceIntegrationTest {
 
 }
 
+private String createTopic(final int suffix) throws InterruptedException {
+final String outputTopic = "outputTopic_" + suffix;
+CLUSTER.createTopic(outputTopic);
+return outputTopic;
+}
+
 @Test
 public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
 
@@ -188,7 +195,7 @@ public class RegexSourceIntegrationTest {
 
 final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
 
-pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, 
stringSerde));
+pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
 
 final List assignedTopics = new CopyOnWriteArrayList<>();
 streams = new KafkaStreams(builder.build(), streamsConfiguration, new 

[kafka] branch 1.0 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
 new de8663a  KAFKA-7157: Fix handling of nulls in TimestampConverter 
(#7070)
de8663a is described below

commit de8663a0835d6d60a5d601626a9c301530acfed3
Author: Robert Yokota 
AuthorDate: Fri Jul 12 10:12:20 2019 -0700

KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva , Robert Yokota 

Reviewers: Arjun Satish , Randall Hauch 

---
 .../connect/transforms/TimestampConverter.java |  52 +++--
 .../connect/transforms/TimestampConverterTest.java | 233 +++--
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 8557441..f32253e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -47,7 +47,7 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class TimestampConverter> 
implements Transformation {
 
@@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements
 
 private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
+public static final Schema OPTIONAL_DATE_SCHEMA = 
org.apache.kafka.connect.data.Date.builder().optional().schema();
+public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = 
Timestamp.builder().optional().schema();
+public static final Schema OPTIONAL_TIME_SCHEMA = 
Time.builder().optional().schema();
+
 private interface TimestampTranslator {
 /**
  * Convert from the type-specific format to the universal 
java.util.Date format
@@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements
 /**
  * Get the schema for this format.
  */
-Schema typeSchema();
+Schema typeSchema(boolean isOptional);
 
 /**
  * Convert from the universal java.util.Date format to the 
type-specific format
@@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.STRING_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : 
Schema.STRING_SCHEMA;
 }
 
 @Override
@@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.INT64_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : 
Schema.INT64_SCHEMA;
 }
 
 @Override
@@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return org.apache.kafka.connect.data.Date.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_DATE_SCHEMA : 
org.apache.kafka.connect.data.Date.SCHEMA;
 }
 
 @Override
@@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Time.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
 }
 
 @Override
@@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Timestamp.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : 
Timestamp.SCHEMA;
 }
 
 @Override
@@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements
 if (config.field.isEmpty()) {
 Object value = operatingValue(record);
 // New schema is determined by the requested target timestamp type
-Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+Schema updatedSchema = 

[kafka] branch 1.1 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
 new c05ed1e  KAFKA-7157: Fix handling of nulls in TimestampConverter 
(#7070)
c05ed1e is described below

commit c05ed1eae466ef8afd8d67022d206d7d9bb24838
Author: Robert Yokota 
AuthorDate: Fri Jul 12 10:12:20 2019 -0700

KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva , Robert Yokota 

Reviewers: Arjun Satish , Randall Hauch 

---
 .../connect/transforms/TimestampConverter.java |  52 +++--
 .../connect/transforms/TimestampConverterTest.java | 233 +++--
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 8557441..f32253e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -47,7 +47,7 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class TimestampConverter> 
implements Transformation {
 
@@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements
 
 private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
+public static final Schema OPTIONAL_DATE_SCHEMA = 
org.apache.kafka.connect.data.Date.builder().optional().schema();
+public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = 
Timestamp.builder().optional().schema();
+public static final Schema OPTIONAL_TIME_SCHEMA = 
Time.builder().optional().schema();
+
 private interface TimestampTranslator {
 /**
  * Convert from the type-specific format to the universal 
java.util.Date format
@@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements
 /**
  * Get the schema for this format.
  */
-Schema typeSchema();
+Schema typeSchema(boolean isOptional);
 
 /**
  * Convert from the universal java.util.Date format to the 
type-specific format
@@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.STRING_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : 
Schema.STRING_SCHEMA;
 }
 
 @Override
@@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.INT64_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : 
Schema.INT64_SCHEMA;
 }
 
 @Override
@@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return org.apache.kafka.connect.data.Date.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_DATE_SCHEMA : 
org.apache.kafka.connect.data.Date.SCHEMA;
 }
 
 @Override
@@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Time.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
 }
 
 @Override
@@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Timestamp.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : 
Timestamp.SCHEMA;
 }
 
 @Override
@@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements
 if (config.field.isEmpty()) {
 Object value = operatingValue(record);
 // New schema is determined by the requested target timestamp type
-Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+Schema updatedSchema = 

[kafka] branch 2.0 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
 new 2e9d140  KAFKA-7157: Fix handling of nulls in TimestampConverter 
(#7070)
2e9d140 is described below

commit 2e9d140a655ee9d8b9951b4ce0850b486ef69029
Author: Robert Yokota 
AuthorDate: Fri Jul 12 10:12:20 2019 -0700

KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva , Robert Yokota 

Reviewers: Arjun Satish , Randall Hauch 

---
 .../connect/transforms/TimestampConverter.java |  52 +++--
 .../connect/transforms/TimestampConverterTest.java | 233 +++--
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 8557441..f32253e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -47,7 +47,7 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class TimestampConverter> 
implements Transformation {
 
@@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements
 
 private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
+public static final Schema OPTIONAL_DATE_SCHEMA = 
org.apache.kafka.connect.data.Date.builder().optional().schema();
+public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = 
Timestamp.builder().optional().schema();
+public static final Schema OPTIONAL_TIME_SCHEMA = 
Time.builder().optional().schema();
+
 private interface TimestampTranslator {
 /**
  * Convert from the type-specific format to the universal 
java.util.Date format
@@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements
 /**
  * Get the schema for this format.
  */
-Schema typeSchema();
+Schema typeSchema(boolean isOptional);
 
 /**
  * Convert from the universal java.util.Date format to the 
type-specific format
@@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.STRING_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : 
Schema.STRING_SCHEMA;
 }
 
 @Override
@@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.INT64_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : 
Schema.INT64_SCHEMA;
 }
 
 @Override
@@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return org.apache.kafka.connect.data.Date.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_DATE_SCHEMA : 
org.apache.kafka.connect.data.Date.SCHEMA;
 }
 
 @Override
@@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Time.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
 }
 
 @Override
@@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Timestamp.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : 
Timestamp.SCHEMA;
 }
 
 @Override
@@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements
 if (config.field.isEmpty()) {
 Object value = operatingValue(record);
 // New schema is determined by the requested target timestamp type
-Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+Schema updatedSchema = 

[kafka] branch 2.1 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new ccc751f  KAFKA-7157: Fix handling of nulls in TimestampConverter 
(#7070)
ccc751f is described below

commit ccc751f3b9a798db401a0740be36004df30f9ab2
Author: Robert Yokota 
AuthorDate: Fri Jul 12 10:12:20 2019 -0700

KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva , Robert Yokota 

Reviewers: Arjun Satish , Randall Hauch 

---
 .../connect/transforms/TimestampConverter.java |  52 +++--
 .../connect/transforms/TimestampConverterTest.java | 233 +++--
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 8557441..f32253e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -47,7 +47,7 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class TimestampConverter> 
implements Transformation {
 
@@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements
 
 private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
+public static final Schema OPTIONAL_DATE_SCHEMA = 
org.apache.kafka.connect.data.Date.builder().optional().schema();
+public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = 
Timestamp.builder().optional().schema();
+public static final Schema OPTIONAL_TIME_SCHEMA = 
Time.builder().optional().schema();
+
 private interface TimestampTranslator {
 /**
  * Convert from the type-specific format to the universal 
java.util.Date format
@@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements
 /**
  * Get the schema for this format.
  */
-Schema typeSchema();
+Schema typeSchema(boolean isOptional);
 
 /**
  * Convert from the universal java.util.Date format to the 
type-specific format
@@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.STRING_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : 
Schema.STRING_SCHEMA;
 }
 
 @Override
@@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.INT64_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : 
Schema.INT64_SCHEMA;
 }
 
 @Override
@@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return org.apache.kafka.connect.data.Date.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_DATE_SCHEMA : 
org.apache.kafka.connect.data.Date.SCHEMA;
 }
 
 @Override
@@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Time.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
 }
 
 @Override
@@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Timestamp.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : 
Timestamp.SCHEMA;
 }
 
 @Override
@@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements
 if (config.field.isEmpty()) {
 Object value = operatingValue(record);
 // New schema is determined by the requested target timestamp type
-Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+Schema updatedSchema = 

[kafka] branch 2.2 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new 10a72da  KAFKA-7157: Fix handling of nulls in TimestampConverter 
(#7070)
10a72da is described below

commit 10a72da77a264cec3ba726d33994e8001064bb0f
Author: Robert Yokota 
AuthorDate: Fri Jul 12 10:12:20 2019 -0700

KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva , Robert Yokota 

Reviewers: Arjun Satish , Randall Hauch 

---
 .../connect/transforms/TimestampConverter.java |  52 +++--
 .../connect/transforms/TimestampConverterTest.java | 233 +++--
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 8557441..f32253e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -47,7 +47,7 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class TimestampConverter> 
implements Transformation {
 
@@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements
 
 private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
+public static final Schema OPTIONAL_DATE_SCHEMA = 
org.apache.kafka.connect.data.Date.builder().optional().schema();
+public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = 
Timestamp.builder().optional().schema();
+public static final Schema OPTIONAL_TIME_SCHEMA = 
Time.builder().optional().schema();
+
 private interface TimestampTranslator {
 /**
  * Convert from the type-specific format to the universal 
java.util.Date format
@@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements
 /**
  * Get the schema for this format.
  */
-Schema typeSchema();
+Schema typeSchema(boolean isOptional);
 
 /**
  * Convert from the universal java.util.Date format to the 
type-specific format
@@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.STRING_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : 
Schema.STRING_SCHEMA;
 }
 
 @Override
@@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.INT64_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : 
Schema.INT64_SCHEMA;
 }
 
 @Override
@@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return org.apache.kafka.connect.data.Date.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_DATE_SCHEMA : 
org.apache.kafka.connect.data.Date.SCHEMA;
 }
 
 @Override
@@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Time.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
 }
 
 @Override
@@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Timestamp.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : 
Timestamp.SCHEMA;
 }
 
 @Override
@@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements
 if (config.field.isEmpty()) {
 Object value = operatingValue(record);
 // New schema is determined by the requested target timestamp type
-Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+Schema updatedSchema = 

[kafka] branch 1.1 updated: KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized (#7071)

2019-07-12 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
 new 8c93b7e  KAFKA-8570; Grow buffer to hold down converted records if it 
was insufficiently sized (#7071)
8c93b7e is described below

commit 8c93b7ecd651c17e61d89fdb533b8ec4d1b0bfd7
Author: Dhruvil Shah 
AuthorDate: Fri Jul 12 10:28:25 2019 -0700

KAFKA-8570; Grow buffer to hold down converted records if it was 
insufficiently sized (#7071)

Backport https://github.com/apache/kafka/pull/6974 to 1.1

When the log contains out of order message formats (for example v2 message 
followed by v1 message) and consists of compressed batches typically greater 
than 1kB in size, it is possible for down-conversion to fail. With compressed 
batches, we estimate the size of down-converted batches using:

```
private static int estimateCompressedSizeInBytes(int size, 
CompressionType compressionType) {
return compressionType == CompressionType.NONE ? size : 
Math.min(Math.max(size / 2, 1024), 1 << 16);
}
```

This almost always underestimates size of down-converted records if the 
batch is between 1kB-64kB in size. In general, this means we may under estimate 
the total size required for compressed batches.

Because of an implicit assumption in the code that messages with a lower 
message format appear before any with a higher message format, we do not grow 
the buffer we copy the down converted records into when we see a message <= the 
target message format. This assumption becomes incorrect when the log contains 
out of order message formats, for example because of leaders flapping while 
upgrading the message format.

Reviewers: Jason Gustafson 
---
 .../kafka/common/record/AbstractRecords.java   |  1 +
 .../kafka/common/record/FileRecordsTest.java   | 36 ++
 2 files changed, 37 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 89a5413..0552e6b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -104,6 +104,7 @@ public abstract class AbstractRecords implements Records {
 for (RecordBatchAndRecords recordBatchAndRecords : 
recordBatchAndRecordsList) {
 temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
 if (recordBatchAndRecords.batch.magic() <= toMagic) {
+buffer = Utils.ensureCapacity(buffer, buffer.position() + 
recordBatchAndRecords.batch.sizeInBytes());
 recordBatchAndRecords.batch.writeTo(buffer);
 } else {
 MemoryRecordsBuilder builder = convertRecordBatch(toMagic, 
buffer, recordBatchAndRecords);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index fdd3ede..d1bf3d3 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.test.TestUtils.tempFile;
@@ -359,6 +360,41 @@ public class FileRecordsTest {
 doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V2);
 }
 
+@Test
+public void testDownconversionAfterMessageFormatDowngrade() throws 
IOException {
+// random bytes
+Random random = new Random();
+byte[] bytes = new byte[3000];
+random.nextBytes(bytes);
+
+// records
+CompressionType compressionType = CompressionType.GZIP;
+List offsets = asList(0L, 1L);
+List magic = asList(RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.MAGIC_VALUE_V1);  // downgrade message format from v2 to v1
+List records = asList(
+new SimpleRecord(1L, "k1".getBytes(), bytes),
+new SimpleRecord(2L, "k2".getBytes(), bytes));
+byte toMagic = 1;
+
+// create MemoryRecords
+ByteBuffer buffer = ByteBuffer.allocate(8000);
+for (int i = 0; i < records.size(); i++) {
+MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magic.get(i), compressionType, TimestampType.CREATE_TIME, 0L);
+builder.appendWithOffset(offsets.get(i), records.get(i));
+builder.close();
+}
+buffer.flip();
+
+// create FileRecords, down-convert and verify
+try (FileRecords fileRecords = 

[kafka] branch 2.1 updated: KAFKA-8637: WriteBatch objects leak off-heap memory

2019-07-12 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new b5ea03e  KAFKA-8637: WriteBatch objects leak off-heap memory
b5ea03e is described below

commit b5ea03e0dee01d610c2d92b1436fd80d9d3dab7b
Author: A. Sophie Blee-Goldman 
AuthorDate: Fri Jul 12 10:19:50 2019 -0700

KAFKA-8637: WriteBatch objects leak off-heap memory

Reviewers: Bill Bejeck , Matthias J. Sax 

---
 .../apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index c4fce72..0669446 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -212,6 +212,7 @@ class RocksDBSegmentedBytesStore implements 
SegmentedBytesStore {
 final Segment segment = entry.getKey();
 final WriteBatch batch = entry.getValue();
 segment.write(batch);
+batch.close();
 }
 } catch (final RocksDBException e) {
 throw new ProcessorStateException("Error restoring batch to store 
" + this.name, e);



[kafka] branch 2.2 updated: KAFKA-8637: WriteBatch objects leak off-heap memory

2019-07-12 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new ca66c1e  KAFKA-8637: WriteBatch objects leak off-heap memory
ca66c1e is described below

commit ca66c1eef7fff184155d30945cc802d158211e85
Author: A. Sophie Blee-Goldman 
AuthorDate: Fri Jul 12 10:19:50 2019 -0700

KAFKA-8637: WriteBatch objects leak off-heap memory

Reviewers: Bill Bejeck , Matthias J. Sax 

---
 .../apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 0ed4e9d..cef8a34 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -211,6 +211,7 @@ public class RocksDBSegmentedBytesStore implements 
SegmentedBytesStore {
 final KeyValueSegment segment = entry.getKey();
 final WriteBatch batch = entry.getValue();
 segment.write(batch);
+batch.close();
 }
 } catch (final RocksDBException e) {
 throw new ProcessorStateException("Error restoring batch to store 
" + this.name, e);



[kafka] branch 2.3 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 813bf6c  KAFKA-7157: Fix handling of nulls in TimestampConverter 
(#7070)
813bf6c is described below

commit 813bf6c31ee84f66f517cfb6f44ca8abce37dcb9
Author: Robert Yokota 
AuthorDate: Fri Jul 12 10:12:20 2019 -0700

KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva , Robert Yokota 

Reviewers: Arjun Satish , Randall Hauch 

---
 .../connect/transforms/TimestampConverter.java |  52 +++--
 .../connect/transforms/TimestampConverterTest.java | 233 +++--
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 8557441..f32253e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -47,7 +47,7 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class TimestampConverter> 
implements Transformation {
 
@@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements
 
 private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
+public static final Schema OPTIONAL_DATE_SCHEMA = 
org.apache.kafka.connect.data.Date.builder().optional().schema();
+public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = 
Timestamp.builder().optional().schema();
+public static final Schema OPTIONAL_TIME_SCHEMA = 
Time.builder().optional().schema();
+
 private interface TimestampTranslator {
 /**
  * Convert from the type-specific format to the universal 
java.util.Date format
@@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements
 /**
  * Get the schema for this format.
  */
-Schema typeSchema();
+Schema typeSchema(boolean isOptional);
 
 /**
  * Convert from the universal java.util.Date format to the 
type-specific format
@@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.STRING_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : 
Schema.STRING_SCHEMA;
 }
 
 @Override
@@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.INT64_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : 
Schema.INT64_SCHEMA;
 }
 
 @Override
@@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return org.apache.kafka.connect.data.Date.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_DATE_SCHEMA : 
org.apache.kafka.connect.data.Date.SCHEMA;
 }
 
 @Override
@@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Time.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
 }
 
 @Override
@@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Timestamp.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : 
Timestamp.SCHEMA;
 }
 
 @Override
@@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements
 if (config.field.isEmpty()) {
 Object value = operatingValue(record);
 // New schema is determined by the requested target timestamp type
-Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+Schema updatedSchema = 

[kafka] branch trunk updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new fa042bc  KAFKA-7157: Fix handling of nulls in TimestampConverter 
(#7070)
fa042bc is described below

commit fa042bc491cf8b1e8ae0e5b9f7996564ba886d3d
Author: Robert Yokota 
AuthorDate: Fri Jul 12 10:12:20 2019 -0700

KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva , Robert Yokota 

Reviewers: Arjun Satish , Randall Hauch 

---
 .../connect/transforms/TimestampConverter.java |  52 +++--
 .../connect/transforms/TimestampConverterTest.java | 233 +++--
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 8557441..f32253e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -47,7 +47,7 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class TimestampConverter> 
implements Transformation {
 
@@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements
 
 private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
+public static final Schema OPTIONAL_DATE_SCHEMA = 
org.apache.kafka.connect.data.Date.builder().optional().schema();
+public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = 
Timestamp.builder().optional().schema();
+public static final Schema OPTIONAL_TIME_SCHEMA = 
Time.builder().optional().schema();
+
 private interface TimestampTranslator {
 /**
  * Convert from the type-specific format to the universal 
java.util.Date format
@@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements
 /**
  * Get the schema for this format.
  */
-Schema typeSchema();
+Schema typeSchema(boolean isOptional);
 
 /**
  * Convert from the universal java.util.Date format to the 
type-specific format
@@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.STRING_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : 
Schema.STRING_SCHEMA;
 }
 
 @Override
@@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Schema.INT64_SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : 
Schema.INT64_SCHEMA;
 }
 
 @Override
@@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return org.apache.kafka.connect.data.Date.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_DATE_SCHEMA : 
org.apache.kafka.connect.data.Date.SCHEMA;
 }
 
 @Override
@@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Time.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
 }
 
 @Override
@@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements
 }
 
 @Override
-public Schema typeSchema() {
-return Timestamp.SCHEMA;
+public Schema typeSchema(boolean isOptional) {
+return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : 
Timestamp.SCHEMA;
 }
 
 @Override
@@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements
 if (config.field.isEmpty()) {
 Object value = operatingValue(record);
 // New schema is determined by the requested target timestamp type
-Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+Schema updatedSchema = 

[kafka] branch 1.0 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
 new c6ffe50  KAFKA-6605: Fix NPE in Flatten when optional Struct is null 
(#5705)
c6ffe50 is described below

commit c6ffe50c6b857c9685c39808421fe729bdde4d92
Author: Michał Borowiecki 
AuthorDate: Fri Jul 12 16:27:33 2019 +0100

KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

Correct the Flatten SMT to properly handle null key or value `Struct` 
instances.

Author: Michal Borowiecki 
Reviewers: Arjun Satish , Robert Yokota 
, Randall Hauch 
---
 .../apache/kafka/connect/transforms/Flatten.java   | 29 ++--
 .../kafka/connect/transforms/FlattenTest.java  | 40 ++
 2 files changed, 58 insertions(+), 11 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index c5e4000..d7d2144 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class Flatten> implements 
Transformation {
 
@@ -136,20 +136,24 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private R applyWithSchema(R record) {
-final Struct value = requireStruct(operatingValue(record), PURPOSE);
+final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
 
-Schema updatedSchema = schemaUpdateCache.get(value.schema());
+Schema schema = operatingSchema(record);
+Schema updatedSchema = schemaUpdateCache.get(schema);
 if (updatedSchema == null) {
-final SchemaBuilder builder = 
SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
-Struct defaultValue = (Struct) value.schema().defaultValue();
-buildUpdatedSchema(value.schema(), "", builder, 
value.schema().isOptional(), defaultValue);
+final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
+Struct defaultValue = (Struct) schema.defaultValue();
+buildUpdatedSchema(schema, "", builder, schema.isOptional(), 
defaultValue);
 updatedSchema = builder.build();
-schemaUpdateCache.put(value.schema(), updatedSchema);
+schemaUpdateCache.put(schema, updatedSchema);
+}
+if (value == null) {
+return newRecord(record, updatedSchema, null);
+} else {
+final Struct updatedValue = new Struct(updatedSchema);
+buildWithSchema(value, "", updatedValue);
+return newRecord(record, updatedSchema, updatedValue);
 }
-
-final Struct updatedValue = new Struct(updatedSchema);
-buildWithSchema(value, "", updatedValue);
-return newRecord(record, updatedSchema, updatedValue);
 }
 
 /**
@@ -216,6 +220,9 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private void buildWithSchema(Struct record, String fieldNamePrefix, Struct 
newRecord) {
+if (record == null) {
+return;
+}
 for (Field field : record.schema().fields()) {
 final String fieldName = fieldName(fieldNamePrefix, field.name());
 switch (field.schema().type()) {
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index d709054..430bba6 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -182,6 +182,46 @@ public class FlattenTest {
 }
 
 @Test
+public void testOptionalStruct() {
+xformValue.configure(Collections.emptyMap());
+
+SchemaBuilder builder = SchemaBuilder.struct().optional();
+builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+Schema schema = builder.build();
+
+SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
+"topic", 0,
+schema, null));
+
+assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+assertNull(transformed.value());
+}
+
+@Test
+public void testOptionalNestedStruct() {
+

[kafka] branch 2.0 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
 new 842dacd  KAFKA-6605: Fix NPE in Flatten when optional Struct is null 
(#5705)
842dacd is described below

commit 842dacda8c12b89546e2666fed5a6e6a513093c4
Author: Michał Borowiecki 
AuthorDate: Fri Jul 12 16:27:33 2019 +0100

KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

Correct the Flatten SMT to properly handle null key or value `Struct` 
instances.

Author: Michal Borowiecki 
Reviewers: Arjun Satish , Robert Yokota 
, Randall Hauch 
---
 .../apache/kafka/connect/transforms/Flatten.java   | 29 ++--
 .../kafka/connect/transforms/FlattenTest.java  | 40 ++
 2 files changed, 58 insertions(+), 11 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index c5e4000..d7d2144 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class Flatten> implements 
Transformation {
 
@@ -136,20 +136,24 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private R applyWithSchema(R record) {
-final Struct value = requireStruct(operatingValue(record), PURPOSE);
+final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
 
-Schema updatedSchema = schemaUpdateCache.get(value.schema());
+Schema schema = operatingSchema(record);
+Schema updatedSchema = schemaUpdateCache.get(schema);
 if (updatedSchema == null) {
-final SchemaBuilder builder = 
SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
-Struct defaultValue = (Struct) value.schema().defaultValue();
-buildUpdatedSchema(value.schema(), "", builder, 
value.schema().isOptional(), defaultValue);
+final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
+Struct defaultValue = (Struct) schema.defaultValue();
+buildUpdatedSchema(schema, "", builder, schema.isOptional(), 
defaultValue);
 updatedSchema = builder.build();
-schemaUpdateCache.put(value.schema(), updatedSchema);
+schemaUpdateCache.put(schema, updatedSchema);
+}
+if (value == null) {
+return newRecord(record, updatedSchema, null);
+} else {
+final Struct updatedValue = new Struct(updatedSchema);
+buildWithSchema(value, "", updatedValue);
+return newRecord(record, updatedSchema, updatedValue);
 }
-
-final Struct updatedValue = new Struct(updatedSchema);
-buildWithSchema(value, "", updatedValue);
-return newRecord(record, updatedSchema, updatedValue);
 }
 
 /**
@@ -216,6 +220,9 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private void buildWithSchema(Struct record, String fieldNamePrefix, Struct 
newRecord) {
+if (record == null) {
+return;
+}
 for (Field field : record.schema().fields()) {
 final String fieldName = fieldName(fieldNamePrefix, field.name());
 switch (field.schema().type()) {
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index d709054..430bba6 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -182,6 +182,46 @@ public class FlattenTest {
 }
 
 @Test
+public void testOptionalStruct() {
+xformValue.configure(Collections.emptyMap());
+
+SchemaBuilder builder = SchemaBuilder.struct().optional();
+builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+Schema schema = builder.build();
+
+SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
+"topic", 0,
+schema, null));
+
+assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+assertNull(transformed.value());
+}
+
+@Test
+public void testOptionalNestedStruct() {
+

[kafka] branch 2.1 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new c651486  KAFKA-6605: Fix NPE in Flatten when optional Struct is null 
(#5705)
c651486 is described below

commit c6514869b263fd0d8e8db0b74cbc21859bcbe7a6
Author: Michał Borowiecki 
AuthorDate: Fri Jul 12 16:27:33 2019 +0100

KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

Correct the Flatten SMT to properly handle null key or value `Struct` 
instances.

Author: Michal Borowiecki 
Reviewers: Arjun Satish , Robert Yokota 
, Randall Hauch 
---
 .../apache/kafka/connect/transforms/Flatten.java   | 29 ++--
 .../kafka/connect/transforms/FlattenTest.java  | 40 ++
 2 files changed, 58 insertions(+), 11 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index c5e4000..d7d2144 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class Flatten> implements 
Transformation {
 
@@ -136,20 +136,24 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private R applyWithSchema(R record) {
-final Struct value = requireStruct(operatingValue(record), PURPOSE);
+final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
 
-Schema updatedSchema = schemaUpdateCache.get(value.schema());
+Schema schema = operatingSchema(record);
+Schema updatedSchema = schemaUpdateCache.get(schema);
 if (updatedSchema == null) {
-final SchemaBuilder builder = 
SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
-Struct defaultValue = (Struct) value.schema().defaultValue();
-buildUpdatedSchema(value.schema(), "", builder, 
value.schema().isOptional(), defaultValue);
+final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
+Struct defaultValue = (Struct) schema.defaultValue();
+buildUpdatedSchema(schema, "", builder, schema.isOptional(), 
defaultValue);
 updatedSchema = builder.build();
-schemaUpdateCache.put(value.schema(), updatedSchema);
+schemaUpdateCache.put(schema, updatedSchema);
+}
+if (value == null) {
+return newRecord(record, updatedSchema, null);
+} else {
+final Struct updatedValue = new Struct(updatedSchema);
+buildWithSchema(value, "", updatedValue);
+return newRecord(record, updatedSchema, updatedValue);
 }
-
-final Struct updatedValue = new Struct(updatedSchema);
-buildWithSchema(value, "", updatedValue);
-return newRecord(record, updatedSchema, updatedValue);
 }
 
 /**
@@ -216,6 +220,9 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private void buildWithSchema(Struct record, String fieldNamePrefix, Struct 
newRecord) {
+if (record == null) {
+return;
+}
 for (Field field : record.schema().fields()) {
 final String fieldName = fieldName(fieldNamePrefix, field.name());
 switch (field.schema().type()) {
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index d709054..430bba6 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -182,6 +182,46 @@ public class FlattenTest {
 }
 
 @Test
+public void testOptionalStruct() {
+xformValue.configure(Collections.emptyMap());
+
+SchemaBuilder builder = SchemaBuilder.struct().optional();
+builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+Schema schema = builder.build();
+
+SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
+"topic", 0,
+schema, null));
+
+assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+assertNull(transformed.value());
+}
+
+@Test
+public void testOptionalNestedStruct() {
+

[kafka] branch 2.2 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new 1fa3e97  KAFKA-6605: Fix NPE in Flatten when optional Struct is null 
(#5705)
1fa3e97 is described below

commit 1fa3e979237f5cede646a07651e07beaa2ad9376
Author: Michał Borowiecki 
AuthorDate: Fri Jul 12 16:27:33 2019 +0100

KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

Correct the Flatten SMT to properly handle null key or value `Struct` 
instances.

Author: Michal Borowiecki 
Reviewers: Arjun Satish , Robert Yokota 
, Randall Hauch 
---
 .../apache/kafka/connect/transforms/Flatten.java   | 29 ++--
 .../kafka/connect/transforms/FlattenTest.java  | 40 ++
 2 files changed, 58 insertions(+), 11 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index c5e4000..d7d2144 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class Flatten> implements 
Transformation {
 
@@ -136,20 +136,24 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private R applyWithSchema(R record) {
-final Struct value = requireStruct(operatingValue(record), PURPOSE);
+final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
 
-Schema updatedSchema = schemaUpdateCache.get(value.schema());
+Schema schema = operatingSchema(record);
+Schema updatedSchema = schemaUpdateCache.get(schema);
 if (updatedSchema == null) {
-final SchemaBuilder builder = 
SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
-Struct defaultValue = (Struct) value.schema().defaultValue();
-buildUpdatedSchema(value.schema(), "", builder, 
value.schema().isOptional(), defaultValue);
+final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
+Struct defaultValue = (Struct) schema.defaultValue();
+buildUpdatedSchema(schema, "", builder, schema.isOptional(), 
defaultValue);
 updatedSchema = builder.build();
-schemaUpdateCache.put(value.schema(), updatedSchema);
+schemaUpdateCache.put(schema, updatedSchema);
+}
+if (value == null) {
+return newRecord(record, updatedSchema, null);
+} else {
+final Struct updatedValue = new Struct(updatedSchema);
+buildWithSchema(value, "", updatedValue);
+return newRecord(record, updatedSchema, updatedValue);
 }
-
-final Struct updatedValue = new Struct(updatedSchema);
-buildWithSchema(value, "", updatedValue);
-return newRecord(record, updatedSchema, updatedValue);
 }
 
 /**
@@ -216,6 +220,9 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private void buildWithSchema(Struct record, String fieldNamePrefix, Struct 
newRecord) {
+if (record == null) {
+return;
+}
 for (Field field : record.schema().fields()) {
 final String fieldName = fieldName(fieldNamePrefix, field.name());
 switch (field.schema().type()) {
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index b0549fb..2e7be95 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -183,6 +183,46 @@ public class FlattenTest {
 }
 
 @Test
+public void testOptionalStruct() {
+xformValue.configure(Collections.emptyMap());
+
+SchemaBuilder builder = SchemaBuilder.struct().optional();
+builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+Schema schema = builder.build();
+
+SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
+"topic", 0,
+schema, null));
+
+assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+assertNull(transformed.value());
+}
+
+@Test
+public void testOptionalNestedStruct() {
+

[kafka] branch 2.3 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 2400f72  KAFKA-6605: Fix NPE in Flatten when optional Struct is null 
(#5705)
2400f72 is described below

commit 2400f729a1069393503b8657cb69ed524c466ff4
Author: Michał Borowiecki 
AuthorDate: Fri Jul 12 16:27:33 2019 +0100

KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

Correct the Flatten SMT to properly handle null key or value `Struct` 
instances.

Author: Michal Borowiecki 
Reviewers: Arjun Satish , Robert Yokota 
, Randall Hauch 
---
 .../apache/kafka/connect/transforms/Flatten.java   | 29 ++--
 .../kafka/connect/transforms/FlattenTest.java  | 40 ++
 2 files changed, 58 insertions(+), 11 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index c5e4000..d7d2144 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class Flatten> implements 
Transformation {
 
@@ -136,20 +136,24 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private R applyWithSchema(R record) {
-final Struct value = requireStruct(operatingValue(record), PURPOSE);
+final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
 
-Schema updatedSchema = schemaUpdateCache.get(value.schema());
+Schema schema = operatingSchema(record);
+Schema updatedSchema = schemaUpdateCache.get(schema);
 if (updatedSchema == null) {
-final SchemaBuilder builder = 
SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
-Struct defaultValue = (Struct) value.schema().defaultValue();
-buildUpdatedSchema(value.schema(), "", builder, 
value.schema().isOptional(), defaultValue);
+final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
+Struct defaultValue = (Struct) schema.defaultValue();
+buildUpdatedSchema(schema, "", builder, schema.isOptional(), 
defaultValue);
 updatedSchema = builder.build();
-schemaUpdateCache.put(value.schema(), updatedSchema);
+schemaUpdateCache.put(schema, updatedSchema);
+}
+if (value == null) {
+return newRecord(record, updatedSchema, null);
+} else {
+final Struct updatedValue = new Struct(updatedSchema);
+buildWithSchema(value, "", updatedValue);
+return newRecord(record, updatedSchema, updatedValue);
 }
-
-final Struct updatedValue = new Struct(updatedSchema);
-buildWithSchema(value, "", updatedValue);
-return newRecord(record, updatedSchema, updatedValue);
 }
 
 /**
@@ -216,6 +220,9 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private void buildWithSchema(Struct record, String fieldNamePrefix, Struct 
newRecord) {
+if (record == null) {
+return;
+}
 for (Field field : record.schema().fields()) {
 final String fieldName = fieldName(fieldNamePrefix, field.name());
 switch (field.schema().type()) {
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index b0549fb..2e7be95 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -183,6 +183,46 @@ public class FlattenTest {
 }
 
 @Test
+public void testOptionalStruct() {
+xformValue.configure(Collections.emptyMap());
+
+SchemaBuilder builder = SchemaBuilder.struct().optional();
+builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+Schema schema = builder.build();
+
+SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
+"topic", 0,
+schema, null));
+
+assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+assertNull(transformed.value());
+}
+
+@Test
+public void testOptionalNestedStruct() {
+

[kafka] branch trunk updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

2019-07-12 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new fc4fea6  KAFKA-6605: Fix NPE in Flatten when optional Struct is null 
(#5705)
fc4fea6 is described below

commit fc4fea6761986749f0ac640868d9b4d2a552eb62
Author: Michał Borowiecki 
AuthorDate: Fri Jul 12 16:27:33 2019 +0100

KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

Correct the Flatten SMT to properly handle null key or value `Struct` 
instances.

Author: Michal Borowiecki 
Reviewers: Arjun Satish , Robert Yokota 
, Randall Hauch 
---
 .../apache/kafka/connect/transforms/Flatten.java   | 29 ++--
 .../kafka/connect/transforms/FlattenTest.java  | 40 ++
 2 files changed, 58 insertions(+), 11 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index c5e4000..d7d2144 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class Flatten> implements 
Transformation {
 
@@ -136,20 +136,24 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private R applyWithSchema(R record) {
-final Struct value = requireStruct(operatingValue(record), PURPOSE);
+final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
 
-Schema updatedSchema = schemaUpdateCache.get(value.schema());
+Schema schema = operatingSchema(record);
+Schema updatedSchema = schemaUpdateCache.get(schema);
 if (updatedSchema == null) {
-final SchemaBuilder builder = 
SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
-Struct defaultValue = (Struct) value.schema().defaultValue();
-buildUpdatedSchema(value.schema(), "", builder, 
value.schema().isOptional(), defaultValue);
+final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
+Struct defaultValue = (Struct) schema.defaultValue();
+buildUpdatedSchema(schema, "", builder, schema.isOptional(), 
defaultValue);
 updatedSchema = builder.build();
-schemaUpdateCache.put(value.schema(), updatedSchema);
+schemaUpdateCache.put(schema, updatedSchema);
+}
+if (value == null) {
+return newRecord(record, updatedSchema, null);
+} else {
+final Struct updatedValue = new Struct(updatedSchema);
+buildWithSchema(value, "", updatedValue);
+return newRecord(record, updatedSchema, updatedValue);
 }
-
-final Struct updatedValue = new Struct(updatedSchema);
-buildWithSchema(value, "", updatedValue);
-return newRecord(record, updatedSchema, updatedValue);
 }
 
 /**
@@ -216,6 +220,9 @@ public abstract class Flatten> 
implements Transformat
 }
 
 private void buildWithSchema(Struct record, String fieldNamePrefix, Struct 
newRecord) {
+if (record == null) {
+return;
+}
 for (Field field : record.schema().fields()) {
 final String fieldName = fieldName(fieldNamePrefix, field.name());
 switch (field.schema().type()) {
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index b0549fb..2e7be95 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -183,6 +183,46 @@ public class FlattenTest {
 }
 
 @Test
+public void testOptionalStruct() {
+xformValue.configure(Collections.emptyMap());
+
+SchemaBuilder builder = SchemaBuilder.struct().optional();
+builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+Schema schema = builder.build();
+
+SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
+"topic", 0,
+schema, null));
+
+assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+assertNull(transformed.value());
+}
+
+@Test
+public void testOptionalNestedStruct() {
+ 

[kafka] branch 1.1 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
 new cf3f9ac  Fixes #8198 KStreams testing docs use non-existent method 
pipe (#6678)
cf3f9ac is described below

commit cf3f9ac4ea1bc40f014ae93a6dc197f60834ff66
Author: slim 
AuthorDate: Fri Jul 12 18:06:05 2019 +0300

Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

Minor fix of #8198 apache/kafka-site#210

Reviewers: John Roesler , Bill Bejeck 
---
 docs/streams/developer-guide/testing.html | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
index e6886a1..c3dc0a8 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -76,12 +76,14 @@ TopologyTestDriver testDriver = new 
TopologyTestDriver(topology, config);
 
 
 ConsumerRecordFactoryString, Integer factory = new 
ConsumerRecordFactory("input-topic", new StringSerializer(), new 
IntegerSerializer());
-testDriver.pipe(factory.create("key", 42L));
-
-
-To verify the output, the test driver produces 
ProducerRecords with key and value type byte[].
-For result verification, you can specify corresponding deserializers when 
reading the output record from the driver.
-
+testDriver.pipeInput(factory.create("key", 42L));
+
+
+To verify the output, the test driver produces 
ProducerRecords with key and value type
+byte[].
+For result verification, you can specify corresponding 
deserializers when reading the output record from
+the driver.
+
 ProducerRecordString, Integer outputRecord = 
testDriver.readOutput("output-topic", new StringDeserializer(), new 
LongDeserializer());
 
 



[kafka] branch 2.0 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
 new 8cc43fa  Fixes #8198 KStreams testing docs use non-existent method 
pipe (#6678)
8cc43fa is described below

commit 8cc43fa313a97d929eca61e45a887cf802e4e0ac
Author: slim 
AuthorDate: Fri Jul 12 18:06:05 2019 +0300

Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

Minor fix of #8198 apache/kafka-site#210

Reviewers: John Roesler , Bill Bejeck 
---
 docs/streams/developer-guide/testing.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
index bdecc43..9ede7ea 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new 
TopologyTestDriver(topology, props);
 
 
 ConsumerRecordFactoryString, Integer factory = new 
ConsumerRecordFactory("input-topic", new StringSerializer(), new 
IntegerSerializer());
-testDriver.pipe(factory.create("key", 42L));
+testDriver.pipeInput(factory.create("key", 42L));
 
 
 To verify the output, the test driver produces 
ProducerRecords with key and value type



[kafka] branch 2.1 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new ea8baf5  Fixes #8198 KStreams testing docs use non-existent method 
pipe (#6678)
ea8baf5 is described below

commit ea8baf54fd2eb693397d0670ea475a5e07396eff
Author: slim 
AuthorDate: Fri Jul 12 18:06:05 2019 +0300

Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

Minor fix of #8198 apache/kafka-site#210

Reviewers: John Roesler , Bill Bejeck 
---
 docs/streams/developer-guide/testing.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
index 026b02b..55a89b0 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new 
TopologyTestDriver(topology, props);
 
 
 ConsumerRecordFactoryString, Integer factory = new 
ConsumerRecordFactory("input-topic", new StringSerializer(), new 
IntegerSerializer());
-testDriver.pipe(factory.create("key", 42L));
+testDriver.pipeInput(factory.create("key", 42L));
 
 
 To verify the output, the test driver produces 
ProducerRecords with key and value type



[kafka] branch 2.2 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new f2ae6e3  Fixes #8198 KStreams testing docs use non-existent method 
pipe (#6678)
f2ae6e3 is described below

commit f2ae6e3ca900d963cad46ee4adfbdc928ee68d68
Author: slim 
AuthorDate: Fri Jul 12 18:06:05 2019 +0300

Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

Minor fix of #8198 apache/kafka-site#210

Reviewers: John Roesler , Bill Bejeck 
---
 docs/streams/developer-guide/testing.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
index 026b02b..55a89b0 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new 
TopologyTestDriver(topology, props);
 
 
 ConsumerRecordFactoryString, Integer factory = new 
ConsumerRecordFactory("input-topic", new StringSerializer(), new 
IntegerSerializer());
-testDriver.pipe(factory.create("key", 42L));
+testDriver.pipeInput(factory.create("key", 42L));
 
 
 To verify the output, the test driver produces 
ProducerRecords with key and value type



[kafka] branch 2.3 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 62617b9  Fixes #8198 KStreams testing docs use non-existent method 
pipe (#6678)
62617b9 is described below

commit 62617b93d13215716600aa498f6806978be07c30
Author: slim 
AuthorDate: Fri Jul 12 18:06:05 2019 +0300

Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

Minor fix of #8198 apache/kafka-site#210

Reviewers: John Roesler , Bill Bejeck 
---
 docs/streams/developer-guide/testing.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
index 026b02b..55a89b0 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new 
TopologyTestDriver(topology, props);
 
 
 ConsumerRecordFactoryString, Integer factory = new 
ConsumerRecordFactory("input-topic", new StringSerializer(), new 
IntegerSerializer());
-testDriver.pipe(factory.create("key", 42L));
+testDriver.pipeInput(factory.create("key", 42L));
 
 
 To verify the output, the test driver produces 
ProducerRecords with key and value type



[kafka] branch trunk updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 8cabd44  Fixes #8198 KStreams testing docs use non-existent method 
pipe (#6678)
8cabd44 is described below

commit 8cabd44e1d35d3cf3f865d8a4eee8c04228e0d75
Author: slim 
AuthorDate: Fri Jul 12 18:06:05 2019 +0300

Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)

Minor fix of #8198 apache/kafka-site#210

Reviewers: John Roesler , Bill Bejeck 
---
 docs/streams/developer-guide/testing.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
index 026b02b..55a89b0 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new 
TopologyTestDriver(topology, props);
 
 
 ConsumerRecordFactoryString, Integer factory = new 
ConsumerRecordFactory("input-topic", new StringSerializer(), new 
IntegerSerializer());
-testDriver.pipe(factory.create("key", 42L));
+testDriver.pipeInput(factory.create("key", 42L));
 
 
 To verify the output, the test driver produces 
ProducerRecords with key and value type



[kafka] branch 2.2 updated: KAFKA-5998: fix checkpointableOffsets handling (#7030)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new db98c6f  KAFKA-5998: fix checkpointableOffsets handling (#7030)
db98c6f is described below

commit db98c6f8cdeb27e7dd858cd37499c66458064e5a
Author: John Roesler 
AuthorDate: Fri Jul 12 08:42:11 2019 -0500

KAFKA-5998: fix checkpointableOffsets handling (#7030)

fix checkpoint file warning by filtering checkpointable offsets per task
clean up state manager hierarchy to prevent similar bugs

Reviewers: Bruno Cadonna , Bill Bejeck 

---
 .../internals/GlobalStateManagerImpl.java  |  68 ---
 .../processor/internals/ProcessorStateManager.java | 203 ++---
 ...ractStateManager.java => StateManagerUtil.java} |  41 ++---
 .../streams/processor/internals/StreamTask.java|  11 +-
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../streams/state/internals/OffsetCheckpoint.java  |   5 +
 .../internals/GlobalStateManagerImplTest.java  |   8 +-
 .../processor/internals/MockChangelogReader.java   |   7 +-
 .../internals/ProcessorStateManagerTest.java   | 134 +-
 .../processor/internals/StandbyTaskTest.java   |   4 +-
 .../processor/internals/StreamTaskTest.java|   6 +-
 .../processor/internals/StreamThreadTest.java  |   2 +-
 .../processor/internals/TaskManagerTest.java   |   6 +-
 13 files changed, 356 insertions(+), 141 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f224e1e..6a3959f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.FixedOrderMap;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -32,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.slf4j.Logger;
 
@@ -48,22 +50,30 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
+
 /**
  * This class is responsible for the initialization, restoration, closing, 
flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
  */
-public class GlobalStateManagerImpl extends AbstractStateManager implements 
GlobalStateManager {
+public class GlobalStateManagerImpl implements GlobalStateManager {
 private final Logger log;
+private final boolean eosEnabled;
 private final ProcessorTopology topology;
 private final Consumer globalConsumer;
+private final File baseDir;
 private final StateDirectory stateDirectory;
 private final Set globalStoreNames = new HashSet<>();
+private final FixedOrderMap> globalStores = 
new FixedOrderMap<>();
 private final StateRestoreListener stateRestoreListener;
-private InternalProcessorContext processorContext;
+private InternalProcessorContext globalProcessorContext;
 private final int retries;
 private final long retryBackoffMs;
 private final Duration pollTime;
 private final Set globalNonPersistentStoresTopics = new 
HashSet<>();
+private final OffsetCheckpoint checkpointFile;
+private final Map checkpointFileCache;
 
 public GlobalStateManagerImpl(final LogContext logContext,
   final ProcessorTopology topology,
@@ -71,7 +81,10 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
   final StateDirectory stateDirectory,
   final StateRestoreListener 
stateRestoreListener,
   final StreamsConfig config) {
-super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
+   

[kafka] branch 2.3 updated: KAFKA-5998: fix checkpointableOffsets handling (#7030)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 1052d87  KAFKA-5998: fix checkpointableOffsets handling (#7030)
1052d87 is described below

commit 1052d87d37bdc0bab085de2bcf8f49a344dfcdd5
Author: John Roesler 
AuthorDate: Fri Jul 12 08:42:11 2019 -0500

KAFKA-5998: fix checkpointableOffsets handling (#7030)

fix checkpoint file warning by filtering checkpointable offsets per task
clean up state manager hierarchy to prevent similar bugs

Reviewers: Bruno Cadonna , Bill Bejeck 

---
 .../internals/GlobalStateManagerImpl.java  |  68 ---
 .../processor/internals/ProcessorStateManager.java | 203 ++---
 ...ractStateManager.java => StateManagerUtil.java} |  41 ++---
 .../streams/processor/internals/StreamTask.java|  11 +-
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../streams/state/internals/OffsetCheckpoint.java  |   5 +
 .../internals/GlobalStateManagerImplTest.java  |   8 +-
 .../processor/internals/MockChangelogReader.java   |   7 +-
 .../internals/ProcessorStateManagerTest.java   | 134 +-
 .../processor/internals/StandbyTaskTest.java   |   4 +-
 .../processor/internals/StreamTaskTest.java|   6 +-
 .../processor/internals/StreamThreadTest.java  |   2 +-
 .../processor/internals/TaskManagerTest.java   |   6 +-
 13 files changed, 356 insertions(+), 141 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f224e1e..6a3959f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.FixedOrderMap;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -32,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.slf4j.Logger;
 
@@ -48,22 +50,30 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
+
 /**
  * This class is responsible for the initialization, restoration, closing, 
flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
  */
-public class GlobalStateManagerImpl extends AbstractStateManager implements 
GlobalStateManager {
+public class GlobalStateManagerImpl implements GlobalStateManager {
 private final Logger log;
+private final boolean eosEnabled;
 private final ProcessorTopology topology;
 private final Consumer globalConsumer;
+private final File baseDir;
 private final StateDirectory stateDirectory;
 private final Set globalStoreNames = new HashSet<>();
+private final FixedOrderMap> globalStores = 
new FixedOrderMap<>();
 private final StateRestoreListener stateRestoreListener;
-private InternalProcessorContext processorContext;
+private InternalProcessorContext globalProcessorContext;
 private final int retries;
 private final long retryBackoffMs;
 private final Duration pollTime;
 private final Set globalNonPersistentStoresTopics = new 
HashSet<>();
+private final OffsetCheckpoint checkpointFile;
+private final Map checkpointFileCache;
 
 public GlobalStateManagerImpl(final LogContext logContext,
   final ProcessorTopology topology,
@@ -71,7 +81,10 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
   final StateDirectory stateDirectory,
   final StateRestoreListener 
stateRestoreListener,
   final StreamsConfig config) {
-super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
+   

[kafka] branch trunk updated: KAFKA-5998: fix checkpointableOffsets handling (#7030)

2019-07-12 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 53b4ce5  KAFKA-5998: fix checkpointableOffsets handling (#7030)
53b4ce5 is described below

commit 53b4ce5c00d61be87962f603682873665155cec4
Author: John Roesler 
AuthorDate: Fri Jul 12 08:42:11 2019 -0500

KAFKA-5998: fix checkpointableOffsets handling (#7030)

fix checkpoint file warning by filtering checkpointable offsets per task
clean up state manager hierarchy to prevent similar bugs

Reviewers: Bruno Cadonna , Bill Bejeck 

---
 .../internals/GlobalStateManagerImpl.java  |  68 ---
 .../processor/internals/ProcessorStateManager.java | 203 ++---
 ...ractStateManager.java => StateManagerUtil.java} |  41 ++---
 .../streams/processor/internals/StreamTask.java|  11 +-
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../streams/state/internals/OffsetCheckpoint.java  |   5 +
 .../internals/GlobalStateManagerImplTest.java  |   8 +-
 .../processor/internals/MockChangelogReader.java   |   7 +-
 .../internals/ProcessorStateManagerTest.java   | 134 +-
 .../processor/internals/StandbyTaskTest.java   |   4 +-
 .../processor/internals/StreamTaskTest.java|   6 +-
 .../processor/internals/StreamThreadTest.java  |   2 +-
 .../processor/internals/TaskManagerTest.java   |   6 +-
 13 files changed, 356 insertions(+), 141 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f224e1e..6a3959f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.FixedOrderMap;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -32,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.slf4j.Logger;
 
@@ -48,22 +50,30 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
+
 /**
  * This class is responsible for the initialization, restoration, closing, 
flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
  */
-public class GlobalStateManagerImpl extends AbstractStateManager implements 
GlobalStateManager {
+public class GlobalStateManagerImpl implements GlobalStateManager {
 private final Logger log;
+private final boolean eosEnabled;
 private final ProcessorTopology topology;
 private final Consumer globalConsumer;
+private final File baseDir;
 private final StateDirectory stateDirectory;
 private final Set globalStoreNames = new HashSet<>();
+private final FixedOrderMap> globalStores = 
new FixedOrderMap<>();
 private final StateRestoreListener stateRestoreListener;
-private InternalProcessorContext processorContext;
+private InternalProcessorContext globalProcessorContext;
 private final int retries;
 private final long retryBackoffMs;
 private final Duration pollTime;
 private final Set globalNonPersistentStoresTopics = new 
HashSet<>();
+private final OffsetCheckpoint checkpointFile;
+private final Map checkpointFileCache;
 
 public GlobalStateManagerImpl(final LogContext logContext,
   final ProcessorTopology topology,
@@ -71,7 +81,10 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
   final StateDirectory stateDirectory,
   final StateRestoreListener 
stateRestoreListener,
   final StreamsConfig config) {
-super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
+