This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 42b598a  [Schema] Introduce Schema.AUTO to detect schema automatically 
for consumers and readers (#2494)
42b598a is described below

commit 42b598a17b64d76428d48438cfb169cf3a1fcd5d
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Sat Sep 1 00:07:12 2018 -0700

    [Schema] Introduce Schema.AUTO to detect schema automatically for consumers 
and readers (#2494)
    
    ### Motivation
    
    Sometimes consumers and readers don't have the pojo for the message 
schemas. so we can use `Schema.Auto` to detect
    the schema and reflect the messages into generic records.
    
     ### Changes
    
    - Introduce AutoSchema and `SchemaType.AUTO`
    - fetch schema information first when `AutoSchema` is used
---
 .../api/SimpleTypedProducerConsumerTest.java       | 95 ++++++++++++++++++++++
 .../java/org/apache/pulsar/client/api/Schema.java  |  6 ++
 .../pulsar/client/impl/schema/AutoSchema.java      | 63 ++++++++++++++
 .../pulsar/client/schema/AvroSchemaTest.java       | 21 ++++-
 .../pulsar/client/impl/PulsarClientImpl.java       | 52 ++++++++++++
 .../apache/pulsar/common/schema/SchemaType.java    |  7 +-
 6 files changed, 242 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index b880953..24573d5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
@@ -440,4 +441,98 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testAvroProducerAndAutoSchemaConsumer() throws Exception {
+       log.info("-- Starting {} test --", methodName);
+
+       AvroSchema<AvroEncodedPojo> avroSchema =
+           AvroSchema.of(AvroEncodedPojo.class);
+
+       Producer<AvroEncodedPojo> producer = pulsarClient
+           .newProducer(avroSchema)
+           .topic("persistent://my-property/use/my-ns/my-topic1")
+           .create();
+
+       for (int i = 0; i < 10; i++) {
+           String message = "my-message-" + i;
+           producer.send(new AvroEncodedPojo(message));
+       }
+
+       Consumer<GenericRecord> consumer = pulsarClient
+           .newConsumer(Schema.AUTO())
+           .topic("persistent://my-property/use/my-ns/my-topic1")
+           .subscriptionName("my-subscriber-name")
+           .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+           .subscribe();
+
+       Message<GenericRecord> msg = null;
+       Set<String> messageSet = Sets.newHashSet();
+       for (int i = 0; i < 10; i++) {
+           msg = consumer.receive(5, TimeUnit.SECONDS);
+           GenericRecord receivedMessage = msg.getValue();
+           log.debug("Received message: [{}]", receivedMessage);
+           String expectedMessage = "my-message-" + i;
+           String actualMessage = (String) receivedMessage.getField("message");
+           testMessageOrderAndDuplicates(messageSet, actualMessage, 
expectedMessage);
+       }
+       // Acknowledge the consumption of all messages at once
+       consumer.acknowledgeCumulative(msg);
+       consumer.close();
+
+       SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
+           .getSchema("my-property/my-ns/my-topic1")
+           .get();
+
+       Assert.assertEquals(storedSchema.schema.getData(), 
avroSchema.getSchemaInfo().getSchema());
+
+       log.info("-- Exiting {} test --", methodName);
+
+   }
+
+   @Test
+    public void testAvroProducerAndAutoSchemaReader() throws Exception {
+       log.info("-- Starting {} test --", methodName);
+
+       AvroSchema<AvroEncodedPojo> avroSchema =
+           AvroSchema.of(AvroEncodedPojo.class);
+
+       Producer<AvroEncodedPojo> producer = pulsarClient
+           .newProducer(avroSchema)
+           .topic("persistent://my-property/use/my-ns/my-topic1")
+           .create();
+
+       for (int i = 0; i < 10; i++) {
+           String message = "my-message-" + i;
+           producer.send(new AvroEncodedPojo(message));
+       }
+
+       Reader<GenericRecord> reader = pulsarClient
+           .newReader(Schema.AUTO())
+           .topic("persistent://my-property/use/my-ns/my-topic1")
+           .startMessageId(MessageId.earliest)
+           .create();
+
+       Message<GenericRecord> msg = null;
+       Set<String> messageSet = Sets.newHashSet();
+       for (int i = 0; i < 10; i++) {
+           msg = reader.readNext();
+           GenericRecord receivedMessage = msg.getValue();
+           log.debug("Received message: [{}]", receivedMessage);
+           String expectedMessage = "my-message-" + i;
+           String actualMessage = (String) receivedMessage.getField("message");
+           testMessageOrderAndDuplicates(messageSet, actualMessage, 
expectedMessage);
+       }
+       // Acknowledge the consumption of all messages at once
+       reader.close();
+
+       SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
+           .getSchema("my-property/my-ns/my-topic1")
+           .get();
+
+       Assert.assertEquals(storedSchema.schema.getData(), 
avroSchema.getSchemaInfo().getSchema());
+
+       log.info("-- Exiting {} test --", methodName);
+
+   }
+
 }
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 3088a4d..113f26c 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.BytesSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -77,4 +79,8 @@ public interface Schema<T> {
     static <T> Schema<T> JSON(Class<T> clazz) {
         return JSONSchema.of(clazz);
     }
+
+    static Schema<GenericRecord> AUTO() {
+        return new AutoSchema();
+    }
 }
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
new file mode 100644
index 0000000..5bf92b7
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Auto detect schema.
+ */
+public class AutoSchema implements Schema<GenericRecord> {
+
+    private Schema<GenericRecord> schema;
+
+    public void setSchema(Schema<GenericRecord> schema) {
+        this.schema = schema;
+    }
+
+    private void ensureSchemaInitialized() {
+        checkState(null != schema, "Schema is not initialized before used");
+    }
+
+    @Override
+    public byte[] encode(GenericRecord message) {
+        ensureSchemaInitialized();
+
+        return schema.encode(message);
+    }
+
+    @Override
+    public GenericRecord decode(byte[] bytes) {
+        ensureSchemaInitialized();
+
+        return schema.decode(bytes);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        ensureSchemaInitialized();
+
+        return schema.getSchemaInfo();
+    }
+}
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
index cfe9cb7..e5fcc3c 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
@@ -27,6 +27,7 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -115,11 +116,29 @@ public class AvroSchemaTest {
     @Test
     public void testEncodeAndDecodeGenericRecord() {
         AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
+        GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(avroSchema.getSchemaInfo());
+
+        log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
+
+        testGenericSchema(avroSchema, genericAvroSchema);
+    }
+
+    @Test
+    public void testAutoSchema() {
+        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
 
         GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(avroSchema.getSchemaInfo());
 
         log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
 
+        AutoSchema schema = new AutoSchema();
+        schema.setSchema(genericAvroSchema);
+
+        testGenericSchema(avroSchema, schema);
+    }
+
+    private void testGenericSchema(AvroSchema<Foo> avroSchema,
+                                   
org.apache.pulsar.client.api.Schema<GenericRecord> genericRecordSchema) {
         int numRecords = 10;
         for (int i = 0; i < numRecords; i++) {
             Foo foo = new Foo();
@@ -132,7 +151,7 @@ public class AvroSchemaTest {
 
             byte[] data = avroSchema.encode(foo);
 
-            GenericRecord record = genericAvroSchema.decode(data);
+            GenericRecord record = genericRecordSchema.decode(data);
             Object field1 = record.getField("field1");
             assertEquals("field-1-" + i, field1, "Field 1 is " + 
field1.getClass());
             Object field2 = record.getField("field2");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e156e10..e82cf6c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -61,12 +62,15 @@ import 
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
@@ -243,6 +247,11 @@ public class PulsarClientImpl implements PulsarClient {
                     new 
PulsarClientException.InvalidConfigurationException("Producer configuration 
undefined"));
         }
 
+        if (schema instanceof AutoSchema) {
+            return FutureUtil.failedFuture(
+                    new 
PulsarClientException.InvalidConfigurationException("AutoSchema is only used by 
consumers to detect schemas automatically"));
+        }
+
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed : state = " 
+ state.get()));
         }
@@ -377,6 +386,29 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private <T> CompletableFuture<Consumer<T>> 
singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+        if (schema instanceof AutoSchema) {
+            AutoSchema autoSchema = (AutoSchema) schema;
+            return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
+                    .thenCompose(schemaInfoOptional -> {
+                        if (schemaInfoOptional.isPresent() && 
schemaInfoOptional.get().getType() == SchemaType.AVRO) {
+                            GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(schemaInfoOptional.get());
+                            log.info("Auto detected schema for topic {} : {}",
+                                conf.getSingleTopic(), new 
String(schemaInfoOptional.get().getSchema(), UTF_8));
+                            autoSchema.setSchema(genericAvroSchema);
+                            return doSingleTopicSubscribeAsync(conf, schema);
+                        } else {
+                            return FutureUtil.failedFuture(
+                                new 
PulsarClientException.LookupException("Currently schema detection only works 
for topics with avro schemas"));
+                        }
+                    });
+        } else {
+            return doSingleTopicSubscribeAsync(conf, schema);
+        }
+    }
+
+
+
+    private <T> CompletableFuture<Consumer<T>> 
doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> 
schema) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         String topic = conf.getSingleTopic();
@@ -505,6 +537,26 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public <T> CompletableFuture<Reader<T>> 
createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
+        if (schema instanceof AutoSchema) {
+            AutoSchema autoSchema = (AutoSchema) schema;
+            return lookup.getSchema(TopicName.get(conf.getTopicName()))
+                    .thenCompose(schemaInfoOptional -> {
+                        if (schemaInfoOptional.isPresent() && 
schemaInfoOptional.get().getType() == SchemaType.AVRO) {
+                            GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(schemaInfoOptional.get());
+                            log.info("Auto detected schema for topic {} : {}",
+                                conf.getTopicName(), new 
String(schemaInfoOptional.get().getSchema(), UTF_8));
+                            autoSchema.setSchema(genericAvroSchema);
+                            return doCreateReaderAsync(conf, schema);
+                        } else {
+                            return FutureUtil.failedFuture(
+                                new 
PulsarClientException.LookupException("Currently schema detection only works 
for topics with avro schemas"));
+                        }
+                    });
+        } else {
+            return doCreateReaderAsync(conf, schema);
+        }
+    }
+    <T> CompletableFuture<Reader<T>> 
doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed"));
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index cbf7c91..88adb53 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -45,5 +45,10 @@ public enum SchemaType {
     /**
      * Serialize and deserialize via avro
      */
-    AVRO
+    AVRO,
+
+    /**
+     * Auto Detect Schema Type.
+     */
+    AUTO
 }

Reply via email to