This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_tmp in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ab3caca169c35b2ce189d84c6b3834023bfb2e90 Author: Enrico Olivelli <eolive...@gmail.com> AuthorDate: Fri Apr 9 19:29:00 2021 +0200 GenericObject - support KeyValue in Message#getValue() (#10107) --- .../java/org/apache/pulsar/schema/SchemaTest.java | 48 ++++++++++++++++++++++ .../org/apache/pulsar/client/impl/MessageImpl.java | 13 +++++- .../client/impl/schema/AutoConsumeSchema.java | 4 ++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index fba8108..56ca04c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -49,6 +49,8 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.AfterMethod; @@ -305,6 +307,52 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } @Test + public void testKeyValueSchema() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicName = "test-string-schema"; + + final String topic = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicName).toString(); + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME)); + + admin.topics().createPartitionedTopic(topic, 2); + + Producer<KeyValue<String, Integer>> producer = pulsarClient + .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE)) + .topic(topic) + .create(); + + producer.send(new KeyValue<>("foo", 123)); + + Consumer<KeyValue<String, Integer>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE)) + .subscriptionName("test-sub") + .topic(topic) + .subscribe(); + + Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .subscriptionName("test-sub2") + .topic(topic) + .subscribe(); + + producer.send(new KeyValue<>("foo", 123)); + + Message<KeyValue<String, Integer>> message = consumer.receive(); + Message<GenericRecord> message2 = consumer2.receive(); + assertEquals(message.getValue(), message2.getValue().getNativeObject()); + + producer.close(); + consumer.close(); + consumer2.close(); + } + + @Test public void testIsUsingAvroSchemaParser() { for (SchemaType value : SchemaType.values()) { if (value == SchemaType.AVRO || value == SchemaType.JSON || value == SchemaType.PROTOBUF) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index ac387a4..f7d8cf9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -41,6 +41,7 @@ import java.util.stream.Collectors; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.EncryptionContext; @@ -306,8 +307,16 @@ public class MessageImpl<T> implements Message<T> { } } + private KeyValueSchema getKeyValueSchema() { + if (schema instanceof AutoConsumeSchema) { + return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema(); + } else { + return (KeyValueSchema) schema; + } + } + private T getKeyValueBySchemaVersion() { - KeyValueSchema kvSchema = (KeyValueSchema) schema; + KeyValueSchema kvSchema = getKeyValueSchema(); byte[] schemaVersion = getSchemaVersion(); if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { return (T) kvSchema.decode( @@ -319,7 +328,7 @@ public class MessageImpl<T> implements Message<T> { } private T getKeyValue() { - KeyValueSchema kvSchema = (KeyValueSchema) schema; + KeyValueSchema kvSchema = getKeyValueSchema(); if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { return (T) kvSchema.decode( msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 6ab92ad..647a87b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -250,4 +250,8 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { return GenericObjectWrapper.of(value, this.schema.getSchemaInfo().getType(), schemaVersion); } + + public Schema<?> getInternalSchema() { + return schema; + } }