[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation
poorbarcode commented on code in PR #20303: URL: https://github.com/apache/pulsar/pull/20303#discussion_r1197342688 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java: ## @@ -207,25 +207,26 @@ protected void batchOperation(List ops) { for (int i = 0; i < ops.size(); i++) { OpResult opr = results.get(i); MetadataOp op = ops.get(i); - -switch (op.getType()) { -case PUT: -handlePutResult(op.asPut(), opr); -break; -case DELETE: -handleDeleteResult(op.asDelete(), opr); -break; -case GET: -handleGetResult(op.asGet(), opr); -break; -case GET_CHILDREN: -handleGetChildrenResult(op.asGetChildren(), opr); -break; - -default: -op.getFuture().completeExceptionally(new MetadataStoreException( -"Operation type not supported in multi: " + op.getType())); -} +execute(() -> { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation
poorbarcode commented on code in PR #20303: URL: https://github.com/apache/pulsar/pull/20303#discussion_r1197342482 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java: ## @@ -528,7 +528,7 @@ public void execute(Runnable task, CompletableFuture future) { try { executor.execute(task); } catch (Throwable t) { -future.completeExceptionally(t); +executor.execute(() -> future.completeExceptionally(t)); Review Comment: >> Now, if we call `put /a`, it will use the `event thread` of the ZK client after the operation; but if we call `put /a/b/c`(the node `/a` is not exists), it will use the `metadata store thread` after the operation. > What is the consequence of using different threads? Did you see out of order events? > EDIT: looks like the consequence is this error `ZooKeeper session reconnection timeout. Notifying session is lost`. To easier to trace the context, I answer this question here. No, I notice this when I check for deadlocks https://github.com/apache/pulsar/pull/20189. I saw that the ZK thread was executing the task of creating the topic. And then I realized that maybe @merlimat forgot to change this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation
poorbarcode commented on code in PR #20303: URL: https://github.com/apache/pulsar/pull/20303#discussion_r1197326974 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java: ## @@ -528,7 +528,7 @@ public void execute(Runnable task, CompletableFuture future) { try { executor.execute(task); } catch (Throwable t) { -future.completeExceptionally(t); +executor.execute(() -> future.completeExceptionally(t)); Review Comment: This change has been removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation
poorbarcode commented on code in PR #20303: URL: https://github.com/apache/pulsar/pull/20303#discussion_r1194546785 ## pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java: ## @@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, Supplier urlSup assertFalse(store.exists(prefix).join()); } +@DataProvider(name = "conditionOfSwitchThread") +public Object[][] conditionOfSwitchThread(){ +return new Object[][]{ +{false, false}, +{false, true}, +{true, false}, +{true, true} +}; +} + +@Test(dataProvider = "conditionOfSwitchThread") +public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean enabledBatch) throws Exception { +final String prefix = newKey(); +final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", ""); +MetadataStoreConfig.MetadataStoreConfigBuilder builder = + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName); +builder.fsyncEnable(false); +builder.batchingEnabled(enabledBatch); +if (!hasSynchronizer) { +builder.synchronizer(null); +} +MetadataStoreConfig config = builder.build(); +@Cleanup +ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); + +final Runnable verify = () -> { +String currentThreadName = Thread.currentThread().getName(); +String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s", +metadataStoreName, currentThreadName); +if (!Thread.currentThread().getName().startsWith(metadataStoreName)){ Review Comment: > Did you forget to push for this change? Yes, you are right. Fixed, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation
poorbarcode commented on code in PR #20303: URL: https://github.com/apache/pulsar/pull/20303#discussion_r1193168968 ## pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java: ## @@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, Supplier urlSup assertFalse(store.exists(prefix).join()); } +@DataProvider(name = "conditionOfSwitchThread") +public Object[][] conditionOfSwitchThread(){ +return new Object[][]{ +{false, false}, +{false, true}, +{true, false}, +{true, true} +}; +} + +@Test(dataProvider = "conditionOfSwitchThread") +public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean enabledBatch) throws Exception { +final String prefix = newKey(); +final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", ""); +MetadataStoreConfig.MetadataStoreConfigBuilder builder = + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName); +builder.fsyncEnable(false); +builder.batchingEnabled(enabledBatch); +if (!hasSynchronizer) { +builder.synchronizer(null); +} +MetadataStoreConfig config = builder.build(); +@Cleanup +ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); + +final Runnable verify = () -> { +String currentThreadName = Thread.currentThread().getName(); +String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s", +metadataStoreName, currentThreadName); +if (!Thread.currentThread().getName().startsWith(metadataStoreName)){ +throw new RuntimeException(errorMessage); +} +}; + +// put with node which has parent(but the parent node is not exists). +store.put(prefix + "/a1/b1/c1", "value".getBytes(), Optional.of(-1L)).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// put. +store.put(prefix + "/b1", "value".getBytes(), Optional.of(-1L)).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// get. +store.get(prefix + "/b1").thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// get the node which is not exists. +store.get(prefix + "/non").thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// delete. +store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// delete the node which is not exists. +store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> { +verify.run(); +return null; +}).exceptionally(ex -> { +verify.run(); +return null; +}).join(); +} Review Comment: > I suggest that DO NOT use CompletableFuture#get and CompletableFuture#join anywhere. Agree with you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation
poorbarcode commented on code in PR #20303: URL: https://github.com/apache/pulsar/pull/20303#discussion_r1192442666 ## pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java: ## @@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, Supplier urlSup assertFalse(store.exists(prefix).join()); } +@DataProvider(name = "conditionOfSwitchThread") +public Object[][] conditionOfSwitchThread(){ +return new Object[][]{ +{false, false}, +{false, true}, +{true, false}, +{true, true} +}; +} + +@Test(dataProvider = "conditionOfSwitchThread") +public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean enabledBatch) throws Exception { +final String prefix = newKey(); +final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", ""); +MetadataStoreConfig.MetadataStoreConfigBuilder builder = + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName); +builder.fsyncEnable(false); +builder.batchingEnabled(enabledBatch); +if (!hasSynchronizer) { +builder.synchronizer(null); +} +MetadataStoreConfig config = builder.build(); +@Cleanup +ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); + +final Runnable verify = () -> { +String currentThreadName = Thread.currentThread().getName(); +String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s", +metadataStoreName, currentThreadName); +if (!Thread.currentThread().getName().startsWith(metadataStoreName)){ +throw new RuntimeException(errorMessage); +} +}; + +// put with node which has parent(but the parent node is not exists). +store.put(prefix + "/a1/b1/c1", "value".getBytes(), Optional.of(-1L)).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// put. +store.put(prefix + "/b1", "value".getBytes(), Optional.of(-1L)).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// get. +store.get(prefix + "/b1").thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// get the node which is not exists. +store.get(prefix + "/non").thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// delete. +store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// delete the node which is not exists. +store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> { +verify.run(); +return null; +}).exceptionally(ex -> { +verify.run(); +return null; +}).join(); +} Review Comment: @nodece Could you take a look again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation
poorbarcode commented on code in PR #20303: URL: https://github.com/apache/pulsar/pull/20303#discussion_r1192424212 ## pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java: ## @@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, Supplier urlSup assertFalse(store.exists(prefix).join()); } +@DataProvider(name = "conditionOfSwitchThread") +public Object[][] conditionOfSwitchThread(){ +return new Object[][]{ +{false, false}, +{false, true}, +{true, false}, +{true, true} +}; +} + +@Test(dataProvider = "conditionOfSwitchThread") +public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean enabledBatch) throws Exception { +final String prefix = newKey(); +final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", ""); +MetadataStoreConfig.MetadataStoreConfigBuilder builder = + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName); +builder.fsyncEnable(false); +builder.batchingEnabled(enabledBatch); +if (!hasSynchronizer) { +builder.synchronizer(null); +} +MetadataStoreConfig config = builder.build(); +@Cleanup +ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); + +final Runnable verify = () -> { +String currentThreadName = Thread.currentThread().getName(); +String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s", +metadataStoreName, currentThreadName); +if (!Thread.currentThread().getName().startsWith(metadataStoreName)){ +throw new RuntimeException(errorMessage); +} +}; + +// put with node which has parent(but the parent node is not exists). +store.put(prefix + "/a1/b1/c1", "value".getBytes(), Optional.of(-1L)).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// put. +store.put(prefix + "/b1", "value".getBytes(), Optional.of(-1L)).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// get. +store.get(prefix + "/b1").thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// get the node which is not exists. +store.get(prefix + "/non").thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// delete. +store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> { +verify.run(); +return null; +}).join(); +// delete the node which is not exists. +store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> { +verify.run(); +return null; +}).exceptionally(ex -> { +verify.run(); +return null; +}).join(); +} Review Comment: > Could you add a nested request case? So like: No, this will cause a deadlock. The metadata store thread pool is a singleton thread pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation
poorbarcode commented on code in PR #20303: URL: https://github.com/apache/pulsar/pull/20303#discussion_r1192420389 ## pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java: ## @@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, Supplier urlSup assertFalse(store.exists(prefix).join()); } +@DataProvider(name = "conditionOfSwitchThread") +public Object[][] conditionOfSwitchThread(){ +return new Object[][]{ +{false, false}, +{false, true}, +{true, false}, +{true, true} +}; +} + +@Test(dataProvider = "conditionOfSwitchThread") +public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean enabledBatch) throws Exception { +final String prefix = newKey(); +final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", ""); +MetadataStoreConfig.MetadataStoreConfigBuilder builder = + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName); +builder.fsyncEnable(false); +builder.batchingEnabled(enabledBatch); +if (!hasSynchronizer) { +builder.synchronizer(null); +} +MetadataStoreConfig config = builder.build(); +@Cleanup +ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); + +final Runnable verify = () -> { +String currentThreadName = Thread.currentThread().getName(); +String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s", +metadataStoreName, currentThreadName); +if (!Thread.currentThread().getName().startsWith(metadataStoreName)){ Review Comment: Good suggestion, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org