[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation

2023-05-17 Thread via GitHub


poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1197342688


##
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java:
##
@@ -207,25 +207,26 @@ protected void batchOperation(List ops) {
 for (int i = 0; i < ops.size(); i++) {
 OpResult opr = results.get(i);
 MetadataOp op = ops.get(i);
-
-switch (op.getType()) {
-case PUT:
-handlePutResult(op.asPut(), opr);
-break;
-case DELETE:
-handleDeleteResult(op.asDelete(), opr);
-break;
-case GET:
-handleGetResult(op.asGet(), opr);
-break;
-case GET_CHILDREN:
-handleGetChildrenResult(op.asGetChildren(), opr);
-break;
-
-default:
-op.getFuture().completeExceptionally(new 
MetadataStoreException(
-"Operation type not supported in multi: " 
+ op.getType()));
-}
+execute(() -> {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation

2023-05-17 Thread via GitHub


poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1197342482


##
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##
@@ -528,7 +528,7 @@ public void execute(Runnable task, CompletableFuture 
future) {
 try {
 executor.execute(task);
 } catch (Throwable t) {
-future.completeExceptionally(t);
+executor.execute(() -> future.completeExceptionally(t));

Review Comment:
   >> Now, if we call `put /a`, it will use the `event thread` of the ZK client 
after the operation; but if we call `put /a/b/c`(the node `/a` is not exists), 
it will use the `metadata store thread` after the operation.
   
   > What is the consequence of using different threads? Did you see out of 
order events?
   > EDIT: looks like the consequence is this error `ZooKeeper session 
reconnection timeout. Notifying session is lost`.
   
   To easier to trace the context, I answer this question here.
   
   No, I notice this when I check for deadlocks 
https://github.com/apache/pulsar/pull/20189. I saw that the ZK thread was 
executing the task of creating the topic. And then I realized that maybe 
@merlimat  forgot to change this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation

2023-05-17 Thread via GitHub


poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1197326974


##
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##
@@ -528,7 +528,7 @@ public void execute(Runnable task, CompletableFuture 
future) {
 try {
 executor.execute(task);
 } catch (Throwable t) {
-future.completeExceptionally(t);
+executor.execute(() -> future.completeExceptionally(t));

Review Comment:
   This change has been removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation

2023-05-15 Thread via GitHub


poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1194546785


##
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java:
##
@@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, 
Supplier urlSup
 assertFalse(store.exists(prefix).join());
 }
 
+@DataProvider(name = "conditionOfSwitchThread")
+public Object[][] conditionOfSwitchThread(){
+return new Object[][]{
+{false, false},
+{false, true},
+{true, false},
+{true, true}
+};
+}
+
+@Test(dataProvider = "conditionOfSwitchThread")
+public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, 
boolean enabledBatch) throws Exception {
+final String prefix = newKey();
+final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+builder.fsyncEnable(false);
+builder.batchingEnabled(enabledBatch);
+if (!hasSynchronizer) {
+builder.synchronizer(null);
+}
+MetadataStoreConfig config = builder.build();
+@Cleanup
+ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+final Runnable verify = () -> {
+String currentThreadName = Thread.currentThread().getName();
+String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
+metadataStoreName, currentThreadName);
+if 
(!Thread.currentThread().getName().startsWith(metadataStoreName)){

Review Comment:
   > Did you forget to push for this change?
   
   Yes, you are right. Fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation

2023-05-14 Thread via GitHub


poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1193168968


##
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java:
##
@@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, 
Supplier urlSup
 assertFalse(store.exists(prefix).join());
 }
 
+@DataProvider(name = "conditionOfSwitchThread")
+public Object[][] conditionOfSwitchThread(){
+return new Object[][]{
+{false, false},
+{false, true},
+{true, false},
+{true, true}
+};
+}
+
+@Test(dataProvider = "conditionOfSwitchThread")
+public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, 
boolean enabledBatch) throws Exception {
+final String prefix = newKey();
+final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+builder.fsyncEnable(false);
+builder.batchingEnabled(enabledBatch);
+if (!hasSynchronizer) {
+builder.synchronizer(null);
+}
+MetadataStoreConfig config = builder.build();
+@Cleanup
+ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+final Runnable verify = () -> {
+String currentThreadName = Thread.currentThread().getName();
+String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
+metadataStoreName, currentThreadName);
+if 
(!Thread.currentThread().getName().startsWith(metadataStoreName)){
+throw new RuntimeException(errorMessage);
+}
+};
+
+// put with node which has parent(but the parent node is not exists).
+store.put(prefix + "/a1/b1/c1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// put.
+store.put(prefix + "/b1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// get.
+store.get(prefix + "/b1").thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// get the node which is not exists.
+store.get(prefix + "/non").thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// delete.
+store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// delete the node which is not exists.
+store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
+verify.run();
+return null;
+}).exceptionally(ex -> {
+verify.run();
+return null;
+}).join();
+}

Review Comment:
   > I suggest that DO NOT use CompletableFuture#get and CompletableFuture#join 
anywhere.
   
   Agree with you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation

2023-05-12 Thread via GitHub


poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1192442666


##
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java:
##
@@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, 
Supplier urlSup
 assertFalse(store.exists(prefix).join());
 }
 
+@DataProvider(name = "conditionOfSwitchThread")
+public Object[][] conditionOfSwitchThread(){
+return new Object[][]{
+{false, false},
+{false, true},
+{true, false},
+{true, true}
+};
+}
+
+@Test(dataProvider = "conditionOfSwitchThread")
+public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, 
boolean enabledBatch) throws Exception {
+final String prefix = newKey();
+final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+builder.fsyncEnable(false);
+builder.batchingEnabled(enabledBatch);
+if (!hasSynchronizer) {
+builder.synchronizer(null);
+}
+MetadataStoreConfig config = builder.build();
+@Cleanup
+ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+final Runnable verify = () -> {
+String currentThreadName = Thread.currentThread().getName();
+String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
+metadataStoreName, currentThreadName);
+if 
(!Thread.currentThread().getName().startsWith(metadataStoreName)){
+throw new RuntimeException(errorMessage);
+}
+};
+
+// put with node which has parent(but the parent node is not exists).
+store.put(prefix + "/a1/b1/c1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// put.
+store.put(prefix + "/b1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// get.
+store.get(prefix + "/b1").thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// get the node which is not exists.
+store.get(prefix + "/non").thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// delete.
+store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// delete the node which is not exists.
+store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
+verify.run();
+return null;
+}).exceptionally(ex -> {
+verify.run();
+return null;
+}).join();
+}

Review Comment:
   @nodece  Could you take a look again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation

2023-05-12 Thread via GitHub


poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1192424212


##
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java:
##
@@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, 
Supplier urlSup
 assertFalse(store.exists(prefix).join());
 }
 
+@DataProvider(name = "conditionOfSwitchThread")
+public Object[][] conditionOfSwitchThread(){
+return new Object[][]{
+{false, false},
+{false, true},
+{true, false},
+{true, true}
+};
+}
+
+@Test(dataProvider = "conditionOfSwitchThread")
+public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, 
boolean enabledBatch) throws Exception {
+final String prefix = newKey();
+final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+builder.fsyncEnable(false);
+builder.batchingEnabled(enabledBatch);
+if (!hasSynchronizer) {
+builder.synchronizer(null);
+}
+MetadataStoreConfig config = builder.build();
+@Cleanup
+ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+final Runnable verify = () -> {
+String currentThreadName = Thread.currentThread().getName();
+String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
+metadataStoreName, currentThreadName);
+if 
(!Thread.currentThread().getName().startsWith(metadataStoreName)){
+throw new RuntimeException(errorMessage);
+}
+};
+
+// put with node which has parent(but the parent node is not exists).
+store.put(prefix + "/a1/b1/c1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// put.
+store.put(prefix + "/b1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// get.
+store.get(prefix + "/b1").thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// get the node which is not exists.
+store.get(prefix + "/non").thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// delete.
+store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
+verify.run();
+return null;
+}).join();
+// delete the node which is not exists.
+store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
+verify.run();
+return null;
+}).exceptionally(ex -> {
+verify.run();
+return null;
+}).join();
+}

