merlimat closed pull request #1608: Fixed typo in ConsumerBuilder acknowledgmentGroupTime option URL: https://github.com/apache/incubator-pulsar/pull/1608
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 0afe36d3a..675bd1ccc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1421,7 +1421,7 @@ public void persistentTopicsCursorReset(String topicName) throws Exception { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); @@ -1472,7 +1472,7 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); @@ -1543,7 +1543,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); List<String> topics = admin.persistentTopics().getList("prop-xyz/ns1"); assertEquals(topics.size(), 4); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index c7a26ffa3..2ca1e0869 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -1441,7 +1441,7 @@ public void persistentTopicsCursorReset(String topicName) throws Exception { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); @@ -1492,7 +1492,7 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); @@ -1563,7 +1563,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); List<String> topics = admin.persistentTopics().getList("prop-xyz/use/ns1"); assertEquals(topics.size(), 4); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index ee836c306..0918bc7c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -142,7 +142,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { SubscriptionStats subStats; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); @@ -219,7 +219,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { SubscriptionStats subStats; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index bc5c210b2..e702f6e03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -136,12 +136,12 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener(); TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener(); ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover); // 1. two consumers on the same subscription ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1") - .consumerEventListener(listener1).acknowledmentGroupTime(0, TimeUnit.SECONDS); + .consumerEventListener(listener1).acknowledgmentGroupTime(0, TimeUnit.SECONDS); Consumer<byte[]> consumer1 = consumerBulder1.subscribe(); Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2) .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java index 4b659f45d..fce508bd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java @@ -508,7 +508,7 @@ public void testUnackedCountWithRedeliveries() throws Exception { ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .receiverQueueSize(10).subscriptionType(SubscriptionType.Shared) - .acknowledmentGroupTime(0, TimeUnit.SECONDS); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS); ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) consumerBuilder.subscribe(); for (int i = 0; i < numMsgs; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java index 0c28dffcd..0acafadd9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java @@ -256,7 +256,7 @@ public void testFailoverSingleAckedNormalTopic() throws Exception { // 2. Create consumer ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover) - .acknowledmentGroupTime(0, TimeUnit.SECONDS); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS); Consumer<byte[]> consumer1 = consumerBuilder.clone().consumerName("consumer-1").subscribe(); Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("consumer-2").subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index aba0e1b4d..7dc315dc9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -597,7 +597,7 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw final int totalProducedMsgs = 500; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriberName) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic") .enableBatching(false) @@ -703,15 +703,15 @@ public void testBlockBrokerDispatching() throws Exception { ConsumerImpl<byte[]> consumer1Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); // create subscription-2 and 3 ConsumerImpl<byte[]> consumer1Sub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); consumer1Sub2.close(); ConsumerImpl<byte[]> consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); consumer1Sub3.close(); Producer<byte[]> producer = pulsarClient.newProducer() @@ -751,7 +751,7 @@ public void testBlockBrokerDispatching() throws Exception { // (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); int consumer2Msgs = 0; for (int j = 0; j < totalProducedMsgs; j++) { msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS); @@ -776,7 +776,7 @@ public void testBlockBrokerDispatching() throws Exception { **/ ConsumerImpl<byte[]> consumerSub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Set<MessageId> messages2 = Sets.newHashSet(); for (int j = 0; j < totalProducedMsgs; j++) { msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS); @@ -793,7 +793,7 @@ public void testBlockBrokerDispatching() throws Exception { /** (3) if Subscription3 is acking then it shouldn't be blocked **/ consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); int consumedMsgsSub3 = 0; for (int j = 0; j < totalProducedMsgs; j++) { msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index ad406ce2c..7ec9cf4d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -958,7 +958,7 @@ public void testSharedConsumerAckDifferentConsumer() throws Exception { ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name") .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared) - .acknowledmentGroupTime(0, TimeUnit.SECONDS); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS); Consumer<byte[]> consumer1 = consumerBuilder.subscribe(); Consumer<byte[]> consumer2 = consumerBuilder.subscribe(); @@ -1143,7 +1143,7 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer() .topic("persistent://my-property/my-ns/unacked-topic").create(); @@ -1307,7 +1307,7 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .receiverQueueSize(receiverQueueSize).ackTimeout(1, TimeUnit.SECONDS) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer() .topic("persistent://my-property/my-ns/unacked-topic") @@ -1885,7 +1885,7 @@ public void testSharedSamePriorityConsumer() throws Exception { ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name") .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize) - .acknowledmentGroupTime(0, TimeUnit.SECONDS); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS); Consumer<byte[]> c1 = consumerBuilder.subscribe(); Consumer<byte[]> c2 = consumerBuilder.subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2") @@ -1988,7 +1988,7 @@ public void testRedeliveryFailOverConsumer() throws Exception { ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic") .create(); @@ -2284,7 +2284,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe Set<String> messageSet = Sets.newHashSet(); Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name") - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); // 1. Invalid key name try { @@ -2318,7 +2318,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe consumer.close(); consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); int msgNum = 0; try { @@ -2339,7 +2339,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe // Set keyreader consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL) - .cryptoKeyReader(new EncKeyReader()).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); for (int i = msgNum; i < totalMsg - 1; i++) { msg = consumer.receive(5, TimeUnit.SECONDS); @@ -2356,7 +2356,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe consumer.close(); consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); // Receive should proceed and discard encrypted messages msg = consumer.receive(5, TimeUnit.SECONDS); @@ -2382,12 +2382,12 @@ public void testConsumerSubscriptionInitialize() throws Exception { // 2, create consumer Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe(); Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest") .subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe(); Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); // 3, produce 5 messages more diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index ce371912e..d5dc26b5d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -105,7 +105,7 @@ public void testTopicInternalStats() throws Exception { final String topicName = "persistent://my-property/my-ns/my-topic1"; final String subscriptionName = "my-subscriber-name"; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); final int numberOfMsgs = 1000; for (int i = 0; i < numberOfMsgs; i++) { diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java index bb19b87b6..13eb0029e 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -49,7 +49,7 @@ } if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) { - consumerBuilder.acknowledmentGroupTime( + consumerBuilder.acknowledgmentGroupTime( Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 87ae5f929..655a1a0fd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -211,7 +211,7 @@ * @param unit * the time unit for the delay */ - ConsumerBuilder<T> acknowledmentGroupTime(long delay, TimeUnit unit); + ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit); /** * Set the max total receiver queue size across partitons. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index ced47a57a..f024a4f7a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -183,7 +183,7 @@ private ConsumerBuilderImpl(PulsarClientImpl client, ConsumerConfigurationData<T } @Override - public ConsumerBuilder<T> acknowledmentGroupTime(long delay, TimeUnit unit) { + public ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit) { checkArgument(delay >= 0); conf.setAcknowledgementsGroupTimeMicros(unit.toMicros(delay)); return this; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index d4a7dc1c0..ab9686c36 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -252,7 +252,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() // .messageListener(listener) // .receiverQueueSize(arguments.receiverQueueSize) // - .acknowledmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // + .acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // .subscriptionType(arguments.subscriptionType); if (arguments.encKeyName != null) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services