This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new db89cd4 Json schema codec (#1551) db89cd4 is described below commit db89cd460c25492bb6812f31a6e28265c33b192d Author: Dave Rusek <dave.ru...@gmail.com> AuthorDate: Fri Apr 27 12:45:30 2018 -0600 Json schema codec (#1551) * Add JSON schema support * Use different json schema library * Add license headers to new files * Revert "Use different json schema library" This reverts commit 405b918ed9ccebe947916b7c87192b0cef3e910a. * Changes related to review input * Upgrade jackson schema module * Check schema compatibility on subscribe and publish * Add missing license headers: * Ensure schema is stored * Add negative tests on publish/consume with wrong schema * Add missing license header * Release ALL protobuf references * Remove SecureRandom, it's unneeded and appears to timeout * GRRRRR license headers --- pom.xml | 6 + .../apache/pulsar/broker/ServiceConfiguration.java | 11 + .../apache/pulsar/broker/service/ServerCnx.java | 32 ++- .../org/apache/pulsar/broker/service/Topic.java | 2 + .../service/nonpersistent/NonPersistentTopic.java | 9 + .../broker/service/persistent/PersistentTopic.java | 9 + .../schema/DefaultSchemaRegistryService.java | 5 + .../schema/JsonSchemaCompatibilityCheck.java | 46 ++++ .../service/schema/SchemaCompatibilityCheck.java | 28 +-- .../broker/service/schema/SchemaRegistry.java | 2 + .../service/schema/SchemaRegistryService.java | 22 +- .../service/schema/SchemaRegistryServiceImpl.java | 52 +++-- .../broker/service/schema/SchemaServiceTest.java | 3 +- .../pulsar/client/api/ProducerConsumerBase.java | 4 +- .../api/SimpleTypedProducerConsumerTest.java | 239 +++++++++++++++++++++ pulsar-client/pom.xml | 5 + .../apache/pulsar/client/api/MessageBuilder.java | 2 +- .../java/org/apache/pulsar/client/api/Schema.java | 2 +- ...hema.java => SchemaSerializationException.java} | 28 +-- .../pulsar/client/impl/MessageBuilderImpl.java | 3 +- .../org/apache/pulsar/client/impl/MessageImpl.java | 1 + .../apache/pulsar/client/impl/ProducerBase.java | 8 +- .../apache/pulsar/client/impl/ProducerImpl.java | 3 +- .../pulsar/client/impl/TopicMessageImpl.java | 1 + .../pulsar/client/impl/schema/JSONSchema.java | 84 ++++++++ .../apache/pulsar/client/tutorial/JsonPojo.java} | 35 ++- .../tutorial/SampleAsyncProducerWithSchema.java | 69 ++++++ .../client/tutorial/SampleConsumerWithSchema.java | 49 +++++ .../org/apache/pulsar/common/api/Commands.java | 17 +- 29 files changed, 682 insertions(+), 95 deletions(-) diff --git a/pom.xml b/pom.xml index 4ac9294..af48238 100644 --- a/pom.xml +++ b/pom.xml @@ -534,6 +534,12 @@ flexible messaging model and an intuitive client API.</description> </dependency> <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-jsonSchema</artifactId> + <version>2.9.0</version> + </dependency> + + <dependency> <artifactId>log4j</artifactId> <groupId>log4j</groupId> <version>1.2.17</version> diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1002714..b35b440 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -438,6 +438,9 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean preferLaterVersions = false; private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"; + private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet( + "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck" + ); /**** --- WebSocket --- ****/ // Number of IO threads in Pulsar Client used in WebSocket proxy @@ -1506,6 +1509,14 @@ public class ServiceConfiguration implements PulsarConfiguration { schemaRegistryStorageClassName = className; } + public Set<String> getSchemaRegistryCompatibilityCheckers() { + return schemaRegistryCompatibilityCheckers; + } + + public void setSchemaRegistryCompatibilityCheckers(Set<String> schemaRegistryCompatibilityCheckers) { + this.schemaRegistryCompatibilityCheckers = schemaRegistryCompatibilityCheckers; + } + public boolean authenticateOriginalAuthData() { return authenticateOriginalAuthData; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d3488e8..d5a420f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -25,6 +25,7 @@ import static org.apache.pulsar.broker.lookup.v1.TopicLookup.lookupTopicAsync; import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse; import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5; +import com.google.common.base.Strings; import com.google.protobuf.GeneratedMessageLite; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; @@ -91,6 +92,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaVersion; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -561,6 +563,7 @@ public class ServerCnx extends PulsarHandler { final boolean readCompacted = subscribe.getReadCompacted(); final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe); final InitialPosition initialPosition = subscribe.getInitialPosition(); + final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; CompletableFuture<Boolean> isProxyAuthorizedFuture; if (service.isAuthorizationEnabled() && originalPrincipal != null) { @@ -622,9 +625,25 @@ public class ServerCnx extends PulsarHandler { } service.getOrCreateTopic(topicName.toString()) - .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, - subType, priorityLevel, consumerName, isDurable, - startMessageId, metadata, readCompacted, initialPosition)) + .thenCompose(topic -> { + if (schema != null) { + return topic.isSchemaCompatible(schema).thenCompose(isCompatible -> { + if (isCompatible) { + return topic.subscribe(ServerCnx.this, subscriptionName, consumerId, + subType, priorityLevel, consumerName, isDurable, + startMessageId, metadata, readCompacted, initialPosition); + } else { + return FutureUtil.failedFuture(new BrokerServiceException( + "Trying to subscribe with incompatible schema" + )); + } + }); + } else { + return topic.subscribe(ServerCnx.this, subscriptionName, consumerId, + subType, priorityLevel, consumerName, isDurable, + startMessageId, metadata, readCompacted, initialPosition); + } + }) .thenAccept(consumer -> { if (consumerFuture.complete(consumer)) { log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, @@ -721,7 +740,7 @@ public class ServerCnx extends PulsarHandler { .data(protocolSchema.getSchemaData().toByteArray()) .isDeleted(false) .timestamp(System.currentTimeMillis()) - .user(originalPrincipal) + .user(Strings.nullToEmpty(originalPrincipal)) .type(getType(protocolSchema.getType())) .props(protocolSchema.getPropertiesList().stream().collect( Collectors.toMap( @@ -741,6 +760,7 @@ public class ServerCnx extends PulsarHandler { : service.generateUniqueProducerName(); final boolean isEncrypted = cmdProducer.getEncrypted(); final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer); + final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null; TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer); if (topicName == null) { @@ -841,8 +861,8 @@ public class ServerCnx extends PulsarHandler { disableTcpNoDelayIfNeeded(topicName.toString(), producerName); CompletableFuture<SchemaVersion> schemaVersionFuture; - if (cmdProducer.hasSchema()) { - schemaVersionFuture = topic.addSchema(getSchema(cmdProducer.getSchema())); + if (schema != null) { + schemaVersionFuture = topic.addSchema(schema); } else { schemaVersionFuture = CompletableFuture.completedFuture(SchemaVersion.Empty); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 1f149f6..fceb858 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -130,4 +130,6 @@ public interface Topic { Position getLastMessageId(); CompletableFuture<SchemaVersion> addSchema(SchemaData schema); + + CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 1cc9403..7901975 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -994,4 +994,13 @@ public class NonPersistentTopic implements Topic { .getSchemaRegistryService() .putSchemaIfAbsent(id, schema); } + + @Override + public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) { + String base = TopicName.get(getName()).getPartitionedTopicName(); + String id = TopicName.get(base).getSchemaName(); + return brokerService.pulsar() + .getSchemaRegistryService() + .isCompatibleWithLatestVersion(id, schema); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 66ff7dd..0240ffd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1713,4 +1713,13 @@ public class PersistentTopic implements Topic, AddEntryCallback { .getSchemaRegistryService() .putSchemaIfAbsent(id, schema); } + + @Override + public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) { + String base = TopicName.get(getName()).getPartitionedTopicName(); + String id = TopicName.get(base).getSchemaName(); + return brokerService.pulsar() + .getSchemaRegistryService() + .isCompatibleWithLatestVersion(id, schema); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java index db3b9f7..fef288c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java @@ -51,6 +51,11 @@ public class DefaultSchemaRegistryService implements SchemaRegistryService { } @Override + public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema) { + return CompletableFuture.completedFuture(true); + } + + @Override public void close() throws Exception { } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java new file mode 100644 index 0000000..fa83c8a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jsonSchema.JsonSchema; +import java.io.IOException; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; + +@SuppressWarnings("unused") +public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public SchemaType getSchemaType() { + return SchemaType.JSON; + } + + @Override + public boolean isCompatible(SchemaData from, SchemaData to) { + try { + JsonSchema fromSchema = objectMapper.readValue(from.getData(), JsonSchema.class); + JsonSchema toSchema = objectMapper.readValue(to.getData(), JsonSchema.class); + return fromSchema.getId().equals(toSchema.getId()); + } catch (IOException e) { + return false; + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java similarity index 62% copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java index 3a91fde..86d115a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java @@ -16,30 +16,24 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.api; +package org.apache.pulsar.broker.service.schema; -import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; -public interface Schema<T> { - byte[] encode(T message); - T decode(byte[] bytes); - - SchemaInfo getSchemaInfo(); - - Schema<byte[]> IDENTITY = new Schema<byte[]>() { - @Override - public byte[] encode(byte[] message) { - return message; - } +public interface SchemaCompatibilityCheck { + SchemaType getSchemaType(); + boolean isCompatible(SchemaData from, SchemaData to); + SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() { @Override - public byte[] decode(byte[] bytes) { - return bytes; + public SchemaType getSchemaType() { + return SchemaType.NONE; } @Override - public SchemaInfo getSchemaInfo() { - return null; + public boolean isCompatible(SchemaData from, SchemaData to) { + return true; } }; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java index 4dfbd6d..8a2e6ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java @@ -34,6 +34,8 @@ public interface SchemaRegistry extends AutoCloseable { CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user); + CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema); + SchemaVersion versionFromBytes(byte[] version); class SchemaAndMetadata { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java index b9fa998..a74066b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java @@ -18,9 +18,13 @@ */ package org.apache.pulsar.broker.service.schema; +import com.google.common.collect.Maps; import java.lang.reflect.Method; +import java.util.Map; +import java.util.Set; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.common.schema.SchemaType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,15 +32,31 @@ public interface SchemaRegistryService extends SchemaRegistry { String CreateMethodName = "create"; Logger log = LoggerFactory.getLogger(SchemaRegistryService.class); + static Map<SchemaType, SchemaCompatibilityCheck> getCheckers(Set<String> checkerClasses) throws Exception { + Map<SchemaType, SchemaCompatibilityCheck> checkers = Maps.newHashMap(); + for (String className : checkerClasses) { + final Class<?> checkerClass = Class.forName(className); + SchemaCompatibilityCheck instance = (SchemaCompatibilityCheck) checkerClass.newInstance(); + checkers.put(instance.getSchemaType(), instance); + } + return checkers; + } + static SchemaRegistryService create(PulsarService pulsar) { try { ServiceConfiguration config = pulsar.getConfiguration(); final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName()); Object factoryInstance = storageClass.newInstance(); Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class); + SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar); + + Map<SchemaType, SchemaCompatibilityCheck> checkers = + getCheckers(config.getSchemaRegistryCompatibilityCheckers()); + schemaStorage.start(); - return new SchemaRegistryServiceImpl(schemaStorage); + + return new SchemaRegistryServiceImpl(schemaStorage, checkers); } catch (Exception e) { log.warn("Unable to create schema registry storage, defaulting to empty storage: {}", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index d014f92..3303332 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -39,21 +39,24 @@ import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; import org.apache.pulsar.common.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaVersion; +import org.apache.pulsar.common.util.FutureUtil; public class SchemaRegistryServiceImpl implements SchemaRegistryService { private static HashFunction hashFunction = Hashing.sha256(); + private final Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks; private final SchemaStorage schemaStorage; private final Clock clock; @VisibleForTesting - SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Clock clock) { + SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks, Clock clock) { this.schemaStorage = schemaStorage; + this.compatibilityChecks = compatibilityChecks; this.clock = clock; } @VisibleForTesting - SchemaRegistryServiceImpl(SchemaStorage schemaStorage) { - this(schemaStorage, Clock.systemUTC()); + SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks) { + this(schemaStorage, compatibilityChecks, Clock.systemUTC()); } @Override @@ -80,17 +83,23 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { @Override @NotNull public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) { - byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); - SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() - .setType(Functions.convertFromDomainType(schema.getType())) - .setSchema(ByteString.copyFrom(schema.getData())) - .setSchemaId(schemaId) - .setUser(schema.getUser()) - .setDeleted(false) - .setTimestamp(clock.millis()) - .addAllProps(toPairs(schema.getProps())) - .build(); - return schemaStorage.put(schemaId, info.toByteArray(), context); + return checkCompatibilityWithLatest(schemaId, schema).thenCompose(isCompatible -> { + if (isCompatible) { + byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); + SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() + .setType(Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(clock.millis()) + .addAllProps(toPairs(schema.getProps())) + .build(); + return schemaStorage.put(schemaId, info.toByteArray(), context); + } else { + return FutureUtil.failedFuture(new Exception()); + } + }); } @Override @@ -101,6 +110,11 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { } @Override + public CompletableFuture<Boolean> isCompatibleWithLatestVersion(String schemaId, SchemaData schema) { + return checkCompatibilityWithLatest(schemaId, schema); + } + + @Override public SchemaVersion versionFromBytes(byte[] version) { return schemaStorage.versionFromBytes(version); } @@ -121,6 +135,16 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { .build(); } + private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema) { + return getSchema(schemaId).thenApply(storedSchema -> + (storedSchema == null) || + compatibilityChecks.getOrDefault( + schema.getType(), + SchemaCompatibilityCheck.DEFAULT + ).isCompatible(storedSchema.schema, schema) + ); + } + interface Functions { static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) { switch (type) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index 9ae34fe..2264177 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.schema; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; +import com.google.common.collect.Maps; import java.time.Clock; import java.time.Instant; import java.time.ZoneId; @@ -74,7 +75,7 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest { BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar); storage.init(); storage.start(); - schemaRegistryService = new SchemaRegistryServiceImpl(storage, MockClock); + schemaRegistryService = new SchemaRegistryServiceImpl(storage, Maps.newHashMap(), MockClock); } @AfterMethod diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java index fb04520..bc535d7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java @@ -45,8 +45,8 @@ public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest { admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("test")); } - protected void testMessageOrderAndDuplicates(Set<String> messagesReceived, String receivedMessage, - String expectedMessage) { + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { // Make sure that messages are received in order Assert.assertEquals(receivedMessage, expectedMessage, "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java new file mode 100644 index 0000000..fd76393 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Sets; +import java.time.Clock; +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.service.schema.SchemaRegistry; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class); + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testJsonProducerAndConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + JSONSchema<JsonEncodedPojo> jsonSchema = + JSONSchema.of(JsonEncodedPojo.class); + + Consumer<JsonEncodedPojo> consumer = pulsarClient + .newConsumer(jsonSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscribe(); + + Producer<JsonEncodedPojo> producer = pulsarClient + .newProducer(jsonSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(new JsonEncodedPojo(message)); + } + + Message<JsonEncodedPojo> msg = null; + Set<JsonEncodedPojo> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + JsonEncodedPojo receivedMessage = msg.getValue(); + log.debug("Received message: [{}]", receivedMessage); + JsonEncodedPojo expectedMessage = new JsonEncodedPojo("my-message-" + i); + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), jsonSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + } + + @Test + public void testJsonProducerAndConsumerWithPrestoredSchema() throws Exception { + log.info("-- Starting {} test --", methodName); + + JSONSchema<JsonEncodedPojo> jsonSchema = + JSONSchema.of(JsonEncodedPojo.class); + + pulsar.getSchemaRegistryService() + .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SchemaData.builder() + .type(SchemaType.JSON) + .isDeleted(false) + .timestamp(Clock.systemUTC().millis()) + .user("me") + .data(jsonSchema.getSchemaInfo().getSchema()) + .props(Collections.emptyMap()) + .build() + ).get(); + + Consumer<JsonEncodedPojo> consumer = pulsarClient + .newConsumer(jsonSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscribe(); + + Producer<JsonEncodedPojo> producer = pulsarClient + .newProducer(jsonSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + consumer.close(); + producer.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), jsonSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + } + + @Test(expectedExceptions = {PulsarClientException.class}) + public void testJsonConsumerWithWrongPrestoredSchema() throws Exception { + log.info("-- Starting {} test --", methodName); + + byte[] randomSchemaBytes = "hello".getBytes(); + + pulsar.getSchemaRegistryService() + .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SchemaData.builder() + .type(SchemaType.JSON) + .isDeleted(false) + .timestamp(Clock.systemUTC().millis()) + .user("me") + .data(randomSchemaBytes) + .props(Collections.emptyMap()) + .build() + ).get(); + + Consumer<JsonEncodedPojo> consumer = pulsarClient + .newConsumer(JSONSchema.of(JsonEncodedPojo.class)) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscribe(); + + log.info("-- Exiting {} test --", methodName); + } + + @Test(expectedExceptions = {PulsarClientException.class}) + public void testJsonProducerWithWrongPrestoredSchema() throws Exception { + log.info("-- Starting {} test --", methodName); + + byte[] randomSchemaBytes = "hello".getBytes(); + + pulsar.getSchemaRegistryService() + .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SchemaData.builder() + .type(SchemaType.JSON) + .isDeleted(false) + .timestamp(Clock.systemUTC().millis()) + .user("me") + .data(randomSchemaBytes) + .props(Collections.emptyMap()) + .build() + ).get(); + + Producer<JsonEncodedPojo> producer = pulsarClient + .newProducer(JSONSchema.of(JsonEncodedPojo.class)) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + + log.info("-- Exiting {} test --", methodName); + } + + public static class JsonEncodedPojo { + private String message; + + public JsonEncodedPojo() { + } + + public JsonEncodedPojo(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JsonEncodedPojo that = (JsonEncodedPojo) o; + return Objects.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(message); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("message", message) + .toString(); + } + } + +} diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index f6b5b40..8bc7899 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -103,6 +103,11 @@ <version>4.4.9</version> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-jsonSchema</artifactId> + </dependency> + </dependencies> <build> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java index d6a013b..0728275 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java @@ -57,7 +57,7 @@ public interface MessageBuilder<T> { * @param value * the domain object */ - MessageBuilder<T> setValue(T value); + MessageBuilder<T> setValue(T value) throws SchemaSerializationException; /** * Set the content of the message diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java index 3a91fde..638477c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -21,7 +21,7 @@ package org.apache.pulsar.client.api; import org.apache.pulsar.common.schema.SchemaInfo; public interface Schema<T> { - byte[] encode(T message); + byte[] encode(T message) throws SchemaSerializationException; T decode(byte[] bytes); SchemaInfo getSchemaInfo(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java similarity index 61% copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java copy to pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java index 3a91fde..e31c4cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java @@ -18,28 +18,8 @@ */ package org.apache.pulsar.client.api; -import org.apache.pulsar.common.schema.SchemaInfo; - -public interface Schema<T> { - byte[] encode(T message); - T decode(byte[] bytes); - - SchemaInfo getSchemaInfo(); - - Schema<byte[]> IDENTITY = new Schema<byte[]>() { - @Override - public byte[] encode(byte[] message) { - return message; - } - - @Override - public byte[] decode(byte[] bytes) { - return bytes; - } - - @Override - public SchemaInfo getSchemaInfo() { - return null; - } - }; +public class SchemaSerializationException extends PulsarClientException { + public SchemaSerializationException(Throwable cause) { + super(cause); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java index 056064a..7012690 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -48,7 +49,7 @@ public class MessageBuilderImpl<T> implements MessageBuilder<T> { } @Override - public MessageBuilder<T> setValue(T value) { + public MessageBuilder<T> setValue(T value) throws SchemaSerializationException { return setContent(schema.encode(value)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 2989d30..e02f9f6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index 0cf8bab..48c1f19 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -28,7 +28,9 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.util.FutureUtil; public abstract class ProducerBase<T> extends HandlerState implements Producer<T> { @@ -51,7 +53,11 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T @Override public CompletableFuture<MessageId> sendAsync(T message) { - return sendAsync(MessageBuilder.create(schema).setValue(message).build()); + try { + return sendAsync(MessageBuilder.create(schema).setValue(message).build()); + } catch (SchemaSerializationException e) { + return FutureUtil.failedFuture(new SchemaSerializationException(e)); + } } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index d866b28..3c746db 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -847,7 +847,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne long requestId = client.newRequestId(); cnx.sendRequestWithId( - Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata), + Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata, + schema == null ? null : schema.getSchemaInfo()), requestId).thenAccept(response -> { String producerName = response.getProducerName(); long lastSequenceId = response.getLastSequenceId(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 18e2088..236c2b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -22,6 +22,7 @@ package org.apache.pulsar.client.impl; import java.util.Map; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SchemaSerializationException; public class TopicMessageImpl<T> extends MessageRecordImpl<T, TopicMessageIdImpl> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java new file mode 100644 index 0000000..7eca43d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jsonSchema.JsonSchema; +import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +public class JSONSchema<T> implements Schema<T> { + + private final SchemaInfo info; + private final ObjectMapper objectMapper; + private final Class<T> pojo; + + private JSONSchema(SchemaInfo info, Class<T> pojo, ObjectMapper objectMapper) { + this.info = info; + this.pojo = pojo; + this.objectMapper = objectMapper; + } + + @Override + public byte[] encode(T message) throws SchemaSerializationException { + try { + return objectMapper.writeValueAsBytes(message); + } catch (JsonProcessingException e) { + throw new SchemaSerializationException(e); + } + } + + @Override + public T decode(byte[] bytes) { + try { + return objectMapper.readValue(new String(bytes), pojo); + } catch (IOException e) { + throw new RuntimeException(new SchemaSerializationException(e)); + } + } + + @Override + public SchemaInfo getSchemaInfo() { + return info; + } + + public static <T> JSONSchema<T> of(Class<T> pojo) throws JsonProcessingException { + return of(pojo, Collections.emptyMap()); + } + + public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> properties) throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper); + JsonSchema schema = schemaGen.generateSchema(pojo); + + SchemaInfo info = new SchemaInfo(); + info.setName(""); + info.setProperties(properties); + info.setType(SchemaType.JSON); + info.setSchema(mapper.writeValueAsBytes(schema)); + return new JSONSchema<>(info, pojo, mapper); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/JsonPojo.java similarity index 58% copy from pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java copy to pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/JsonPojo.java index 3a91fde..6d77875 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/JsonPojo.java @@ -16,30 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.api; +package org.apache.pulsar.client.tutorial; -import org.apache.pulsar.common.schema.SchemaInfo; +public class JsonPojo { + public String content; -public interface Schema<T> { - byte[] encode(T message); - T decode(byte[] bytes); + public String getContent() { + return content; + } - SchemaInfo getSchemaInfo(); + public void setContent(String content) { + this.content = content; + } - Schema<byte[]> IDENTITY = new Schema<byte[]>() { - @Override - public byte[] encode(byte[] message) { - return message; - } + public JsonPojo() { + } - @Override - public byte[] decode(byte[] bytes) { - return bytes; - } - - @Override - public SchemaInfo getSchemaInfo() { - return null; - } - }; + public JsonPojo(String content) { + this.content = content; + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java new file mode 100644 index 0000000..90758b6 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.tutorial; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.schema.JSONSchema; + +@Slf4j +public class SampleAsyncProducerWithSchema { + + public static void main(String[] args) throws IOException { + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build(); + + Producer<JsonPojo> producer = pulsarClient.newProducer(JSONSchema.of(JsonPojo.class)).topic("persistent://my-property/use/my-ns/my-topic") + .sendTimeout(3, TimeUnit.SECONDS).create(); + + List<CompletableFuture<MessageId>> futures = Lists.newArrayList(); + + for (int i = 0; i < 10; i++) { + final String content = "my-message-" + i; + CompletableFuture<MessageId> future = producer.sendAsync(new JsonPojo(content)); + + future.handle((v, ex) -> { + if (ex == null) { + log.info("Message persisted: {}", content); + } else { + log.error("Error persisting message: {}", content, ex); + } + return null; + }); + + futures.add(future); + } + + log.info("Waiting for async ops to complete"); + for (CompletableFuture<MessageId> future : futures) { + future.join(); + } + + log.info("All operations completed"); + + pulsarClient.close(); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java new file mode 100644 index 0000000..3780332 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.tutorial; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.schema.JSONSchema; + +public class SampleConsumerWithSchema { + public static void main(String[] args) throws PulsarClientException, JsonProcessingException { + + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build(); + + Consumer<JsonPojo> consumer = pulsarClient.newConsumer(JSONSchema.of(JsonPojo.class)) // + .topic("persistent://my-property/use/my-ns/my-topic") // + .subscriptionName("my-subscription-name").subscribe(); + + Message<JsonPojo> msg = null; + + for (int i = 0; i < 100; i++) { + msg = consumer.receive(); + // do something + System.out.println("Received: " + msg.getValue().content); + } + + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + pulsarClient.close(); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 904eddd..f61a2b3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -336,14 +336,19 @@ public class Commands { } subscribeBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata)); - if (null != schemaInfo) { - subscribeBuilder.setSchema(getSchema(schemaInfo)); + PulsarApi.Schema schema = null; + if (schemaInfo != null) { + schema = getSchema(schemaInfo); + subscribeBuilder.setSchema(schema); } CommandSubscribe subscribe = subscribeBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe)); subscribeBuilder.recycle(); subscribe.recycle(); + if (null != schema) { + schema.recycle(); + } return res; } @@ -426,6 +431,7 @@ public class Commands { return res; } + @VisibleForTesting public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, Map<String, String> metadata) { return newProducer(topic, producerId, requestId, producerName, false, metadata); @@ -452,7 +458,7 @@ public class Commands { } private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) { - return PulsarApi.Schema.newBuilder() + PulsarApi.Schema.Builder builder = PulsarApi.Schema.newBuilder() .setName(schemaInfo.getName()) .setSchemaData(copyFrom(schemaInfo.getSchema())) .setType(getSchemaType(schemaInfo.getType())) @@ -463,7 +469,10 @@ public class Commands { .setValue(entry.getValue()) .build() ).collect(Collectors.toList()) - ).build(); + ); + PulsarApi.Schema schema = builder.build(); + builder.recycle(); + return schema; } public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, -- To stop receiving notification emails like this one, please contact mme...@apache.org.