This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4acd720eb2aea434a9839c6e738943a313e87b20
Author: Enrico Olivelli <eolive...@gmail.com>
AuthorDate: Wed May 12 12:33:42 2021 +0200

    PIP-85 Add Schema Information to Message in Java Client API (#10476)
    
    This is the implementation for PIP-85
    
https://docs.google.com/document/d/1VWi5LHP44V31nP4bCui9d5RXwH6xc_phrUes6tvNguk/
    
    - introduce `Optional<Schema<?>> Message.getReaderSchema()` - new Public API
    - introduce  `Optional<Object>` SchemaReader.getNativeSchema()` - new 
Public API
    - introduce `Schema<?> atSchemaVersion(schemaVersion)` to `AbstractSchema` 
(private API)
    - make `KeyValueSchema` extend AbstractSchema (private API)
    - rename MessageImpl and TopicMessageImpl `getSchema` to `getSchemaInternal`
    
    The schema returned by Message.getReaderSchema has these properties:
    - getSchemaInfo returns the actual schema used to decode the Message
    - getNativeSchema returns the original native (ie. Avro) Schema for the 
correct version
    - it can be used for "decoding" payloads
    - it cannot be used for "encoding" payloads
    
    (cherry picked from commit 42241107dcf1f74a27a02297bc47901b3684b3cf)
---
 .../apache/pulsar/client/api/InterceptorsTest.java |   4 +-
 .../apache/pulsar/client/api/SimpleSchemaTest.java |  79 +++++++++-
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 174 +++++++++++++++++++--
 .../java/org/apache/pulsar/client/api/Message.java |  13 ++
 .../pulsar/client/api/schema/SchemaReader.java     |  10 ++
 .../org/apache/pulsar/client/impl/MessageImpl.java |  35 ++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  12 +-
 .../pulsar/client/impl/TopicMessageImpl.java       |  10 +-
 .../pulsar/client/impl/schema/AbstractSchema.java  |  18 +++
 .../client/impl/schema/AbstractStructSchema.java   |  84 +++++++++-
 .../client/impl/schema/AutoConsumeSchema.java      |  10 ++
 .../pulsar/client/impl/schema/KeyValueSchema.java  |  36 ++++-
 .../impl/schema/generic/GenericAvroReader.java     |   6 +
 .../generic/GenericProtobufNativeReader.java       |   6 +
 .../impl/schema/generic/GenericSchemaImpl.java     |   1 +
 .../schema/reader/AbstractMultiVersionReader.java  |   8 +-
 .../client/impl/schema/reader/AvroReader.java      |  10 ++
 .../pulsar/functions/source/PulsarSource.java      |   4 +-
 .../pulsar/functions/source/PulsarSourceTest.java  |   5 +-
 19 files changed, 485 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index d5fe225..5c42251 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -106,14 +106,14 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
             @Override
             public boolean eligible(Message message) {
                 return SchemaType.STRING.equals(
-                        
((MessageImpl)message).getSchema().getSchemaInfo().getType());
+                        
((MessageImpl)message).getSchemaInternal().getSchemaInfo().getType());
             }
         };
         BaseInterceptor interceptor3 = new BaseInterceptor("int3") {
             @Override
             public boolean eligible(Message message) {
                 return SchemaType.INT32.equals(
-                        
((MessageImpl)message).getSchema().getSchemaInfo().getType());
+                        
((MessageImpl)message).getSchemaInternal().getSchemaInfo().getType());
             }
         };
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 802d059..0e1d756 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -25,6 +25,7 @@ import lombok.NoArgsConstructor;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -33,7 +34,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.Schema.Parser;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import 
org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
@@ -79,7 +80,10 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
 
     @DataProvider(name = "schemaValidationModes")
     public static Object[][] schemaValidationModes() {
-        return new Object[][] { { true }, { false } };
+        return new Object[][] {
+                { true },
+                { false }
+        };
     }
 
     @DataProvider(name = "topicDomain")
@@ -530,8 +534,11 @@ public class SimpleSchemaTest extends ProducerConsumerBase 
{
                 assertEquals(data.getValue().getField("i"), i);
                 MessageImpl impl = (MessageImpl) data;
 
-                org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) 
impl.getSchema().getNativeSchema().get();
+                org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) 
impl.getSchemaInternal().getNativeSchema().get();
                 assertNotNull(avroSchema);
+
+                org.apache.avro.Schema avroSchema2 = (org.apache.avro.Schema) 
data.getReaderSchema().get().getNativeSchema().get();
+                assertNotNull(avroSchema2);
             }
 
         }
@@ -616,6 +623,9 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertEquals(data.getValue().getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getValue().getField("i"), i * 
1000);
                 c1.acknowledge(data);
+                KeyValueSchema keyValueSchema = (KeyValueSchema) 
data.getReaderSchema().get();
+                assertNotNull(keyValueSchema.getKeySchema());
+                assertNotNull(keyValueSchema.getValueSchema());
             }
 
             // verify c2
@@ -625,6 +635,9 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertEquals(data.getValue().getKey().i, i * 100);
                 assertEquals(data.getValue().getValue().i, i * 1000);
                 c2.acknowledge(data);
+                KeyValueSchema keyValueSchema = (KeyValueSchema) 
data.getReaderSchema().get();
+                assertNotNull(keyValueSchema.getKeySchema());
+                assertNotNull(keyValueSchema.getValueSchema());
             }
 
             // verify c3
@@ -634,6 +647,9 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertEquals(data.getValue().getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getValue().i, i * 1000);
                 c3.acknowledge(data);
+                KeyValueSchema keyValueSchema = (KeyValueSchema) 
data.getReaderSchema().get();
+                assertNotNull(keyValueSchema.getKeySchema());
+                assertNotNull(keyValueSchema.getValueSchema());
             }
 
             // verify c4
@@ -784,12 +800,18 @@ public class SimpleSchemaTest extends 
ProducerConsumerBase {
             // verify c0
             for (int i = 0; i < numMessages; i++) {
                 Message<GenericRecord> wrapper = c0.receive();
-                log.info("schema version {}", 
BytesSchemaVersion.of(wrapper.getSchemaVersion()));
                 KeyValue<GenericRecord, GenericRecord> data = 
(KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
                 assertNotNull(wrapper.getSchemaVersion());
                 assertEquals(data.getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getField("i"), i * 1000);
                 c0.acknowledge(wrapper);
+                KeyValueSchema keyValueSchema = (KeyValueSchema) 
wrapper.getReaderSchema().get();
+                assertNotNull(keyValueSchema.getKeySchema());
+                assertNotNull(keyValueSchema.getValueSchema());
+                
assertTrue(keyValueSchema.getKeySchema().getSchemaInfo().getSchemaDefinition().contains("V1Data"));
+                
assertTrue(keyValueSchema.getValueSchema().getSchemaInfo().getSchemaDefinition().contains("V1Data"));
+                
assertTrue(keyValueSchema.getKeySchema().getNativeSchema().isPresent());
+                
assertTrue(keyValueSchema.getValueSchema().getNativeSchema().isPresent());
             }
 
 
@@ -813,15 +835,20 @@ public class SimpleSchemaTest extends 
ProducerConsumerBase {
                 // verify c0
                 for (int i = 0; i < numMessages; i++) {
                     Message<GenericRecord> wrapper = c0.receive();
-                    log.info("schema version {}", 
BytesSchemaVersion.of(wrapper.getSchemaVersion()));
                     KeyValue<GenericRecord, GenericRecord> data = 
(KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
                     assertNotNull(wrapper.getSchemaVersion());
                     assertEquals(data.getKey().getField("i"), i * 100);
                     assertEquals(data.getValue().getField("i"), i * 1000);
                     assertEquals(data.getKey().getField("j"), i);
                     assertEquals(data.getValue().getField("j"), i * 20);
+                    KeyValueSchema keyValueSchema = (KeyValueSchema) 
wrapper.getReaderSchema().get();
+                    assertNotNull(keyValueSchema.getKeySchema());
+                    assertNotNull(keyValueSchema.getValueSchema());
+                    
assertTrue(keyValueSchema.getKeySchema().getSchemaInfo().getSchemaDefinition().contains("V2Data"));
+                    
assertTrue(keyValueSchema.getValueSchema().getSchemaInfo().getSchemaDefinition().contains("V2Data"));
+                    
assertTrue(keyValueSchema.getKeySchema().getNativeSchema().isPresent());
+                    
assertTrue(keyValueSchema.getValueSchema().getNativeSchema().isPresent());
                 }
-
             }
         }
     }
@@ -852,6 +879,46 @@ public class SimpleSchemaTest extends ProducerConsumerBase 
{
         Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), 
ByteBuffer.allocate(8).putLong(1).array()).get().isPresent());
     }
 
+    @Test
+    public void testGetNativeSchemaWithAutoConsumeWithMultiVersion() throws 
Exception {
+        final String topic = 
"persistent://my-property/my-ns/testGetSchemaWithMultiVersion";
+
+        @Cleanup
+        Consumer<?> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .subscriptionName("test")
+                .topic(topic)
+                .subscribe();
+
+        @Cleanup
+        Producer<V1Data> v1DataProducer = 
pulsarClient.newProducer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Producer<V2Data> v2DataProducer = 
pulsarClient.newProducer(Schema.AVRO(V2Data.class))
+                .topic(topic)
+                .create();
+
+        Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
+
+        v1DataProducer.send(new V1Data());
+        v2DataProducer.send(new V2Data());
+
+        Message<?> messageV1 = consumer.receive();
+        Schema<?> schemaV1 = messageV1.getReaderSchema().get();
+        Message<?> messageV2 = consumer.receive();
+        Schema<?> schemaV2 = messageV2.getReaderSchema().get();
+        log.info("schemaV1 {} {}", schemaV1.getSchemaInfo(), 
schemaV1.getNativeSchema());
+        log.info("schemaV2 {} {}", schemaV2.getSchemaInfo(), 
schemaV2.getNativeSchema());
+        
assertTrue(schemaV1.getSchemaInfo().getSchemaDefinition().contains("V1Data"));
+        
assertTrue(schemaV2.getSchemaInfo().getSchemaDefinition().contains("V2Data"));
+        org.apache.avro.Schema avroSchemaV1 = (org.apache.avro.Schema) 
schemaV1.getNativeSchema().get();
+        org.apache.avro.Schema avroSchemaV2 = (org.apache.avro.Schema) 
schemaV2.getNativeSchema().get();
+        assertNotEquals(avroSchemaV1.toString(false), 
avroSchemaV2.toString(false));
+        assertTrue(avroSchemaV1.toString(false).contains("V1Data"));
+        assertTrue(avroSchemaV2.toString(false).contains("V2Data"));
+    }
+
     @Test(dataProvider = "topicDomain")
     public void testAutoCreatedSchema(String domain) throws Exception {
         final String topic1 = domain + 
"my-property/my-ns/testAutoCreatedSchema-1";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 7672cce..9542a36 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -121,10 +122,11 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
                         (false).withSupportSchemaVersioning(true).
                         
withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
 
-        admin.schemas().createSchema(fqtnTwo, Schema.AVRO(
+        Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO(
                 
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
                         (false).withSupportSchemaVersioning(true).
-                        
withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
+                        withPojo(Schemas.PersonTwo.class).build());
+        admin.schemas().createSchema(fqtnTwo, personTwoSchema.getSchemaInfo());
 
         Producer<Schemas.PersonTwo> producer = 
pulsarClient.newProducer(Schema.AVRO(
                 
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
@@ -146,18 +148,144 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
                 .topic(fqtnOne, fqtnTwo)
                 .subscribe();
 
+        Consumer<GenericRecord> consumer2 = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .subscriptionName("test2")
+                .topic(fqtnOne, fqtnTwo)
+                .subscribe();
+
         producer.send(personTwo);
 
         Schemas.PersonTwo personConsume = consumer.receive().getValue();
         assertEquals("Tom", personConsume.getName());
         assertEquals(1, personConsume.getId());
 
+        Message<Schemas.PersonTwo> message = consumer.receive();
+        Schemas.PersonTwo personConsume = message.getValue();
+        assertEquals(personConsume.getName(), "Tom");
+        assertEquals(personConsume.getId(), 1);
+        Schema<?> schema = message.getReaderSchema().get();
+        log.info("the-schema {}", schema);
+        assertEquals(personTwoSchema.getSchemaInfo(), schema.getSchemaInfo());
+        org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) 
schema.getNativeSchema().get();
+        log.info("nativeSchema-schema {}", nativeSchema);
+        assertNotNull(nativeSchema);
+
+        // verify that with AUTO_CONSUME we can access the original schema
+        // and the Native AVRO schema
+        Message<?> message2 = consumer2.receive();
+        Schema<?> schema2 = message2.getReaderSchema().get();
+        log.info("the-schema {}", schema2);
+        assertEquals(personTwoSchema.getSchemaInfo(), schema2.getSchemaInfo());
+        org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) 
schema.getNativeSchema().get();
+        log.info("nativeSchema-schema {}", nativeSchema2);
+        assertNotNull(nativeSchema2);
+
+        producer.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testMultiTopicSetSchemaProviderWithKeyValue() throws Exception 
{
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicOne = "test-multi-version-schema-one";
+        final String topicTwo = "test-multi-version-schema-two";
+        final String fqtnOne = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicOne
+        ).toString();
+
+        final String fqtnTwo = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicTwo
+        ).toString();
+
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        admin.topics().createPartitionedTopic(fqtnOne, 3);
+        admin.topics().createPartitionedTopic(fqtnTwo, 3);
+
+        Schema<Schemas.PersonOne> schemaOne =  Schema.AVRO(
+                
SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonOne.class).build());
+        admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, 
schemaOne).getSchemaInfo());
+
+        Schema<Schemas.PersonTwo> schemaTwo = Schema.AVRO(
+                
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build());
+        admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, 
schemaTwo).getSchemaInfo());
+
+        Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO(
+                
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build());
+        admin.schemas().createSchema(fqtnTwo, Schema.KeyValue(Schema.STRING, 
schemaTwo).getSchemaInfo());
+
+        Producer<KeyValue<String, Schemas.PersonTwo>> producer = 
pulsarClient.newProducer(Schema.KeyValue(Schema.STRING, Schema.AVRO(
+                
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build())))
+                .topic(fqtnOne)
+                .create();
+
+        Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
+        personTwo.setId(1);
+        personTwo.setName("Tom");
+
+
+        Consumer<KeyValue<String, Schemas.PersonTwo>> consumer = 
pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.AVRO(
+                
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build())))
+                .subscriptionName("test")
+                .topic(fqtnOne, fqtnTwo)
+                .subscribe();
+
+        Consumer<GenericRecord> consumer2 = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .subscriptionName("test2")
+                .topic(fqtnOne, fqtnTwo)
+                .subscribe();
+
+        producer.send(new KeyValue<>("foo", personTwo));
+
+        Message<KeyValue<String, Schemas.PersonTwo>> message = 
consumer.receive();
+        assertEquals("foo", message.getValue().getKey());
+        Schemas.PersonTwo personConsume = message.getValue().getValue();
+        assertEquals(personConsume.getName(), "Tom");
+        assertEquals(personConsume.getId(), 1);
+        KeyValueSchema schema = (KeyValueSchema) 
message.getReaderSchema().get();
+        log.info("the-schema {}", schema);
+        assertEquals(personTwoSchema.getSchemaInfo(), 
schema.getValueSchema().getSchemaInfo());
+        org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) 
schema.getValueSchema().getNativeSchema().get();
+        log.info("nativeSchema-schema {}", nativeSchema);
+        assertNotNull(nativeSchema);
+
+        // verify that with AUTO_CONSUME we can access the original schema
+        // and the Native AVRO schema
+        Message<?> message2 = consumer2.receive();
+        KeyValueSchema schema2 = (KeyValueSchema) 
message2.getReaderSchema().get();
+        log.info("the-schema {}", schema2);
+        assertEquals(personTwoSchema.getSchemaInfo(), 
schema2.getValueSchema().getSchemaInfo());
+        org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) 
schema.getValueSchema().getNativeSchema().get();
+        log.info("nativeSchema-schema {}", nativeSchema2);
+        assertNotNull(nativeSchema2);
+
         producer.close();
         consumer.close();
     }
 
     @Test
-    public void testBytesSchemaDeserialize() throws Exception {
+    public void testJSONSchemaDeserialize() throws Exception {
         final String tenant = PUBLIC_TENANT;
         final String namespace = "test-namespace-" + randomName(16);
         final String topicName = "test-bytes-schema";
@@ -203,6 +331,12 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(message.getValue().getField("address").getClass(),
                 message1.getValue().getAddress().getClass());
 
+        Schema<?> schema = message.getReaderSchema().get();
+        Schema<?> schema1 = message1.getReaderSchema().get();
+        log.info("schema {}", schema);
+        log.info("schema1 {}", schema1);
+        assertEquals(schema.getSchemaInfo(), schema1.getSchemaInfo());
+
         producer.close();
         consumer.close();
         consumer1.close();
@@ -246,12 +380,14 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         producer.send("foo");
 
         Message<String> message = consumer.receive();
-        Message<GenericRecord> message3 = consumer2.receive();
+        Message<GenericRecord> message2 = consumer2.receive();
+        assertEquals(SchemaType.STRING, 
message.getReaderSchema().get().getSchemaInfo().getType());
+        assertEquals(SchemaType.STRING, 
message2.getReaderSchema().get().getSchemaInfo().getType());
 
         assertEquals("foo", message.getValue());
-        assertEquals(message3.getValue().getClass().getName(), 
"org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
-        assertEquals(SchemaType.STRING, message3.getValue().getSchemaType());
-        assertEquals("foo", message3.getValue().getNativeObject());
+        assertEquals(message2.getValue().getClass().getName(), 
"org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
+        assertEquals(SchemaType.STRING, message2.getValue().getSchemaType());
+        assertEquals("foo", message2.getValue().getNativeObject());
 
         producer.close();
         consumer.close();
@@ -296,12 +432,22 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         producer.send("foo".getBytes(StandardCharsets.UTF_8));
 
         Message<byte[]> message = consumer.receive();
-        Message<GenericRecord> message3 = consumer2.receive();
+        Message<GenericRecord> message2 = consumer2.receive();
+        if (schema == SchemaType.BYTES) {
+            assertEquals(schema, 
message.getReaderSchema().get().getSchemaInfo().getType());
+            assertEquals(schema, 
message2.getReaderSchema().get().getSchemaInfo().getType());
+        } else if (schema == SchemaType.NONE) {
+            // schema NONE is always reported as BYTES
+            assertEquals(SchemaType.BYTES, 
message.getReaderSchema().get().getSchemaInfo().getType());
+            assertEquals(SchemaType.BYTES, 
message2.getReaderSchema().get().getSchemaInfo().getType());
+        } else {
+            fail();
+        }
 
         assertEquals("foo".getBytes(StandardCharsets.UTF_8), 
message.getValue());
-        assertEquals(message3.getValue().getClass().getName(), 
"org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
-        assertEquals(SchemaType.BYTES, message3.getValue().getSchemaType());
-        assertEquals("foo".getBytes(StandardCharsets.UTF_8), 
message3.getValue().getNativeObject());
+        assertEquals(message2.getValue().getClass().getName(), 
"org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
+        assertEquals(SchemaType.BYTES, message2.getValue().getSchemaType());
+        assertEquals("foo".getBytes(StandardCharsets.UTF_8), 
message2.getValue().getNativeObject());
 
         producer.close();
         consumer.close();
@@ -420,6 +566,12 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         Message<GenericRecord> message2 = consumer2.receive();
         assertEquals(message.getValue(), 
message2.getValue().getNativeObject());
 
+        Schema<?> schema = message.getReaderSchema().get();
+        Schema<?> schemaFromGenericRecord = message.getReaderSchema().get();
+        KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
+        KeyValueSchema keyValueSchemaFromGenericRecord = (KeyValueSchema) 
schemaFromGenericRecord;
+        assertEquals(keyValueSchema.getSchemaInfo(), 
keyValueSchemaFromGenericRecord.getSchemaInfo());
+
         if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
             // with "SEPARATED encoding the routing key is the key of the 
KeyValue
             assertNotNull(message.getKeyBytes());
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
index 324c2ba..d41054e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -202,6 +202,19 @@ public interface Message<T> {
     byte[] getSchemaVersion();
 
     /**
+     * Get the schema associated to the message.
+     * Please note that this schema is usually equal to the Schema you passed
+     * during the construction of the Consumer or the Reader.
+     * But if you are consuming the topic using the GenericObject interface
+     * this method will return the schema associated with the message.
+     * @return The schema used to decode the payload of message.
+     * @see Schema#AUTO_CONSUME()
+     */
+    default Optional<Schema<?>> getReaderSchema() {
+        return Optional.empty();
+    }
+
+    /**
      * Check whether the message is replicated from other cluster.
      *
      * @since 2.4.0
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
index dc18c4e..c5561bf 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.api.schema;
 
 import java.io.InputStream;
+import java.util.Optional;
+
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
@@ -86,4 +88,12 @@ public interface SchemaReader<T> {
      */
     default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
     }
+
+    /**
+     * Returns the underling Schema if possible
+     * @return the schema, or an empty Optional if it is not possible to 
access it
+     */
+    default Optional<Object> getNativeSchema() {
+        return Optional.empty();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index a148677..1eb7d3f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AbstractSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.protocol.Commands;
@@ -267,11 +268,38 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
-    public Schema getSchema() {
+    @Override
+    public int size() {
+        if (msgMetadata.isNullValue()) {
+            return 0;
+        }
+        return payload.readableBytes();
+    }
+
+    public Schema<T> getSchemaInternal() {
         return this.schema;
     }
 
     @Override
+    public Optional<Schema<?>> getReaderSchema() {
+        ensureSchemaIsLoaded();
+        if (schema == null) {
+            return Optional.empty();
+        }
+        if (schema instanceof AutoConsumeSchema) {
+            byte[] schemaVersion = getSchemaVersion();
+            return Optional.of(((AutoConsumeSchema) schema)
+                    .atSchemaVersion(schemaVersion));
+        } else if (schema instanceof AbstractSchema) {
+            byte[] schemaVersion = getSchemaVersion();
+            return Optional.of(((AbstractSchema) schema)
+                    .atSchemaVersion(schemaVersion));
+        } else {
+            return Optional.of(schema);
+        }
+    }
+
+    @Override
     public byte[] getSchemaVersion() {
         if (msgMetadataBuilder != null && 
msgMetadataBuilder.hasSchemaVersion()) {
             return msgMetadataBuilder.getSchemaVersion().toByteArray();
@@ -280,10 +308,13 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
-    private SchemaInfo getSchemaInfo() {
+    private void ensureSchemaIsLoaded() {
         if (schema instanceof AutoConsumeSchema) {
             ((AutoConsumeSchema) schema).fetchSchemaIfNeeded();
         }
+    }
+    private SchemaInfo getSchemaInfo() {
+        ensureSchemaIsLoaded();
         return schema.getSchemaInfo();
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 786af84..96e4042 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -569,7 +569,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     private boolean populateMessageSchema(MessageImpl msg, SendCallback 
callback) {
         MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder();
-        if (msg.getSchema() == schema) {
+        if (msg.getSchemaInternal() == schema) {
             schemaVersion.ifPresent(v -> 
msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(v)));
             msg.setSchemaState(MessageImpl.SchemaState.Ready);
             return true;
@@ -581,7 +581,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             completeCallbackAndReleaseSemaphore(callback, e);
             return false;
         }
-        SchemaHash schemaHash = SchemaHash.of(msg.getSchema());
+        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
         byte[] schemaVersion = schemaCache.get(schemaHash);
         if (schemaVersion != null) {
             
msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion));
@@ -591,7 +591,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     private boolean rePopulateMessageSchema(MessageImpl msg) {
-        SchemaHash schemaHash = SchemaHash.of(msg.getSchema());
+        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
         byte[] schemaVersion = schemaCache.get(schemaHash);
         if (schemaVersion == null) {
             return false;
@@ -605,7 +605,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         if (!changeToRegisteringSchemaState()) {
             return;
         }
-        SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchema())
+        SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInternal())
                                         .map(Schema::getSchemaInfo)
                                         .filter(si -> si.getType().getValue() 
> 0)
                                         .orElse(Schema.BYTES.getSchemaInfo());
@@ -619,7 +619,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 }
             } else {
                 log.warn("[{}] [{}] GetOrCreateSchema succeed", topic, 
producerName);
-                SchemaHash schemaHash = SchemaHash.of(msg.getSchema());
+                SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
                 schemaCache.putIfAbsent(schemaHash, v);
                 
msg.getMessageBuilder().setSchemaVersion(ByteString.copyFrom(v));
                 msg.setSchemaState(MessageImpl.SchemaState.Ready);
@@ -1831,4 +1831,4 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ProducerImpl.class);
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 00869f1..784c56b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -173,11 +173,17 @@ public class TopicMessageImpl<T> implements Message<T> {
         return msg;
     }
 
-    public Schema<T> getSchema() {
+    public Schema<T> getSchemaInternal() {
         if (this.msg instanceof MessageImpl) {
             MessageImpl message = (MessageImpl) this.msg;
-            return message.getSchema();
+            return message.getSchemaInternal();
         }
         return null;
     }
+
+    @Override
+    public Optional<Schema<?>> getReaderSchema() {
+        return msg.getReaderSchema();
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index 255a491..4328e7b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 
+import java.util.Objects;
+
 public abstract class AbstractSchema<T> implements Schema<T> {
 
     /**
@@ -66,4 +68,20 @@ public abstract class AbstractSchema<T> implements Schema<T> 
{
     public Schema<T> clone() {
         return this;
     }
+
+    /**
+     * Return an instance of this schema at the given version.
+     * @param schemaVersion the version
+     * @return the schema at that specific version
+     * @throws SchemaSerializationException in case of unknown schema version
+     * @throws NullPointerException in case of null schemaVersion
+     */
+    public Schema<?> atSchemaVersion(byte[] schemaVersion) throws 
SchemaSerializationException {
+        Objects.requireNonNull(schemaVersion);
+        if (!supportSchemaVersioning()) {
+            return this;
+        } else {
+            throw new SchemaSerializationException("Not implemented for " + 
this.getClass());
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
index 015ec4d..c4444e7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
@@ -20,14 +20,21 @@ package org.apache.pulsar.client.impl.schema;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
 /**
  * minimal abstract StructSchema
  */
@@ -80,6 +87,81 @@ public abstract class AbstractStructSchema<T> extends 
AbstractSchema<T> {
         if (reader != null) {
             this.reader.setSchemaInfoProvider(schemaInfoProvider);
         }
+        this.schemaInfoProvider = schemaInfoProvider;
+    }
+
+    @Override
+    public Schema<T> atSchemaVersion(byte[] schemaVersion) throws 
SchemaSerializationException {
+        Objects.requireNonNull(schemaVersion);
+        if (schemaInfoProvider == null) {
+            // this schema is not downloaded from the registry
+            return this;
+        }
+        try {
+            SchemaInfo schemaInfo = 
schemaInfoProvider.getSchemaByVersion(schemaVersion).get();
+            if (schemaInfo == null) {
+                throw new SchemaSerializationException("Unknown version "+ 
BytesSchemaVersion.of(schemaVersion));
+            }
+            return getAbstractStructSchemaAtVersion(schemaVersion, schemaInfo);
+        } catch (ExecutionException err) {
+            throw new SchemaSerializationException(err.getCause());
+        } catch (InterruptedException err) {
+            Thread.currentThread().interrupt();
+            throw new SchemaSerializationException(err);
+        }
+    }
+
+    private static class WrappedVersionedSchema<T> extends 
AbstractStructSchema<T> {
+        private final byte[] schemaVersion;
+        private final AbstractStructSchema<T> parent;
+        public WrappedVersionedSchema(SchemaInfo schemaInfo, final byte[] 
schemaVersion,
+                                      AbstractStructSchema<T> parent) {
+            super(schemaInfo);
+            this.schemaVersion = schemaVersion;
+            this.writer = null;
+            this.reader = parent.reader;
+            this.schemaInfoProvider = parent.schemaInfoProvider;
+            this.parent = parent;
+        }
+
+        @Override
+        public boolean requireFetchingSchemaInfo() {
+            return true;
+        }
+
+        @Override
+        public T decode(byte[] bytes) {
+            return decode(bytes, schemaVersion);
+        }
+
+        @Override
+        public T decode(ByteBuf byteBuf) {
+            return decode(byteBuf, schemaVersion);
+        }
+
+        @Override
+        public byte[] encode(T message) {
+            throw new UnsupportedOperationException("This schema is not meant 
to be used for encoding");
+        }
+
+        @Override
+        public Optional<Object> getNativeSchema() {
+            if (reader instanceof AbstractMultiVersionReader) {
+                AbstractMultiVersionReader abstractMultiVersionReader = 
(AbstractMultiVersionReader) reader;
+                try {
+                    SchemaReader schemaReader = 
abstractMultiVersionReader.getSchemaReader(schemaVersion);
+                    return schemaReader.getNativeSchema();
+                } catch (ExecutionException err) {
+                    throw new RuntimeException(err.getCause());
+                }
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
+
+    private AbstractStructSchema<T> getAbstractStructSchemaAtVersion(byte[] 
schemaVersion, SchemaInfo schemaInfo) {
+        return new WrappedVersionedSchema<>(schemaInfo, schemaVersion, this);
     }
 
     protected void setWriter(SchemaWriter<T> writer) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 3ec21a2..160a3a7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -78,6 +78,16 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         return schema == null || schema.supportSchemaVersioning();
     }
 
+    public Schema<?> atSchemaVersion(byte[] schemaVersion) {
+        fetchSchemaIfNeeded();
+        ensureSchemaInitialized();
+        if (schema.supportSchemaVersioning() && schema instanceof 
AbstractSchema) {
+            return ((AbstractSchema) schema).atSchemaVersion(schemaVersion);
+        } else {
+            return schema;
+        }
+    }
+
     @Override
     public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
         fetchSchemaIfNeeded();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 4f7f921..f572bbf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.client.impl.schema;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.concurrent.CompletableFuture;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
@@ -35,7 +38,7 @@ import org.apache.pulsar.common.schema.SchemaType;
  * [Key, Value] pair schema definition
  */
 @Slf4j
-public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
+public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> {
 
     @Getter
     private final Schema<K> keySchema;
@@ -139,18 +142,29 @@ public class KeyValueSchema<K, V> implements 
Schema<KeyValue<K, V>> {
         }
     }
 
+    @Override
     public KeyValue<K, V> decode(byte[] bytes) {
         return decode(bytes, null);
     }
 
+    @Override
     public KeyValue<K, V> decode(byte[] bytes, byte[] schemaVersion) {
         if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
             throw new SchemaSerializationException("This method cannot be used 
under this SEPARATED encoding type");
         }
-
         return KeyValue.decode(bytes, (keyBytes, valueBytes) -> 
decode(keyBytes, valueBytes, schemaVersion));
     }
 
+    @Override
+    public KeyValue<K, V> decode(ByteBuf byteBuf) {
+        return decode(ByteBufUtil.getBytes(byteBuf));
+    }
+
+    @Override
+    public KeyValue<K, V> decode(ByteBuf byteBuf, byte[] schemaVersion) {
+        return decode(ByteBufUtil.getBytes(byteBuf), schemaVersion);
+    }
+
     public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] 
schemaVersion) {
         K k;
         if (keyBytes == null) {
@@ -214,7 +228,6 @@ public class KeyValueSchema<K, V> implements 
Schema<KeyValue<K, V>> {
         this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
             keySchema, valueSchema, keyValueEncodingType
         );
-
         this.keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
             @Override
             public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] 
schemaVersion) {
@@ -253,4 +266,21 @@ public class KeyValueSchema<K, V> implements 
Schema<KeyValue<K, V>> {
             }
         });
     }
+
+    @Override
+    public String toString() {
+        return "KeyValueSchema(" + keyValueEncodingType + "," + keySchema + 
"," + valueSchema + ")";
+    }
+
+    @Override
+    public Schema<?> atSchemaVersion(byte[] schemaVersion) throws 
SchemaSerializationException {
+        if (!supportSchemaVersioning()) {
+            return this;
+        } else {
+            Schema<?> keySchema = this.keySchema instanceof AbstractSchema ? 
((AbstractSchema) this.keySchema).atSchemaVersion(schemaVersion) : 
this.keySchema;
+            Schema<?> valueSchema = this.valueSchema instanceof AbstractSchema 
? ((AbstractSchema) this.valueSchema).atSchemaVersion(schemaVersion) : 
this.valueSchema;
+            return KeyValueSchema.of(keySchema, valueSchema, 
keyValueEncodingType);
+        }
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
index 22c63a94..d9a7384 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
@@ -36,6 +36,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 
@@ -117,5 +118,10 @@ public class GenericAvroReader implements 
SchemaReader<GenericRecord> {
         return offset;
     }
 
+    @Override
+    public Optional<Object> getNativeSchema() {
+        return Optional.of(schema);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(GenericAvroReader.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java
index eccc482..c41767c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class GenericProtobufNativeReader implements 
SchemaReader<GenericRecord> {
@@ -79,5 +80,10 @@ public class GenericProtobufNativeReader implements 
SchemaReader<GenericRecord>
         }
     }
 
+    @Override
+    public Optional<Object> getNativeSchema() {
+        return Optional.of(descriptor);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(GenericProtobufNativeReader.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index 59e748c..e7d963c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -25,6 +25,7 @@ import 
org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
index ea0df25..f1a775d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
@@ -71,7 +71,7 @@ public abstract class AbstractMultiVersionReader<T> 
implements SchemaReader<T> {
     public T read(InputStream inputStream, byte[] schemaVersion) {
         try {
             return schemaVersion == null ? read(inputStream) :
-                    
readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(inputStream);
+                    getSchemaReader(schemaVersion).read(inputStream);
         } catch (ExecutionException e) {
             LOG.error("Can't get generic schema for topic {} schema version 
{}",
                     schemaInfoProvider.getTopicName(), 
Hex.encodeHexString(schemaVersion), e);
@@ -79,11 +79,15 @@ public abstract class AbstractMultiVersionReader<T> 
implements SchemaReader<T> {
         }
     }
 
+    public SchemaReader<T> getSchemaReader(byte[] schemaVersion) throws 
ExecutionException {
+        return readerCache.get(BytesSchemaVersion.of(schemaVersion));
+    }
+
     @Override
     public T read(byte[] bytes, byte[] schemaVersion) {
         try {
             return schemaVersion == null ? read(bytes) :
-                    
readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
+                    getSchemaReader(schemaVersion).read(bytes);
         } catch (ExecutionException | AvroTypeException e) {
             if (e instanceof AvroTypeException) {
                 throw new SchemaSerializationException(e);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
index e1b58a4..63885ba 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
@@ -32,18 +32,22 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Optional;
 
 public class AvroReader<T> implements SchemaReader<T> {
 
     private ReflectDatumReader<T> reader;
     private static final ThreadLocal<BinaryDecoder> decoders =
             new ThreadLocal<>();
+    private final Schema schema;
 
     public AvroReader(Schema schema) {
         this.reader = new ReflectDatumReader<>(schema);
+        this.schema = schema;
     }
 
     public AvroReader(Schema schema, ClassLoader classLoader, boolean 
jsr310ConversionEnabled) {
+        this.schema = schema;
         if (classLoader != null) {
             ReflectData reflectData = new ReflectData(classLoader);
             AvroSchema.addLogicalTypeConversions(reflectData, 
jsr310ConversionEnabled);
@@ -55,6 +59,7 @@ public class AvroReader<T> implements SchemaReader<T> {
 
     public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader 
classLoader,
         boolean jsr310ConversionEnabled) {
+        this.schema = readerSchema;
         if (classLoader != null) {
             ReflectData reflectData = new ReflectData(classLoader);
             AvroSchema.addLogicalTypeConversions(reflectData, 
jsr310ConversionEnabled);
@@ -98,6 +103,11 @@ public class AvroReader<T> implements SchemaReader<T> {
         }
     }
 
+    @Override
+    public Optional<Object> getNativeSchema() {
+        return Optional.of(schema);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(AvroReader.class);
 
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index e1ec9f6..6f8d326 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -132,10 +132,10 @@ public class PulsarSource<T> extends PushSource<T> 
implements MessageListener<T>
         Schema<T> schema = null;
         if (message instanceof MessageImpl) {
             MessageImpl impl = (MessageImpl) message;
-            schema = impl.getSchema();
+            schema = impl.getSchemaInternal();
         } else if (message instanceof TopicMessageImpl) {
             TopicMessageImpl impl = (TopicMessageImpl) message;
-            schema = impl.getSchema();
+            schema = impl.getSchemaInternal();
         }
         Record<T> record = PulsarRecord.<T>builder()
                 .message(message)
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index be39508..65474ff 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -40,7 +40,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -286,9 +285,9 @@ public class PulsarSourceTest {
         Consumer consumer = mock(Consumer.class);
         MessageImpl messageImpl = mock(MessageImpl.class);
         Schema schema = mock(Schema.class);
-        when(messageImpl.getSchema()).thenReturn(schema);
+        when(messageImpl.getSchemaInternal()).thenReturn(schema);
         pulsarSource.received(consumer, (Message) messageImpl);
-        verify(messageImpl.getSchema(), times(1));
+        verify(messageImpl.getSchemaInternal(), times(1));
         Record<GenericRecord> pushed = pulsarSource.read();
         assertSame(pushed.getSchema(), schema);
     }

Reply via email to