This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eb8068  Schema registry 4/N (#1381)
1eb8068 is described below

commit 1eb80681700a8dcf268f06b5f227033cd1165bb4
Author: Dave Rusek <dave.ru...@gmail.com>
AuthorDate: Fri Apr 6 16:50:19 2018 -0600

    Schema registry 4/N (#1381)
    
    * Schema Registry proto changes
    
    * Infrastructure to store schemas
    
    * A default schema registry implementation
    
    * Add admin api for the schema registry
    
    * Renumber schema fields
    
    * Update Pulsar API with schema changes
    
    * Revert field number change
    
    * Fix merge conflict
    
    * Fix broken merge
    
    * DestinationName has been renamed to TopicName
    
    * Address issues in review
    
    * Add schema type back to proto definition
    
    * Address comments regarding lombok usage
    
    * Remove reserved future enum fields
    
    * regenerate code from protobuf
    
    * Remove unused code
    
    * Add schema version to producer success message
    
    * plumb schema through to producer
    
    * Revert "Add schema version to producer success message"
    
    This reverts commit e7e72f468cf46f1605524a7399520c22763583c9.
    
    * Revert "Revert "Add schema version to producer success message""
    
    This reverts commit 7b902f6bdb1cb054e26577747ff4dd8c326a6248.
    
    * Persist schema on producer connect
    
    * Add principal to schema on publish
    
    * Reformat function for readability
    
    * Remove unused protoc profile
    
    * Rename put on schema registry to putIfAbsent
    
    * Reformat function for readability
    
    * Remove unused protoc profile
    
    * Rename put on schema registry to putIfAbsent
    
    * fix compile errors from parent branch changes
    
    * fix lombok tomfoolery on builder
    
    * plumb hash through and allow lookup by data
    
    * wip
    
    * run tests
    
    * wip: address review comments
    
    * switch underscore to slash in schema name
    
    * blah
    
    * Get duplicate schema detection to work
    
    * Fix protobuf version incompatibility
    
    * fix merge issues
    
    * Fix license headers
    
    * Fix license headers
    
    * Address review
    
    * Fix webservice
    
    * plumb schema from producer to server and back
    
    * Plumb schema through subscriber
    
    * Create and return schema via rest endpoint
    
    * Make DELETE great again
    
    * Clean up imports
    
    * Move resource objects to common package
    
    * Fix licenses
    
    * Update error message for schema registry service
    
    * Remove cruft
    
    * Address review comments
    
    - rename props to properties in GetSchemaResponse
    - Use config for ledger parameters
    
    * Address review comments
    
    * Fix license headers
    
    * deal with lombock stuff causing issues
    
    * Resolve conflict
---
 .../pulsar/broker/admin/v2/SchemasResource.java    | 242 +++++++++++++++++++++
 .../service/schema/BookkeeperSchemaStorage.java    |  29 ++-
 .../service/schema/SchemaRegistryService.java      |   3 +-
 .../broker/service/schema/SchemaStorage.java       |   2 +
 .../pulsar/broker/service/schema/StoredSchema.java |  11 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  28 ++-
 .../java/org/apache/pulsar/client/api/Schema.java  |   9 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  49 ++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  15 +-
 .../Schema.java => impl/ProducerResponse.java}     |  24 +-
 .../org/apache/pulsar/common/api/Commands.java     |  51 ++++-
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 112 ++--------
 .../pulsar/common/schema/DeleteSchemaResponse.java |  22 +-
 .../pulsar/common/schema/GetSchemaResponse.java    |  27 +--
 .../pulsar/common/schema/PostSchemaPayload.java    |  23 +-
 .../pulsar/common/schema/PostSchemaResponse.java   |  22 +-
 .../apache/pulsar/common/schema/SchemaInfo.java    |  24 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   2 +-
 19 files changed, 457 insertions(+), 240 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
new file mode 100644
index 0000000..2173617
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.isNull;
+import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
+import static org.apache.pulsar.common.util.Codec.decode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import io.swagger.annotations.ApiOperation;
+import java.time.Clock;
+import java.util.Optional;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.DeleteSchemaResponse;
+import org.apache.pulsar.common.schema.GetSchemaResponse;
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+import org.apache.pulsar.common.schema.PostSchemaResponse;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+@Path("/schemas")
+public class SchemasResource extends AdminResource {
+
+    private final Clock clock;
+
+    public SchemasResource() {
+        this(Clock.systemUTC());
+    }
+
+    @VisibleForTesting
+    public SchemasResource(Clock clock) {
+        super();
+        this.clock = clock;
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/{topic}/schema")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Get topic schema", response = 
GetSchemaResponse.class)
+    public void getSchema(
+        @PathParam("property") String property,
+        @PathParam("namespace") String namespace,
+        @PathParam("topic") String topic,
+        @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(property, namespace, topic);
+
+        String schemaId = buildSchemaId(property, namespace, topic);
+        pulsar().getSchemaRegistryService().getSchema(schemaId)
+            .handle((schema, error) -> {
+                if (isNull(error)) {
+                    response.resume(
+                        Response.ok()
+                            .encoding(MediaType.APPLICATION_JSON)
+                            .entity(GetSchemaResponse.builder()
+                                .version(schema.version)
+                                .type(schema.schema.getType())
+                                .timestamp(schema.schema.getTimestamp())
+                                .data(new String(schema.schema.getData()))
+                                .properties(schema.schema.getProps())
+                                .build()
+                            )
+                            .build()
+                    );
+                } else {
+                    response.resume(error);
+                }
+                return null;
+            });
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/{topic}/schema/{version}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Get topic schema")
+    public void getSchema(
+        @PathParam("property") String property,
+        @PathParam("namespace") String namespace,
+        @PathParam("topic") String topic,
+        @PathParam("version") @Encoded String version,
+        @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(property, namespace, topic);
+
+        String schemaId = buildSchemaId(property, namespace, topic);
+        SchemaVersion v = 
pulsar().getSchemaRegistryService().versionFromBytes(version.getBytes());
+        pulsar().getSchemaRegistryService().getSchema(schemaId, v)
+            .handle((schema, error) -> {
+                if (isNull(error)) {
+                    if (schema.schema.isDeleted()) {
+                        response.resume(Response.noContent());
+                    } else {
+                        response.resume(
+                            Response.ok()
+                                .encoding(MediaType.APPLICATION_JSON)
+                                .entity(GetSchemaResponse.builder()
+                                    .version(schema.version)
+                                    .type(schema.schema.getType())
+                                    .timestamp(schema.schema.getTimestamp())
+                                    .data(new String(schema.schema.getData()))
+                                    .properties(schema.schema.getProps())
+                                    .build()
+                                ).build()
+                        );
+                    }
+                } else {
+                    response.resume(error);
+                }
+                return null;
+            });
+    }
+
+    @DELETE
+    @Path("/{property}/{namespace}/{topic}/schema")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Delete topic schema")
+    public void deleteSchema(
+        @PathParam("property") String property,
+        @PathParam("namespace") String namespace,
+        @PathParam("topic") String topic,
+        @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(property, namespace, topic);
+
+        String schemaId = buildSchemaId(property, namespace, topic);
+        pulsar().getSchemaRegistryService().deleteSchema(schemaId, 
defaultIfEmpty(clientAppId(), ""))
+            .handle((version, error) -> {
+                if (isNull(error)) {
+                    response.resume(
+                        Response.ok().entity(
+                            DeleteSchemaResponse.builder()
+                                .version(version)
+                                .build()
+                        ).build()
+                    );
+                } else {
+                    response.resume(error);
+                }
+                return null;
+            });
+    }
+
+    @POST
+    @Path("/{property}/{namespace}/{topic}/schema")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Post topic schema")
+    public void postSchema(
+        @PathParam("property") String property,
+        @PathParam("namespace") String namespace,
+        @PathParam("topic") String topic,
+        PostSchemaPayload payload,
+        @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(property, namespace, topic);
+
+        pulsar().getSchemaRegistryService().putSchemaIfAbsent(
+            buildSchemaId(property, namespace, topic),
+            SchemaData.builder()
+                .data(payload.getSchema().getBytes(Charsets.UTF_8))
+                .isDeleted(false)
+                .timestamp(clock.millis())
+                .type(SchemaType.valueOf(payload.getType()))
+                .user(defaultIfEmpty(clientAppId(), ""))
+                .build()
+        ).thenAccept(version ->
+            response.resume(
+                Response.accepted().entity(
+                    PostSchemaResponse.builder()
+                        .version(version)
+                        .build()
+                ).build()
+            )
+        );
+    }
+
+    private String buildSchemaId(String property, String namespace, String 
topic) {
+        return TopicName.get("persistent", property, namespace, 
topic).getSchemaName();
+    }
+
+    private void validateDestinationAndAdminOperation(String property, String 
namespace, String topic) {
+        TopicName destinationName = TopicName.get(
+            "persistent", property, namespace, decode(topic)
+        );
+
+        try {
+            validateAdminAccessOnProperty(destinationName.getProperty());
+            validateTopicOwnership(destinationName, false);
+        } catch (RestException e) {
+            if (e.getResponse().getStatus() == 
Response.Status.UNAUTHORIZED.getStatusCode()) {
+                throw new RestException(Response.Status.NOT_FOUND, "Not 
Found");
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    private void validateDestinationExists(TopicName dn) {
+        try {
+            Optional<Topic> topic = 
pulsar().getBrokerService().getTopicReference(dn.toString());
+            checkArgument(topic.isPresent());
+        } catch (Exception e) {
+            throw new RestException(Response.Status.NOT_FOUND, "Topic not 
found");
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 8ba8750..3730121 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -21,28 +21,29 @@ package org.apache.pulsar.broker.service.schema;
 import static com.google.common.collect.Iterables.concat;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.protobuf.ByteString.copyFrom;
-import static java.util.Collections.emptyMap;
 import static java.util.Objects.isNull;
 import static java.util.Objects.nonNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import javax.validation.constraints.NotNull;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.CreateMode;
@@ -55,10 +56,12 @@ import org.apache.zookeeper.data.ACL;
 public class BookkeeperSchemaStorage implements SchemaStorage {
     private static final String SchemaPath = "/schemas";
     private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+    private static final byte[] LedgerPassword = "".getBytes();
 
     private final PulsarService pulsar;
     private final ZooKeeper zooKeeper;
     private final ZooKeeperCache localZkCache;
+    private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
     @VisibleForTesting
@@ -66,6 +69,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage 
{
         this.pulsar = pulsar;
         this.localZkCache = pulsar.getLocalZkCache();
         this.zooKeeper = localZkCache.getZooKeeper();
+        this.config = pulsar.getConfiguration();
     }
 
     @VisibleForTesting
@@ -79,6 +83,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage 
{
         }
     }
 
+    @Override
     public void start() throws IOException {
         this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
             pulsar.getConfiguration(),
@@ -119,8 +124,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                 .thenApply(entry ->
                     new StoredSchema(
                         entry.getSchemaData().toByteArray(),
-                        new 
LongSchemaVersion(schemaLocator.getInfo().getVersion()),
-                        emptyMap()
+                        new 
LongSchemaVersion(schemaLocator.getInfo().getVersion())
                     )
                 );
         });
@@ -156,8 +160,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                 .thenApply(entry ->
                     new StoredSchema(
                         entry.getSchemaData().toByteArray(),
-                        new LongSchemaVersion(version),
-                        emptyMap()
+                        new LongSchemaVersion(version)
                     )
                 );
         });
@@ -377,14 +380,19 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     @NotNull
     private CompletableFuture<LedgerHandle> createLedger() {
         final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
-        bookKeeper.asyncCreateLedger(0, 0, DigestType.MAC, new byte[]{},
+        bookKeeper.asyncCreateLedger(
+            config.getManagedLedgerDefaultEnsembleSize(),
+            config.getManagedLedgerDefaultWriteQuorum(),
+            config.getManagedLedgerDefaultAckQuorum(),
+            config.getManagedLedgerDigestType(),
+            LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
                     future.completeExceptionally(BKException.create(rc));
                 } else {
                     future.complete(handle);
                 }
-            }, null
+            }, null, Collections.emptyMap()
         );
         return future;
     }
@@ -392,7 +400,10 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     @NotNull
     private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
         final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
-        bookKeeper.asyncOpenLedger(ledgerId, DigestType.MAC, new byte[]{},
+        bookKeeper.asyncOpenLedger(
+            ledgerId,
+            config.getManagedLedgerDigestType(),
+            LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
                     future.completeExceptionally(BKException.create(rc));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 69e7364..b9fa998 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -35,9 +35,10 @@ public interface SchemaRegistryService extends 
SchemaRegistry {
             Object factoryInstance = storageClass.newInstance();
             Method createMethod = storageClass.getMethod(CreateMethodName, 
PulsarService.class);
             SchemaStorage schemaStorage = (SchemaStorage) 
createMethod.invoke(factoryInstance, pulsar);
+            schemaStorage.start();
             return new SchemaRegistryServiceImpl(schemaStorage);
         } catch (Exception e) {
-            log.warn("Error when trying to create scehema registry storage: 
{}", e);
+            log.warn("Unable to create schema registry storage, defaulting to 
empty storage: {}", e);
         }
         return new DefaultSchemaRegistryService();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
index b0c8075..c5c7f83 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -31,6 +31,8 @@ public interface SchemaStorage {
 
     SchemaVersion versionFromBytes(byte[] version);
 
+    void start() throws Exception;
+
     void close() throws Exception;
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
index f28a707..fd2602b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
@@ -20,19 +20,16 @@ package org.apache.pulsar.broker.service.schema;
 
 import com.google.common.base.MoreObjects;
 import java.util.Arrays;
-import java.util.Map;
 import java.util.Objects;
 import org.apache.pulsar.common.schema.SchemaVersion;
 
 public class StoredSchema {
     public final byte[] data;
     public final SchemaVersion version;
-    public final Map<String, String> metadata;
 
-    public StoredSchema(byte[] data, SchemaVersion version, Map<String, 
String> metadata) {
+    StoredSchema(byte[] data, SchemaVersion version) {
         this.data = data;
         this.version = version;
-        this.metadata = metadata;
     }
 
     @Override
@@ -45,14 +42,13 @@ public class StoredSchema {
         }
         StoredSchema that = (StoredSchema) o;
         return Arrays.equals(data, that.data) &&
-            Objects.equals(version, that.version) &&
-            Objects.equals(metadata, that.metadata);
+            Objects.equals(version, that.version);
     }
 
     @Override
     public int hashCode() {
 
-        int result = Objects.hash(version, metadata);
+        int result = Objects.hash(version);
         result = 31 * result + Arrays.hashCode(data);
         return result;
     }
@@ -62,7 +58,6 @@ public class StoredSchema {
         return MoreObjects.toStringHelper(this)
             .add("data", data)
             .add("version", version)
-            .add("metadata", metadata)
             .toString();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 89f49f0..b800bfa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -32,8 +32,13 @@ import static org.testng.Assert.assertNotSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.net.URI;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -41,11 +46,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
-
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -56,6 +59,7 @@ import org.apache.pulsar.broker.admin.v1.Namespaces;
 import org.apache.pulsar.broker.admin.v1.PersistentTopics;
 import org.apache.pulsar.broker.admin.v1.Properties;
 import org.apache.pulsar.broker.admin.v1.ResourceQuotas;
+import org.apache.pulsar.broker.admin.v2.SchemasResource;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
@@ -83,13 +87,10 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 @Test
 public class AdminTest extends MockedPulsarServiceBaseTest {
+    private final String configClusterName = "use";
     private ConfigurationCacheService configurationCache;
-
     private Clusters clusters;
     private Properties properties;
     private Namespaces namespaces;
@@ -97,9 +98,12 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
     private Brokers brokers;
     private ResourceQuotas resourceQuotas;
     private BrokerStats brokerStats;
-
+    private SchemasResource schemasResource;
     private Field uriField;
-    private final String configClusterName = "use";
+    private Clock mockClock = Clock.fixed(
+        Instant.ofEpochSecond(365248800),
+        ZoneId.of("-05:00")
+    );
 
     public AdminTest() {
         super();
@@ -184,6 +188,14 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
         doReturn(mockZookKeeper).when(brokerStats).localZk();
         
doReturn(configurationCache.propertiesCache()).when(brokerStats).propertiesCache();
         
doReturn(configurationCache.policiesCache()).when(brokerStats).policiesCache();
+
+        schemasResource = spy(new SchemasResource(mockClock));
+        schemasResource.setServletContext(new MockServletContext());
+        schemasResource.setPulsar(pulsar);
+        doReturn(mockZookKeeper).when(schemasResource).globalZk();
+        doReturn(mockZookKeeper).when(schemasResource).localZk();
+        
doReturn(configurationCache.propertiesCache()).when(schemasResource).propertiesCache();
+        
doReturn(configurationCache.policiesCache()).when(schemasResource).policiesCache();
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
index 346c525..3a91fde 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,10 +18,14 @@
  */
 package org.apache.pulsar.client.api;
 
+import org.apache.pulsar.common.schema.SchemaInfo;
+
 public interface Schema<T> {
     byte[] encode(T message);
     T decode(byte[] bytes);
 
+    SchemaInfo getSchemaInfo();
+
     Schema<byte[]> IDENTITY = new Schema<byte[]>() {
         @Override
         public byte[] encode(byte[] message) {
@@ -32,5 +36,10 @@ public interface Schema<T> {
         public byte[] decode(byte[] bytes) {
             return bytes;
         }
+
+        @Override
+        public SchemaInfo getSchemaInfo() {
+            return null;
+        }
     };
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 37904fd..d89d8fb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,6 +21,14 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
@@ -29,11 +37,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
 import javax.net.ssl.SSLSession;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -63,28 +67,19 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.unix.Errors.NativeIoException;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Promise;
-
 public class ClientCnx extends PulsarHandler {
 
     private final Authentication authentication;
     private State state;
 
-    private final ConcurrentLongHashMap<CompletableFuture<Pair<String, Long>>> 
pendingRequests = new ConcurrentLongHashMap<>(
-            16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> 
pendingLookupRequests = new ConcurrentLongHashMap<>(
-            16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> 
pendingGetLastMessageIdRequests = new ConcurrentLongHashMap<>(
-        16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<List<String>>> 
pendingGetTopicsRequests = new ConcurrentLongHashMap<>(
-        16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<ProducerResponse>> 
pendingRequests =
+        new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> 
pendingLookupRequests =
+        new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> 
pendingGetLastMessageIdRequests =
+        new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<List<String>>> 
pendingGetTopicsRequests =
+        new ConcurrentLongHashMap<>(16, 1);
 
     private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new 
ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new 
ConcurrentLongHashMap<>(16, 1);
@@ -280,7 +275,7 @@ public class ClientCnx extends PulsarHandler {
             log.debug("{} Received success response from server: {}", 
ctx.channel(), success.getRequestId());
         }
         long requestId = success.getRequestId();
-        CompletableFuture<Pair<String, Long>> requestFuture = 
pendingRequests.remove(requestId);
+        CompletableFuture<ProducerResponse> requestFuture = 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
             requestFuture.complete(null);
         } else {
@@ -313,9 +308,9 @@ public class ClientCnx extends PulsarHandler {
                     success.getRequestId(), success.getProducerName());
         }
         long requestId = success.getRequestId();
-        CompletableFuture<Pair<String, Long>> requestFuture = 
pendingRequests.remove(requestId);
+        CompletableFuture<ProducerResponse> requestFuture = 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
-            requestFuture.complete(new 
ImmutablePair<>(success.getProducerName(), success.getLastSequenceId()));
+            requestFuture.complete(new 
ProducerResponse(success.getProducerName(), success.getLastSequenceId(), 
success.getSchemaVersion().toByteArray()));
         } else {
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
         }
@@ -460,7 +455,7 @@ public class ClientCnx extends PulsarHandler {
             log.warn("{} Producer creation has been blocked because backlog 
quota exceeded for producer topic",
                     ctx.channel());
         }
-        CompletableFuture<Pair<String, Long>> requestFuture = 
pendingRequests.remove(requestId);
+        CompletableFuture<ProducerResponse> requestFuture = 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
             
requestFuture.completeExceptionally(getPulsarClientException(error.getError(), 
error.getMessage()));
         } else {
@@ -575,8 +570,8 @@ public class ClientCnx extends PulsarHandler {
         return connectionFuture;
     }
 
-    CompletableFuture<Pair<String, Long>> sendRequestWithId(ByteBuf cmd, long 
requestId) {
-        CompletableFuture<Pair<String, Long>> future = new 
CompletableFuture<>();
+    CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long 
requestId) {
+        CompletableFuture<ProducerResponse> future = new CompletableFuture<>();
         pendingRequests.put(requestId, future);
         ctx.writeAndFlush(cmd).addListener(writeFuture -> {
             if (!writeFuture.isSuccess()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2a88bf0..a446dd4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -469,7 +469,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, 
readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()));
+                consumerName, isDurable, startMessageIdData, metadata, 
readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), 
schema.getSchemaInfo());
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 27757d1..92a512d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -25,11 +25,13 @@ import static java.lang.String.format;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -106,6 +108,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     private final Map<String, String> metadata;
 
+    private Optional<byte[]> schemaVersion = Optional.empty();
+
     private final ConnectionHandler connectionHandler;
 
     @SuppressWarnings("rawtypes")
@@ -288,6 +292,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return;
         }
 
+        if (schemaVersion.isPresent()) {
+            
msgMetadata.setSchemaVersion(ByteString.copyFrom(schemaVersion.get()));
+        }
+
         try {
             synchronized (this) {
                 long sequenceId;
@@ -837,9 +845,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
         cnx.sendRequestWithId(
                 Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata),
-                requestId).thenAccept(pair -> {
-                    String producerName = pair.getLeft();
-                    long lastSequenceId = pair.getRight();
+                requestId).thenAccept(response -> {
+                    String producerName = response.getProducerName();
+                    long lastSequenceId = response.getLastSequenceId();
+                    schemaVersion = 
Optional.ofNullable(response.getSchemaVersion());
 
                     // We are now reconnected to broker and clear to send 
messages. Re-send all pending messages and
                     // set the cnx pointer so that new messages will be sent 
immediately
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
index 346c525..edb98f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
@@ -16,21 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.client.impl;
 
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
-
-        @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
-        }
-    };
+@Data
+@AllArgsConstructor
+public class ProducerResponse {
+    private String producerName;
+    private long lastSequenceId;
+    private byte[] schemaVersion;
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 90dbf67..904eddd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.api;
 
+import static com.google.protobuf.ByteString.copyFrom;
 import static com.google.protobuf.ByteString.copyFromUtf8;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
@@ -31,6 +32,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
@@ -75,6 +77,8 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
@@ -310,12 +314,12 @@ public class Commands {
     public static ByteBuf newSubscribe(String topic, String subscription, long 
consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName) {
         return newSubscribe(topic, subscription, consumerId, requestId, 
subType, priorityLevel, consumerName,
-                true /* isDurable */, null /* startMessageId */, 
Collections.emptyMap(), false, InitialPosition.Earliest);
+                true /* isDurable */, null /* startMessageId */, 
Collections.emptyMap(), false, InitialPosition.Earliest, null);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long 
consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean 
isDurable, MessageIdData startMessageId,
-            Map<String, String> metadata, boolean readCompacted, 
InitialPosition subscriptionInitialPosition) {
+            Map<String, String> metadata, boolean readCompacted, 
InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
         CommandSubscribe.Builder subscribeBuilder = 
CommandSubscribe.newBuilder();
         subscribeBuilder.setTopic(topic);
         subscribeBuilder.setSubscription(subscription);
@@ -332,6 +336,10 @@ public class Commands {
         }
         subscribeBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata));
 
+        if (null != schemaInfo) {
+            subscribeBuilder.setSchema(getSchema(schemaInfo));
+        }
+
         CommandSubscribe subscribe = subscribeBuilder.build();
         ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe));
         subscribeBuilder.recycle();
@@ -425,6 +433,41 @@ public class Commands {
 
     public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,
                 boolean encrypted, Map<String, String> metadata) {
+        return newProducer(topic, producerId, requestId, producerName, 
encrypted, metadata, null);
+    }
+
+    private static PulsarApi.Schema.Type getSchemaType(SchemaType type) {
+        switch (type) {
+            case PROTOBUF:
+                return PulsarApi.Schema.Type.Protobuf;
+            case THRIFT:
+                return PulsarApi.Schema.Type.Thrift;
+            case AVRO:
+                return PulsarApi.Schema.Type.Avro;
+            case JSON:
+                return PulsarApi.Schema.Type.Json;
+            default:
+                return null;
+        }
+    }
+
+    private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) {
+        return PulsarApi.Schema.newBuilder()
+            .setName(schemaInfo.getName())
+            .setSchemaData(copyFrom(schemaInfo.getSchema()))
+            .setType(getSchemaType(schemaInfo.getType()))
+            .addAllProperties(
+                schemaInfo.getProperties().entrySet().stream().map(entry ->
+                    PulsarApi.KeyValue.newBuilder()
+                        .setKey(entry.getKey())
+                        .setValue(entry.getValue())
+                        .build()
+                ).collect(Collectors.toList())
+            ).build();
+    }
+
+    public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,
+                boolean encrypted, Map<String, String> metadata, SchemaInfo 
schemaInfo) {
         CommandProducer.Builder producerBuilder = CommandProducer.newBuilder();
         producerBuilder.setTopic(topic);
         producerBuilder.setProducerId(producerId);
@@ -436,6 +479,10 @@ public class Commands {
 
         producerBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata));
 
+        if (null != schemaInfo) {
+            producerBuilder.setSchema(getSchema(schemaInfo));
+        }
+
         CommandProducer producer = producerBuilder.build();
         ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.PRODUCER).setProducer(producer));
         producerBuilder.recycle();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 86892fa..56f09fe 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -266,10 +266,6 @@ public final class PulsarApi {
     boolean hasName();
     String getName();
     
-    // required bytes version = 2;
-    boolean hasVersion();
-    com.google.protobuf.ByteString getVersion();
-    
     // required bytes schema_data = 3;
     boolean hasSchemaData();
     com.google.protobuf.ByteString getSchemaData();
@@ -400,21 +396,11 @@ public final class PulsarApi {
       }
     }
     
-    // required bytes version = 2;
-    public static final int VERSION_FIELD_NUMBER = 2;
-    private com.google.protobuf.ByteString version_;
-    public boolean hasVersion() {
-      return ((bitField0_ & 0x00000002) == 0x00000002);
-    }
-    public com.google.protobuf.ByteString getVersion() {
-      return version_;
-    }
-    
     // required bytes schema_data = 3;
     public static final int SCHEMA_DATA_FIELD_NUMBER = 3;
     private com.google.protobuf.ByteString schemaData_;
     public boolean hasSchemaData() {
-      return ((bitField0_ & 0x00000004) == 0x00000004);
+      return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     public com.google.protobuf.ByteString getSchemaData() {
       return schemaData_;
@@ -424,7 +410,7 @@ public final class PulsarApi {
     public static final int TYPE_FIELD_NUMBER = 4;
     private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type type_;
     public boolean hasType() {
-      return ((bitField0_ & 0x00000008) == 0x00000008);
+      return ((bitField0_ & 0x00000004) == 0x00000004);
     }
     public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type getType() {
       return type_;
@@ -453,7 +439,6 @@ public final class PulsarApi {
     
     private void initFields() {
       name_ = "";
-      version_ = com.google.protobuf.ByteString.EMPTY;
       schemaData_ = com.google.protobuf.ByteString.EMPTY;
       type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json;
       properties_ = java.util.Collections.emptyList();
@@ -467,10 +452,6 @@ public final class PulsarApi {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!hasVersion()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       if (!hasSchemaData()) {
         memoizedIsInitialized = 0;
         return false;
@@ -501,12 +482,9 @@ public final class PulsarApi {
         output.writeBytes(1, getNameBytes());
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        output.writeBytes(2, version_);
-      }
-      if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, schemaData_);
       }
-      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeEnum(4, type_.getNumber());
       }
       for (int i = 0; i < properties_.size(); i++) {
@@ -526,13 +504,9 @@ public final class PulsarApi {
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeBytesSize(2, version_);
-      }
-      if (((bitField0_ & 0x00000004) == 0x00000004)) {
-        size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, schemaData_);
       }
-      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
         size += com.google.protobuf.CodedOutputStream
           .computeEnumSize(4, type_.getNumber());
       }
@@ -655,14 +629,12 @@ public final class PulsarApi {
         super.clear();
         name_ = "";
         bitField0_ = (bitField0_ & ~0x00000001);
-        version_ = com.google.protobuf.ByteString.EMPTY;
-        bitField0_ = (bitField0_ & ~0x00000002);
         schemaData_ = com.google.protobuf.ByteString.EMPTY;
-        bitField0_ = (bitField0_ & ~0x00000004);
+        bitField0_ = (bitField0_ & ~0x00000002);
         type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json;
-        bitField0_ = (bitField0_ & ~0x00000008);
+        bitField0_ = (bitField0_ & ~0x00000004);
         properties_ = java.util.Collections.emptyList();
-        bitField0_ = (bitField0_ & ~0x00000010);
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -703,18 +675,14 @@ public final class PulsarApi {
         if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
           to_bitField0_ |= 0x00000002;
         }
-        result.version_ = version_;
+        result.schemaData_ = schemaData_;
         if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
           to_bitField0_ |= 0x00000004;
         }
-        result.schemaData_ = schemaData_;
-        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
-          to_bitField0_ |= 0x00000008;
-        }
         result.type_ = type_;
-        if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        if (((bitField0_ & 0x00000008) == 0x00000008)) {
           properties_ = java.util.Collections.unmodifiableList(properties_);
-          bitField0_ = (bitField0_ & ~0x00000010);
+          bitField0_ = (bitField0_ & ~0x00000008);
         }
         result.properties_ = properties_;
         result.bitField0_ = to_bitField0_;
@@ -726,9 +694,6 @@ public final class PulsarApi {
         if (other.hasName()) {
           setName(other.getName());
         }
-        if (other.hasVersion()) {
-          setVersion(other.getVersion());
-        }
         if (other.hasSchemaData()) {
           setSchemaData(other.getSchemaData());
         }
@@ -738,7 +703,7 @@ public final class PulsarApi {
         if (!other.properties_.isEmpty()) {
           if (properties_.isEmpty()) {
             properties_ = other.properties_;
-            bitField0_ = (bitField0_ & ~0x00000010);
+            bitField0_ = (bitField0_ & ~0x00000008);
           } else {
             ensurePropertiesIsMutable();
             properties_.addAll(other.properties_);
@@ -753,10 +718,6 @@ public final class PulsarApi {
           
           return false;
         }
-        if (!hasVersion()) {
-          
-          return false;
-        }
         if (!hasSchemaData()) {
           
           return false;
@@ -801,13 +762,8 @@ public final class PulsarApi {
               name_ = input.readBytes();
               break;
             }
-            case 18: {
-              bitField0_ |= 0x00000002;
-              version_ = input.readBytes();
-              break;
-            }
             case 26: {
-              bitField0_ |= 0x00000004;
+              bitField0_ |= 0x00000002;
               schemaData_ = input.readBytes();
               break;
             }
@@ -815,7 +771,7 @@ public final class PulsarApi {
               int rawValue = input.readEnum();
               org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type value = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.valueOf(rawValue);
               if (value != null) {
-                bitField0_ |= 0x00000008;
+                bitField0_ |= 0x00000004;
                 type_ = value;
               }
               break;
@@ -868,34 +824,10 @@ public final class PulsarApi {
         
       }
       
-      // required bytes version = 2;
-      private com.google.protobuf.ByteString version_ = 
com.google.protobuf.ByteString.EMPTY;
-      public boolean hasVersion() {
-        return ((bitField0_ & 0x00000002) == 0x00000002);
-      }
-      public com.google.protobuf.ByteString getVersion() {
-        return version_;
-      }
-      public Builder setVersion(com.google.protobuf.ByteString value) {
-        if (value == null) {
-    throw new NullPointerException();
-  }
-  bitField0_ |= 0x00000002;
-        version_ = value;
-        
-        return this;
-      }
-      public Builder clearVersion() {
-        bitField0_ = (bitField0_ & ~0x00000002);
-        version_ = getDefaultInstance().getVersion();
-        
-        return this;
-      }
-      
       // required bytes schema_data = 3;
       private com.google.protobuf.ByteString schemaData_ = 
com.google.protobuf.ByteString.EMPTY;
       public boolean hasSchemaData() {
-        return ((bitField0_ & 0x00000004) == 0x00000004);
+        return ((bitField0_ & 0x00000002) == 0x00000002);
       }
       public com.google.protobuf.ByteString getSchemaData() {
         return schemaData_;
@@ -904,13 +836,13 @@ public final class PulsarApi {
         if (value == null) {
     throw new NullPointerException();
   }
-  bitField0_ |= 0x00000004;
+  bitField0_ |= 0x00000002;
         schemaData_ = value;
         
         return this;
       }
       public Builder clearSchemaData() {
-        bitField0_ = (bitField0_ & ~0x00000004);
+        bitField0_ = (bitField0_ & ~0x00000002);
         schemaData_ = getDefaultInstance().getSchemaData();
         
         return this;
@@ -919,7 +851,7 @@ public final class PulsarApi {
       // required .pulsar.proto.Schema.Type type = 4;
       private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type type_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json;
       public boolean hasType() {
-        return ((bitField0_ & 0x00000008) == 0x00000008);
+        return ((bitField0_ & 0x00000004) == 0x00000004);
       }
       public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type 
getType() {
         return type_;
@@ -928,13 +860,13 @@ public final class PulsarApi {
         if (value == null) {
           throw new NullPointerException();
         }
-        bitField0_ |= 0x00000008;
+        bitField0_ |= 0x00000004;
         type_ = value;
         
         return this;
       }
       public Builder clearType() {
-        bitField0_ = (bitField0_ & ~0x00000008);
+        bitField0_ = (bitField0_ & ~0x00000004);
         type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json;
         
         return this;
@@ -944,9 +876,9 @@ public final class PulsarApi {
       private 
java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> 
properties_ =
         java.util.Collections.emptyList();
       private void ensurePropertiesIsMutable() {
-        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
           properties_ = new 
java.util.ArrayList<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue>(properties_);
-          bitField0_ |= 0x00000010;
+          bitField0_ |= 0x00000008;
          }
       }
       
@@ -1018,7 +950,7 @@ public final class PulsarApi {
       }
       public Builder clearProperties() {
         properties_ = java.util.Collections.emptyList();
-        bitField0_ = (bitField0_ & ~0x00000010);
+        bitField0_ = (bitField0_ & ~0x00000008);
         
         return this;
       }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
index 346c525..b9c47b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
@@ -16,21 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.common.schema;
 
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
+import lombok.Builder;
+import lombok.Data;
 
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
-
-        @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
-        }
-    };
+@Data
+@Builder
+public class DeleteSchemaResponse {
+    private SchemaVersion version;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
index 346c525..bc98b89 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
@@ -16,21 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.common.schema;
 
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
 
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
-
-        @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
-        }
-    };
+@Data
+@Builder
+public class GetSchemaResponse {
+    private SchemaVersion version;
+    private SchemaType type;
+    private long timestamp;
+    private String data;
+    private Map<String, String> properties;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
index 346c525..af04418 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
@@ -16,21 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.common.schema;
 
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
+import java.util.Map;
+import lombok.Data;
 
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
-
-        @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
-        }
-    };
+@Data
+public class PostSchemaPayload {
+    private String type;
+    private String schema;
+    private Map<String, String> properties;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
index 346c525..b12db5d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
@@ -16,21 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.common.schema;
 
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
+import lombok.Builder;
+import lombok.Data;
 
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
-
-        @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
-        }
-    };
+@Data
+@Builder
+public class PostSchemaResponse {
+    private SchemaVersion version;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 346c525..b32eba4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -16,21 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.common.schema;
 
-public interface Schema<T> {
-    byte[] encode(T message);
-    T decode(byte[] bytes);
+import java.util.Map;
+import lombok.Data;
 
-    Schema<byte[]> IDENTITY = new Schema<byte[]>() {
-        @Override
-        public byte[] encode(byte[] message) {
-            return message;
-        }
-
-        @Override
-        public byte[] decode(byte[] bytes) {
-            return bytes;
-        }
-    };
+@Data
+public class SchemaInfo {
+    private String name;
+    private byte[] schema;
+    private SchemaType type;
+    private Map<String, String> properties;
 }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 288a262..3514554 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -31,10 +31,10 @@ message Schema {
        }
 
     required string name = 1;
-    required bytes version = 2;
     required bytes schema_data = 3;
        required Type type = 4;
     repeated KeyValue properties = 5;
+       
 }
 
 message MessageIdData {

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to