This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new baf155f [ClientAPI]Fix hasMessageAvailable() (#6362) baf155f is described below commit baf155f4a41de959145841e4cd922e89363e9292 Author: Yijie Shen <henry.yijies...@gmail.com> AuthorDate: Tue Mar 3 15:04:49 2020 +0800 [ClientAPI]Fix hasMessageAvailable() (#6362) Fixes #6333 Previously, `hasMoreMessages` is test against: ``` return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0 && incomingMessages.size() > 0; ``` However, the `incomingMessages` could be 0 when the consumer/reader has just started and hasn't received any messages yet. In this PR, the last entry is retrieved and decoded to get message metadata. for the batchIndex field population. --- .../apache/pulsar/broker/service/ServerCnx.java | 86 ++++++++++++++++--- .../pulsar/compaction/TwoPhaseCompactor.java | 6 +- .../apache/pulsar/client/api/TopicReaderTest.java | 75 ++++++++++++++++- .../org/apache/pulsar/client/impl/ReaderTest.java | 4 +- .../apache/pulsar/client/impl/ConsumerImpl.java | 98 ++++++++++++++++------ .../pulsar/client/impl/ZeroQueueConsumerImpl.java | 2 +- .../pulsar/common/api/raw/RawMessageImpl.java | 3 + 7 files changed, 233 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bef63fc..4497c97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -46,7 +46,11 @@ import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.StringUtils; @@ -59,6 +63,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyExcep import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.web.RestException; @@ -1396,22 +1401,83 @@ public class ServerCnx extends PulsarHandler { Topic topic = consumer.getSubscription().getTopic(); Position position = topic.getLastMessageId(); int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, - topic.getName(), consumer.getSubscription().getName(), position, partitionIndex); - } - MessageIdData messageId = MessageIdData.newBuilder() - .setLedgerId(((PositionImpl)position).getLedgerId()) - .setEntryId(((PositionImpl)position).getEntryId()) - .setPartition(partitionIndex) - .build(); - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + getLargestBatchIndexWhenPossible( + topic, + (PositionImpl) position, + partitionIndex, + requestId, + consumer.getSubscription().getName()); + } else { ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); } } + private void getLargestBatchIndexWhenPossible( + Topic topic, + PositionImpl position, + int partitionIndex, + long requestId, + String subscriptionName) { + + PersistentTopic persistentTopic = (PersistentTopic) topic; + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + + // If it's not pointing to a valid entry, respond messageId of the current position. + if (position.getEntryId() == -1) { + MessageIdData messageId = MessageIdData.newBuilder() + .setLedgerId(position.getLedgerId()) + .setEntryId(position.getEntryId()) + .setPartition(partitionIndex).build(); + + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + } + + // For a valid position, we read the entry out and parse the batch size from its metadata. + CompletableFuture<Entry> entryFuture = new CompletableFuture<>(); + ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + entryFuture.complete(entry); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + entryFuture.completeExceptionally(exception); + } + }, null); + + CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> { + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + int batchSize = metadata.getNumMessagesInBatch(); + entry.release(); + return batchSize; + }); + + batchSizeFuture.whenComplete((batchSize, e) -> { + if (e != null) { + ctx.writeAndFlush(Commands.newError( + requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage())); + } else { + int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1; + + if (log.isDebugEnabled()) { + log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, + topic.getName(), subscriptionName, position, partitionIndex); + } + + MessageIdData messageId = MessageIdData.newBuilder() + .setLedgerId(position.getLedgerId()) + .setEntryId(position.getEntryId()) + .setPartition(partitionIndex) + .setBatchIndex(largestBatchIndex).build(); + + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + } + }); + } + @Override protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) { final long requestId = commandGetTopicsOfNamespace.getRequestId(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 06afe93..95f6f1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.RawBatchConverter; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -95,7 +96,10 @@ public class TwoPhaseCompactor extends Compactor { } else { log.info("Commencing phase one of compaction for {}, reading to {}", reader.getTopic(), lastMessageId); - phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey, + // Each entry is processed as a whole, discard the batchIndex part deliberately. + MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId; + MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), lastImpl.getPartitionIndex()); + phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey, loopPromise); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 7eda446..552f69d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -91,6 +91,17 @@ public class TopicReaderTest extends ProducerConsumerBase { }; } + @DataProvider + public static Object[][] variationsForHasMessageAvailable() { + return new Object[][] { + // batching / start-inclusive + {true, true}, + {true, false}, + {false, true}, + {false, false}, + }; + } + @Test public void testSimpleReader() throws Exception { Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader") @@ -531,6 +542,68 @@ public class TopicReaderTest extends ProducerConsumerBase { } + @Test(dataProvider = "variationsForHasMessageAvailable") + public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception { + final String topicName = "persistent://my-property/my-ns/HasMessageAvailable"; + final int numOfMessage = 100; + + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() + .topic(topicName); + + if (enableBatch) { + producerBuilder + .enableBatching(true) + .batchingMaxMessages(10); + } else { + producerBuilder + .enableBatching(false); + } + + Producer<byte[]> producer = producerBuilder.create(); + + CountDownLatch latch = new CountDownLatch(numOfMessage); + + List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < numOfMessage; i++) { + producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> { + if (e != null) { + Assert.fail(); + } else { + allIds.add(mid); + } + latch.countDown(); + }); + } + + latch.await(); + + allIds.sort(null); // make sure the largest mid appears at last. + + for (MessageId id : allIds) { + Reader<byte[]> reader; + + if (startInclusive) { + reader = pulsarClient.newReader().topic(topicName) + .startMessageId(id).startMessageIdInclusive().create(); + } else { + reader = pulsarClient.newReader().topic(topicName) + .startMessageId(id).create(); + } + + if (startInclusive) { + assertTrue(reader.hasMessageAvailable()); + } else if (id != allIds.get(allIds.size() - 1)) { + assertTrue(reader.hasMessageAvailable()); + } else { + assertFalse(reader.hasMessageAvailable()); + } + reader.close(); + } + + producer.close(); + } + @Test public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception { final int numOfMessage = 10; @@ -794,7 +867,7 @@ public class TopicReaderTest extends ProducerConsumerBase { .batchingMaxMessages(10) .create(); - CountDownLatch latch = new CountDownLatch(100); + CountDownLatch latch = new CountDownLatch(numOfMessage); List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 5f97a58..b75cfce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -145,7 +145,9 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { while (reader.hasMessageAvailable()) { Assert.assertTrue(keys.remove(reader.readNext().getKey())); } - Assert.assertTrue(keys.isEmpty()); + // start from latest with start message inclusive should only read the last message in batch + Assert.assertTrue(keys.size() == 9); + Assert.assertFalse(keys.contains("key9")); Assert.assertFalse(reader.hasMessageAvailable()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9431496..ce4ee1e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -104,7 +104,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @SuppressWarnings("unused") private volatile int availablePermits = 0; - protected volatile MessageId lastDequeuedMessage = MessageId.earliest; + protected volatile MessageId lastDequeuedMessageId = MessageId.earliest; private volatile MessageId lastMessageIdInBroker = MessageId.earliest; private long subscribeTimeout; @@ -188,7 +188,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle this.consumerId = client.newConsumerId(); this.subscriptionMode = conf.getSubscriptionMode(); this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null; - this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest : startMessageId; this.initialStartMessageId = this.startMessageId; this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; AVAILABLE_PERMITS_UPDATER.set(this, 0); @@ -677,7 +676,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (duringSeek.compareAndSet(true, false)) { return seekMessageId; } else if (subscriptionMode == SubscriptionMode.Durable) { - return null; + return startMessageId; } if (!currentMessageQueue.isEmpty()) { @@ -695,10 +694,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } return previousMessage; - } else if (!lastDequeuedMessage.equals(MessageId.earliest)) { + } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past - return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessage); + return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); } else { // No message was received or dequeued by this consumer. Next message would still be the startMessageId return startMessageId; @@ -1118,7 +1117,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle protected synchronized void messageProcessed(Message<?> msg) { ClientCnx currentCnx = cnx(); ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx(); - lastDequeuedMessage = msg.getMessageId(); + lastDequeuedMessageId = msg.getMessageId(); if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. @@ -1493,6 +1492,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest); duringSeek.set(true); + lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); @@ -1539,6 +1539,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId); duringSeek.set(true); + lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); @@ -1555,17 +1556,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } public boolean hasMessageAvailable() throws PulsarClientException { - // we need to seek to the last position then the last message can be received when the resetIncludeHead - // specified. - if (lastDequeuedMessage == MessageId.latest && resetIncludeHead) { - lastDequeuedMessage = getLastMessageId(); - seek(lastDequeuedMessage); - } try { - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { - return true; - } - return hasMessageAvailableAsync().get(); } catch (Exception e) { throw PulsarClientException.unwrap(e); @@ -1575,12 +1566,56 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle public CompletableFuture<Boolean> hasMessageAvailableAsync() { final CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>(); - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { - booleanFuture.complete(true); + // we haven't read yet. use startMessageId for comparison + if (lastDequeuedMessageId == MessageId.earliest) { + // if we are starting from latest, we should seek to the actual last message first. + // allow the last one to be read when read head inclusively. + if (startMessageId.getLedgerId() == Long.MAX_VALUE && + startMessageId.getEntryId() == Long.MAX_VALUE && + startMessageId.partitionIndex == -1) { + + getLastMessageIdAsync() + .thenCompose(this::seekAsync) + .whenComplete((ignore, e) -> { + if (e != null) { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + } else { + booleanFuture.complete(resetIncludeHead); + } + }); + + return booleanFuture; + } + + if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { + booleanFuture.complete(true); + return booleanFuture; + } + + getLastMessageIdAsync().thenAccept(messageId -> { + lastMessageIdInBroker = messageId; + if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { + booleanFuture.complete(true); + } else { + booleanFuture.complete(false); + } + }).exceptionally(e -> { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + return null; + }); + } else { + // read before, use lastDequeueMessage for comparison + if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { + booleanFuture.complete(true); + return booleanFuture; + } + getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { + if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { booleanFuture.complete(true); } else { booleanFuture.complete(false); @@ -1591,18 +1626,22 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return null; }); } + return booleanFuture; } - private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastDequeuedMessage) { - if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && + private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) { + if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 && ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { return true; - } else { - // Make sure batching message can be read completely. - return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0 - && incomingMessages.size() > 0; } + + if (!inclusive && lastMessageIdInBroker.compareTo(messageId) > 0 && + ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { + return true; + } + + return false; } @Override @@ -1647,8 +1686,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept((result) -> { log.info("[{}][{}] Successfully getLastMessageId {}:{}", topic, subscription, result.getLedgerId(), result.getEntryId()); - future.complete(new MessageIdImpl(result.getLedgerId(), - result.getEntryId(), result.getPartition())); + if (result.getBatchIndex() < 0) { + future.complete(new MessageIdImpl(result.getLedgerId(), + result.getEntryId(), result.getPartition())); + } else { + future.complete(new BatchMessageIdImpl(result.getLedgerId(), + result.getEntryId(), result.getPartition(), result.getBatchIndex())); + } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); future.completeExceptionally( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index 94c8dd3..a0de070 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -98,7 +98,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { } do { message = incomingMessages.take(); - lastDequeuedMessage = message.getMessageId(); + lastDequeuedMessageId = message.getMessageId(); ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx(); // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" synchronized (this) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java index 335bf96..371f47f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java @@ -162,4 +162,7 @@ public class RawMessageImpl implements RawMessage { return msgMetadata.get().getPartitionKeyB64Encoded(); } + public int getBatchSize() { + return msgMetadata.get().getNumMessagesInBatch(); + } }