This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 28ce0ad Validate topic name on broker side (#1178) 28ce0ad is described below commit 28ce0adaef76c52341231104daac4df3e2c55f53 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Feb 6 16:15:40 2018 -0800 Validate topic name on broker side (#1178) * Validate topic name on broker side * Refactored to put them in a single validation method * Addressed comment --- .../apache/pulsar/broker/service/ServerCnx.java | 143 ++++++++++++++------- .../pulsar/broker/service/ServerCnxTest.java | 93 +++++++++++--- .../broker/service/utils/ClientChannelHelper.java | 6 + .../apache/pulsar/common/api/proto/PulsarApi.java | 3 + pulsar-common/src/main/proto/PulsarApi.proto | 1 + 5 files changed, 180 insertions(+), 66 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3a6d337..0f0cd6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -80,6 +80,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; +import com.google.protobuf.GeneratedMessageLite; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; @@ -106,7 +107,7 @@ public class ServerCnx extends PulsarHandler { private final int MaxNonPersistentPendingMessages; private String originalPrincipal = null; private Set<String> proxyRoles = Sets.newHashSet(); - + enum State { Start, Connected, Failed } @@ -198,7 +199,7 @@ public class ServerCnx extends PulsarHandler { return true; } - + // //// // // Incoming commands handling // //// @@ -206,9 +207,14 @@ public class ServerCnx extends PulsarHandler { @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); - final String topicName = lookup.getTopic(); + if (log.isDebugEnabled()) { - log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); + log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId); + } + + DestinationName topicName = validateTopicName(lookup.getTopic(), requestId, lookup); + if (topicName == null) { + return; } final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); @@ -218,21 +224,21 @@ public class ServerCnx extends PulsarHandler { if (!validateOriginalPrincipal(originalPrincipal, newLookupErrorResponse(ServerError.AuthorizationError, "Valid Proxy Client role should be provided for lookup ", requestId), - topicName, "Valid Proxy Client role should be provided for lookup ")) { + topicName.toString(), "Valid Proxy Client role should be provided for lookup ")) { lookupSemaphore.release(); return; } CompletableFuture<Boolean> isProxyAuthorizedFuture; if (service.isAuthorizationEnabled() && originalPrincipal != null) { isProxyAuthorizedFuture = service.getAuthorizationManager() - .canLookupAsync(DestinationName.get(topicName), authRole); + .canLookupAsync(topicName, authRole); } else { isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); } - + isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { if (isProxyAuthorized) { - lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topicName), + lookupDestinationAsync(getBrokerService().pulsar(), topicName, lookup.getAuthoritative(), originalPrincipal != null ? originalPrincipal : authRole, lookup.getRequestId()).handle((lookupResponse, ex) -> { if (ex == null) { @@ -273,10 +279,16 @@ public class ServerCnx extends PulsarHandler { @Override protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { final long requestId = partitionMetadata.getRequestId(); - final String topicName = partitionMetadata.getTopic(); if (log.isDebugEnabled()) { - log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topicName, remoteAddress, requestId); + log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(), + remoteAddress, requestId); } + + DestinationName topicName = validateTopicName(partitionMetadata.getTopic(), requestId, partitionMetadata); + if (topicName == null) { + return; + } + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { final String originalPrincipal = partitionMetadata.hasOriginalPrincipal() @@ -284,22 +296,23 @@ public class ServerCnx extends PulsarHandler { if (!validateOriginalPrincipal(originalPrincipal, Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", requestId), - topicName, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ")) { + topicName.toString(), + "Valid Proxy Client role should be provided for getPartitionMetadataRequest ")) { lookupSemaphore.release(); return; } CompletableFuture<Boolean> isProxyAuthorizedFuture; if (service.isAuthorizationEnabled() && originalPrincipal != null) { isProxyAuthorizedFuture = service.getAuthorizationManager() - .canLookupAsync(DestinationName.get(topicName), authRole); + .canLookupAsync(topicName, authRole); } else { isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); } isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { if (isProxyAuthorized) { - getPartitionedTopicMetadata(getBrokerService().pulsar(), - originalPrincipal != null ? originalPrincipal : authRole, - DestinationName.get(topicName)).handle((metadata, ex) -> { + getPartitionedTopicMetadata(getBrokerService().pulsar(), + originalPrincipal != null ? originalPrincipal : authRole, topicName) + .handle((metadata, ex) -> { if (ex == null) { int partitions = metadata.partitions; ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId)); @@ -446,19 +459,38 @@ public class ServerCnx extends PulsarHandler { @Override protected void handleSubscribe(final CommandSubscribe subscribe) { checkArgument(state == State.Connected); - final String topicName = subscribe.getTopic(); final long requestId = subscribe.getRequestId(); final long consumerId = subscribe.getConsumerId(); + + DestinationName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe); + if (topicName == null) { + return; + } + if (!validateOriginalPrincipal(originalPrincipal, Commands.newError(requestId, ServerError.AuthorizationError, "Valid Proxy Client role should be provided while subscribing "), - topicName, "Valid Proxy Client role should be provided while subscribing ")) { + topicName.toString(), "Valid Proxy Client role should be provided while subscribing ")) { return; } + + final String subscriptionName = subscribe.getSubscription(); + final SubType subType = subscribe.getSubType(); + final String consumerName = subscribe.getConsumerName(); + final boolean isDurable = subscribe.getDurable(); + final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl( + subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), + subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) + : null; + + final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0; + final boolean readCompacted = subscribe.getReadCompacted(); + final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe); + CompletableFuture<Boolean> isProxyAuthorizedFuture; if (service.isAuthorizationEnabled() && originalPrincipal != null) { - isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName), - authRole, subscribe.getSubscription()); + isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(topicName, authRole, + subscribe.getSubscription()); } else { isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); } @@ -466,26 +498,12 @@ public class ServerCnx extends PulsarHandler { if (isProxyAuthorized) { CompletableFuture<Boolean> authorizationFuture; if (service.isAuthorizationEnabled()) { - authorizationFuture = service.getAuthorizationManager().canConsumeAsync( - DestinationName.get(subscribe.getTopic()), - originalPrincipal != null ? originalPrincipal : authRole, subscribe.getSubscription()); + authorizationFuture = service.getAuthorizationManager().canConsumeAsync(topicName, + originalPrincipal != null ? originalPrincipal : authRole, subscriptionName); } else { authorizationFuture = CompletableFuture.completedFuture(true); } - final String subscriptionName = subscribe.getSubscription(); - final SubType subType = subscribe.getSubType(); - final String consumerName = subscribe.getConsumerName(); - final boolean isDurable = subscribe.getDurable(); - final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl( - subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), - subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) - : null; - - final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0; - final boolean readCompacted = subscribe.getReadCompacted(); - final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe); - authorizationFuture.thenApply(isAuthorized -> { if (isAuthorized) { if (log.isDebugEnabled()) { @@ -527,7 +545,7 @@ public class ServerCnx extends PulsarHandler { } } - service.getTopic(topicName) + service.getTopic(topicName.toString()) .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted)) @@ -605,21 +623,29 @@ public class ServerCnx extends PulsarHandler { @Override protected void handleProducer(final CommandProducer cmdProducer) { checkArgument(state == State.Connected); - final String topicName = cmdProducer.getTopic(); final long producerId = cmdProducer.getProducerId(); final long requestId = cmdProducer.getRequestId(); + // Use producer name provided by client if present + final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() + : service.generateUniqueProducerName(); + final boolean isEncrypted = cmdProducer.getEncrypted(); + final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer); + + DestinationName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer); + if (topicName == null) { + return; + } if (!validateOriginalPrincipal(originalPrincipal, Commands.newError(requestId, ServerError.AuthorizationError, "Valid Proxy Client role should be provided while creating producer "), - topicName, "Valid Proxy Client role should be provided while creating producer ")) { + topicName.toString(), "Valid Proxy Client role should be provided while creating producer ")) { return; } - + CompletableFuture<Boolean> isProxyAuthorizedFuture; if (service.isAuthorizationEnabled() && originalPrincipal != null) { - isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName), - authRole); + isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(topicName, authRole); } else { isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); } @@ -627,17 +653,11 @@ public class ServerCnx extends PulsarHandler { if (isProxyAuthorized) { CompletableFuture<Boolean> authorizationFuture; if (service.isAuthorizationEnabled()) { - authorizationFuture = service.getAuthorizationManager().canProduceAsync( - DestinationName.get(cmdProducer.getTopic().toString()), + authorizationFuture = service.getAuthorizationManager().canProduceAsync(topicName, originalPrincipal != null ? originalPrincipal : authRole); } else { authorizationFuture = CompletableFuture.completedFuture(true); } - // Use producer name provided by client if present - final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() - : service.generateUniqueProducerName(); - final boolean isEncrypted = cmdProducer.getEncrypted(); - final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer); authorizationFuture.thenApply(isAuthorized -> { if (isAuthorized) { @@ -675,7 +695,7 @@ public class ServerCnx extends PulsarHandler { log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); - service.getTopic(topicName).thenAccept((Topic topic) -> { + service.getTopic(topicName.toString()).thenAccept((Topic topic) -> { // Before creating producer, check if backlog quota exceeded // on topic if (topic.isBacklogQuotaExceeded(producerName)) { @@ -704,7 +724,7 @@ public class ServerCnx extends PulsarHandler { return; } - disableTcpNoDelayIfNeeded(topicName, producerName); + disableTcpNoDelayIfNeeded(topicName.toString(), producerName); Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted, metadata); @@ -1130,6 +1150,29 @@ public class ServerCnx extends PulsarHandler { } } + private DestinationName validateTopicName(String topic, long requestId, GeneratedMessageLite requestCommand) { + try { + return DestinationName.get(topic); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to parse topic name '{}'", remoteAddress, topic, t); + } + + if (requestCommand instanceof CommandLookupTopic) { + ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName, + "Invalid topic name: " + t.getMessage(), requestId)); + } else if (requestCommand instanceof CommandPartitionedTopicMetadata) { + ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName, + "Invalid topic name: " + t.getMessage(), requestId)); + } else { + ctx.writeAndFlush(Commands.newError(requestId, ServerError.InvalidTopicName, + "Invalid topic name: " + t.getMessage())); + } + + return null; + } + } + private static final Logger log = LoggerFactory.getLogger(ServerCnx.class); /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index b604cfd..4b895d4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -45,8 +45,6 @@ import java.util.concurrent.TimeUnit; import javax.naming.AuthenticationException; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -67,29 +65,28 @@ import org.apache.pulsar.broker.authorization.AuthorizationManager; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.ServerCnx; -import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.ServerCnx.State; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.utils.ClientChannelHelper; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.Commands; -import org.apache.pulsar.common.api.PulsarHandler; import org.apache.pulsar.common.api.Commands.ChecksumType; +import org.apache.pulsar.common.api.PulsarHandler; import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; import org.apache.pulsar.common.api.proto.PulsarApi.CommandError; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess; import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.policies.data.AuthAction; @@ -105,6 +102,9 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; @@ -114,19 +114,19 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; */ @Test public class ServerCnxTest { - private EmbeddedChannel channel; + protected EmbeddedChannel channel; private ServiceConfiguration svcConfig; private ServerCnx serverCnx; - private BrokerService brokerService; + protected BrokerService brokerService; private ManagedLedgerFactory mlFactoryMock; private ClientChannelHelper clientChannelHelper; private PulsarService pulsar; private ConfigurationCacheService configCacheService; - private NamespaceService namespaceService; + protected NamespaceService namespaceService; private final int currentProtocolVersion = ProtocolVersion.values()[ProtocolVersion.values().length - 1] .getNumber(); - private final String successTopicName = "persistent://prop/use/ns-abc/successTopic"; + protected final String successTopicName = "persistent://prop/use/ns-abc/successTopic"; private final String failTopicName = "persistent://prop/use/ns-abc/failTopic"; private final String nonOwnedTopicName = "persistent://prop/use/ns-abc/success-not-owned-topic"; private final String encryptionRequiredTopicName = "persistent://prop/use/ns-abc/successEncryptionRequiredTopic"; @@ -1334,7 +1334,7 @@ public class ServerCnxTest { channel.finish(); } - private void resetChannel() throws Exception { + protected void resetChannel() throws Exception { int MaxMessageSize = 5 * 1024 * 1024; if (channel != null && channel.isActive()) { serverCnx.close(); @@ -1345,7 +1345,7 @@ public class ServerCnxTest { channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx); } - private void setChannelConnected() throws Exception { + protected void setChannelConnected() throws Exception { Field channelState = ServerCnx.class.getDeclaredField("state"); channelState.setAccessible(true); channelState.set(serverCnx, State.Connected); @@ -1358,7 +1358,7 @@ public class ServerCnxTest { versionField.set(cnx, version); } - private Object getResponse() throws Exception { + protected Object getResponse() throws Exception { // Wait at most for 10s to get a response final long sleepTimeMs = 10; final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs; @@ -1460,5 +1460,66 @@ public class ServerCnxTest { doReturn(successSubName).when(cursorMock).getName(); } + @Test(timeOut = 30000) + public void testInvalidTopicOnLookup() throws Exception { + resetChannel(); + setChannelConnected(); + + String invalidTopicName = "xx/ass/aa/aaa"; + + resetChannel(); + setChannelConnected(); + + + channel.writeInbound(Commands.newLookup(invalidTopicName, true, 1)); + Object obj = getResponse(); + assertEquals(obj.getClass(), CommandLookupTopicResponse.class); + CommandLookupTopicResponse res = (CommandLookupTopicResponse) obj; + assertEquals(res.getError(), ServerError.InvalidTopicName); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void testInvalidTopicOnProducer() throws Exception { + resetChannel(); + setChannelConnected(); + + String invalidTopicName = "xx/ass/aa/aaa"; + + resetChannel(); + setChannelConnected(); + + ByteBuf clientCommand = Commands.newProducer(invalidTopicName, 1 /* producer id */, 1 /* request id */, + "prod-name", Collections.emptyMap()); + channel.writeInbound(clientCommand); + Object obj = getResponse(); + assertEquals(obj.getClass(), CommandError.class); + CommandError res = (CommandError) obj; + assertEquals(res.getError(), ServerError.InvalidTopicName); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void testInvalidTopicOnSubscribe() throws Exception { + resetChannel(); + setChannelConnected(); + + String invalidTopicName = "xx/ass/aa/aaa"; + + resetChannel(); + setChannelConnected(); + + channel.writeInbound(Commands.newSubscribe(invalidTopicName, "test-subscription", 1, 1, SubType.Exclusive, 0, + "consumerName")); + Object obj = getResponse(); + assertEquals(obj.getClass(), CommandError.class); + CommandError res = (CommandError) obj; + assertEquals(res.getError(), ServerError.InvalidTopicName); + + channel.finish(); + } + private static final Logger log = LoggerFactory.getLogger(ServerCnxTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index 717cd4a..3699f78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -28,6 +28,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; import org.apache.pulsar.common.api.proto.PulsarApi.CommandError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage; import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess; @@ -146,6 +147,11 @@ public class ClientChannelHelper { protected void handleProducerSuccess(CommandProducerSuccess success) { queue.offer(CommandProducerSuccess.newBuilder(success).build()); } + + @Override + protected void handleLookupResponse(CommandLookupTopicResponse connection) { + queue.offer(CommandLookupTopicResponse.newBuilder(connection).build()); + } }; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 98c851b..16a3cf0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -71,6 +71,7 @@ public final class PulsarApi { TooManyRequests(14, 14), TopicTerminatedError(15, 15), ProducerBusy(16, 16), + InvalidTopicName(17, 17), ; public static final int UnknownError_VALUE = 0; @@ -90,6 +91,7 @@ public final class PulsarApi { public static final int TooManyRequests_VALUE = 14; public static final int TopicTerminatedError_VALUE = 15; public static final int ProducerBusy_VALUE = 16; + public static final int InvalidTopicName_VALUE = 17; public final int getNumber() { return value; } @@ -113,6 +115,7 @@ public final class PulsarApi { case 14: return TooManyRequests; case 15: return TopicTerminatedError; case 16: return ProducerBusy; + case 17: return InvalidTopicName; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 2fac3de..3b87273 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -111,6 +111,7 @@ enum ServerError { TopicTerminatedError = 15; // The topic has been terminated ProducerBusy = 16; // Producer with same name is already connected + InvalidTopicName = 17; // The topic name is not valid } enum AuthMethod { -- To stop receiving notification emails like this one, please contact mme...@apache.org.