Review Comment:
   > Could you add a nested request case? So like:
   
   No, this will cause a deadlock. The metadata store thread pool is a 
singleton thread pool.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20303: [fix] [meta]Switch to the metadata store thread after zk operation

2023-05-12 Thread via GitHub


poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1192420389


##
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java:
##
@@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, 
Supplier urlSup
 assertFalse(store.exists(prefix).join());
 }
 
+@DataProvider(name = "conditionOfSwitchThread")
+public Object[][] conditionOfSwitchThread(){
+return new Object[][]{
+{false, false},
+{false, true},
+{true, false},
+{true, true}
+};
+}
+
+@Test(dataProvider = "conditionOfSwitchThread")
+public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, 
boolean enabledBatch) throws Exception {
+final String prefix = newKey();
+final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+builder.fsyncEnable(false);
+builder.batchingEnabled(enabledBatch);
+if (!hasSynchronizer) {
+builder.synchronizer(null);
+}
+MetadataStoreConfig config = builder.build();
+@Cleanup
+ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+final Runnable verify = () -> {
+String currentThreadName = Thread.currentThread().getName();
+String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
+metadataStoreName, currentThreadName);
+if 
(!Thread.currentThread().getName().startsWith(metadataStoreName)){

Review Comment:
   Good suggestion, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org