This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ab3caca169c35b2ce189d84c6b3834023bfb2e90
Author: Enrico Olivelli <eolive...@gmail.com>
AuthorDate: Fri Apr 9 19:29:00 2021 +0200

    GenericObject - support KeyValue in Message#getValue() (#10107)
---
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 48 ++++++++++++++++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java | 13 +++++-
 .../client/impl/schema/AutoConsumeSchema.java      |  4 ++
 3 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index fba8108..56ca04c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -49,6 +49,8 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.AfterMethod;
@@ -305,6 +307,52 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testKeyValueSchema() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicName = "test-string-schema";
+
+        final String topic = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicName).toString();
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME));
+
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        Producer<KeyValue<String, Integer>> producer = pulsarClient
+                .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, 
KeyValueEncodingType.INLINE))
+                .topic(topic)
+                .create();
+
+        producer.send(new KeyValue<>("foo", 123));
+
+        Consumer<KeyValue<String, Integer>> consumer = 
pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, 
KeyValueEncodingType.INLINE))
+                .subscriptionName("test-sub")
+                .topic(topic)
+                .subscribe();
+
+        Consumer<GenericRecord> consumer2 = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .subscriptionName("test-sub2")
+                .topic(topic)
+                .subscribe();
+
+        producer.send(new KeyValue<>("foo", 123));
+
+        Message<KeyValue<String, Integer>> message = consumer.receive();
+        Message<GenericRecord> message2 = consumer2.receive();
+        assertEquals(message.getValue(), 
message2.getValue().getNativeObject());
+
+        producer.close();
+        consumer.close();
+        consumer2.close();
+    }
+
+    @Test
     public void testIsUsingAvroSchemaParser() {
         for (SchemaType value : SchemaType.values()) {
             if (value == SchemaType.AVRO || value == SchemaType.JSON || value 
== SchemaType.PROTOBUF) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index ac387a4..f7d8cf9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
@@ -306,8 +307,16 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    private KeyValueSchema getKeyValueSchema() {
+        if (schema instanceof AutoConsumeSchema) {
+            return (KeyValueSchema) ((AutoConsumeSchema) 
schema).getInternalSchema();
+        } else {
+            return (KeyValueSchema) schema;
+        }
+    }
+
     private T getKeyValueBySchemaVersion() {
-        KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        KeyValueSchema kvSchema = getKeyValueSchema();
         byte[] schemaVersion = getSchemaVersion();
         if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
             return (T) kvSchema.decode(
@@ -319,7 +328,7 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     private T getKeyValue() {
-        KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        KeyValueSchema kvSchema = getKeyValueSchema();
         if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
             return (T) kvSchema.decode(
                     msgMetadataBuilder.hasNullPartitionKey() ? null : 
getKeyBytes(),
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 6ab92ad..647a87b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -250,4 +250,8 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         return GenericObjectWrapper.of(value,
                 this.schema.getSchemaInfo().getType(), schemaVersion);
     }
+
+    public Schema<?> getInternalSchema() {
+        return schema;
+    }
 }

Reply via email to