This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_tmp in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4acd720eb2aea434a9839c6e738943a313e87b20 Author: Enrico Olivelli <eolive...@gmail.com> AuthorDate: Wed May 12 12:33:42 2021 +0200 PIP-85 Add Schema Information to Message in Java Client API (#10476) This is the implementation for PIP-85 https://docs.google.com/document/d/1VWi5LHP44V31nP4bCui9d5RXwH6xc_phrUes6tvNguk/ - introduce `Optional<Schema<?>> Message.getReaderSchema()` - new Public API - introduce `Optional<Object>` SchemaReader.getNativeSchema()` - new Public API - introduce `Schema<?> atSchemaVersion(schemaVersion)` to `AbstractSchema` (private API) - make `KeyValueSchema` extend AbstractSchema (private API) - rename MessageImpl and TopicMessageImpl `getSchema` to `getSchemaInternal` The schema returned by Message.getReaderSchema has these properties: - getSchemaInfo returns the actual schema used to decode the Message - getNativeSchema returns the original native (ie. Avro) Schema for the correct version - it can be used for "decoding" payloads - it cannot be used for "encoding" payloads (cherry picked from commit 42241107dcf1f74a27a02297bc47901b3684b3cf) --- .../apache/pulsar/client/api/InterceptorsTest.java | 4 +- .../apache/pulsar/client/api/SimpleSchemaTest.java | 79 +++++++++- .../java/org/apache/pulsar/schema/SchemaTest.java | 174 +++++++++++++++++++-- .../java/org/apache/pulsar/client/api/Message.java | 13 ++ .../pulsar/client/api/schema/SchemaReader.java | 10 ++ .../org/apache/pulsar/client/impl/MessageImpl.java | 35 ++++- .../apache/pulsar/client/impl/ProducerImpl.java | 12 +- .../pulsar/client/impl/TopicMessageImpl.java | 10 +- .../pulsar/client/impl/schema/AbstractSchema.java | 18 +++ .../client/impl/schema/AbstractStructSchema.java | 84 +++++++++- .../client/impl/schema/AutoConsumeSchema.java | 10 ++ .../pulsar/client/impl/schema/KeyValueSchema.java | 36 ++++- .../impl/schema/generic/GenericAvroReader.java | 6 + .../generic/GenericProtobufNativeReader.java | 6 + .../impl/schema/generic/GenericSchemaImpl.java | 1 + .../schema/reader/AbstractMultiVersionReader.java | 8 +- .../client/impl/schema/reader/AvroReader.java | 10 ++ .../pulsar/functions/source/PulsarSource.java | 4 +- .../pulsar/functions/source/PulsarSourceTest.java | 5 +- 19 files changed, 485 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index d5fe225..5c42251 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -106,14 +106,14 @@ public class InterceptorsTest extends ProducerConsumerBase { @Override public boolean eligible(Message message) { return SchemaType.STRING.equals( - ((MessageImpl)message).getSchema().getSchemaInfo().getType()); + ((MessageImpl)message).getSchemaInternal().getSchemaInfo().getType()); } }; BaseInterceptor interceptor3 = new BaseInterceptor("int3") { @Override public boolean eligible(Message message) { return SchemaType.INT32.equals( - ((MessageImpl)message).getSchema().getSchemaInfo().getType()); + ((MessageImpl)message).getSchemaInternal().getSchemaInfo().getType()); } }; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index 802d059..0e1d756 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -25,6 +25,7 @@ import lombok.NoArgsConstructor; import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -33,7 +34,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.reflect.ReflectData; import org.apache.avro.Schema.Parser; import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException; @@ -79,7 +80,10 @@ public class SimpleSchemaTest extends ProducerConsumerBase { @DataProvider(name = "schemaValidationModes") public static Object[][] schemaValidationModes() { - return new Object[][] { { true }, { false } }; + return new Object[][] { + { true }, + { false } + }; } @DataProvider(name = "topicDomain") @@ -530,8 +534,11 @@ public class SimpleSchemaTest extends ProducerConsumerBase { assertEquals(data.getValue().getField("i"), i); MessageImpl impl = (MessageImpl) data; - org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchema().getNativeSchema().get(); + org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchemaInternal().getNativeSchema().get(); assertNotNull(avroSchema); + + org.apache.avro.Schema avroSchema2 = (org.apache.avro.Schema) data.getReaderSchema().get().getNativeSchema().get(); + assertNotNull(avroSchema2); } } @@ -616,6 +623,9 @@ public class SimpleSchemaTest extends ProducerConsumerBase { assertEquals(data.getValue().getKey().getField("i"), i * 100); assertEquals(data.getValue().getValue().getField("i"), i * 1000); c1.acknowledge(data); + KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); + assertNotNull(keyValueSchema.getKeySchema()); + assertNotNull(keyValueSchema.getValueSchema()); } // verify c2 @@ -625,6 +635,9 @@ public class SimpleSchemaTest extends ProducerConsumerBase { assertEquals(data.getValue().getKey().i, i * 100); assertEquals(data.getValue().getValue().i, i * 1000); c2.acknowledge(data); + KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); + assertNotNull(keyValueSchema.getKeySchema()); + assertNotNull(keyValueSchema.getValueSchema()); } // verify c3 @@ -634,6 +647,9 @@ public class SimpleSchemaTest extends ProducerConsumerBase { assertEquals(data.getValue().getKey().getField("i"), i * 100); assertEquals(data.getValue().getValue().i, i * 1000); c3.acknowledge(data); + KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); + assertNotNull(keyValueSchema.getKeySchema()); + assertNotNull(keyValueSchema.getValueSchema()); } // verify c4 @@ -784,12 +800,18 @@ public class SimpleSchemaTest extends ProducerConsumerBase { // verify c0 for (int i = 0; i < numMessages; i++) { Message<GenericRecord> wrapper = c0.receive(); - log.info("schema version {}", BytesSchemaVersion.of(wrapper.getSchemaVersion())); KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject(); assertNotNull(wrapper.getSchemaVersion()); assertEquals(data.getKey().getField("i"), i * 100); assertEquals(data.getValue().getField("i"), i * 1000); c0.acknowledge(wrapper); + KeyValueSchema keyValueSchema = (KeyValueSchema) wrapper.getReaderSchema().get(); + assertNotNull(keyValueSchema.getKeySchema()); + assertNotNull(keyValueSchema.getValueSchema()); + assertTrue(keyValueSchema.getKeySchema().getSchemaInfo().getSchemaDefinition().contains("V1Data")); + assertTrue(keyValueSchema.getValueSchema().getSchemaInfo().getSchemaDefinition().contains("V1Data")); + assertTrue(keyValueSchema.getKeySchema().getNativeSchema().isPresent()); + assertTrue(keyValueSchema.getValueSchema().getNativeSchema().isPresent()); } @@ -813,15 +835,20 @@ public class SimpleSchemaTest extends ProducerConsumerBase { // verify c0 for (int i = 0; i < numMessages; i++) { Message<GenericRecord> wrapper = c0.receive(); - log.info("schema version {}", BytesSchemaVersion.of(wrapper.getSchemaVersion())); KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject(); assertNotNull(wrapper.getSchemaVersion()); assertEquals(data.getKey().getField("i"), i * 100); assertEquals(data.getValue().getField("i"), i * 1000); assertEquals(data.getKey().getField("j"), i); assertEquals(data.getValue().getField("j"), i * 20); + KeyValueSchema keyValueSchema = (KeyValueSchema) wrapper.getReaderSchema().get(); + assertNotNull(keyValueSchema.getKeySchema()); + assertNotNull(keyValueSchema.getValueSchema()); + assertTrue(keyValueSchema.getKeySchema().getSchemaInfo().getSchemaDefinition().contains("V2Data")); + assertTrue(keyValueSchema.getValueSchema().getSchemaInfo().getSchemaDefinition().contains("V2Data")); + assertTrue(keyValueSchema.getKeySchema().getNativeSchema().isPresent()); + assertTrue(keyValueSchema.getValueSchema().getNativeSchema().isPresent()); } - } } } @@ -852,6 +879,46 @@ public class SimpleSchemaTest extends ProducerConsumerBase { Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(1).array()).get().isPresent()); } + @Test + public void testGetNativeSchemaWithAutoConsumeWithMultiVersion() throws Exception { + final String topic = "persistent://my-property/my-ns/testGetSchemaWithMultiVersion"; + + @Cleanup + Consumer<?> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .subscriptionName("test") + .topic(topic) + .subscribe(); + + @Cleanup + Producer<V1Data> v1DataProducer = pulsarClient.newProducer(Schema.AVRO(V1Data.class)) + .topic(topic) + .create(); + + @Cleanup + Producer<V2Data> v2DataProducer = pulsarClient.newProducer(Schema.AVRO(V2Data.class)) + .topic(topic) + .create(); + + Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); + + v1DataProducer.send(new V1Data()); + v2DataProducer.send(new V2Data()); + + Message<?> messageV1 = consumer.receive(); + Schema<?> schemaV1 = messageV1.getReaderSchema().get(); + Message<?> messageV2 = consumer.receive(); + Schema<?> schemaV2 = messageV2.getReaderSchema().get(); + log.info("schemaV1 {} {}", schemaV1.getSchemaInfo(), schemaV1.getNativeSchema()); + log.info("schemaV2 {} {}", schemaV2.getSchemaInfo(), schemaV2.getNativeSchema()); + assertTrue(schemaV1.getSchemaInfo().getSchemaDefinition().contains("V1Data")); + assertTrue(schemaV2.getSchemaInfo().getSchemaDefinition().contains("V2Data")); + org.apache.avro.Schema avroSchemaV1 = (org.apache.avro.Schema) schemaV1.getNativeSchema().get(); + org.apache.avro.Schema avroSchemaV2 = (org.apache.avro.Schema) schemaV2.getNativeSchema().get(); + assertNotEquals(avroSchemaV1.toString(false), avroSchemaV2.toString(false)); + assertTrue(avroSchemaV1.toString(false).contains("V1Data")); + assertTrue(avroSchemaV2.toString(false).contains("V2Data")); + } + @Test(dataProvider = "topicDomain") public void testAutoCreatedSchema(String domain) throws Exception { final String topic1 = domain + "my-property/my-ns/testAutoCreatedSchema-1"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 7672cce..9542a36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -121,10 +122,11 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { (false).withSupportSchemaVersioning(true). withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo()); - admin.schemas().createSchema(fqtnTwo, Schema.AVRO( + Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO( SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull (false).withSupportSchemaVersioning(true). - withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo()); + withPojo(Schemas.PersonTwo.class).build()); + admin.schemas().createSchema(fqtnTwo, personTwoSchema.getSchemaInfo()); Producer<Schemas.PersonTwo> producer = pulsarClient.newProducer(Schema.AVRO( SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull @@ -146,18 +148,144 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .topic(fqtnOne, fqtnTwo) .subscribe(); + Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .subscriptionName("test2") + .topic(fqtnOne, fqtnTwo) + .subscribe(); + producer.send(personTwo); Schemas.PersonTwo personConsume = consumer.receive().getValue(); assertEquals("Tom", personConsume.getName()); assertEquals(1, personConsume.getId()); + Message<Schemas.PersonTwo> message = consumer.receive(); + Schemas.PersonTwo personConsume = message.getValue(); + assertEquals(personConsume.getName(), "Tom"); + assertEquals(personConsume.getId(), 1); + Schema<?> schema = message.getReaderSchema().get(); + log.info("the-schema {}", schema); + assertEquals(personTwoSchema.getSchemaInfo(), schema.getSchemaInfo()); + org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getNativeSchema().get(); + log.info("nativeSchema-schema {}", nativeSchema); + assertNotNull(nativeSchema); + + // verify that with AUTO_CONSUME we can access the original schema + // and the Native AVRO schema + Message<?> message2 = consumer2.receive(); + Schema<?> schema2 = message2.getReaderSchema().get(); + log.info("the-schema {}", schema2); + assertEquals(personTwoSchema.getSchemaInfo(), schema2.getSchemaInfo()); + org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getNativeSchema().get(); + log.info("nativeSchema-schema {}", nativeSchema2); + assertNotNull(nativeSchema2); + + producer.close(); + consumer.close(); + } + + @Test + public void testMultiTopicSetSchemaProviderWithKeyValue() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "test-multi-version-schema-one"; + final String topicTwo = "test-multi-version-schema-two"; + final String fqtnOne = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicOne + ).toString(); + + final String fqtnTwo = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicTwo + ).toString(); + + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME) + ); + + admin.topics().createPartitionedTopic(fqtnOne, 3); + admin.topics().createPartitionedTopic(fqtnTwo, 3); + + Schema<Schemas.PersonOne> schemaOne = Schema.AVRO( + SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonOne.class).build()); + admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, schemaOne).getSchemaInfo()); + + Schema<Schemas.PersonTwo> schemaTwo = Schema.AVRO( + SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build()); + admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, schemaTwo).getSchemaInfo()); + + Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO( + SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build()); + admin.schemas().createSchema(fqtnTwo, Schema.KeyValue(Schema.STRING, schemaTwo).getSchemaInfo()); + + Producer<KeyValue<String, Schemas.PersonTwo>> producer = pulsarClient.newProducer(Schema.KeyValue(Schema.STRING, Schema.AVRO( + SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build()))) + .topic(fqtnOne) + .create(); + + Schemas.PersonTwo personTwo = new Schemas.PersonTwo(); + personTwo.setId(1); + personTwo.setName("Tom"); + + + Consumer<KeyValue<String, Schemas.PersonTwo>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.AVRO( + SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build()))) + .subscriptionName("test") + .topic(fqtnOne, fqtnTwo) + .subscribe(); + + Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .subscriptionName("test2") + .topic(fqtnOne, fqtnTwo) + .subscribe(); + + producer.send(new KeyValue<>("foo", personTwo)); + + Message<KeyValue<String, Schemas.PersonTwo>> message = consumer.receive(); + assertEquals("foo", message.getValue().getKey()); + Schemas.PersonTwo personConsume = message.getValue().getValue(); + assertEquals(personConsume.getName(), "Tom"); + assertEquals(personConsume.getId(), 1); + KeyValueSchema schema = (KeyValueSchema) message.getReaderSchema().get(); + log.info("the-schema {}", schema); + assertEquals(personTwoSchema.getSchemaInfo(), schema.getValueSchema().getSchemaInfo()); + org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get(); + log.info("nativeSchema-schema {}", nativeSchema); + assertNotNull(nativeSchema); + + // verify that with AUTO_CONSUME we can access the original schema + // and the Native AVRO schema + Message<?> message2 = consumer2.receive(); + KeyValueSchema schema2 = (KeyValueSchema) message2.getReaderSchema().get(); + log.info("the-schema {}", schema2); + assertEquals(personTwoSchema.getSchemaInfo(), schema2.getValueSchema().getSchemaInfo()); + org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get(); + log.info("nativeSchema-schema {}", nativeSchema2); + assertNotNull(nativeSchema2); + producer.close(); consumer.close(); } @Test - public void testBytesSchemaDeserialize() throws Exception { + public void testJSONSchemaDeserialize() throws Exception { final String tenant = PUBLIC_TENANT; final String namespace = "test-namespace-" + randomName(16); final String topicName = "test-bytes-schema"; @@ -203,6 +331,12 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { assertEquals(message.getValue().getField("address").getClass(), message1.getValue().getAddress().getClass()); + Schema<?> schema = message.getReaderSchema().get(); + Schema<?> schema1 = message1.getReaderSchema().get(); + log.info("schema {}", schema); + log.info("schema1 {}", schema1); + assertEquals(schema.getSchemaInfo(), schema1.getSchemaInfo()); + producer.close(); consumer.close(); consumer1.close(); @@ -246,12 +380,14 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { producer.send("foo"); Message<String> message = consumer.receive(); - Message<GenericRecord> message3 = consumer2.receive(); + Message<GenericRecord> message2 = consumer2.receive(); + assertEquals(SchemaType.STRING, message.getReaderSchema().get().getSchemaInfo().getType()); + assertEquals(SchemaType.STRING, message2.getReaderSchema().get().getSchemaInfo().getType()); assertEquals("foo", message.getValue()); - assertEquals(message3.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper"); - assertEquals(SchemaType.STRING, message3.getValue().getSchemaType()); - assertEquals("foo", message3.getValue().getNativeObject()); + assertEquals(message2.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper"); + assertEquals(SchemaType.STRING, message2.getValue().getSchemaType()); + assertEquals("foo", message2.getValue().getNativeObject()); producer.close(); consumer.close(); @@ -296,12 +432,22 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { producer.send("foo".getBytes(StandardCharsets.UTF_8)); Message<byte[]> message = consumer.receive(); - Message<GenericRecord> message3 = consumer2.receive(); + Message<GenericRecord> message2 = consumer2.receive(); + if (schema == SchemaType.BYTES) { + assertEquals(schema, message.getReaderSchema().get().getSchemaInfo().getType()); + assertEquals(schema, message2.getReaderSchema().get().getSchemaInfo().getType()); + } else if (schema == SchemaType.NONE) { + // schema NONE is always reported as BYTES + assertEquals(SchemaType.BYTES, message.getReaderSchema().get().getSchemaInfo().getType()); + assertEquals(SchemaType.BYTES, message2.getReaderSchema().get().getSchemaInfo().getType()); + } else { + fail(); + } assertEquals("foo".getBytes(StandardCharsets.UTF_8), message.getValue()); - assertEquals(message3.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper"); - assertEquals(SchemaType.BYTES, message3.getValue().getSchemaType()); - assertEquals("foo".getBytes(StandardCharsets.UTF_8), message3.getValue().getNativeObject()); + assertEquals(message2.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper"); + assertEquals(SchemaType.BYTES, message2.getValue().getSchemaType()); + assertEquals("foo".getBytes(StandardCharsets.UTF_8), message2.getValue().getNativeObject()); producer.close(); consumer.close(); @@ -420,6 +566,12 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { Message<GenericRecord> message2 = consumer2.receive(); assertEquals(message.getValue(), message2.getValue().getNativeObject()); + Schema<?> schema = message.getReaderSchema().get(); + Schema<?> schemaFromGenericRecord = message.getReaderSchema().get(); + KeyValueSchema keyValueSchema = (KeyValueSchema) schema; + KeyValueSchema keyValueSchemaFromGenericRecord = (KeyValueSchema) schemaFromGenericRecord; + assertEquals(keyValueSchema.getSchemaInfo(), keyValueSchemaFromGenericRecord.getSchemaInfo()); + if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) { // with "SEPARATED encoding the routing key is the key of the KeyValue assertNotNull(message.getKeyBytes()); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java index 324c2ba..d41054e 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java @@ -202,6 +202,19 @@ public interface Message<T> { byte[] getSchemaVersion(); /** + * Get the schema associated to the message. + * Please note that this schema is usually equal to the Schema you passed + * during the construction of the Consumer or the Reader. + * But if you are consuming the topic using the GenericObject interface + * this method will return the schema associated with the message. + * @return The schema used to decode the payload of message. + * @see Schema#AUTO_CONSUME() + */ + default Optional<Schema<?>> getReaderSchema() { + return Optional.empty(); + } + + /** * Check whether the message is replicated from other cluster. * * @since 2.4.0 diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java index dc18c4e..c5561bf 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java @@ -19,6 +19,8 @@ package org.apache.pulsar.client.api.schema; import java.io.InputStream; +import java.util.Optional; + import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -86,4 +88,12 @@ public interface SchemaReader<T> { */ default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) { } + + /** + * Returns the underling Schema if possible + * @return the schema, or an empty Optional if it is not possible to access it + */ + default Optional<Object> getNativeSchema() { + return Optional.empty(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index a148677..1eb7d3f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -41,6 +41,7 @@ import java.util.stream.Collectors; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.AbstractSchema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.protocol.Commands; @@ -267,11 +268,38 @@ public class MessageImpl<T> implements Message<T> { } } - public Schema getSchema() { + @Override + public int size() { + if (msgMetadata.isNullValue()) { + return 0; + } + return payload.readableBytes(); + } + + public Schema<T> getSchemaInternal() { return this.schema; } @Override + public Optional<Schema<?>> getReaderSchema() { + ensureSchemaIsLoaded(); + if (schema == null) { + return Optional.empty(); + } + if (schema instanceof AutoConsumeSchema) { + byte[] schemaVersion = getSchemaVersion(); + return Optional.of(((AutoConsumeSchema) schema) + .atSchemaVersion(schemaVersion)); + } else if (schema instanceof AbstractSchema) { + byte[] schemaVersion = getSchemaVersion(); + return Optional.of(((AbstractSchema) schema) + .atSchemaVersion(schemaVersion)); + } else { + return Optional.of(schema); + } + } + + @Override public byte[] getSchemaVersion() { if (msgMetadataBuilder != null && msgMetadataBuilder.hasSchemaVersion()) { return msgMetadataBuilder.getSchemaVersion().toByteArray(); @@ -280,10 +308,13 @@ public class MessageImpl<T> implements Message<T> { } } - private SchemaInfo getSchemaInfo() { + private void ensureSchemaIsLoaded() { if (schema instanceof AutoConsumeSchema) { ((AutoConsumeSchema) schema).fetchSchemaIfNeeded(); } + } + private SchemaInfo getSchemaInfo() { + ensureSchemaIsLoaded(); return schema.getSchemaInfo(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 786af84..96e4042 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -569,7 +569,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) { MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder(); - if (msg.getSchema() == schema) { + if (msg.getSchemaInternal() == schema) { schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(v))); msg.setSchemaState(MessageImpl.SchemaState.Ready); return true; @@ -581,7 +581,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne completeCallbackAndReleaseSemaphore(callback, e); return false; } - SchemaHash schemaHash = SchemaHash.of(msg.getSchema()); + SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); byte[] schemaVersion = schemaCache.get(schemaHash); if (schemaVersion != null) { msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion)); @@ -591,7 +591,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } private boolean rePopulateMessageSchema(MessageImpl msg) { - SchemaHash schemaHash = SchemaHash.of(msg.getSchema()); + SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); byte[] schemaVersion = schemaCache.get(schemaHash); if (schemaVersion == null) { return false; @@ -605,7 +605,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (!changeToRegisteringSchemaState()) { return; } - SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchema()) + SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInternal()) .map(Schema::getSchemaInfo) .filter(si -> si.getType().getValue() > 0) .orElse(Schema.BYTES.getSchemaInfo()); @@ -619,7 +619,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } } else { log.warn("[{}] [{}] GetOrCreateSchema succeed", topic, producerName); - SchemaHash schemaHash = SchemaHash.of(msg.getSchema()); + SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); schemaCache.putIfAbsent(schemaHash, v); msg.getMessageBuilder().setSchemaVersion(ByteString.copyFrom(v)); msg.setSchemaState(MessageImpl.SchemaState.Ready); @@ -1831,4 +1831,4 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class); -} \ No newline at end of file +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 00869f1..784c56b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -173,11 +173,17 @@ public class TopicMessageImpl<T> implements Message<T> { return msg; } - public Schema<T> getSchema() { + public Schema<T> getSchemaInternal() { if (this.msg instanceof MessageImpl) { MessageImpl message = (MessageImpl) this.msg; - return message.getSchema(); + return message.getSchemaInternal(); } return null; } + + @Override + public Optional<Schema<?>> getReaderSchema() { + return msg.getReaderSchema(); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java index 255a491..4328e7b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java @@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; +import java.util.Objects; + public abstract class AbstractSchema<T> implements Schema<T> { /** @@ -66,4 +68,20 @@ public abstract class AbstractSchema<T> implements Schema<T> { public Schema<T> clone() { return this; } + + /** + * Return an instance of this schema at the given version. + * @param schemaVersion the version + * @return the schema at that specific version + * @throws SchemaSerializationException in case of unknown schema version + * @throws NullPointerException in case of null schemaVersion + */ + public Schema<?> atSchemaVersion(byte[] schemaVersion) throws SchemaSerializationException { + Objects.requireNonNull(schemaVersion); + if (!supportSchemaVersioning()) { + return this; + } else { + throw new SchemaSerializationException("Not implemented for " + this.getClass()); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java index 015ec4d..c4444e7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java @@ -20,14 +20,21 @@ package org.apache.pulsar.client.impl.schema; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; - +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.api.schema.SchemaReader; import org.apache.pulsar.client.api.schema.SchemaWriter; +import org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader; +import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + /** * minimal abstract StructSchema */ @@ -80,6 +87,81 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> { if (reader != null) { this.reader.setSchemaInfoProvider(schemaInfoProvider); } + this.schemaInfoProvider = schemaInfoProvider; + } + + @Override + public Schema<T> atSchemaVersion(byte[] schemaVersion) throws SchemaSerializationException { + Objects.requireNonNull(schemaVersion); + if (schemaInfoProvider == null) { + // this schema is not downloaded from the registry + return this; + } + try { + SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion).get(); + if (schemaInfo == null) { + throw new SchemaSerializationException("Unknown version "+ BytesSchemaVersion.of(schemaVersion)); + } + return getAbstractStructSchemaAtVersion(schemaVersion, schemaInfo); + } catch (ExecutionException err) { + throw new SchemaSerializationException(err.getCause()); + } catch (InterruptedException err) { + Thread.currentThread().interrupt(); + throw new SchemaSerializationException(err); + } + } + + private static class WrappedVersionedSchema<T> extends AbstractStructSchema<T> { + private final byte[] schemaVersion; + private final AbstractStructSchema<T> parent; + public WrappedVersionedSchema(SchemaInfo schemaInfo, final byte[] schemaVersion, + AbstractStructSchema<T> parent) { + super(schemaInfo); + this.schemaVersion = schemaVersion; + this.writer = null; + this.reader = parent.reader; + this.schemaInfoProvider = parent.schemaInfoProvider; + this.parent = parent; + } + + @Override + public boolean requireFetchingSchemaInfo() { + return true; + } + + @Override + public T decode(byte[] bytes) { + return decode(bytes, schemaVersion); + } + + @Override + public T decode(ByteBuf byteBuf) { + return decode(byteBuf, schemaVersion); + } + + @Override + public byte[] encode(T message) { + throw new UnsupportedOperationException("This schema is not meant to be used for encoding"); + } + + @Override + public Optional<Object> getNativeSchema() { + if (reader instanceof AbstractMultiVersionReader) { + AbstractMultiVersionReader abstractMultiVersionReader = (AbstractMultiVersionReader) reader; + try { + SchemaReader schemaReader = abstractMultiVersionReader.getSchemaReader(schemaVersion); + return schemaReader.getNativeSchema(); + } catch (ExecutionException err) { + throw new RuntimeException(err.getCause()); + } + } else { + return Optional.empty(); + } + } + } + + private AbstractStructSchema<T> getAbstractStructSchemaAtVersion(byte[] schemaVersion, SchemaInfo schemaInfo) { + return new WrappedVersionedSchema<>(schemaInfo, schemaVersion, this); } protected void setWriter(SchemaWriter<T> writer) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 3ec21a2..160a3a7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -78,6 +78,16 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { return schema == null || schema.supportSchemaVersioning(); } + public Schema<?> atSchemaVersion(byte[] schemaVersion) { + fetchSchemaIfNeeded(); + ensureSchemaInitialized(); + if (schema.supportSchemaVersioning() && schema instanceof AbstractSchema) { + return ((AbstractSchema) schema).atSchemaVersion(schemaVersion); + } else { + return schema; + } + } + @Override public GenericRecord decode(byte[] bytes, byte[] schemaVersion) { fetchSchemaIfNeeded(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java index 4f7f921..f572bbf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java @@ -21,6 +21,9 @@ package org.apache.pulsar.client.impl.schema; import static com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.CompletableFuture; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; @@ -35,7 +38,7 @@ import org.apache.pulsar.common.schema.SchemaType; * [Key, Value] pair schema definition */ @Slf4j -public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> { +public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> { @Getter private final Schema<K> keySchema; @@ -139,18 +142,29 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> { } } + @Override public KeyValue<K, V> decode(byte[] bytes) { return decode(bytes, null); } + @Override public KeyValue<K, V> decode(byte[] bytes, byte[] schemaVersion) { if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) { throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type"); } - return KeyValue.decode(bytes, (keyBytes, valueBytes) -> decode(keyBytes, valueBytes, schemaVersion)); } + @Override + public KeyValue<K, V> decode(ByteBuf byteBuf) { + return decode(ByteBufUtil.getBytes(byteBuf)); + } + + @Override + public KeyValue<K, V> decode(ByteBuf byteBuf, byte[] schemaVersion) { + return decode(ByteBufUtil.getBytes(byteBuf), schemaVersion); + } + public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) { K k; if (keyBytes == null) { @@ -214,7 +228,6 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> { this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo( keySchema, valueSchema, keyValueEncodingType ); - this.keySchema.setSchemaInfoProvider(new SchemaInfoProvider() { @Override public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) { @@ -253,4 +266,21 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> { } }); } + + @Override + public String toString() { + return "KeyValueSchema(" + keyValueEncodingType + "," + keySchema + "," + valueSchema + ")"; + } + + @Override + public Schema<?> atSchemaVersion(byte[] schemaVersion) throws SchemaSerializationException { + if (!supportSchemaVersioning()) { + return this; + } else { + Schema<?> keySchema = this.keySchema instanceof AbstractSchema ? ((AbstractSchema) this.keySchema).atSchemaVersion(schemaVersion) : this.keySchema; + Schema<?> valueSchema = this.valueSchema instanceof AbstractSchema ? ((AbstractSchema) this.valueSchema).atSchemaVersion(schemaVersion) : this.valueSchema; + return KeyValueSchema.of(keySchema, valueSchema, keyValueEncodingType); + } + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java index 22c63a94..d9a7384 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java @@ -36,6 +36,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; @@ -117,5 +118,10 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> { return offset; } + @Override + public Optional<Object> getNativeSchema() { + return Optional.of(schema); + } + private static final Logger log = LoggerFactory.getLogger(GenericAvroReader.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java index eccc482..c41767c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public class GenericProtobufNativeReader implements SchemaReader<GenericRecord> { @@ -79,5 +80,10 @@ public class GenericProtobufNativeReader implements SchemaReader<GenericRecord> } } + @Override + public Optional<Object> getNativeSchema() { + return Optional.of(descriptor); + } + private static final Logger log = LoggerFactory.getLogger(GenericProtobufNativeReader.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java index 59e748c..e7d963c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema; import org.apache.pulsar.common.schema.SchemaInfo; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java index ea0df25..f1a775d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java @@ -71,7 +71,7 @@ public abstract class AbstractMultiVersionReader<T> implements SchemaReader<T> { public T read(InputStream inputStream, byte[] schemaVersion) { try { return schemaVersion == null ? read(inputStream) : - readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(inputStream); + getSchemaReader(schemaVersion).read(inputStream); } catch (ExecutionException e) { LOG.error("Can't get generic schema for topic {} schema version {}", schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e); @@ -79,11 +79,15 @@ public abstract class AbstractMultiVersionReader<T> implements SchemaReader<T> { } } + public SchemaReader<T> getSchemaReader(byte[] schemaVersion) throws ExecutionException { + return readerCache.get(BytesSchemaVersion.of(schemaVersion)); + } + @Override public T read(byte[] bytes, byte[] schemaVersion) { try { return schemaVersion == null ? read(bytes) : - readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes); + getSchemaReader(schemaVersion).read(bytes); } catch (ExecutionException | AvroTypeException e) { if (e instanceof AvroTypeException) { throw new SchemaSerializationException(e); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java index e1b58a4..63885ba 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java @@ -32,18 +32,22 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; +import java.util.Optional; public class AvroReader<T> implements SchemaReader<T> { private ReflectDatumReader<T> reader; private static final ThreadLocal<BinaryDecoder> decoders = new ThreadLocal<>(); + private final Schema schema; public AvroReader(Schema schema) { this.reader = new ReflectDatumReader<>(schema); + this.schema = schema; } public AvroReader(Schema schema, ClassLoader classLoader, boolean jsr310ConversionEnabled) { + this.schema = schema; if (classLoader != null) { ReflectData reflectData = new ReflectData(classLoader); AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled); @@ -55,6 +59,7 @@ public class AvroReader<T> implements SchemaReader<T> { public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader, boolean jsr310ConversionEnabled) { + this.schema = readerSchema; if (classLoader != null) { ReflectData reflectData = new ReflectData(classLoader); AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled); @@ -98,6 +103,11 @@ public class AvroReader<T> implements SchemaReader<T> { } } + @Override + public Optional<Object> getNativeSchema() { + return Optional.of(schema); + } + private static final Logger log = LoggerFactory.getLogger(AvroReader.class); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index e1ec9f6..6f8d326 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -132,10 +132,10 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> Schema<T> schema = null; if (message instanceof MessageImpl) { MessageImpl impl = (MessageImpl) message; - schema = impl.getSchema(); + schema = impl.getSchemaInternal(); } else if (message instanceof TopicMessageImpl) { TopicMessageImpl impl = (TopicMessageImpl) message; - schema = impl.getSchema(); + schema = impl.getSchemaInternal(); } Record<T> record = PulsarRecord.<T>builder() .message(message) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index be39508..65474ff 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -40,7 +40,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -286,9 +285,9 @@ public class PulsarSourceTest { Consumer consumer = mock(Consumer.class); MessageImpl messageImpl = mock(MessageImpl.class); Schema schema = mock(Schema.class); - when(messageImpl.getSchema()).thenReturn(schema); + when(messageImpl.getSchemaInternal()).thenReturn(schema); pulsarSource.received(consumer, (Message) messageImpl); - verify(messageImpl.getSchema(), times(1)); + verify(messageImpl.getSchemaInternal(), times(1)); Record<GenericRecord> pushed = pulsarSource.read(); assertSame(pushed.getSchema(), schema); }