[kafka] branch trunk updated: KAFKA-8614; Consistent naming for IncrementalAlterConfig and AlterConfig responses (#7022)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ea814d7 KAFKA-8614; Consistent naming for IncrementalAlterConfig and AlterConfig responses (#7022) ea814d7 is described below commit ea814d78690940827e90950282f40a74ca45eb0f Author: Bob Barrett AuthorDate: Fri Jul 12 17:18:41 2019 -0700 KAFKA-8614; Consistent naming for IncrementalAlterConfig and AlterConfig responses (#7022) This patch changes the name of the `Resources` field of AlterConfigsResponse to `Responses`. This makes it consistent with AlterConfigsResponse, which has a differently-named but structurally-identical field. Tested with unit tests. Reviewers: Jason Gustafson --- .../common/requests/IncrementalAlterConfigsRequest.java| 4 ++-- .../common/requests/IncrementalAlterConfigsResponse.java | 14 +++--- .../resources/common/message/AlterConfigsResponse.json | 2 +- .../common/message/IncrementalAlterConfigsResponse.json| 2 +- .../apache/kafka/clients/admin/KafkaAdminClientTest.java | 8 .../apache/kafka/common/requests/RequestResponseTest.java | 4 ++-- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java index 3a87cdb..0c6c0b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java @@ -20,7 +20,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; -import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; @@ -80,7 +80,7 @@ public class IncrementalAlterConfigsRequest extends AbstractRequest { IncrementalAlterConfigsResponseData response = new IncrementalAlterConfigsResponseData(); ApiError apiError = ApiError.fromThrowable(e); for (AlterConfigsResource resource : data.resources()) { -response.responses().add(new AlterConfigsResourceResult() +response.responses().add(new AlterConfigsResourceResponse() .setResourceName(resource.resourceName()) .setResourceType(resource.resourceType()) .setErrorCode(apiError.error().code()) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java index 1e5aea1..46b1d53 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java @@ -19,7 +19,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; -import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -35,7 +35,7 @@ public class IncrementalAlterConfigsResponse extends AbstractResponse { IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData(); responseData.setThrottleTimeMs(requestThrottleMs); for (Map.Entry entry : results.entrySet()) { -responseData.responses().add(new AlterConfigsResourceResult(). +responseData.responses().add(new AlterConfigsResourceResponse(). setResourceName(entry.getKey().name()). setResourceType(entry.getKey().type().id()). setErrorCode(entry.getValue().error().code()). @@ -46,9 +46,9 @@ public class IncrementalAlterConfigsResponse extends AbstractResponse { public static Map fromResponseData(final IncrementalAlterConfigsResponseData data) { Map map = new HashMap<>(); -for (AlterConfigsResourceResult result : data.responses()) { -
[kafka] branch trunk updated: MINOR: Use dynamic port in `RestServerTest` (#7079)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1d873a9 MINOR: Use dynamic port in `RestServerTest` (#7079) 1d873a9 is described below commit 1d873a9de9ad98d9f3120fa2ff59c3372650775b Author: Jason Gustafson AuthorDate: Fri Jul 12 15:29:10 2019 -0700 MINOR: Use dynamic port in `RestServerTest` (#7079) We have seen some failures recently in `RestServerTest`. It's the usual problem with reliance on static ports. ``` Caused by: java.io.IOException: Failed to bind to 0.0.0.0/0.0.0.0:8083 at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:346) at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:308) at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80) at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:236) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) at org.eclipse.jetty.server.Server.doStart(Server.java:396) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) at org.apache.kafka.connect.runtime.rest.RestServer.initializeServer(RestServer.java:178) ... 56 more Caused by: java.net.BindException: Address already in use ``` This patch makes the chosen port dynamic. Reviewers: Ismael Juma --- .../java/org/apache/kafka/connect/runtime/rest/RestServerTest.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 3609fb3..a640b83 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -78,6 +78,7 @@ public class RestServerTest { workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); +workerProps.put(WorkerConfig.LISTENERS_CONFIG, "HTTP://localhost:0"); return workerProps; } @@ -105,6 +106,7 @@ public class RestServerTest { // Build listener from hostname and port configMap = new HashMap<>(baseWorkerProps()); +configMap.remove(WorkerConfig.LISTENERS_CONFIG); configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname"); configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080"); config = new DistributedConfig(configMap); @@ -115,7 +117,7 @@ public class RestServerTest { @SuppressWarnings("deprecation") @Test public void testAdvertisedUri() { -// Advertised URI from listeenrs without protocol +// Advertised URI from listeners without protocol Map configMap = new HashMap<>(baseWorkerProps()); configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443;); DistributedConfig config = new DistributedConfig(configMap); @@ -153,6 +155,7 @@ public class RestServerTest { // listener from hostname and port configMap = new HashMap<>(baseWorkerProps()); +configMap.remove(WorkerConfig.LISTENERS_CONFIG); configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname"); configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080"); config = new DistributedConfig(configMap);
[kafka] branch 2.2 updated: HOT FIX: close RocksDB objects in correct order (#7076)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new 74b672b HOT FIX: close RocksDB objects in correct order (#7076) 74b672b is described below commit 74b672b0fa11bd5989f1e1f2559686ed433999ed Author: A. Sophie Blee-Goldman AuthorDate: Fri Jul 12 12:43:46 2019 -0700 HOT FIX: close RocksDB objects in correct order (#7076) Reviewers: Bill Bejeck , Matthias J. Sax --- .../RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java | 3 ++- .../java/org/apache/kafka/streams/state/internals/RocksDBStore.java | 5 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java index c07e43b..ba83c02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java @@ -1356,7 +1356,8 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options @Override public void close() { -columnFamilyOptions.close(); +// ColumnFamilyOptions should be closed last dbOptions.close(); +columnFamilyOptions.close(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 109b8c3..ee62ec7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -374,11 +374,14 @@ public class RocksDBStore implements KeyValueStore { open = false; closeOpenIterators(); + +// Important: do not rearrange the order in which the below objects are closed! +// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions dbAccessor.close(); +db.close(); userSpecifiedOptions.close(); wOptions.close(); fOptions.close(); -db.close(); filter.close(); dbAccessor = null;
[kafka] branch 2.3 updated: HOT FIX: close RocksDB objects in correct order (#7076)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 5942700 HOT FIX: close RocksDB objects in correct order (#7076) 5942700 is described below commit 5942700a45363170c914f5a450bba0d9b3ca05d8 Author: A. Sophie Blee-Goldman AuthorDate: Fri Jul 12 12:43:46 2019 -0700 HOT FIX: close RocksDB objects in correct order (#7076) Reviewers: Bill Bejeck , Matthias J. Sax --- .../RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java | 3 ++- .../java/org/apache/kafka/streams/state/internals/RocksDBStore.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java index d28682a..5b892a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java @@ -1384,7 +1384,8 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options @Override public void close() { -columnFamilyOptions.close(); +// ColumnFamilyOptions should be closed last dbOptions.close(); +columnFamilyOptions.close(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 0643e64..bcc4372 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -402,11 +402,13 @@ public class RocksDBStore implements KeyValueStore, BulkLoadingSt configSetter = null; } +// Important: do not rearrange the order in which the below objects are closed! +// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions dbAccessor.close(); +db.close(); userSpecifiedOptions.close(); wOptions.close(); fOptions.close(); -db.close(); filter.close(); cache.close();
[kafka] branch trunk updated: HOT FIX: close RocksDB objects in correct order (#7076)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d2328a1 HOT FIX: close RocksDB objects in correct order (#7076) d2328a1 is described below commit d2328a1a0e2fa5f3592cd67b320fb8f223978659 Author: A. Sophie Blee-Goldman AuthorDate: Fri Jul 12 12:43:46 2019 -0700 HOT FIX: close RocksDB objects in correct order (#7076) Reviewers: Bill Bejeck , Matthias J. Sax --- .../RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java | 3 ++- .../java/org/apache/kafka/streams/state/internals/RocksDBStore.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java index d28682a..5b892a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java @@ -1384,7 +1384,8 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options @Override public void close() { -columnFamilyOptions.close(); +// ColumnFamilyOptions should be closed last dbOptions.close(); +columnFamilyOptions.close(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 0643e64..bcc4372 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -402,11 +402,13 @@ public class RocksDBStore implements KeyValueStore, BulkLoadingSt configSetter = null; } +// Important: do not rearrange the order in which the below objects are closed! +// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions dbAccessor.close(); +db.close(); userSpecifiedOptions.close(); wOptions.close(); fOptions.close(); -db.close(); filter.close(); cache.close();
[kafka] branch trunk updated: MINOR: Create a new topic for each test for flaky RegexSourceIntegrationTest (#6853)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ae4a975 MINOR: Create a new topic for each test for flaky RegexSourceIntegrationTest (#6853) ae4a975 is described below commit ae4a97543e990407096d9a1e63c77bab8c18649c Author: Bill Bejeck AuthorDate: Fri Jul 12 15:18:17 2019 -0400 MINOR: Create a new topic for each test for flaky RegexSourceIntegrationTest (#6853) The RegexSourceIntegrationTest has some flakiness as it deletes and re-creates the same output topic before each test. This PR reduces the chance for errors by creating a unique output topic for each test. Reviewers: Matthias J. Sax , Boyang Chen --- .../integration/RegexSourceIntegrationTest.java| 42 +- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index f74487b..10e0650 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; @@ -90,11 +91,12 @@ public class RegexSourceIntegrationTest { private static final String PARTITIONED_TOPIC_1 = "partitioned-1"; private static final String PARTITIONED_TOPIC_2 = "partitioned-2"; -private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); private Properties streamsConfiguration; private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated"; private KafkaStreams streams; +private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0); +private String outputTopic; @BeforeClass @@ -107,16 +109,14 @@ public class RegexSourceIntegrationTest { TOPIC_Y, TOPIC_Z, FA_TOPIC, -FOO_TOPIC, -DEFAULT_OUTPUT_TOPIC); +FOO_TOPIC); CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1); CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1); } @Before -public void setUp() throws Exception { -CLUSTER.deleteAndRecreateTopics(DEFAULT_OUTPUT_TOPIC); - +public void setUp() throws InterruptedException { +outputTopic = createTopic(topicSuffixGenerator.incrementAndGet()); final Properties properties = new Properties(); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); @@ -141,6 +141,7 @@ public class RegexSourceIntegrationTest { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { + final Serde stringSerde = Serdes.String(); final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); @@ -151,7 +152,7 @@ public class RegexSourceIntegrationTest { final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); -pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); +pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); final List assignedTopics = new CopyOnWriteArrayList<>(); streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { @Override @@ -175,6 +176,12 @@ public class RegexSourceIntegrationTest { } +private String createTopic(final int suffix) throws InterruptedException { +final String outputTopic = "outputTopic_" + suffix; +CLUSTER.createTopic(outputTopic); +return outputTopic; +} + @Test public void testRegexMatchesTopicsAWhenDeleted() throws Exception { @@ -188,7 +195,7 @@ public class RegexSourceIntegrationTest { final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")); -pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); +pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); final List assignedTopics = new CopyOnWriteArrayList<>(); streams = new KafkaStreams(builder.build(), streamsConfiguration, new
[kafka] branch 1.0 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new de8663a KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) de8663a is described below commit de8663a0835d6d60a5d601626a9c301530acfed3 Author: Robert Yokota AuthorDate: Fri Jul 12 10:12:20 2019 -0700 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) Fix handling of nulls in TimestampConverter. Authors: Valeria Vasylieva , Robert Yokota Reviewers: Arjun Satish , Randall Hauch --- .../connect/transforms/TimestampConverter.java | 52 +++-- .../connect/transforms/TimestampConverterTest.java | 233 +++-- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index 8557441..f32253e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -47,7 +47,7 @@ import java.util.Set; import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class TimestampConverter> implements Transformation { @@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); +public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); +public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); +public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements /** * Get the schema for this format. */ -Schema typeSchema(); +Schema typeSchema(boolean isOptional); /** * Convert from the universal java.util.Date format to the type-specific format @@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.STRING_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override @@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.INT64_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override @@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return org.apache.kafka.connect.data.Date.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override @@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Time.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override @@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Timestamp.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override @@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements if (config.field.isEmpty()) { Object value = operatingValue(record); // New schema is determined by the requested target timestamp type -Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); +Schema updatedSchema =
[kafka] branch 1.1 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new c05ed1e KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) c05ed1e is described below commit c05ed1eae466ef8afd8d67022d206d7d9bb24838 Author: Robert Yokota AuthorDate: Fri Jul 12 10:12:20 2019 -0700 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) Fix handling of nulls in TimestampConverter. Authors: Valeria Vasylieva , Robert Yokota Reviewers: Arjun Satish , Randall Hauch --- .../connect/transforms/TimestampConverter.java | 52 +++-- .../connect/transforms/TimestampConverterTest.java | 233 +++-- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index 8557441..f32253e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -47,7 +47,7 @@ import java.util.Set; import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class TimestampConverter> implements Transformation { @@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); +public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); +public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); +public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements /** * Get the schema for this format. */ -Schema typeSchema(); +Schema typeSchema(boolean isOptional); /** * Convert from the universal java.util.Date format to the type-specific format @@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.STRING_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override @@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.INT64_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override @@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return org.apache.kafka.connect.data.Date.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override @@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Time.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override @@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Timestamp.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override @@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements if (config.field.isEmpty()) { Object value = operatingValue(record); // New schema is determined by the requested target timestamp type -Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); +Schema updatedSchema =
[kafka] branch 2.0 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 2e9d140 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) 2e9d140 is described below commit 2e9d140a655ee9d8b9951b4ce0850b486ef69029 Author: Robert Yokota AuthorDate: Fri Jul 12 10:12:20 2019 -0700 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) Fix handling of nulls in TimestampConverter. Authors: Valeria Vasylieva , Robert Yokota Reviewers: Arjun Satish , Randall Hauch --- .../connect/transforms/TimestampConverter.java | 52 +++-- .../connect/transforms/TimestampConverterTest.java | 233 +++-- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index 8557441..f32253e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -47,7 +47,7 @@ import java.util.Set; import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class TimestampConverter> implements Transformation { @@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); +public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); +public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); +public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements /** * Get the schema for this format. */ -Schema typeSchema(); +Schema typeSchema(boolean isOptional); /** * Convert from the universal java.util.Date format to the type-specific format @@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.STRING_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override @@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.INT64_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override @@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return org.apache.kafka.connect.data.Date.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override @@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Time.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override @@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Timestamp.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override @@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements if (config.field.isEmpty()) { Object value = operatingValue(record); // New schema is determined by the requested target timestamp type -Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); +Schema updatedSchema =
[kafka] branch 2.1 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new ccc751f KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) ccc751f is described below commit ccc751f3b9a798db401a0740be36004df30f9ab2 Author: Robert Yokota AuthorDate: Fri Jul 12 10:12:20 2019 -0700 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) Fix handling of nulls in TimestampConverter. Authors: Valeria Vasylieva , Robert Yokota Reviewers: Arjun Satish , Randall Hauch --- .../connect/transforms/TimestampConverter.java | 52 +++-- .../connect/transforms/TimestampConverterTest.java | 233 +++-- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index 8557441..f32253e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -47,7 +47,7 @@ import java.util.Set; import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class TimestampConverter> implements Transformation { @@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); +public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); +public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); +public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements /** * Get the schema for this format. */ -Schema typeSchema(); +Schema typeSchema(boolean isOptional); /** * Convert from the universal java.util.Date format to the type-specific format @@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.STRING_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override @@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.INT64_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override @@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return org.apache.kafka.connect.data.Date.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override @@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Time.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override @@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Timestamp.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override @@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements if (config.field.isEmpty()) { Object value = operatingValue(record); // New schema is determined by the requested target timestamp type -Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); +Schema updatedSchema =
[kafka] branch 2.2 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new 10a72da KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) 10a72da is described below commit 10a72da77a264cec3ba726d33994e8001064bb0f Author: Robert Yokota AuthorDate: Fri Jul 12 10:12:20 2019 -0700 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) Fix handling of nulls in TimestampConverter. Authors: Valeria Vasylieva , Robert Yokota Reviewers: Arjun Satish , Randall Hauch --- .../connect/transforms/TimestampConverter.java | 52 +++-- .../connect/transforms/TimestampConverterTest.java | 233 +++-- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index 8557441..f32253e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -47,7 +47,7 @@ import java.util.Set; import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class TimestampConverter> implements Transformation { @@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); +public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); +public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); +public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements /** * Get the schema for this format. */ -Schema typeSchema(); +Schema typeSchema(boolean isOptional); /** * Convert from the universal java.util.Date format to the type-specific format @@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.STRING_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override @@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.INT64_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override @@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return org.apache.kafka.connect.data.Date.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override @@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Time.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override @@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Timestamp.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override @@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements if (config.field.isEmpty()) { Object value = operatingValue(record); // New schema is determined by the requested target timestamp type -Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); +Schema updatedSchema =
[kafka] branch 1.1 updated: KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized (#7071)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 8c93b7e KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized (#7071) 8c93b7e is described below commit 8c93b7ecd651c17e61d89fdb533b8ec4d1b0bfd7 Author: Dhruvil Shah AuthorDate: Fri Jul 12 10:28:25 2019 -0700 KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized (#7071) Backport https://github.com/apache/kafka/pull/6974 to 1.1 When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using: ``` private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) { return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16); } ``` This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches. Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format. Reviewers: Jason Gustafson --- .../kafka/common/record/AbstractRecords.java | 1 + .../kafka/common/record/FileRecordsTest.java | 36 ++ 2 files changed, 37 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 89a5413..0552e6b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -104,6 +104,7 @@ public abstract class AbstractRecords implements Records { for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) { temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes(); if (recordBatchAndRecords.batch.magic() <= toMagic) { +buffer = Utils.ensureCapacity(buffer, buffer.position() + recordBatchAndRecords.batch.sizeInBytes()); recordBatchAndRecords.batch.writeTo(buffer); } else { MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords); diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index fdd3ede..d1bf3d3 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Random; import static java.util.Arrays.asList; import static org.apache.kafka.test.TestUtils.tempFile; @@ -359,6 +360,41 @@ public class FileRecordsTest { doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V2); } +@Test +public void testDownconversionAfterMessageFormatDowngrade() throws IOException { +// random bytes +Random random = new Random(); +byte[] bytes = new byte[3000]; +random.nextBytes(bytes); + +// records +CompressionType compressionType = CompressionType.GZIP; +List offsets = asList(0L, 1L); +List magic = asList(RecordBatch.MAGIC_VALUE_V2, RecordBatch.MAGIC_VALUE_V1); // downgrade message format from v2 to v1 +List records = asList( +new SimpleRecord(1L, "k1".getBytes(), bytes), +new SimpleRecord(2L, "k2".getBytes(), bytes)); +byte toMagic = 1; + +// create MemoryRecords +ByteBuffer buffer = ByteBuffer.allocate(8000); +for (int i = 0; i < records.size(); i++) { +MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic.get(i), compressionType, TimestampType.CREATE_TIME, 0L); +builder.appendWithOffset(offsets.get(i), records.get(i)); +builder.close(); +} +buffer.flip(); + +// create FileRecords, down-convert and verify +try (FileRecords fileRecords =
[kafka] branch 2.1 updated: KAFKA-8637: WriteBatch objects leak off-heap memory
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new b5ea03e KAFKA-8637: WriteBatch objects leak off-heap memory b5ea03e is described below commit b5ea03e0dee01d610c2d92b1436fd80d9d3dab7b Author: A. Sophie Blee-Goldman AuthorDate: Fri Jul 12 10:19:50 2019 -0700 KAFKA-8637: WriteBatch objects leak off-heap memory Reviewers: Bill Bejeck , Matthias J. Sax --- .../apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index c4fce72..0669446 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -212,6 +212,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { final Segment segment = entry.getKey(); final WriteBatch batch = entry.getValue(); segment.write(batch); +batch.close(); } } catch (final RocksDBException e) { throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
[kafka] branch 2.2 updated: KAFKA-8637: WriteBatch objects leak off-heap memory
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new ca66c1e KAFKA-8637: WriteBatch objects leak off-heap memory ca66c1e is described below commit ca66c1eef7fff184155d30945cc802d158211e85 Author: A. Sophie Blee-Goldman AuthorDate: Fri Jul 12 10:19:50 2019 -0700 KAFKA-8637: WriteBatch objects leak off-heap memory Reviewers: Bill Bejeck , Matthias J. Sax --- .../apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 0ed4e9d..cef8a34 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -211,6 +211,7 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore { final KeyValueSegment segment = entry.getKey(); final WriteBatch batch = entry.getValue(); segment.write(batch); +batch.close(); } } catch (final RocksDBException e) { throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
[kafka] branch 2.3 updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 813bf6c KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) 813bf6c is described below commit 813bf6c31ee84f66f517cfb6f44ca8abce37dcb9 Author: Robert Yokota AuthorDate: Fri Jul 12 10:12:20 2019 -0700 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) Fix handling of nulls in TimestampConverter. Authors: Valeria Vasylieva , Robert Yokota Reviewers: Arjun Satish , Randall Hauch --- .../connect/transforms/TimestampConverter.java | 52 +++-- .../connect/transforms/TimestampConverterTest.java | 233 +++-- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index 8557441..f32253e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -47,7 +47,7 @@ import java.util.Set; import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class TimestampConverter> implements Transformation { @@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); +public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); +public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); +public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements /** * Get the schema for this format. */ -Schema typeSchema(); +Schema typeSchema(boolean isOptional); /** * Convert from the universal java.util.Date format to the type-specific format @@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.STRING_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override @@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.INT64_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override @@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return org.apache.kafka.connect.data.Date.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override @@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Time.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override @@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Timestamp.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override @@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements if (config.field.isEmpty()) { Object value = operatingValue(record); // New schema is determined by the requested target timestamp type -Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); +Schema updatedSchema =
[kafka] branch trunk updated: KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new fa042bc KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) fa042bc is described below commit fa042bc491cf8b1e8ae0e5b9f7996564ba886d3d Author: Robert Yokota AuthorDate: Fri Jul 12 10:12:20 2019 -0700 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) Fix handling of nulls in TimestampConverter. Authors: Valeria Vasylieva , Robert Yokota Reviewers: Arjun Satish , Randall Hauch --- .../connect/transforms/TimestampConverter.java | 52 +++-- .../connect/transforms/TimestampConverterTest.java | 233 +++-- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index 8557441..f32253e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -47,7 +47,7 @@ import java.util.Set; import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class TimestampConverter> implements Transformation { @@ -85,6 +85,10 @@ public abstract class TimestampConverter> implements private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); +public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); +public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); +public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -94,7 +98,7 @@ public abstract class TimestampConverter> implements /** * Get the schema for this format. */ -Schema typeSchema(); +Schema typeSchema(boolean isOptional); /** * Convert from the universal java.util.Date format to the type-specific format @@ -118,8 +122,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.STRING_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override @@ -139,8 +143,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Schema.INT64_SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override @@ -159,8 +163,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return org.apache.kafka.connect.data.Date.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override @@ -185,8 +189,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Time.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override @@ -212,8 +216,8 @@ public abstract class TimestampConverter> implements } @Override -public Schema typeSchema() { -return Timestamp.SCHEMA; +public Schema typeSchema(boolean isOptional) { +return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override @@ -330,16 +334,16 @@ public abstract class TimestampConverter> implements if (config.field.isEmpty()) { Object value = operatingValue(record); // New schema is determined by the requested target timestamp type -Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); +Schema updatedSchema =
[kafka] branch 1.0 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new c6ffe50 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) c6ffe50 is described below commit c6ffe50c6b857c9685c39808421fe729bdde4d92 Author: Michał Borowiecki AuthorDate: Fri Jul 12 16:27:33 2019 +0100 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) Correct the Flatten SMT to properly handle null key or value `Struct` instances. Author: Michal Borowiecki Reviewers: Arjun Satish , Robert Yokota , Randall Hauch --- .../apache/kafka/connect/transforms/Flatten.java | 29 ++-- .../kafka/connect/transforms/FlattenTest.java | 40 ++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java index c5e4000..d7d2144 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java @@ -35,7 +35,7 @@ import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class Flatten> implements Transformation { @@ -136,20 +136,24 @@ public abstract class Flatten> implements Transformat } private R applyWithSchema(R record) { -final Struct value = requireStruct(operatingValue(record), PURPOSE); +final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); -Schema updatedSchema = schemaUpdateCache.get(value.schema()); +Schema schema = operatingSchema(record); +Schema updatedSchema = schemaUpdateCache.get(schema); if (updatedSchema == null) { -final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct()); -Struct defaultValue = (Struct) value.schema().defaultValue(); -buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(), defaultValue); +final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); +Struct defaultValue = (Struct) schema.defaultValue(); +buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue); updatedSchema = builder.build(); -schemaUpdateCache.put(value.schema(), updatedSchema); +schemaUpdateCache.put(schema, updatedSchema); +} +if (value == null) { +return newRecord(record, updatedSchema, null); +} else { +final Struct updatedValue = new Struct(updatedSchema); +buildWithSchema(value, "", updatedValue); +return newRecord(record, updatedSchema, updatedValue); } - -final Struct updatedValue = new Struct(updatedSchema); -buildWithSchema(value, "", updatedValue); -return newRecord(record, updatedSchema, updatedValue); } /** @@ -216,6 +220,9 @@ public abstract class Flatten> implements Transformat } private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord) { +if (record == null) { +return; +} for (Field field : record.schema().fields()) { final String fieldName = fieldName(fieldNamePrefix, field.name()); switch (field.schema().type()) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index d709054..430bba6 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -182,6 +182,46 @@ public class FlattenTest { } @Test +public void testOptionalStruct() { +xformValue.configure(Collections.emptyMap()); + +SchemaBuilder builder = SchemaBuilder.struct().optional(); +builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); +Schema schema = builder.build(); + +SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, +"topic", 0, +schema, null)); + +assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); +assertNull(transformed.value()); +} + +@Test +public void testOptionalNestedStruct() { +
[kafka] branch 2.0 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 842dacd KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) 842dacd is described below commit 842dacda8c12b89546e2666fed5a6e6a513093c4 Author: Michał Borowiecki AuthorDate: Fri Jul 12 16:27:33 2019 +0100 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) Correct the Flatten SMT to properly handle null key or value `Struct` instances. Author: Michal Borowiecki Reviewers: Arjun Satish , Robert Yokota , Randall Hauch --- .../apache/kafka/connect/transforms/Flatten.java | 29 ++-- .../kafka/connect/transforms/FlattenTest.java | 40 ++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java index c5e4000..d7d2144 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java @@ -35,7 +35,7 @@ import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class Flatten> implements Transformation { @@ -136,20 +136,24 @@ public abstract class Flatten> implements Transformat } private R applyWithSchema(R record) { -final Struct value = requireStruct(operatingValue(record), PURPOSE); +final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); -Schema updatedSchema = schemaUpdateCache.get(value.schema()); +Schema schema = operatingSchema(record); +Schema updatedSchema = schemaUpdateCache.get(schema); if (updatedSchema == null) { -final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct()); -Struct defaultValue = (Struct) value.schema().defaultValue(); -buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(), defaultValue); +final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); +Struct defaultValue = (Struct) schema.defaultValue(); +buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue); updatedSchema = builder.build(); -schemaUpdateCache.put(value.schema(), updatedSchema); +schemaUpdateCache.put(schema, updatedSchema); +} +if (value == null) { +return newRecord(record, updatedSchema, null); +} else { +final Struct updatedValue = new Struct(updatedSchema); +buildWithSchema(value, "", updatedValue); +return newRecord(record, updatedSchema, updatedValue); } - -final Struct updatedValue = new Struct(updatedSchema); -buildWithSchema(value, "", updatedValue); -return newRecord(record, updatedSchema, updatedValue); } /** @@ -216,6 +220,9 @@ public abstract class Flatten> implements Transformat } private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord) { +if (record == null) { +return; +} for (Field field : record.schema().fields()) { final String fieldName = fieldName(fieldNamePrefix, field.name()); switch (field.schema().type()) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index d709054..430bba6 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -182,6 +182,46 @@ public class FlattenTest { } @Test +public void testOptionalStruct() { +xformValue.configure(Collections.emptyMap()); + +SchemaBuilder builder = SchemaBuilder.struct().optional(); +builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); +Schema schema = builder.build(); + +SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, +"topic", 0, +schema, null)); + +assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); +assertNull(transformed.value()); +} + +@Test +public void testOptionalNestedStruct() { +
[kafka] branch 2.1 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new c651486 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) c651486 is described below commit c6514869b263fd0d8e8db0b74cbc21859bcbe7a6 Author: Michał Borowiecki AuthorDate: Fri Jul 12 16:27:33 2019 +0100 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) Correct the Flatten SMT to properly handle null key or value `Struct` instances. Author: Michal Borowiecki Reviewers: Arjun Satish , Robert Yokota , Randall Hauch --- .../apache/kafka/connect/transforms/Flatten.java | 29 ++-- .../kafka/connect/transforms/FlattenTest.java | 40 ++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java index c5e4000..d7d2144 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java @@ -35,7 +35,7 @@ import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class Flatten> implements Transformation { @@ -136,20 +136,24 @@ public abstract class Flatten> implements Transformat } private R applyWithSchema(R record) { -final Struct value = requireStruct(operatingValue(record), PURPOSE); +final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); -Schema updatedSchema = schemaUpdateCache.get(value.schema()); +Schema schema = operatingSchema(record); +Schema updatedSchema = schemaUpdateCache.get(schema); if (updatedSchema == null) { -final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct()); -Struct defaultValue = (Struct) value.schema().defaultValue(); -buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(), defaultValue); +final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); +Struct defaultValue = (Struct) schema.defaultValue(); +buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue); updatedSchema = builder.build(); -schemaUpdateCache.put(value.schema(), updatedSchema); +schemaUpdateCache.put(schema, updatedSchema); +} +if (value == null) { +return newRecord(record, updatedSchema, null); +} else { +final Struct updatedValue = new Struct(updatedSchema); +buildWithSchema(value, "", updatedValue); +return newRecord(record, updatedSchema, updatedValue); } - -final Struct updatedValue = new Struct(updatedSchema); -buildWithSchema(value, "", updatedValue); -return newRecord(record, updatedSchema, updatedValue); } /** @@ -216,6 +220,9 @@ public abstract class Flatten> implements Transformat } private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord) { +if (record == null) { +return; +} for (Field field : record.schema().fields()) { final String fieldName = fieldName(fieldNamePrefix, field.name()); switch (field.schema().type()) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index d709054..430bba6 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -182,6 +182,46 @@ public class FlattenTest { } @Test +public void testOptionalStruct() { +xformValue.configure(Collections.emptyMap()); + +SchemaBuilder builder = SchemaBuilder.struct().optional(); +builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); +Schema schema = builder.build(); + +SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, +"topic", 0, +schema, null)); + +assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); +assertNull(transformed.value()); +} + +@Test +public void testOptionalNestedStruct() { +
[kafka] branch 2.2 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new 1fa3e97 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) 1fa3e97 is described below commit 1fa3e979237f5cede646a07651e07beaa2ad9376 Author: Michał Borowiecki AuthorDate: Fri Jul 12 16:27:33 2019 +0100 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) Correct the Flatten SMT to properly handle null key or value `Struct` instances. Author: Michal Borowiecki Reviewers: Arjun Satish , Robert Yokota , Randall Hauch --- .../apache/kafka/connect/transforms/Flatten.java | 29 ++-- .../kafka/connect/transforms/FlattenTest.java | 40 ++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java index c5e4000..d7d2144 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java @@ -35,7 +35,7 @@ import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class Flatten> implements Transformation { @@ -136,20 +136,24 @@ public abstract class Flatten> implements Transformat } private R applyWithSchema(R record) { -final Struct value = requireStruct(operatingValue(record), PURPOSE); +final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); -Schema updatedSchema = schemaUpdateCache.get(value.schema()); +Schema schema = operatingSchema(record); +Schema updatedSchema = schemaUpdateCache.get(schema); if (updatedSchema == null) { -final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct()); -Struct defaultValue = (Struct) value.schema().defaultValue(); -buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(), defaultValue); +final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); +Struct defaultValue = (Struct) schema.defaultValue(); +buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue); updatedSchema = builder.build(); -schemaUpdateCache.put(value.schema(), updatedSchema); +schemaUpdateCache.put(schema, updatedSchema); +} +if (value == null) { +return newRecord(record, updatedSchema, null); +} else { +final Struct updatedValue = new Struct(updatedSchema); +buildWithSchema(value, "", updatedValue); +return newRecord(record, updatedSchema, updatedValue); } - -final Struct updatedValue = new Struct(updatedSchema); -buildWithSchema(value, "", updatedValue); -return newRecord(record, updatedSchema, updatedValue); } /** @@ -216,6 +220,9 @@ public abstract class Flatten> implements Transformat } private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord) { +if (record == null) { +return; +} for (Field field : record.schema().fields()) { final String fieldName = fieldName(fieldNamePrefix, field.name()); switch (field.schema().type()) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index b0549fb..2e7be95 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -183,6 +183,46 @@ public class FlattenTest { } @Test +public void testOptionalStruct() { +xformValue.configure(Collections.emptyMap()); + +SchemaBuilder builder = SchemaBuilder.struct().optional(); +builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); +Schema schema = builder.build(); + +SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, +"topic", 0, +schema, null)); + +assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); +assertNull(transformed.value()); +} + +@Test +public void testOptionalNestedStruct() { +
[kafka] branch 2.3 updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 2400f72 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) 2400f72 is described below commit 2400f729a1069393503b8657cb69ed524c466ff4 Author: Michał Borowiecki AuthorDate: Fri Jul 12 16:27:33 2019 +0100 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) Correct the Flatten SMT to properly handle null key or value `Struct` instances. Author: Michal Borowiecki Reviewers: Arjun Satish , Robert Yokota , Randall Hauch --- .../apache/kafka/connect/transforms/Flatten.java | 29 ++-- .../kafka/connect/transforms/FlattenTest.java | 40 ++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java index c5e4000..d7d2144 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java @@ -35,7 +35,7 @@ import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class Flatten> implements Transformation { @@ -136,20 +136,24 @@ public abstract class Flatten> implements Transformat } private R applyWithSchema(R record) { -final Struct value = requireStruct(operatingValue(record), PURPOSE); +final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); -Schema updatedSchema = schemaUpdateCache.get(value.schema()); +Schema schema = operatingSchema(record); +Schema updatedSchema = schemaUpdateCache.get(schema); if (updatedSchema == null) { -final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct()); -Struct defaultValue = (Struct) value.schema().defaultValue(); -buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(), defaultValue); +final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); +Struct defaultValue = (Struct) schema.defaultValue(); +buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue); updatedSchema = builder.build(); -schemaUpdateCache.put(value.schema(), updatedSchema); +schemaUpdateCache.put(schema, updatedSchema); +} +if (value == null) { +return newRecord(record, updatedSchema, null); +} else { +final Struct updatedValue = new Struct(updatedSchema); +buildWithSchema(value, "", updatedValue); +return newRecord(record, updatedSchema, updatedValue); } - -final Struct updatedValue = new Struct(updatedSchema); -buildWithSchema(value, "", updatedValue); -return newRecord(record, updatedSchema, updatedValue); } /** @@ -216,6 +220,9 @@ public abstract class Flatten> implements Transformat } private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord) { +if (record == null) { +return; +} for (Field field : record.schema().fields()) { final String fieldName = fieldName(fieldNamePrefix, field.name()); switch (field.schema().type()) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index b0549fb..2e7be95 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -183,6 +183,46 @@ public class FlattenTest { } @Test +public void testOptionalStruct() { +xformValue.configure(Collections.emptyMap()); + +SchemaBuilder builder = SchemaBuilder.struct().optional(); +builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); +Schema schema = builder.build(); + +SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, +"topic", 0, +schema, null)); + +assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); +assertNull(transformed.value()); +} + +@Test +public void testOptionalNestedStruct() { +
[kafka] branch trunk updated: KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new fc4fea6 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) fc4fea6 is described below commit fc4fea6761986749f0ac640868d9b4d2a552eb62 Author: Michał Borowiecki AuthorDate: Fri Jul 12 16:27:33 2019 +0100 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) Correct the Flatten SMT to properly handle null key or value `Struct` instances. Author: Michal Borowiecki Reviewers: Arjun Satish , Robert Yokota , Randall Hauch --- .../apache/kafka/connect/transforms/Flatten.java | 29 ++-- .../kafka/connect/transforms/FlattenTest.java | 40 ++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java index c5e4000..d7d2144 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java @@ -35,7 +35,7 @@ import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class Flatten> implements Transformation { @@ -136,20 +136,24 @@ public abstract class Flatten> implements Transformat } private R applyWithSchema(R record) { -final Struct value = requireStruct(operatingValue(record), PURPOSE); +final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); -Schema updatedSchema = schemaUpdateCache.get(value.schema()); +Schema schema = operatingSchema(record); +Schema updatedSchema = schemaUpdateCache.get(schema); if (updatedSchema == null) { -final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct()); -Struct defaultValue = (Struct) value.schema().defaultValue(); -buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(), defaultValue); +final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); +Struct defaultValue = (Struct) schema.defaultValue(); +buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue); updatedSchema = builder.build(); -schemaUpdateCache.put(value.schema(), updatedSchema); +schemaUpdateCache.put(schema, updatedSchema); +} +if (value == null) { +return newRecord(record, updatedSchema, null); +} else { +final Struct updatedValue = new Struct(updatedSchema); +buildWithSchema(value, "", updatedValue); +return newRecord(record, updatedSchema, updatedValue); } - -final Struct updatedValue = new Struct(updatedSchema); -buildWithSchema(value, "", updatedValue); -return newRecord(record, updatedSchema, updatedValue); } /** @@ -216,6 +220,9 @@ public abstract class Flatten> implements Transformat } private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord) { +if (record == null) { +return; +} for (Field field : record.schema().fields()) { final String fieldName = fieldName(fieldNamePrefix, field.name()); switch (field.schema().type()) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index b0549fb..2e7be95 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -183,6 +183,46 @@ public class FlattenTest { } @Test +public void testOptionalStruct() { +xformValue.configure(Collections.emptyMap()); + +SchemaBuilder builder = SchemaBuilder.struct().optional(); +builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); +Schema schema = builder.build(); + +SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, +"topic", 0, +schema, null)); + +assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); +assertNull(transformed.value()); +} + +@Test +public void testOptionalNestedStruct() { +
[kafka] branch 1.1 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new cf3f9ac Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) cf3f9ac is described below commit cf3f9ac4ea1bc40f014ae93a6dc197f60834ff66 Author: slim AuthorDate: Fri Jul 12 18:06:05 2019 +0300 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) Minor fix of #8198 apache/kafka-site#210 Reviewers: John Roesler , Bill Bejeck --- docs/streams/developer-guide/testing.html | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index e6886a1..c3dc0a8 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -76,12 +76,14 @@ TopologyTestDriver testDriver = new TopologyTestDriver(topology, config); ConsumerRecordFactoryString, Integer factory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new IntegerSerializer()); -testDriver.pipe(factory.create("key", 42L)); - - -To verify the output, the test driver produces ProducerRecords with key and value type byte[]. -For result verification, you can specify corresponding deserializers when reading the output record from the driver. - +testDriver.pipeInput(factory.create("key", 42L)); + + +To verify the output, the test driver produces ProducerRecords with key and value type +byte[]. +For result verification, you can specify corresponding deserializers when reading the output record from +the driver. + ProducerRecordString, Integer outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
[kafka] branch 2.0 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 8cc43fa Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) 8cc43fa is described below commit 8cc43fa313a97d929eca61e45a887cf802e4e0ac Author: slim AuthorDate: Fri Jul 12 18:06:05 2019 +0300 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) Minor fix of #8198 apache/kafka-site#210 Reviewers: John Roesler , Bill Bejeck --- docs/streams/developer-guide/testing.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index bdecc43..9ede7ea 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); ConsumerRecordFactoryString, Integer factory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new IntegerSerializer()); -testDriver.pipe(factory.create("key", 42L)); +testDriver.pipeInput(factory.create("key", 42L)); To verify the output, the test driver produces ProducerRecords with key and value type
[kafka] branch 2.1 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new ea8baf5 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) ea8baf5 is described below commit ea8baf54fd2eb693397d0670ea475a5e07396eff Author: slim AuthorDate: Fri Jul 12 18:06:05 2019 +0300 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) Minor fix of #8198 apache/kafka-site#210 Reviewers: John Roesler , Bill Bejeck --- docs/streams/developer-guide/testing.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index 026b02b..55a89b0 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); ConsumerRecordFactoryString, Integer factory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new IntegerSerializer()); -testDriver.pipe(factory.create("key", 42L)); +testDriver.pipeInput(factory.create("key", 42L)); To verify the output, the test driver produces ProducerRecords with key and value type
[kafka] branch 2.2 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new f2ae6e3 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) f2ae6e3 is described below commit f2ae6e3ca900d963cad46ee4adfbdc928ee68d68 Author: slim AuthorDate: Fri Jul 12 18:06:05 2019 +0300 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) Minor fix of #8198 apache/kafka-site#210 Reviewers: John Roesler , Bill Bejeck --- docs/streams/developer-guide/testing.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index 026b02b..55a89b0 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); ConsumerRecordFactoryString, Integer factory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new IntegerSerializer()); -testDriver.pipe(factory.create("key", 42L)); +testDriver.pipeInput(factory.create("key", 42L)); To verify the output, the test driver produces ProducerRecords with key and value type
[kafka] branch 2.3 updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 62617b9 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) 62617b9 is described below commit 62617b93d13215716600aa498f6806978be07c30 Author: slim AuthorDate: Fri Jul 12 18:06:05 2019 +0300 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) Minor fix of #8198 apache/kafka-site#210 Reviewers: John Roesler , Bill Bejeck --- docs/streams/developer-guide/testing.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index 026b02b..55a89b0 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); ConsumerRecordFactoryString, Integer factory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new IntegerSerializer()); -testDriver.pipe(factory.create("key", 42L)); +testDriver.pipeInput(factory.create("key", 42L)); To verify the output, the test driver produces ProducerRecords with key and value type
[kafka] branch trunk updated: Fixes #8198 KStreams testing docs use non-existent method pipe (#6678)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8cabd44 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) 8cabd44 is described below commit 8cabd44e1d35d3cf3f865d8a4eee8c04228e0d75 Author: slim AuthorDate: Fri Jul 12 18:06:05 2019 +0300 Fixes #8198 KStreams testing docs use non-existent method pipe (#6678) Minor fix of #8198 apache/kafka-site#210 Reviewers: John Roesler , Bill Bejeck --- docs/streams/developer-guide/testing.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index 026b02b..55a89b0 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -99,7 +99,7 @@ TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); ConsumerRecordFactoryString, Integer factory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new IntegerSerializer()); -testDriver.pipe(factory.create("key", 42L)); +testDriver.pipeInput(factory.create("key", 42L)); To verify the output, the test driver produces ProducerRecords with key and value type
[kafka] branch 2.2 updated: KAFKA-5998: fix checkpointableOffsets handling (#7030)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new db98c6f KAFKA-5998: fix checkpointableOffsets handling (#7030) db98c6f is described below commit db98c6f8cdeb27e7dd858cd37499c66458064e5a Author: John Roesler AuthorDate: Fri Jul 12 08:42:11 2019 -0500 KAFKA-5998: fix checkpointableOffsets handling (#7030) fix checkpoint file warning by filtering checkpointable offsets per task clean up state manager hierarchy to prevent similar bugs Reviewers: Bruno Cadonna , Bill Bejeck --- .../internals/GlobalStateManagerImpl.java | 68 --- .../processor/internals/ProcessorStateManager.java | 203 ++--- ...ractStateManager.java => StateManagerUtil.java} | 41 ++--- .../streams/processor/internals/StreamTask.java| 11 +- .../streams/processor/internals/TaskManager.java | 2 +- .../streams/state/internals/OffsetCheckpoint.java | 5 + .../internals/GlobalStateManagerImplTest.java | 8 +- .../processor/internals/MockChangelogReader.java | 7 +- .../internals/ProcessorStateManagerTest.java | 134 +- .../processor/internals/StandbyTaskTest.java | 4 +- .../processor/internals/StreamTaskTest.java| 6 +- .../processor/internals/StreamThreadTest.java | 2 +- .../processor/internals/TaskManagerTest.java | 6 +- 13 files changed, 356 insertions(+), 141 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index f224e1e..6a3959f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.FixedOrderMap; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -32,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; @@ -48,22 +50,30 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; + /** * This class is responsible for the initialization, restoration, closing, flushing etc * of Global State Stores. There is only ever 1 instance of this class per Application Instance. */ -public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager { +public class GlobalStateManagerImpl implements GlobalStateManager { private final Logger log; +private final boolean eosEnabled; private final ProcessorTopology topology; private final Consumer globalConsumer; +private final File baseDir; private final StateDirectory stateDirectory; private final Set globalStoreNames = new HashSet<>(); +private final FixedOrderMap> globalStores = new FixedOrderMap<>(); private final StateRestoreListener stateRestoreListener; -private InternalProcessorContext processorContext; +private InternalProcessorContext globalProcessorContext; private final int retries; private final long retryBackoffMs; private final Duration pollTime; private final Set globalNonPersistentStoresTopics = new HashSet<>(); +private final OffsetCheckpoint checkpointFile; +private final Map checkpointFileCache; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -71,7 +81,10 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, final StreamsConfig config) { -super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))); +
[kafka] branch 2.3 updated: KAFKA-5998: fix checkpointableOffsets handling (#7030)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 1052d87 KAFKA-5998: fix checkpointableOffsets handling (#7030) 1052d87 is described below commit 1052d87d37bdc0bab085de2bcf8f49a344dfcdd5 Author: John Roesler AuthorDate: Fri Jul 12 08:42:11 2019 -0500 KAFKA-5998: fix checkpointableOffsets handling (#7030) fix checkpoint file warning by filtering checkpointable offsets per task clean up state manager hierarchy to prevent similar bugs Reviewers: Bruno Cadonna , Bill Bejeck --- .../internals/GlobalStateManagerImpl.java | 68 --- .../processor/internals/ProcessorStateManager.java | 203 ++--- ...ractStateManager.java => StateManagerUtil.java} | 41 ++--- .../streams/processor/internals/StreamTask.java| 11 +- .../streams/processor/internals/TaskManager.java | 2 +- .../streams/state/internals/OffsetCheckpoint.java | 5 + .../internals/GlobalStateManagerImplTest.java | 8 +- .../processor/internals/MockChangelogReader.java | 7 +- .../internals/ProcessorStateManagerTest.java | 134 +- .../processor/internals/StandbyTaskTest.java | 4 +- .../processor/internals/StreamTaskTest.java| 6 +- .../processor/internals/StreamThreadTest.java | 2 +- .../processor/internals/TaskManagerTest.java | 6 +- 13 files changed, 356 insertions(+), 141 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index f224e1e..6a3959f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.FixedOrderMap; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -32,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; @@ -48,22 +50,30 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; + /** * This class is responsible for the initialization, restoration, closing, flushing etc * of Global State Stores. There is only ever 1 instance of this class per Application Instance. */ -public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager { +public class GlobalStateManagerImpl implements GlobalStateManager { private final Logger log; +private final boolean eosEnabled; private final ProcessorTopology topology; private final Consumer globalConsumer; +private final File baseDir; private final StateDirectory stateDirectory; private final Set globalStoreNames = new HashSet<>(); +private final FixedOrderMap> globalStores = new FixedOrderMap<>(); private final StateRestoreListener stateRestoreListener; -private InternalProcessorContext processorContext; +private InternalProcessorContext globalProcessorContext; private final int retries; private final long retryBackoffMs; private final Duration pollTime; private final Set globalNonPersistentStoresTopics = new HashSet<>(); +private final OffsetCheckpoint checkpointFile; +private final Map checkpointFileCache; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -71,7 +81,10 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, final StreamsConfig config) { -super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))); +
[kafka] branch trunk updated: KAFKA-5998: fix checkpointableOffsets handling (#7030)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 53b4ce5 KAFKA-5998: fix checkpointableOffsets handling (#7030) 53b4ce5 is described below commit 53b4ce5c00d61be87962f603682873665155cec4 Author: John Roesler AuthorDate: Fri Jul 12 08:42:11 2019 -0500 KAFKA-5998: fix checkpointableOffsets handling (#7030) fix checkpoint file warning by filtering checkpointable offsets per task clean up state manager hierarchy to prevent similar bugs Reviewers: Bruno Cadonna , Bill Bejeck --- .../internals/GlobalStateManagerImpl.java | 68 --- .../processor/internals/ProcessorStateManager.java | 203 ++--- ...ractStateManager.java => StateManagerUtil.java} | 41 ++--- .../streams/processor/internals/StreamTask.java| 11 +- .../streams/processor/internals/TaskManager.java | 2 +- .../streams/state/internals/OffsetCheckpoint.java | 5 + .../internals/GlobalStateManagerImplTest.java | 8 +- .../processor/internals/MockChangelogReader.java | 7 +- .../internals/ProcessorStateManagerTest.java | 134 +- .../processor/internals/StandbyTaskTest.java | 4 +- .../processor/internals/StreamTaskTest.java| 6 +- .../processor/internals/StreamThreadTest.java | 2 +- .../processor/internals/TaskManagerTest.java | 6 +- 13 files changed, 356 insertions(+), 141 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index f224e1e..6a3959f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.FixedOrderMap; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -32,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; @@ -48,22 +50,30 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; + /** * This class is responsible for the initialization, restoration, closing, flushing etc * of Global State Stores. There is only ever 1 instance of this class per Application Instance. */ -public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager { +public class GlobalStateManagerImpl implements GlobalStateManager { private final Logger log; +private final boolean eosEnabled; private final ProcessorTopology topology; private final Consumer globalConsumer; +private final File baseDir; private final StateDirectory stateDirectory; private final Set globalStoreNames = new HashSet<>(); +private final FixedOrderMap> globalStores = new FixedOrderMap<>(); private final StateRestoreListener stateRestoreListener; -private InternalProcessorContext processorContext; +private InternalProcessorContext globalProcessorContext; private final int retries; private final long retryBackoffMs; private final Duration pollTime; private final Set globalNonPersistentStoresTopics = new HashSet<>(); +private final OffsetCheckpoint checkpointFile; +private final Map checkpointFileCache; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -71,7 +81,10 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, final StreamsConfig config) { -super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))); +