This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_tmp in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c73cdae681ff1326ae7717bb6d27c01106ac188f Author: Enrico Olivelli <eolive...@gmail.com> AuthorDate: Tue Apr 20 12:35:37 2021 +0200 AutoConsumeSchema: handle schema NONE as BYTES (#10277) --- .../java/org/apache/pulsar/schema/SchemaTest.java | 23 ++++++++++++++++++++-- .../client/impl/schema/AutoConsumeSchema.java | 1 + 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 0897e67..f167e0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -55,6 +55,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.AfterMethod; @@ -392,7 +393,16 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } @Test - public void testUseAutoConsumeWithSchemalessTopic() throws Exception { + public void testUseAutoConsumeWithBytesSchemaTopic() throws Exception { + testUseAutoConsumeWithSchemalessTopic(SchemaType.BYTES); + } + + @Test + public void testUseAutoConsumeWithNoneSchemaTopic() throws Exception { + testUseAutoConsumeWithSchemalessTopic(SchemaType.NONE); + } + + private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exception { final String tenant = PUBLIC_TENANT; final String namespace = "test-namespace-" + randomName(16); final String topicName = "test-schemaless"; @@ -409,6 +419,15 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { admin.topics().createPartitionedTopic(topic, 2); + // set schema + SchemaInfo schemaInfo = SchemaInfo + .builder() + .schema(new byte[0]) + .name("dummySchema") + .type(schema) + .build(); + admin.schemas().createSchema(topic, schemaInfo); + Producer<byte[]> producer = pulsarClient .newProducer() .topic(topic) @@ -420,7 +439,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .subscribe(); // use GenericRecord even for primitive types - // it will be a PrimitiveRecord + // it will be a GenericObjectWrapper Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) .subscriptionName("test-sub3") .topic(topic) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 9f66875..af0ec7b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -184,6 +184,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { case BOOLEAN: return BooleanSchema.of(); case BYTES: + case NONE: return BytesSchema.of(); case DATE: return DateSchema.of();