This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9947f74 Fixed typo in ConsumerBuilder acknowledgmentGroupTime option (#1608) 9947f74 is described below commit 9947f748e01f6cd8596e9fb49e16c090c6fda769 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Apr 23 13:35:20 2018 -0700 Fixed typo in ConsumerBuilder acknowledgmentGroupTime option (#1608) --- .../apache/pulsar/broker/admin/AdminApiTest.java | 6 +++--- .../pulsar/broker/admin/v1/V1_AdminApiTest.java | 6 +++--- .../pulsar/broker/service/BrokerServiceTest.java | 4 ++-- .../broker/service/PersistentFailoverE2ETest.java | 4 ++-- .../broker/service/PersistentQueueE2ETest.java | 2 +- .../pulsar/broker/service/ResendRequestTest.java | 2 +- .../client/api/DispatcherBlockConsumerTest.java | 14 ++++++------- .../client/api/SimpleProducerConsumerTest.java | 24 +++++++++++----------- .../stats/client/PulsarBrokerStatsClientTest.java | 2 +- .../kafka/compat/PulsarConsumerKafkaConfig.java | 2 +- .../apache/pulsar/client/api/ConsumerBuilder.java | 2 +- .../pulsar/client/impl/ConsumerBuilderImpl.java | 2 +- .../pulsar/testclient/PerformanceConsumer.java | 2 +- 13 files changed, 36 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 0afe36d..675bd1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1421,7 +1421,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); @@ -1472,7 +1472,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); @@ -1543,7 +1543,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); List<String> topics = admin.persistentTopics().getList("prop-xyz/ns1"); assertEquals(topics.size(), 4); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index c7a26ff..2ca1e08 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -1441,7 +1441,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); @@ -1492,7 +1492,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); @@ -1563,7 +1563,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") - .subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); List<String> topics = admin.persistentTopics().getList("prop-xyz/use/ns1"); assertEquals(topics.size(), 4); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index ee836c3..0918bc7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -142,7 +142,7 @@ public class BrokerServiceTest extends BrokerTestBase { SubscriptionStats subStats; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); @@ -219,7 +219,7 @@ public class BrokerServiceTest extends BrokerTestBase { SubscriptionStats subStats; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index bc5c210..e702f6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -136,12 +136,12 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener(); TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener(); ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover); // 1. two consumers on the same subscription ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1") - .consumerEventListener(listener1).acknowledmentGroupTime(0, TimeUnit.SECONDS); + .consumerEventListener(listener1).acknowledgmentGroupTime(0, TimeUnit.SECONDS); Consumer<byte[]> consumer1 = consumerBulder1.subscribe(); Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2) .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java index 4b659f4..fce508b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java @@ -508,7 +508,7 @@ public class PersistentQueueE2ETest extends BrokerTestBase { ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .receiverQueueSize(10).subscriptionType(SubscriptionType.Shared) - .acknowledmentGroupTime(0, TimeUnit.SECONDS); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS); ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) consumerBuilder.subscribe(); for (int i = 0; i < numMsgs; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java index 0c28dff..0acafad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java @@ -256,7 +256,7 @@ public class ResendRequestTest extends BrokerTestBase { // 2. Create consumer ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover) - .acknowledmentGroupTime(0, TimeUnit.SECONDS); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS); Consumer<byte[]> consumer1 = consumerBuilder.clone().consumerName("consumer-1").subscribe(); Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("consumer-2").subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index aba0e1b..7dc315d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -597,7 +597,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { final int totalProducedMsgs = 500; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriberName) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic") .enableBatching(false) @@ -703,15 +703,15 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { ConsumerImpl<byte[]> consumer1Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); // create subscription-2 and 3 ConsumerImpl<byte[]> consumer1Sub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); consumer1Sub2.close(); ConsumerImpl<byte[]> consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); consumer1Sub3.close(); Producer<byte[]> producer = pulsarClient.newProducer() @@ -751,7 +751,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { // (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); int consumer2Msgs = 0; for (int j = 0; j < totalProducedMsgs; j++) { msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS); @@ -776,7 +776,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { **/ ConsumerImpl<byte[]> consumerSub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Set<MessageId> messages2 = Sets.newHashSet(); for (int j = 0; j < totalProducedMsgs; j++) { msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS); @@ -793,7 +793,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { /** (3) if Subscription3 is acking then it shouldn't be blocked **/ consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); int consumedMsgsSub3 = 0; for (int j = 0; j < totalProducedMsgs; j++) { msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index ad406ce..7ec9cf4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -958,7 +958,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name") .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared) - .acknowledmentGroupTime(0, TimeUnit.SECONDS); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS); Consumer<byte[]> consumer1 = consumerBuilder.subscribe(); Consumer<byte[]> consumer2 = consumerBuilder.subscribe(); @@ -1143,7 +1143,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer() .topic("persistent://my-property/my-ns/unacked-topic").create(); @@ -1307,7 +1307,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .receiverQueueSize(receiverQueueSize).ackTimeout(1, TimeUnit.SECONDS) - .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer() .topic("persistent://my-property/my-ns/unacked-topic") @@ -1885,7 +1885,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name") .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize) - .acknowledmentGroupTime(0, TimeUnit.SECONDS); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS); Consumer<byte[]> c1 = consumerBuilder.subscribe(); Consumer<byte[]> c2 = consumerBuilder.subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2") @@ -1988,7 +1988,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic") .create(); @@ -2284,7 +2284,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { Set<String> messageSet = Sets.newHashSet(); Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name") - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); // 1. Invalid key name try { @@ -2318,7 +2318,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { consumer.close(); consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); int msgNum = 0; try { @@ -2339,7 +2339,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // Set keyreader consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL) - .cryptoKeyReader(new EncKeyReader()).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); for (int i = msgNum; i < totalMsg - 1; i++) { msg = consumer.receive(5, TimeUnit.SECONDS); @@ -2356,7 +2356,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { consumer.close(); consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); // Receive should proceed and discard encrypted messages msg = consumer.receive(5, TimeUnit.SECONDS); @@ -2382,12 +2382,12 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // 2, create consumer Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe(); Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest") .subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe(); Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); // 3, produce 5 messages more diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index ce37191..d5dc26b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -105,7 +105,7 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { final String topicName = "persistent://my-property/my-ns/my-topic1"; final String subscriptionName = "my-subscriber-name"; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) - .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); final int numberOfMsgs = 1000; for (int i = 0; i < numberOfMsgs; i++) { diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java index bb19b87..13eb002 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -49,7 +49,7 @@ public class PulsarConsumerKafkaConfig { } if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) { - consumerBuilder.acknowledmentGroupTime( + consumerBuilder.acknowledgmentGroupTime( Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 87ae5f9..655a1a0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -211,7 +211,7 @@ public interface ConsumerBuilder<T> extends Serializable, Cloneable { * @param unit * the time unit for the delay */ - ConsumerBuilder<T> acknowledmentGroupTime(long delay, TimeUnit unit); + ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit); /** * Set the max total receiver queue size across partitons. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index ced47a5..f024a4f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -183,7 +183,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { } @Override - public ConsumerBuilder<T> acknowledmentGroupTime(long delay, TimeUnit unit) { + public ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit) { checkArgument(delay >= 0); conf.setAcknowledgementsGroupTimeMicros(unit.toMicros(delay)); return this; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index d4a7dc1..ab9686c 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -252,7 +252,7 @@ public class PerformanceConsumer { ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() // .messageListener(listener) // .receiverQueueSize(arguments.receiverQueueSize) // - .acknowledmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // + .acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // .subscriptionType(arguments.subscriptionType); if (arguments.encKeyName != null) { -- To stop receiving notification emails like this one, please contact mme...@apache.org.