This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6e8b5bf Consolidate Pair implementations (#1515) 6e8b5bf is described below commit 6e8b5bf05ce0eb31795357879a1ee62297467480 Author: Dave Rusek <dave.ru...@gmail.com> AuthorDate: Fri Apr 6 12:09:14 2018 -0600 Consolidate Pair implementations (#1515) --- .../apache/bookkeeper/mledger/impl/EntryCache.java | 2 +- .../impl/EntryCacheDefaultEvictionPolicy.java | 6 +-- .../bookkeeper/mledger/impl/EntryCacheImpl.java | 14 ++--- .../bookkeeper/mledger/impl/EntryCacheManager.java | 4 +- .../mledger/impl/ManagedCursorContainer.java | 4 +- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 8 +-- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 16 +++--- .../mledger/impl/NonDurableCursorImpl.java | 8 +-- .../org/apache/bookkeeper/mledger/util/Pair.java | 59 ---------------------- .../apache/bookkeeper/mledger/util/RangeCache.java | 5 +- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 16 +++--- .../apache/bookkeeper/mledger/util/PairTest.java | 50 ------------------ .../bookkeeper/mledger/util/RangeCacheTest.java | 21 ++++---- .../java/org/apache/zookeeper/MockZooKeeper.java | 40 +++++++-------- .../broker/loadbalance/impl/OverloadShedder.java | 12 ++--- .../impl/AcknowledgmentsGroupingTracker.java | 3 +- pulsar-common/pom.xml | 5 ++ .../org/apache/pulsar/common/api/Commands.java | 6 +-- .../pulsar/common/util/collections/Pair.java | 37 -------------- 19 files changed, 88 insertions(+), 228 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java index 33a16cb..bb8f2a6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java @@ -21,7 +21,7 @@ package org.apache.bookkeeper.mledger.impl; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.tuple.Pair; /** * Cache of entries used by a single ManagedLedger. An EntryCache is compared to other EntryCache instances using their diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java index fc9e790..682acbb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java @@ -23,7 +23,7 @@ import static java.util.Collections.reverseOrder; import com.google.common.collect.Lists; import java.util.List; -import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,8 +85,8 @@ public class EntryCacheDefaultEvictionPolicy implements EntryCacheEvictionPolicy } Pair<Integer, Long> evicted = entryCache.evictEntries(singleCacheSizeToFree); - evictedEntries += evicted.first; - evictedSize += evicted.second; + evictedEntries += evicted.getLeft(); + evictedSize += evicted.getRight(); } log.info("Completed cache eviction. Removed {} entries from {} caches. ({} Mb)", evictedEntries, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index 60ba634..37ddc54 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -35,9 +35,9 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.util.Pair; import org.apache.bookkeeper.mledger.util.RangeCache; import org.apache.bookkeeper.mledger.util.RangeCache.Weighter; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,8 +132,8 @@ public class EntryCacheImpl implements EntryCache { final PositionImpl firstPosition = PositionImpl.get(-1, 0); Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, true); - int entriesRemoved = removed.first; - long sizeRemoved = removed.second; + int entriesRemoved = removed.getLeft(); + long sizeRemoved = removed.getRight(); if (log.isDebugEnabled()) { log.debug("[{}] Invalidated entries up to {} - Entries removed: {} - Size removed: {}", ml.getName(), lastPosition, entriesRemoved, sizeRemoved); @@ -148,8 +148,8 @@ public class EntryCacheImpl implements EntryCache { final PositionImpl lastPosition = PositionImpl.get(ledgerId + 1, 0); Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, false); - int entriesRemoved = removed.first; - long sizeRemoved = removed.second; + int entriesRemoved = removed.getLeft(); + long sizeRemoved = removed.getRight(); if (log.isDebugEnabled()) { log.debug("[{}] Invalidated all entries on ledger {} - Entries removed: {} - Size removed: {}", ml.getName(), ledgerId, entriesRemoved, sizeRemoved); @@ -299,8 +299,8 @@ public class EntryCacheImpl implements EntryCache { public Pair<Integer, Long> evictEntries(long sizeToFree) { checkArgument(sizeToFree > 0); Pair<Integer, Long> evicted = entries.evictLeastAccessedEntries(sizeToFree); - int evictedEntries = evicted.first; - long evictedSize = evicted.second; + int evictedEntries = evicted.getLeft(); + long evictedSize = evicted.getRight(); if (log.isDebugEnabled()) { log.debug( "[{}] Doing cache eviction of at least {} Mb -- Deleted {} entries - Total size deleted: {} Mb " diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index 262cbeb..9f6f837 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -38,7 +38,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,7 +186,7 @@ public class EntryCacheManager { @Override public Pair<Integer, Long> evictEntries(long sizeToFree) { - return Pair.create(0, (long) 0); + return Pair.of(0, (long) 0); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index f7bdde4..1f8dade 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.StampedLock; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.tuple.Pair; /** * Contains all the cursors for a ManagedLedger. @@ -135,7 +135,7 @@ class ManagedCursorContainer implements Iterable<ManagedCursor> { } PositionImpl newSlowestConsumer = heap.get(0).position; - return Pair.create(previousSlowestConsumer, newSlowestConsumer); + return Pair.of(previousSlowestConsumer, newSlowestConsumer); } finally { rwLock.unlockWrite(stamp); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index a2be71d..462428e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -81,7 +81,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; -import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1228,12 +1228,12 @@ public class ManagedCursorImpl implements ManagedCursor { } void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) { - readPosition = ledger.getNextValidPosition(lastPositionCounter.first); - markDeletePosition = lastPositionCounter.first; + readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft()); + markDeletePosition = lastPositionCounter.getLeft(); // Initialize the counter such that the difference between the messages written on the ML and the // messagesConsumed is 0, to ensure the initial backlog count is 0. - messagesConsumedCounter = lastPositionCounter.second; + messagesConsumedCounter = lastPositionCounter.getRight(); } /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index de77a44..c96b35e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -86,7 +86,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo; import org.apache.bookkeeper.mledger.util.CallbackMutex; import org.apache.bookkeeper.mledger.util.Futures; -import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -1445,7 +1445,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) { Pair<PositionImpl, PositionImpl> pair = activeCursors.cursorUpdated(cursor, newPosition); if (pair != null) { - entryCache.invalidateEntries(pair.second); + entryCache.invalidateEntries(pair.getRight()); } } @@ -1457,8 +1457,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return; } - PositionImpl previousSlowestReader = pair.first; - PositionImpl currentSlowestReader = pair.second; + PositionImpl previousSlowestReader = pair.getLeft(); + PositionImpl currentSlowestReader = pair.getRight(); if (previousSlowestReader.compareTo(currentSlowestReader) == 0) { // The slowest consumer has not changed position. Nothing to do right now @@ -2041,7 +2041,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Ensure no entry was written while reading the two values } while (pos.compareTo(lastConfirmedEntry) != 0); - return Pair.create(pos, count); + return Pair.of(pos, count); } /** @@ -2055,9 +2055,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { do { pos = getFirstPosition(); lastPositionAndCounter = getLastPositionAndCounter(); - count = lastPositionAndCounter.second - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.first)); - } while (pos.compareTo(getFirstPosition()) != 0 || lastPositionAndCounter.first.compareTo(getLastPosition()) != 0); - return Pair.create(pos, count); + count = lastPositionAndCounter.getRight() - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.getLeft())); + } while (pos.compareTo(getFirstPosition()) != 0 || lastPositionAndCounter.getLeft().compareTo(getLastPosition()) != 0); + return Pair.of(pos, count); } public void activateCursor(ManagedCursor cursor) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 6b6c369..43bab0f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -26,7 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,9 +62,9 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { // Initialize the counter such that the difference between the messages written on the ML and the // messagesConsumed is equal to the current backlog (negated). - long initialBacklog = readPosition.compareTo(lastEntryAndCounter.first) < 0 - ? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.first)) : 0; - messagesConsumedCounter = lastEntryAndCounter.second - initialBacklog; + long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0 + ? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.getLeft())) : 0; + messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog; } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Pair.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Pair.java deleted file mode 100644 index 10787b0..0000000 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Pair.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bookkeeper.mledger.util; - -import com.google.common.base.Objects; - -/** - * A generic container for two values. - * - * @param <FirstT> - * the first value type - * @param <SecondT> - * the second value type - */ -public class Pair<FirstT, SecondT> { - public final FirstT first; - public final SecondT second; - - public static <FirstT, SecondT> Pair<FirstT, SecondT> create(FirstT x, SecondT y) { - return new Pair<>(x, y); - } - - public Pair(FirstT first, SecondT second) { - this.first = first; - this.second = second; - } - - @Override - public String toString() { - return String.format("(%s,%s)", first, second); - } - - @Override - @SuppressWarnings("rawtypes") - public boolean equals(Object obj) { - if (obj instanceof Pair) { - Pair other = (Pair) obj; - return Objects.equal(first, other.first) && Objects.equal(second, other.second); - } - - return false; - } -} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java index 16c2d1d..b9b3ace 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.tuple.Pair; /** * Special type of cache where get() and delete() operations can be done over a range of keys. @@ -144,7 +145,7 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun size.addAndGet(-removedSize); - return Pair.create(removedEntries, removedSize); + return Pair.of(removedEntries, removedSize); } /** @@ -171,7 +172,7 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun } size.addAndGet(-removedSize); - return Pair.create(removedEntries, removedSize); + return Pair.of(removedEntries, removedSize); } /** diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a91c159..60d66a1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -72,8 +72,8 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; -import org.apache.bookkeeper.mledger.util.Pair; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -262,8 +262,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { public void addComplete(Position position, Object ctx) { @SuppressWarnings("unchecked") Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx; - ManagedLedger ledger = pair.first; - ManagedCursor cursor = pair.second; + ManagedLedger ledger = pair.getLeft(); + ManagedCursor cursor = pair.getRight(); assertEquals(ledger.getNumberOfEntries(), 1); assertEquals(ledger.getTotalSize(), "test".getBytes(Encoding).length); @@ -308,7 +308,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { public void addFailed(ManagedLedgerException exception, Object ctx) { fail(exception.getMessage()); } - }, new Pair<ManagedLedger, ManagedCursor>(ledger, cursor)); + }, Pair.of(ledger, cursor)); } @Override @@ -2184,11 +2184,11 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { Pair<PositionImpl, Long> latestPositionAndCounter = ledger.getLastPositionAndCounter(); Pair<PositionImpl, Long> earliestPositionAndCounter = ledger.getFirstPositionAndCounter(); - assertEquals(latestPositionAndCounter.first.getNext(), p1); - assertEquals(earliestPositionAndCounter.first.getNext(), p2); + assertEquals(latestPositionAndCounter.getLeft().getNext(), p1); + assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2); - assertEquals(latestPositionAndCounter.second.longValue(), totalInsertedEntries); - assertEquals(earliestPositionAndCounter.second.longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog()); + assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries); + assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog()); ledger.close(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/PairTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/PairTest.java deleted file mode 100644 index 59cf425..0000000 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/PairTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bookkeeper.mledger.util; - -import org.testng.Assert; -import org.testng.annotations.Test; - -public class PairTest { - - /* - * Has been tested elsewhere - * - * @Test public void Pair() { } - */ - - @Test - public void create() { - Pair<String, String> p = Pair.create("firstOne", "secondOne"); - Assert.assertEquals("firstOne", p.first); - Assert.assertEquals("secondOne", p.second); - Integer int3 = new Integer(3); - Pair<String, Integer> q = Pair.create("firstOne", int3); - Assert.assertEquals("firstOne", q.first); - Assert.assertEquals(int3, q.second); - } - - @Test - public void toStringTest() { - Pair<String, String> p = Pair.create("firstOne", "secondOne"); - Assert.assertEquals("firstOne", p.first); - Assert.assertEquals("secondOne", p.second); - Assert.assertEquals("(firstOne,secondOne)", p.toString()); - } -} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java index 61f2e9f..d1d2e5d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.fail; import com.google.common.collect.Lists; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCounted; +import org.apache.commons.lang3.tuple.Pair; import org.testng.annotations.Test; @Test @@ -179,7 +180,7 @@ public class RangeCacheTest { cache.put(3, new RefString("three")); // This should remove the LRU entries: 0, 1 whose combined size is 7 - assertEquals(cache.evictLeastAccessedEntries(5), Pair.create(2, (long) 7)); + assertEquals(cache.evictLeastAccessedEntries(5), Pair.of(2, (long) 7)); assertEquals(cache.getNumberOfEntries(), 2); assertEquals(cache.getSize(), 8); @@ -188,7 +189,7 @@ public class RangeCacheTest { assertEquals(cache.get(2).s, "two"); assertEquals(cache.get(3).s, "three"); - assertEquals(cache.evictLeastAccessedEntries(100), Pair.create(2, (long) 8)); + assertEquals(cache.evictLeastAccessedEntries(100), Pair.of(2, (long) 8)); assertEquals(cache.getNumberOfEntries(), 0); assertEquals(cache.getSize(), 0); assertEquals(cache.get(0), null); @@ -221,18 +222,18 @@ public class RangeCacheTest { assertEquals(cache.getSize(), 100); Pair<Integer, Long> res = cache.evictLeastAccessedEntries(1); - assertEquals((int) res.first, 1); - assertEquals((long) res.second, 1); + assertEquals((int) res.getLeft(), 1); + assertEquals((long) res.getRight(), 1); assertEquals(cache.getSize(), 99); res = cache.evictLeastAccessedEntries(10); - assertEquals((int) res.first, 10); - assertEquals((long) res.second, 10); + assertEquals((int) res.getLeft(), 10); + assertEquals((long) res.getRight(), 10); assertEquals(cache.getSize(), 89); res = cache.evictLeastAccessedEntries(100); - assertEquals((int) res.first, 89); - assertEquals((long) res.second, 89); + assertEquals((int) res.getLeft(), 89); + assertEquals((long) res.getRight(), 89); assertEquals(cache.getSize(), 0); for (int i = 0; i < 100; i++) { @@ -242,8 +243,8 @@ public class RangeCacheTest { assertEquals(cache.getSize(), 100); res = cache.removeRange(10, 20, false); - assertEquals((int) res.first, 10); - assertEquals((long) res.second, 10); + assertEquals((int) res.getLeft(), 10); + assertEquals((long) res.getRight(), 10); assertEquals(cache.getSize(), 90); } } diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java index e868091..8c5202e 100644 --- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java +++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java @@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; -import org.apache.bookkeeper.mledger.util.Pair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.zookeeper.AsyncCallback.Children2Callback; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.apache.zookeeper.AsyncCallback.DataCallback; @@ -146,15 +146,15 @@ public class MockZooKeeper extends ZooKeeper { } if (createMode == CreateMode.EPHEMERAL_SEQUENTIAL || createMode == CreateMode.PERSISTENT_SEQUENTIAL) { - byte[] parentData = tree.get(parent).first; - int parentVersion = tree.get(parent).second; + byte[] parentData = tree.get(parent).getLeft(); + int parentVersion = tree.get(parent).getRight(); path = path + parentVersion; // Update parent version - tree.put(parent, Pair.create(parentData, parentVersion + 1)); + tree.put(parent, Pair.of(parentData, parentVersion + 1)); } - tree.put(path, Pair.create(data, 0)); + tree.put(path, Pair.of(data, 0)); final Set<Watcher> toNotifyCreate = Sets.newHashSet(); toNotifyCreate.addAll(watchers.get(path)); @@ -218,7 +218,7 @@ public class MockZooKeeper extends ZooKeeper { mutex.unlock(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); } else { - tree.put(path, Pair.create(data, 0)); + tree.put(path, Pair.of(data, 0)); mutex.unlock(); cb.processResult(0, path, ctx, null); @@ -250,9 +250,9 @@ public class MockZooKeeper extends ZooKeeper { watchers.put(path, watcher); } if (stat != null) { - stat.setVersion(value.second); + stat.setVersion(value.getRight()); } - return value.first; + return value.getLeft(); } } finally { mutex.unlock(); @@ -283,8 +283,8 @@ public class MockZooKeeper extends ZooKeeper { cb.processResult(KeeperException.Code.NoNode, path, ctx, null, null); } else { Stat stat = new Stat(); - stat.setVersion(value.second); - cb.processResult(0, path, ctx, value.first, stat); + stat.setVersion(value.getRight()); + cb.processResult(0, path, ctx, value.getLeft(), stat); } }); } @@ -314,9 +314,9 @@ public class MockZooKeeper extends ZooKeeper { } Stat stat = new Stat(); - stat.setVersion(value.second); + stat.setVersion(value.getRight()); mutex.unlock(); - cb.processResult(0, path, ctx, value.first, stat); + cb.processResult(0, path, ctx, value.getLeft(), stat); } }); } @@ -482,7 +482,7 @@ public class MockZooKeeper extends ZooKeeper { if (tree.containsKey(path)) { Stat stat = new Stat(); - stat.setVersion(tree.get(path).second); + stat.setVersion(tree.get(path).getRight()); return stat; } else { return null; @@ -507,7 +507,7 @@ public class MockZooKeeper extends ZooKeeper { if (tree.containsKey(path)) { Stat stat = new Stat(); - stat.setVersion(tree.get(path).second); + stat.setVersion(tree.get(path).getRight()); return stat; } else { return null; @@ -603,7 +603,7 @@ public class MockZooKeeper extends ZooKeeper { throw new KeeperException.NoNodeException(); } - int currentVersion = tree.get(path).second; + int currentVersion = tree.get(path).getRight(); // Check version if (version != -1 && version != currentVersion) { @@ -612,7 +612,7 @@ public class MockZooKeeper extends ZooKeeper { newVersion = currentVersion + 1; log.debug("[{}] Updating -- current version: {}", path, currentVersion); - tree.put(path, Pair.create(data, newVersion)); + tree.put(path, Pair.of(data, newVersion)); toNotify.addAll(watchers.get(path)); watchers.removeAll(path); @@ -658,7 +658,7 @@ public class MockZooKeeper extends ZooKeeper { return; } - int currentVersion = tree.get(path).second; + int currentVersion = tree.get(path).getRight(); // Check version if (version != -1 && version != currentVersion) { @@ -670,7 +670,7 @@ public class MockZooKeeper extends ZooKeeper { int newVersion = currentVersion + 1; log.debug("[{}] Updating -- current version: {}", path, currentVersion); - tree.put(path, Pair.create(data, newVersion)); + tree.put(path, Pair.of(data, newVersion)); Stat stat = new Stat(); stat.setVersion(newVersion); @@ -705,7 +705,7 @@ public class MockZooKeeper extends ZooKeeper { } if (version != -1) { - int currentVersion = tree.get(path).second; + int currentVersion = tree.get(path).getRight(); if (version != currentVersion) { throw new KeeperException.BadVersionException(path); } @@ -776,7 +776,7 @@ public class MockZooKeeper extends ZooKeeper { cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx); } else { if (version != -1) { - int currentVersion = tree.get(path).second; + int currentVersion = tree.get(path).getRight(); if (version != currentVersion) { cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java index 972bfc9..ad408f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java @@ -23,9 +23,9 @@ import com.google.common.collect.Multimap; import java.util.Map; -import org.apache.bookkeeper.mledger.util.Pair; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableDouble; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.BundleData; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TimeAverageMessageData; @@ -101,18 +101,18 @@ public class OverloadShedder implements LoadSheddingStrategy { BundleData bundleData = e.getValue(); TimeAverageMessageData shortTermData = bundleData.getShortTermData(); double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); - return Pair.create(bundle, throughput); + return Pair.of(bundle, throughput); }).filter(e -> { // Only consider bundles that were not already unloaded recently - return !recentlyUnloadedBundles.containsKey(e.first); + return !recentlyUnloadedBundles.containsKey(e.getLeft()); }).sorted((e1, e2) -> { // Sort by throughput in reverse order - return Double.compare(e2.second, e1.second); + return Double.compare(e2.getRight(), e1.getRight()); }).forEach(e -> { if (trafficMarkedToOffload.doubleValue() < minimumThroughputToOffload || atLeastOneBundleSelected.isFalse()) { - selectedBundlesCache.put(broker, e.first); - trafficMarkedToOffload.add(e.second); + selectedBundlesCache.put(broker, e.getLeft()); + trafficMarkedToOffload.add(e.getRight()); atLeastOneBundleSelected.setTrue(); } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index 389a06e..68ab1fb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -22,7 +22,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -34,11 +33,11 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; -import org.apache.pulsar.common.util.collections.Pair; /** * Group the acknowledgments for a certain time and then sends them out in a single protobuf command. diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 2ef8e64..0bd5960 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -91,6 +91,11 @@ <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> </dependencies> </project> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index c3b8f4c..90dbf67 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod; import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; @@ -75,7 +76,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.schema.SchemaVersion; -import org.apache.pulsar.common.util.collections.Pair; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; @@ -527,8 +527,8 @@ public class Commands { int entriesCount = entries.size(); for (int i = 0; i < entriesCount; i++) { - long ledgerId = entries.get(i).getFirst(); - long entryId = entries.get(i).getSecond(); + long ledgerId = entries.get(i).getLeft(); + long entryId = entries.get(i).getRight(); MessageIdData.Builder messageIdDataBuilder = MessageIdData.newBuilder(); messageIdDataBuilder.setLedgerId(ledgerId); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Pair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Pair.java deleted file mode 100644 index 128c258..0000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Pair.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import lombok.Value; - -/** - * Basic holder of a pair of values. - * - * Use as: <br/> - * <pre><code> - * Pair<String, String> p = Pair.of("a", "b"); - * p.getFirst(); - * p.getSecond(); - * </code></pre> - */ -@Value(staticConstructor = "of") -public class Pair<X, Y> { - private final X first; - private final Y second; -} -- To stop receiving notification emails like this one, please contact mme...@apache.org.