This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8c6121542e6ff28cb217782e49917e1817595576 Author: Hao Zhang <zhangh...@cmss.chinamobile.com> AuthorDate: Mon Jun 29 02:19:37 2020 +0800 Fix bug related to managedLedger properties (#7357) * Remove re-read from zk, and use the same mutex when update metadata. * Add setProperty(), deleteProperty() API and test ledger changed when add metadata. * Add AsyncSetProperty(), asyncDeleteProperty() API and add related unit tests. * Fix unit test. * Fix exception propagation. (cherry picked from commit a3a63a35c9ff406be05bcfd388f520d40580954e) --- .../apache/bookkeeper/mledger/AsyncCallbacks.java | 6 +- .../apache/bookkeeper/mledger/ManagedLedger.java | 46 ++++++- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 135 +++++++++++++-------- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 110 +++++++++++++---- 4 files changed, 221 insertions(+), 76 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java index 0add10f..1ced748 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java @@ -139,9 +139,9 @@ public interface AsyncCallbacks { void offloadFailed(ManagedLedgerException exception, Object ctx); } - interface SetPropertiesCallback { - void setPropertiesComplete(Map<String, String> properties, Object ctx); + interface UpdatePropertiesCallback { + void updatePropertiesComplete(Map<String, String> properties, Object ctx); - void setPropertiesFailed(ManagedLedgerException exception, Object ctx); + void updatePropertiesFailed(ManagedLedgerException exception, Object ctx); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 4f4b8ca..a8a4509 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -451,11 +451,51 @@ public interface ManagedLedger { Map<String, String> getProperties(); /** + * Add key-value to propertiesMap. + * + * @param key key of property to add + * @param value value of property to add + * @throws InterruptedException + * @throws ManagedLedgerException + */ + void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException; + + /** + * Async add key-value to propertiesMap. + * + * @param key key of property to add + * @param value value of property to add + * @param callback a callback which will be supplied with the newest properties in managedLedger. + * @param ctx a context object which will be passed to the callback on completion. + **/ + void asyncSetProperty(String key, String value, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx); + + /** + * Delete the property by key. + * + * @param key key of property to delete + * @throws InterruptedException + * @throws ManagedLedgerException + */ + void deleteProperty(String key) throws InterruptedException, ManagedLedgerException; + + /** + * Async delete the property by key. + * + * @param key key of property to delete + * @param callback a callback which will be supplied with the newest properties in managedLedger. + * @param ctx a context object which will be passed to the callback on completion. + */ + void asyncDeleteProperty(String key, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx); + + /** * Update managed-ledger's properties. * * @param properties key-values of properties + * @throws InterruptedException + * @throws ManagedLedgerException */ - void setProperties(Map<String, String> properties) throws InterruptedException; + void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException; /** * Async update managed-ledger's properties. @@ -464,9 +504,9 @@ public interface ManagedLedger { * @param callback a callback which will be supplied with the newest properties in managedLedger. * @param ctx a context object which will be passed to the callback on completion. */ - void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.SetPropertiesCallback callback, + void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx); - + /** * Trim consumed ledgers in background * @param promise diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 27d8848..f45b341 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState; import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BoundType; import com.google.common.collect.ImmutableMap; @@ -31,12 +30,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Range; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; - import java.time.Clock; import java.util.Collections; import java.util.HashMap; @@ -67,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; - import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -90,8 +86,8 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.SetPropertiesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -176,10 +172,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private ScheduledFuture<?> timeoutTask; /** - * This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store + * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store * version, we cannot have multiple concurrent updates. */ - private final CallbackMutex ledgersListMutex = new CallbackMutex(); + private final CallbackMutex metadataMutex = new CallbackMutex(); private final CallbackMutex trimmerMutex = new CallbackMutex(); private final CallbackMutex offloadMutex = new CallbackMutex(); @@ -1230,7 +1226,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat); } ledgersStat = stat; - ledgersListMutex.unlock(); + metadataMutex.unlock(); updateLedgersIdsComplete(stat); synchronized (ManagedLedgerImpl.this) { mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp, @@ -1267,7 +1263,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } }, null); - ledgersListMutex.unlock(); + metadataMutex.unlock(); synchronized (ManagedLedgerImpl.this) { lastLedgerCreationFailureTimestamp = clock.millis(); @@ -1282,7 +1278,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) { - if (!ledgersListMutex.tryLock()) { + if (!metadataMutex.tryLock()) { // Defer update for later scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS); return; @@ -2062,7 +2058,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming - || !ledgersListMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list + || !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list scheduleDeferredTrimming(promise); trimmerMutex.unlock(); return; @@ -2101,7 +2097,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(), TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); ledgersStat = stat; - ledgersListMutex.unlock(); + metadataMutex.unlock(); trimmerMutex.unlock(); for (LedgerInfo ls : ledgersToDelete) { @@ -2119,7 +2115,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Failed to update the list of ledgers after trimming", name, e); - ledgersListMutex.unlock(); + metadataMutex.unlock(); trimmerMutex.unlock(); promise.completeExceptionally(e); @@ -2531,7 +2527,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation, CompletableFuture<Void> finalPromise) { synchronized (this) { - if (!ledgersListMutex.tryLock()) { + if (!metadataMutex.tryLock()) { // retry in 100 milliseconds scheduledExecutor.schedule( safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100, @@ -2539,7 +2535,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } else { // lock acquired CompletableFuture<Void> unlockingPromise = new CompletableFuture<>(); unlockingPromise.whenComplete((res, ex) -> { - ledgersListMutex.unlock(); + metadataMutex.unlock(); if (ex != null) { finalPromise.completeExceptionally(ex); } else { @@ -3020,6 +3016,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId()) .setEntryId(lastConfirmedEntry.getEntryId())); } + for (Map.Entry<String, String> property : propertiesMap.entrySet()) { + mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder() + .setKey(property.getKey()).setValue(property.getValue())); + } return mlInfo.build(); } @@ -3271,57 +3271,96 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } @Override - public void setProperties(Map<String, String> properties) throws InterruptedException { + public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException { + Map<String, String> map = new HashMap<>(); + map.put(key, value); + updateProperties(map, false, null); + } + + @Override + public void asyncSetProperty(String key, String value, final UpdatePropertiesCallback callback, Object ctx) { + Map<String, String> map = new HashMap<>(); + map.put(key, value); + asyncUpdateProperties(map, false, null, callback, ctx); + } + + @Override + public void deleteProperty(String key) throws InterruptedException, ManagedLedgerException { + updateProperties(null, true, key); + } + + @Override + public void asyncDeleteProperty(String key, final UpdatePropertiesCallback callback, Object ctx) { + asyncUpdateProperties(null, true, key, callback, ctx); + } + + @Override + public void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException { + updateProperties(properties, false, null); + } + + @Override + public void asyncSetProperties(Map<String, String> properties, final UpdatePropertiesCallback callback, + Object ctx) { + asyncUpdateProperties(properties, false, null, callback, ctx); + } + + private void updateProperties(Map<String, String> properties, boolean isDelete, + String deleteKey) throws InterruptedException, ManagedLedgerException { final CountDownLatch latch = new CountDownLatch(1); - this.asyncSetProperties(properties, new SetPropertiesCallback() { + class Result { + ManagedLedgerException exception = null; + } + final Result result = new Result(); + this.asyncUpdateProperties(properties, isDelete, deleteKey, new UpdatePropertiesCallback() { @Override - public void setPropertiesComplete(Map<String, String> properties, Object ctx) { + public void updatePropertiesComplete(Map<String, String> properties, Object ctx) { latch.countDown(); } @Override - public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}] Update manageLedger's info failed:{}", name, exception.getMessage()); + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + result.exception = exception; latch.countDown(); } }, null); - latch.await(); + if (!latch.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) { + throw new ManagedLedgerException("Timeout during update managedLedger's properties"); + } + + if (result.exception != null) { + log.error("[{}] Update managedLedger's properties failed", name, result.exception); + throw result.exception; + } } - @Override - public void asyncSetProperties(Map<String, String> properties, final SetPropertiesCallback callback, Object ctx) { - store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() { + private void asyncUpdateProperties(Map<String, String> properties, boolean isDelete, + String deleteKey, final UpdatePropertiesCallback callback, Object ctx) { + if (!metadataMutex.tryLock()) { + // Defer update for later + scheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey, + callback, ctx), 100, TimeUnit.MILLISECONDS); + return; + } + if (isDelete) { + propertiesMap.remove(deleteKey); + } else { + propertiesMap.putAll(properties); + } + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() { @Override - public void operationComplete(ManagedLedgerInfo result, Stat version) { + public void operationComplete(Void result, Stat version) { ledgersStat = version; - // Update manageLedger's properties. - ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result); - info.clearProperties(); - for (Map.Entry<String, String> property : properties.entrySet()) { - info.addProperties(MLDataFormats.KeyValue.newBuilder().setKey(property.getKey()).setValue(property.getValue())); - } - store.asyncUpdateLedgerIds(name, info.build(), version, new MetaStoreCallback<Void>() { - @Override - public void operationComplete(Void result, Stat version) { - ledgersStat = version; - propertiesMap.clear(); - propertiesMap.putAll(properties); - callback.setPropertiesComplete(properties, ctx); - } - - @Override - public void operationFailed(MetaStoreException e) { - log.error("[{}] Update manageLedger's info failed:{}", name, e.getMessage()); - callback.setPropertiesFailed(e, ctx); - } - }); + callback.updatePropertiesComplete(propertiesMap, ctx); + metadataMutex.unlock(); } @Override public void operationFailed(MetaStoreException e) { - log.error("[{}] Get manageLedger's info failed:{}", name, e.getMessage()); - callback.setPropertiesFailed(e, ctx); + log.error("[{}] Update managedLedger's properties failed", name, e); + callback.updatePropertiesFailed(e, ctx); + metadataMutex.unlock(); } }); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index e507c99..2806a10 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -63,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; -import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -95,7 +95,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; @@ -1168,7 +1167,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { } @Test - public void testSetProperties() throws Exception { + public void testUpdateProperties() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); Map<String, String> properties = new HashMap<>(); properties.put("key1", "value1"); @@ -1177,40 +1176,107 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.setProperties(properties); assertEquals(ledger.getProperties(), properties); + properties.put("key4", "value4"); + ledger.setProperty("key4", "value4"); + assertEquals(ledger.getProperties(), properties); + + ledger.deleteProperty("key4"); + properties.remove("key4"); + assertEquals(ledger.getProperties(), properties); + Map<String, String> newProperties = new HashMap<>(); - newProperties.put("key4", "value4"); newProperties.put("key5", "value5"); - newProperties.put("key6", "value6"); + newProperties.put("key1", "value6"); + newProperties.putAll(properties); ledger.setProperties(newProperties); assertEquals(ledger.getProperties(), newProperties); } @Test - public void testAsyncSetProperties() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); + public void testAsyncUpdateProperties() throws Exception { + final CountDownLatch latch = new CountDownLatch(3); ManagedLedger ledger = factory.open("my_test_ledger"); - Map<String, String> properties = new HashMap<>(); - properties.put("key1", "value1"); - properties.put("key2", "value2"); - properties.put("key3", "value3"); - ledger.setProperties(properties); - Map<String, String> newProperties = new HashMap<>(); - newProperties.put("key4", "value4"); - newProperties.put("key5", "value5"); - newProperties.put("key6", "value6"); - ledger.asyncSetProperties(newProperties, new AsyncCallbacks.SetPropertiesCallback() { + Map<String, String> prop = new HashMap<>(); + prop.put("key1", "value1"); + prop.put("key2", "value2"); + prop.put("key3", "value3"); + ledger.asyncSetProperties(prop, new AsyncCallbacks.UpdatePropertiesCallback() { @Override - public void setPropertiesComplete(Map<String, String> properties, Object ctx) { + public void updatePropertiesComplete(Map<String, String> properties, Object ctx) { + assertEquals(prop, properties); latch.countDown(); } @Override - public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) { - fail("should have succeeded"); + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { } }, null); - latch.await(); - assertEquals(ledger.getProperties(), newProperties); + + ledger.asyncSetProperty("key4", "value4", new AsyncCallbacks.UpdatePropertiesCallback() { + @Override + public void updatePropertiesComplete(Map<String, String> properties, Object ctx) { + assertNotNull(properties.get("key4")); + assertEquals("value4", properties.get("key4")); + latch.countDown(); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null); + + prop.remove("key1"); + ledger.asyncDeleteProperty("key1", new AsyncCallbacks.UpdatePropertiesCallback() { + @Override + public void updatePropertiesComplete(Map<String, String> properties, Object ctx) { + assertNull(properties.get("key1")); + latch.countDown(); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null); + assertTrue(latch.await(60, TimeUnit.SECONDS)); + } + + @Test + public void testConcurrentAsyncSetProperties() throws Exception { + final CountDownLatch latch = new CountDownLatch(1000); + ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i = 0; i < 1000; i++) { + final int finalI = i; + executor.execute(() -> { + Map<String, String> newProperties = new HashMap<>(); + newProperties.put("key0", String.valueOf(finalI)); + newProperties.put("key1", "value1"); + newProperties.put("key2", "value2"); + newProperties.put("key3", "value3"); + ledger.asyncSetProperties(newProperties, new AsyncCallbacks.UpdatePropertiesCallback() { + @Override + public void updatePropertiesComplete(Map<String, String> properties, Object ctx) { + assertEquals(properties, newProperties); + latch.countDown(); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null); + }); + } + try { + for (int i = 0; i < 100; i++) { + ledger.addEntry("data".getBytes(Encoding)); + Thread.sleep(300); + } + } catch (Exception e) { + fail(e.getMessage()); + } + assertTrue(latch.await(300, TimeUnit.SECONDS)); + executor.shutdown(); + factory.shutdown(); } @Test