This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new db89cd4  Json schema codec (#1551)
db89cd4 is described below

commit db89cd460c25492bb6812f31a6e28265c33b192d
Author: Dave Rusek <dave.ru...@gmail.com>
AuthorDate: Fri Apr 27 12:45:30 2018 -0600

    Json schema codec (#1551)
    
    * Add JSON schema support
    
    * Use different json schema library
    
    * Add license headers to new files
    
    * Revert "Use different json schema library"
    
    This reverts commit 405b918ed9ccebe947916b7c87192b0cef3e910a.
    
    * Changes related to review input
    
    * Upgrade jackson schema module
    
    * Check schema compatibility on subscribe and publish
    
    * Add missing license headers:
    
    * Ensure schema is stored
    
    * Add negative tests on publish/consume with wrong schema
    
    * Add missing license header
    
    * Release ALL protobuf references
    
    * Remove SecureRandom, it's unneeded and appears to timeout
    
    * GRRRRR license headers
---
 pom.xml                                            |   6 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  11 +
 .../apache/pulsar/broker/service/ServerCnx.java    |  32 ++-
 .../org/apache/pulsar/broker/service/Topic.java    |   2 +
 .../service/nonpersistent/NonPersistentTopic.java  |   9 +
 .../broker/service/persistent/PersistentTopic.java |   9 +
 .../schema/DefaultSchemaRegistryService.java       |   5 +
 .../schema/JsonSchemaCompatibilityCheck.java       |  46 ++++
 .../service/schema/SchemaCompatibilityCheck.java   |  28 +--
 .../broker/service/schema/SchemaRegistry.java      |   2 +
 .../service/schema/SchemaRegistryService.java      |  22 +-
 .../service/schema/SchemaRegistryServiceImpl.java  |  52 +++--
 .../broker/service/schema/SchemaServiceTest.java   |   3 +-
 .../pulsar/client/api/ProducerConsumerBase.java    |   4 +-
 .../api/SimpleTypedProducerConsumerTest.java       | 239 +++++++++++++++++++++
 pulsar-client/pom.xml                              |   5 +
 .../apache/pulsar/client/api/MessageBuilder.java   |   2 +-
 .../java/org/apache/pulsar/client/api/Schema.java  |   2 +-
 ...hema.java => SchemaSerializationException.java} |  28 +--
 .../pulsar/client/impl/MessageBuilderImpl.java     |   3 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |   1 +
 .../apache/pulsar/client/impl/ProducerBase.java    |   8 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |   3 +-
 .../pulsar/client/impl/TopicMessageImpl.java       |   1 +
 .../pulsar/client/impl/schema/JSONSchema.java      |  84 ++++++++
 .../apache/pulsar/client/tutorial/JsonPojo.java}   |  35 ++-
 .../tutorial/SampleAsyncProducerWithSchema.java    |  69 ++++++
 .../client/tutorial/SampleConsumerWithSchema.java  |  49 +++++
 .../org/apache/pulsar/common/api/Commands.java     |  17 +-
 29 files changed, 682 insertions(+), 95 deletions(-)

diff --git a/pom.xml b/pom.xml
index 4ac9294..af48238 100644
--- a/pom.xml
+++ b/pom.xml
@@ -534,6 +534,12 @@ flexible messaging model and an intuitive client 
API.</description>
       </dependency>
 
       <dependency>
+        <groupId>com.fasterxml.jackson.module</groupId>
+        <artifactId>jackson-module-jsonSchema</artifactId>
+        <version>2.9.0</version>
+      </dependency>
+
+      <dependency>
         <artifactId>log4j</artifactId>
         <groupId>log4j</groupId>
         <version>1.2.17</version>
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1002714..b35b440 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -438,6 +438,9 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private boolean preferLaterVersions = false;
 
     private String schemaRegistryStorageClassName = 
"org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+    private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
+        "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck"
+    );
 
     /**** --- WebSocket --- ****/
     // Number of IO threads in Pulsar Client used in WebSocket proxy
@@ -1506,6 +1509,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         schemaRegistryStorageClassName = className;
     }
 
