merlimat closed pull request #1434: Move config subscriptionInitialPosition from parameter into ConfigurationData URL: https://github.com/apache/incubator-pulsar/pull/1434
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 3dc4e5612..61a1fc267 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -31,7 +31,6 @@ import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -103,7 +102,7 @@ public String toString() { RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, CompletableFuture<Consumer<byte[]>> consumerFuture) { super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1, - consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY, SubscriptionInitialPosition.Earliest); + consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 2cc4cf93c..f378d1647 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -2333,4 +2333,45 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe log.info("-- Exiting {} test --", methodName); } + @Test + public void testConsumerSubscriptionInitialize() throws Exception { + log.info("-- Starting {} test --", methodName); + String topicName = "persistent://my-property/use/my-ns/test-subscription-initialize-topic"; + + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + // 1, produce 5 messages + for (int i = 0; i < 5; i++) { + final String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + // 2, create consumer + Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName) + .subscriptionName("test-subscription-default").subscribe(); + Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName) + .subscriptionName("test-subscription-latest").subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe(); + Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName) + .subscriptionName("test-subscription-earliest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + + // 3, produce 5 messages more + for (int i = 5; i < 10; i++) { + final String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + // 4, verify consumer get right message. + assertEquals(defaultConsumer.receive().getData(), "my-message-5".getBytes()); + assertEquals(latestConsumer.receive().getData(), "my-message-5".getBytes()); + assertEquals(earliestConsumer.receive().getData(), "my-message-0".getBytes()); + + defaultConsumer.close(); + latestConsumer.close(); + earliestConsumer.close(); + + log.info("-- Exiting {} test --", methodName); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java index 05deee3b0..e28953f65 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java @@ -27,10 +27,21 @@ /** * the latest position which means the start consuming position will be the last message */ - Latest, + Latest(0), /** * the earliest position which means the start consuming position will be the first message */ - Earliest, + Earliest(1), + ; + + + private final int value; + + SubscriptionInitialPosition(int value) { + this.value = value; + } + + public final int getValue() { return value; } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 067a67770..46709d20a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -63,6 +63,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -138,12 +139,12 @@ ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) { - this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, SubscriptionInitialPosition.Latest); + this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema); } ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, - SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, SubscriptionInitialPosition subscriptionInitialPosition) { + SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema) { super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema); this.consumerId = client.newConsumerId(); this.subscriptionMode = subscriptionMode; @@ -581,7 +582,7 @@ public void connectionOpened(final ClientCnx cnx) { } ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, - consumerName, isDurable, startMessageIdData, metadata, readCompacted); + consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue())); if (startMessageIdData != null) { startMessageIdData.recycle(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 881a69c26..ed374f663 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -32,7 +32,6 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderListener; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -83,7 +82,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) { } consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor, - -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, SubscriptionInitialPosition.Latest); + -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema); } @Override diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 59d678077..dcea2b2c8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -66,6 +66,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess; import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; @@ -301,12 +302,12 @@ public static ByteBufPair newSend(long producerId, long sequenceId, int numMessa public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, - true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false); + true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map<String, String> metadata, boolean readCompacted) { + Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) { CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder(); subscribeBuilder.setTopic(topic); subscribeBuilder.setSubscription(subscription); @@ -317,6 +318,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu subscribeBuilder.setPriorityLevel(priorityLevel); subscribeBuilder.setDurable(isDurable); subscribeBuilder.setReadCompacted(readCompacted); + subscribeBuilder.setInitialPosition(subscriptionInitialPosition); if (startMessageId != null) { subscribeBuilder.setStartMessageId(startMessageId); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services