This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 42b598a [Schema] Introduce Schema.AUTO to detect schema automatically for consumers and readers (#2494) 42b598a is described below commit 42b598a17b64d76428d48438cfb169cf3a1fcd5d Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Sat Sep 1 00:07:12 2018 -0700 [Schema] Introduce Schema.AUTO to detect schema automatically for consumers and readers (#2494) ### Motivation Sometimes consumers and readers don't have the pojo for the message schemas. so we can use `Schema.Auto` to detect the schema and reflect the messages into generic records. ### Changes - Introduce AutoSchema and `SchemaType.AUTO` - fetch schema information first when `AutoSchema` is used --- .../api/SimpleTypedProducerConsumerTest.java | 95 ++++++++++++++++++++++ .../java/org/apache/pulsar/client/api/Schema.java | 6 ++ .../pulsar/client/impl/schema/AutoSchema.java | 63 ++++++++++++++ .../pulsar/client/schema/AvroSchemaTest.java | 21 ++++- .../pulsar/client/impl/PulsarClientImpl.java | 52 ++++++++++++ .../apache/pulsar/common/schema/SchemaType.java | 7 +- 6 files changed, 242 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index b880953..24573d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.schema.SchemaRegistry; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.ProtobufSchema; @@ -440,4 +441,98 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase { } } + @Test + public void testAvroProducerAndAutoSchemaConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + AvroSchema<AvroEncodedPojo> avroSchema = + AvroSchema.of(AvroEncodedPojo.class); + + Producer<AvroEncodedPojo> producer = pulsarClient + .newProducer(avroSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(new AvroEncodedPojo(message)); + } + + Consumer<GenericRecord> consumer = pulsarClient + .newConsumer(Schema.AUTO()) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message<GenericRecord> msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + GenericRecord receivedMessage = msg.getValue(); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + String actualMessage = (String) receivedMessage.getField("message"); + testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + + } + + @Test + public void testAvroProducerAndAutoSchemaReader() throws Exception { + log.info("-- Starting {} test --", methodName); + + AvroSchema<AvroEncodedPojo> avroSchema = + AvroSchema.of(AvroEncodedPojo.class); + + Producer<AvroEncodedPojo> producer = pulsarClient + .newProducer(avroSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(new AvroEncodedPojo(message)); + } + + Reader<GenericRecord> reader = pulsarClient + .newReader(Schema.AUTO()) + .topic("persistent://my-property/use/my-ns/my-topic1") + .startMessageId(MessageId.earliest) + .create(); + + Message<GenericRecord> msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = reader.readNext(); + GenericRecord receivedMessage = msg.getValue(); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + String actualMessage = (String) receivedMessage.getField("message"); + testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + reader.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + + } + } diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java index 3088a4d..113f26c 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.BytesSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; @@ -77,4 +79,8 @@ public interface Schema<T> { static <T> Schema<T> JSON(Class<T> clazz) { return JSONSchema.of(clazz); } + + static Schema<GenericRecord> AUTO() { + return new AutoSchema(); + } } diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java new file mode 100644 index 0000000..5bf92b7 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.SchemaInfo; + +/** + * Auto detect schema. + */ +public class AutoSchema implements Schema<GenericRecord> { + + private Schema<GenericRecord> schema; + + public void setSchema(Schema<GenericRecord> schema) { + this.schema = schema; + } + + private void ensureSchemaInitialized() { + checkState(null != schema, "Schema is not initialized before used"); + } + + @Override + public byte[] encode(GenericRecord message) { + ensureSchemaInitialized(); + + return schema.encode(message); + } + + @Override + public GenericRecord decode(byte[] bytes) { + ensureSchemaInitialized(); + + return schema.decode(bytes); + } + + @Override + public SchemaInfo getSchemaInfo() { + ensureSchemaInitialized(); + + return schema.getSchemaInfo(); + } +} diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java index cfe9cb7..e5fcc3c 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java @@ -27,6 +27,7 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; @@ -115,11 +116,29 @@ public class AvroSchemaTest { @Test public void testEncodeAndDecodeGenericRecord() { AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null); + GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo()); + + log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema()); + + testGenericSchema(avroSchema, genericAvroSchema); + } + + @Test + public void testAutoSchema() { + AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null); GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo()); log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema()); + AutoSchema schema = new AutoSchema(); + schema.setSchema(genericAvroSchema); + + testGenericSchema(avroSchema, schema); + } + + private void testGenericSchema(AvroSchema<Foo> avroSchema, + org.apache.pulsar.client.api.Schema<GenericRecord> genericRecordSchema) { int numRecords = 10; for (int i = 0; i < numRecords; i++) { Foo foo = new Foo(); @@ -132,7 +151,7 @@ public class AvroSchemaTest { byte[] data = avroSchema.encode(foo); - GenericRecord record = genericAvroSchema.decode(data); + GenericRecord record = genericRecordSchema.decode(data); Object field1 = record.getField("field1"); assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass()); Object field2 = record.getField("field2"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e156e10..e82cf6c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.lang3.StringUtils.isBlank; import com.google.common.annotations.VisibleForTesting; @@ -61,12 +62,15 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.client.impl.schema.AutoSchema; +import org.apache.pulsar.client.impl.schema.GenericAvroSchema; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; @@ -243,6 +247,11 @@ public class PulsarClientImpl implements PulsarClient { new PulsarClientException.InvalidConfigurationException("Producer configuration undefined")); } + if (schema instanceof AutoSchema) { + return FutureUtil.failedFuture( + new PulsarClientException.InvalidConfigurationException("AutoSchema is only used by consumers to detect schemas automatically")); + } + if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed : state = " + state.get())); } @@ -377,6 +386,29 @@ public class PulsarClientImpl implements PulsarClient { } private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) { + if (schema instanceof AutoSchema) { + AutoSchema autoSchema = (AutoSchema) schema; + return lookup.getSchema(TopicName.get(conf.getSingleTopic())) + .thenCompose(schemaInfoOptional -> { + if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) { + GenericAvroSchema genericAvroSchema = new GenericAvroSchema(schemaInfoOptional.get()); + log.info("Auto detected schema for topic {} : {}", + conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8)); + autoSchema.setSchema(genericAvroSchema); + return doSingleTopicSubscribeAsync(conf, schema); + } else { + return FutureUtil.failedFuture( + new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas")); + } + }); + } else { + return doSingleTopicSubscribeAsync(conf, schema); + } + } + + + + private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) { CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); String topic = conf.getSingleTopic(); @@ -505,6 +537,26 @@ public class PulsarClientImpl implements PulsarClient { } public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) { + if (schema instanceof AutoSchema) { + AutoSchema autoSchema = (AutoSchema) schema; + return lookup.getSchema(TopicName.get(conf.getTopicName())) + .thenCompose(schemaInfoOptional -> { + if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) { + GenericAvroSchema genericAvroSchema = new GenericAvroSchema(schemaInfoOptional.get()); + log.info("Auto detected schema for topic {} : {}", + conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8)); + autoSchema.setSchema(genericAvroSchema); + return doCreateReaderAsync(conf, schema); + } else { + return FutureUtil.failedFuture( + new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas")); + } + }); + } else { + return doCreateReaderAsync(conf, schema); + } + } + <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) { if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java index cbf7c91..88adb53 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java @@ -45,5 +45,10 @@ public enum SchemaType { /** * Serialize and deserialize via avro */ - AVRO + AVRO, + + /** + * Auto Detect Schema Type. + */ + AUTO }