+    public Set<String> getSchemaRegistryCompatibilityCheckers() {
+        return schemaRegistryCompatibilityCheckers;
+    }
+
+    public void setSchemaRegistryCompatibilityCheckers(Set<String> 
schemaRegistryCompatibilityCheckers) {
+        this.schemaRegistryCompatibilityCheckers = 
schemaRegistryCompatibilityCheckers;
+    }
+
     public boolean authenticateOriginalAuthData() {
         return authenticateOriginalAuthData;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d3488e8..d5a420f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -25,6 +25,7 @@ import static 
org.apache.pulsar.broker.lookup.v1.TopicLookup.lookupTopicAsync;
 import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
+import com.google.common.base.Strings;
 import com.google.protobuf.GeneratedMessageLite;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
@@ -91,6 +92,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -561,6 +563,7 @@ public class ServerCnx extends PulsarHandler {
         final boolean readCompacted = subscribe.getReadCompacted();
         final Map<String, String> metadata = 
CommandUtils.metadataFromCommand(subscribe);
         final InitialPosition initialPosition = subscribe.getInitialPosition();
+        final SchemaData schema = subscribe.hasSchema() ? 
getSchema(subscribe.getSchema()) : null;
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
@@ -622,9 +625,25 @@ public class ServerCnx extends PulsarHandler {
                         }
 
                         service.getOrCreateTopic(topicName.toString())
-                                .thenCompose(topic -> 
topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
-                                                                      subType, 
priorityLevel, consumerName, isDurable,
-                                                                      
startMessageId, metadata, readCompacted, initialPosition))
+                                .thenCompose(topic -> {
+                                    if (schema != null) {
+                                        return 
topic.isSchemaCompatible(schema).thenCompose(isCompatible -> {
+                                            if (isCompatible) {
+                                                return 
topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
+                                                    subType, priorityLevel, 
consumerName, isDurable,
+                                                    startMessageId, metadata, 
readCompacted, initialPosition);
+                                            } else {
+                                                return 
FutureUtil.failedFuture(new BrokerServiceException(
+                                                    "Trying to subscribe with 
incompatible schema"
+                                                ));
+                                            }
+                                        });
+                                    } else {
+                                        return topic.subscribe(ServerCnx.this, 
subscriptionName, consumerId,
+                                            subType, priorityLevel, 
consumerName, isDurable,
+                                            startMessageId, metadata, 
readCompacted, initialPosition);
+                                    }
+                                })
                                 .thenAccept(consumer -> {
                                     if (consumerFuture.complete(consumer)) {
                                         log.info("[{}] Created subscription on 
topic {} / {}", remoteAddress, topicName,
@@ -721,7 +740,7 @@ public class ServerCnx extends PulsarHandler {
             .data(protocolSchema.getSchemaData().toByteArray())
             .isDeleted(false)
             .timestamp(System.currentTimeMillis())
-            .user(originalPrincipal)
+            .user(Strings.nullToEmpty(originalPrincipal))
             .type(getType(protocolSchema.getType()))
             .props(protocolSchema.getPropertiesList().stream().collect(
                 Collectors.toMap(
@@ -741,6 +760,7 @@ public class ServerCnx extends PulsarHandler {
                 : service.generateUniqueProducerName();
         final boolean isEncrypted = cmdProducer.getEncrypted();
         final Map<String, String> metadata = 
CommandUtils.metadataFromCommand(cmdProducer);
+        final SchemaData schema = cmdProducer.hasSchema() ? 
getSchema(cmdProducer.getSchema()) : null;
 
         TopicName topicName = validateTopicName(cmdProducer.getTopic(), 
requestId, cmdProducer);
         if (topicName == null) {
@@ -841,8 +861,8 @@ public class ServerCnx extends PulsarHandler {
                             disableTcpNoDelayIfNeeded(topicName.toString(), 
producerName);
 
                             CompletableFuture<SchemaVersion> 
schemaVersionFuture;
-                            if (cmdProducer.hasSchema()) {
-                                schemaVersionFuture = 
topic.addSchema(getSchema(cmdProducer.getSchema()));
+                            if (schema != null) {
+                                schemaVersionFuture = topic.addSchema(schema);
                             } else {
                                 schemaVersionFuture = 
CompletableFuture.completedFuture(SchemaVersion.Empty);
                             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 1f149f6..fceb858 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -130,4 +130,6 @@ public interface Topic {
     Position getLastMessageId();
 
     CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
+
+    CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 1cc9403..7901975 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -994,4 +994,13 @@ public class NonPersistentTopic implements Topic {
             .getSchemaRegistryService()
             .putSchemaIfAbsent(id, schema);
     }
+
+    @Override
+    public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+            .getSchemaRegistryService()
+            .isCompatibleWithLatestVersion(id, schema);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 66ff7dd..0240ffd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1713,4 +1713,13 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
             .getSchemaRegistryService()
             .putSchemaIfAbsent(id, schema);
     }
+
+    @Override
+    public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+            .getSchemaRegistryService()
+            .isCompatibleWithLatestVersion(id, schema);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
index db3b9f7..fef288c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -51,6 +51,11 @@ public class DefaultSchemaRegistryService implements 
SchemaRegistryService {
     }
 
     @Override
+    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String 
schemaId, SchemaData schema) {
+        return CompletableFuture.completedFuture(true);
+    }
+
+    @Override
     public void close() throws Exception {
 
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
new file mode 100644
index 0000000..fa83c8a
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import java.io.IOException;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+
+@SuppressWarnings("unused")
+public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Override
+    public SchemaType getSchemaType() {
+        return SchemaType.JSON;
+    }
+
+    @Override
+    public boolean isCompatible(SchemaData from, SchemaData to) {
+        try {
+            JsonSchema fromSchema = objectMapper.readValue(from.getData(), 
JsonSchema.class);
+            JsonSchema toSchema = objectMapper.readValue(to.getData(), 
JsonSchema.class);
+            return fromSchema.getId().equals(toSchema.getId());
+        } catch (IOException e) {
+            return false;
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
similarity index 62%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
index 3a91fde..86d115a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java
@@ -16,30 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.broker.service.schema;
 
-import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
 
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
-
-    SchemaInfo getSchemaInfo();
-
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
+public interface SchemaCompatibilityCheck {
+    SchemaType getSchemaType();
+    boolean isCompatible(SchemaData from, SchemaData to);
 
+    SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() {
         @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
+        public SchemaType getSchemaType() {
+            return SchemaType.NONE;
         }
 
         @Override
-        public SchemaInfo getSchemaInfo() {
-            return null;
+        public boolean isCompatible(SchemaData from, SchemaData to) {
+            return true;
         }
     };
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
index 4dfbd6d..8a2e6ab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -34,6 +34,8 @@ public interface SchemaRegistry extends AutoCloseable {
 
     CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String 
user);
 
+    CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, 
SchemaData schema);
+
     SchemaVersion versionFromBytes(byte[] version);
 
     class SchemaAndMetadata {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index b9fa998..a74066b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
+import com.google.common.collect.Maps;
 import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Set;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,15 +32,31 @@ public interface SchemaRegistryService extends 
SchemaRegistry {
     String CreateMethodName = "create";
     Logger log = LoggerFactory.getLogger(SchemaRegistryService.class);
 
+    static Map<SchemaType, SchemaCompatibilityCheck> getCheckers(Set<String> 
checkerClasses) throws Exception {
+        Map<SchemaType, SchemaCompatibilityCheck> checkers = Maps.newHashMap();
+        for (String className : checkerClasses) {
+            final Class<?> checkerClass = Class.forName(className);
+            SchemaCompatibilityCheck instance = (SchemaCompatibilityCheck) 
checkerClass.newInstance();
+            checkers.put(instance.getSchemaType(), instance);
+        }
+        return checkers;
+    }
+
     static SchemaRegistryService create(PulsarService pulsar) {
         try {
             ServiceConfiguration config = pulsar.getConfiguration();
             final Class<?> storageClass = 
Class.forName(config.getSchemaRegistryStorageClassName());
             Object factoryInstance = storageClass.newInstance();
             Method createMethod = storageClass.getMethod(CreateMethodName, 
PulsarService.class);
+
             SchemaStorage schemaStorage = (SchemaStorage) 
createMethod.invoke(factoryInstance, pulsar);
+
+            Map<SchemaType, SchemaCompatibilityCheck> checkers =
+                getCheckers(config.getSchemaRegistryCompatibilityCheckers());
+
             schemaStorage.start();
-            return new SchemaRegistryServiceImpl(schemaStorage);
+
+            return new SchemaRegistryServiceImpl(schemaStorage, checkers);
         } catch (Exception e) {
             log.warn("Unable to create schema registry storage, defaulting to 
empty storage: {}", e);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index d014f92..3303332 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -39,21 +39,24 @@ import 
org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
 import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
 
 public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     private static HashFunction hashFunction = Hashing.sha256();
+    private final Map<SchemaType, SchemaCompatibilityCheck> 
compatibilityChecks;
     private final SchemaStorage schemaStorage;
     private final Clock clock;
 
     @VisibleForTesting
-    SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Clock clock) {
+    SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType, 
SchemaCompatibilityCheck> compatibilityChecks, Clock clock) {
         this.schemaStorage = schemaStorage;
+        this.compatibilityChecks = compatibilityChecks;
         this.clock = clock;
     }
 
     @VisibleForTesting
-    SchemaRegistryServiceImpl(SchemaStorage schemaStorage) {
-        this(schemaStorage, Clock.systemUTC());
+    SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType, 
SchemaCompatibilityCheck> compatibilityChecks) {
+        this(schemaStorage, compatibilityChecks, Clock.systemUTC());
     }
 
     @Override
@@ -80,17 +83,23 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     @Override
     @NotNull
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, 
SchemaData schema) {
-        byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
-        SchemaRegistryFormat.SchemaInfo info = 
SchemaRegistryFormat.SchemaInfo.newBuilder()
-            .setType(Functions.convertFromDomainType(schema.getType()))
-            .setSchema(ByteString.copyFrom(schema.getData()))
-            .setSchemaId(schemaId)
-            .setUser(schema.getUser())
-            .setDeleted(false)
-            .setTimestamp(clock.millis())
-            .addAllProps(toPairs(schema.getProps()))
-            .build();
-        return schemaStorage.put(schemaId, info.toByteArray(), context);
+        return checkCompatibilityWithLatest(schemaId, 
schema).thenCompose(isCompatible -> {
+            if (isCompatible) {
+                byte[] context = 
hashFunction.hashBytes(schema.getData()).asBytes();
+                SchemaRegistryFormat.SchemaInfo info = 
SchemaRegistryFormat.SchemaInfo.newBuilder()
+                    .setType(Functions.convertFromDomainType(schema.getType()))
+                    .setSchema(ByteString.copyFrom(schema.getData()))
+                    .setSchemaId(schemaId)
+                    .setUser(schema.getUser())
+                    .setDeleted(false)
+                    .setTimestamp(clock.millis())
+                    .addAllProps(toPairs(schema.getProps()))
+                    .build();
+                return schemaStorage.put(schemaId, info.toByteArray(), 
context);
+            } else {
+                return FutureUtil.failedFuture(new Exception());
+            }
+        });
     }
 
     @Override
@@ -101,6 +110,11 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     }
 
     @Override
+    public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String 
schemaId, SchemaData schema) {
+        return checkCompatibilityWithLatest(schemaId, schema);
+    }
+
+    @Override
     public SchemaVersion versionFromBytes(byte[] version) {
         return schemaStorage.versionFromBytes(version);
     }
@@ -121,6 +135,16 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
             .build();
     }
 
+    private CompletableFuture<Boolean> checkCompatibilityWithLatest(String 
schemaId, SchemaData schema) {
+        return getSchema(schemaId).thenApply(storedSchema ->
+            (storedSchema == null) ||
+                compatibilityChecks.getOrDefault(
+                    schema.getType(),
+                    SchemaCompatibilityCheck.DEFAULT
+                ).isCompatible(storedSchema.schema, schema)
+        );
+    }
+
     interface Functions {
         static SchemaType 
convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) {
             switch (type) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 9ae34fe..2264177 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.schema;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
+import com.google.common.collect.Maps;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -74,7 +75,7 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
         BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
         storage.init();
         storage.start();
-        schemaRegistryService = new SchemaRegistryServiceImpl(storage, 
MockClock);
+        schemaRegistryService = new SchemaRegistryServiceImpl(storage, 
Maps.newHashMap(), MockClock);
     }
 
     @AfterMethod
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index fb04520..bc535d7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -45,8 +45,8 @@ public abstract class ProducerConsumerBase extends 
MockedPulsarServiceBaseTest {
         
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("test"));
     }
 
-    protected void testMessageOrderAndDuplicates(Set<String> messagesReceived, 
String receivedMessage,
-            String expectedMessage) {
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+            T expectedMessage) {
         // Make sure that messages are received in order
         Assert.assertEquals(receivedMessage, expectedMessage,
                 "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
new file mode 100644
index 0000000..fd76393
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Sets;
+import java.time.Clock;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class);
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testJsonProducerAndConsumer() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        JSONSchema<JsonEncodedPojo> jsonSchema =
+            JSONSchema.of(JsonEncodedPojo.class);
+
+        Consumer<JsonEncodedPojo> consumer = pulsarClient
+            .newConsumer(jsonSchema)
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .subscriptionName("my-subscriber-name")
+            .subscribe();
+
+        Producer<JsonEncodedPojo> producer = pulsarClient
+            .newProducer(jsonSchema)
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .create();
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(new JsonEncodedPojo(message));
+        }
+
+        Message<JsonEncodedPojo> msg = null;
+        Set<JsonEncodedPojo> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            JsonEncodedPojo receivedMessage = msg.getValue();
+            log.debug("Received message: [{}]", receivedMessage);
+            JsonEncodedPojo expectedMessage = new 
JsonEncodedPojo("my-message-" + i);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
+            .getSchema("my-property/my-ns/my-topic1")
+            .get();
+
+        Assert.assertEquals(storedSchema.schema.getData(), 
jsonSchema.getSchemaInfo().getSchema());
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
+    public void testJsonProducerAndConsumerWithPrestoredSchema() throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        JSONSchema<JsonEncodedPojo> jsonSchema =
+            JSONSchema.of(JsonEncodedPojo.class);
+
+        pulsar.getSchemaRegistryService()
+            .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+                SchemaData.builder()
+                    .type(SchemaType.JSON)
+                    .isDeleted(false)
+                    .timestamp(Clock.systemUTC().millis())
+                    .user("me")
+                    .data(jsonSchema.getSchemaInfo().getSchema())
+                    .props(Collections.emptyMap())
+                    .build()
+            ).get();
+
+        Consumer<JsonEncodedPojo> consumer = pulsarClient
+            .newConsumer(jsonSchema)
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .subscriptionName("my-subscriber-name")
+            .subscribe();
+
+        Producer<JsonEncodedPojo> producer = pulsarClient
+            .newProducer(jsonSchema)
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .create();
+
+        consumer.close();
+        producer.close();
+
+        SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
+            .getSchema("my-property/my-ns/my-topic1")
+            .get();
+
+        Assert.assertEquals(storedSchema.schema.getData(), 
jsonSchema.getSchemaInfo().getSchema());
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test(expectedExceptions = {PulsarClientException.class})
+    public void testJsonConsumerWithWrongPrestoredSchema() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        byte[] randomSchemaBytes = "hello".getBytes();
+
+        pulsar.getSchemaRegistryService()
+            .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+                SchemaData.builder()
+                    .type(SchemaType.JSON)
+                    .isDeleted(false)
+                    .timestamp(Clock.systemUTC().millis())
+                    .user("me")
+                    .data(randomSchemaBytes)
+                    .props(Collections.emptyMap())
+                    .build()
+            ).get();
+
+        Consumer<JsonEncodedPojo> consumer = pulsarClient
+            .newConsumer(JSONSchema.of(JsonEncodedPojo.class))
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .subscriptionName("my-subscriber-name")
+            .subscribe();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test(expectedExceptions = {PulsarClientException.class})
+    public void testJsonProducerWithWrongPrestoredSchema() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        byte[] randomSchemaBytes = "hello".getBytes();
+
+        pulsar.getSchemaRegistryService()
+            .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+                SchemaData.builder()
+                    .type(SchemaType.JSON)
+                    .isDeleted(false)
+                    .timestamp(Clock.systemUTC().millis())
+                    .user("me")
+                    .data(randomSchemaBytes)
+                    .props(Collections.emptyMap())
+                    .build()
+            ).get();
+
+        Producer<JsonEncodedPojo> producer = pulsarClient
+            .newProducer(JSONSchema.of(JsonEncodedPojo.class))
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .create();
+
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    public static class JsonEncodedPojo {
+        private String message;
+
+        public JsonEncodedPojo() {
+        }
+
+        public JsonEncodedPojo(String message) {
+            this.message = message;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public void setMessage(String message) {
+            this.message = message;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            JsonEncodedPojo that = (JsonEncodedPojo) o;
+            return Objects.equals(message, that.message);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(message);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("message", message)
+                .toString();
+        }
+    }
+
+}
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index f6b5b40..8bc7899 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -103,6 +103,11 @@
       <version>4.4.9</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.fasterxml.jackson.module</groupId>
+      <artifactId>jackson-module-jsonSchema</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
index d6a013b..0728275 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
@@ -57,7 +57,7 @@ public interface MessageBuilder<T> {
      * @param value
      *            the domain object
      */
-    MessageBuilder<T> setValue(T value);
+    MessageBuilder<T> setValue(T value) throws SchemaSerializationException;
 
     /**
      * Set the content of the message
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
index 3a91fde..638477c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.api;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 public interface Schema<T> {
-    byte[] encode(T message);
+    byte[] encode(T message) throws SchemaSerializationException;
     T decode(byte[] bytes);
 
     SchemaInfo getSchemaInfo();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
similarity index 61%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
index 3a91fde..e31c4cf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
@@ -18,28 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
-import org.apache.pulsar.common.schema.SchemaInfo;
-
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
-
-    SchemaInfo getSchemaInfo();
-
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
-
-        @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
-        }
-
-        @Override
-        public SchemaInfo getSchemaInfo() {
-            return null;
-        }
-    };
+public class SchemaSerializationException extends PulsarClientException {
+    public SchemaSerializationException(Throwable cause) {
+        super(cause);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
index 056064a..7012690 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 
@@ -48,7 +49,7 @@ public class MessageBuilderImpl<T> implements 
MessageBuilder<T> {
     }
 
     @Override
-    public MessageBuilder<T> setValue(T value) {
+    public MessageBuilder<T> setValue(T value) throws 
SchemaSerializationException {
         return setContent(schema.encode(value));
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 2989d30..e02f9f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 0cf8bab..48c1f19 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -28,7 +28,9 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.util.FutureUtil;
 
 public abstract class ProducerBase<T> extends HandlerState implements 
Producer<T> {
 
@@ -51,7 +53,11 @@ public abstract class ProducerBase<T> extends HandlerState 
implements Producer<T
 
     @Override
     public CompletableFuture<MessageId> sendAsync(T message) {
-        return 
sendAsync(MessageBuilder.create(schema).setValue(message).build());
+        try {
+            return 
sendAsync(MessageBuilder.create(schema).setValue(message).build());
+        } catch (SchemaSerializationException e) {
+            return FutureUtil.failedFuture(new 
SchemaSerializationException(e));
+        }
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index d866b28..3c746db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -847,7 +847,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         long requestId = client.newRequestId();
 
         cnx.sendRequestWithId(
-                Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata),
+                Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata,
+                    schema == null ? null : schema.getSchemaInfo()),
                 requestId).thenAccept(response -> {
                     String producerName = response.getProducerName();
                     long lastSequenceId = response.getLastSequenceId();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 18e2088..236c2b9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.client.impl;
 import java.util.Map;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 
 public class TopicMessageImpl<T> extends MessageRecordImpl<T, 
TopicMessageIdImpl> {
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
new file mode 100644
index 0000000..7eca43d
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+public class JSONSchema<T> implements Schema<T> {
+
+    private final SchemaInfo info;
+    private final ObjectMapper objectMapper;
+    private final Class<T> pojo;
+
+    private JSONSchema(SchemaInfo info, Class<T> pojo, ObjectMapper 
objectMapper) {
+        this.info = info;
+        this.pojo = pojo;
+        this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public byte[] encode(T message) throws SchemaSerializationException {
+        try {
+            return objectMapper.writeValueAsBytes(message);
+        } catch (JsonProcessingException e) {
+            throw new SchemaSerializationException(e);
+        }
+    }
+
+    @Override
+    public T decode(byte[] bytes) {
+        try {
+            return objectMapper.readValue(new String(bytes), pojo);
+        } catch (IOException e) {
+            throw new RuntimeException(new SchemaSerializationException(e));
+        }
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return info;
+    }
+
+    public static <T> JSONSchema<T> of(Class<T> pojo) throws 
JsonProcessingException {
+        return of(pojo, Collections.emptyMap());
+    }
+
+    public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> 
properties) throws JsonProcessingException {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
+        JsonSchema schema = schemaGen.generateSchema(pojo);
+
+        SchemaInfo info = new SchemaInfo();
+        info.setName("");
+        info.setProperties(properties);
+        info.setType(SchemaType.JSON);
+        info.setSchema(mapper.writeValueAsBytes(schema));
+        return new JSONSchema<>(info, pojo, mapper);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/JsonPojo.java
similarity index 58%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/JsonPojo.java
index 3a91fde..6d77875 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/JsonPojo.java
@@ -16,30 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.client.tutorial;
 
-import org.apache.pulsar.common.schema.SchemaInfo;
+public class JsonPojo {
+    public String content;
 
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
+    public String getContent() {
+        return content;
+    }
 
-    SchemaInfo getSchemaInfo();
+    public void setContent(String content) {
+        this.content = content;
+    }
 
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
+    public JsonPojo() {
+    }
 
-        @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
-        }
-
-        @Override
-        public SchemaInfo getSchemaInfo() {
-            return null;
-        }
-    };
+    public JsonPojo(String content) {
+        this.content = content;
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
new file mode 100644
index 0000000..90758b6
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.tutorial;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+
+@Slf4j
+public class SampleAsyncProducerWithSchema {
+
+    public static void main(String[] args) throws IOException {
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl("http://localhost:8080";).build();
+
+        Producer<JsonPojo> producer = 
pulsarClient.newProducer(JSONSchema.of(JsonPojo.class)).topic("persistent://my-property/use/my-ns/my-topic")
+                .sendTimeout(3, TimeUnit.SECONDS).create();
+
+        List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+
+        for (int i = 0; i < 10; i++) {
+            final String content = "my-message-" + i;
+            CompletableFuture<MessageId> future = producer.sendAsync(new 
JsonPojo(content));
+
+            future.handle((v, ex) -> {
+                if (ex == null) {
+                    log.info("Message persisted: {}", content);
+                } else {
+                    log.error("Error persisting message: {}", content, ex);
+                }
+                return null;
+            });
+
+            futures.add(future);
+        }
+
+        log.info("Waiting for async ops to complete");
+        for (CompletableFuture<MessageId> future : futures) {
+            future.join();
+        }
+
+        log.info("All operations completed");
+
+        pulsarClient.close();
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
new file mode 100644
index 0000000..3780332
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.tutorial;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+
+public class SampleConsumerWithSchema {
+    public static void main(String[] args) throws PulsarClientException, 
JsonProcessingException {
+
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl("http://localhost:8080";).build();
+
+        Consumer<JsonPojo> consumer = 
pulsarClient.newConsumer(JSONSchema.of(JsonPojo.class)) //
+                .topic("persistent://my-property/use/my-ns/my-topic") //
+                .subscriptionName("my-subscription-name").subscribe();
+
+        Message<JsonPojo> msg = null;
+
+        for (int i = 0; i < 100; i++) {
+            msg = consumer.receive();
+            // do something
+            System.out.println("Received: " + msg.getValue().content);
+        }
+
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        pulsarClient.close();
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 904eddd..f61a2b3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -336,14 +336,19 @@ public class Commands {
         }
         subscribeBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata));
 
-        if (null != schemaInfo) {
-            subscribeBuilder.setSchema(getSchema(schemaInfo));
+        PulsarApi.Schema schema = null;
+        if (schemaInfo != null) {
+            schema = getSchema(schemaInfo);
+            subscribeBuilder.setSchema(schema);
         }
 
         CommandSubscribe subscribe = subscribeBuilder.build();
         ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe));
         subscribeBuilder.recycle();
         subscribe.recycle();
+        if (null != schema) {
+            schema.recycle();
+        }
         return res;
     }
 
@@ -426,6 +431,7 @@ public class Commands {
         return res;
     }
 
+    @VisibleForTesting
     public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,
                 Map<String, String> metadata) {
         return newProducer(topic, producerId, requestId, producerName, false, 
metadata);
@@ -452,7 +458,7 @@ public class Commands {
     }
 
     private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) {
-        return PulsarApi.Schema.newBuilder()
+        PulsarApi.Schema.Builder builder = PulsarApi.Schema.newBuilder()
             .setName(schemaInfo.getName())
             .setSchemaData(copyFrom(schemaInfo.getSchema()))
             .setType(getSchemaType(schemaInfo.getType()))
@@ -463,7 +469,10 @@ public class Commands {
                         .setValue(entry.getValue())
                         .build()
                 ).collect(Collectors.toList())
-            ).build();
+            );
+        PulsarApi.Schema schema = builder.build();
+        builder.recycle();
+        return schema;
     }
 
     public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to