imay closed pull request #265: Change the lock type of Catalog lock
URL: https://github.com/apache/incubator-doris/pull/265
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/olap/olap_table.cpp b/be/src/olap/olap_table.cpp
index b47d678c..2f6a48fe 100644
--- a/be/src/olap/olap_table.cpp
+++ b/be/src/olap/olap_table.cpp
@@ -760,30 +760,32 @@ void OLAPTable::load_pending_data() {
                 break;
             }
 
-            if (_num_key_fields != pending_rowset.column_pruning_size()) {
-                LOG(WARNING) << "column pruning size is error when load 
pending data."
-                             << "column_pruning_size=" << 
pending_rowset.column_pruning_size() << ", "
-                             << "num_key_fields=" << _num_key_fields;
-                error_pending_data.insert(rowset->transaction_id());
-                break;
-            }
-            std::vector<std::pair<std::string, std::string>> 
column_statistics_string(_num_key_fields);
-            std::vector<bool> null_vec(_num_key_fields);
-            for (size_t j = 0; j < _num_key_fields; ++j) {
-                ColumnPruning column_pruning = 
pending_rowset.column_pruning(j);
-                column_statistics_string[j].first = column_pruning.min();
-                column_statistics_string[j].second = column_pruning.max();
-                if (column_pruning.has_null_flag()) {
-                    null_vec[j] = column_pruning.null_flag();
-                } else {
-                    null_vec[j] = false;
+            if (pending_rowset.column_pruning_size() != 0) {
+                if (_num_key_fields != pending_rowset.column_pruning_size()) {
+                    LOG(WARNING) << "column pruning size is error when load 
pending data."
+                        << "column_pruning_size=" << 
pending_rowset.column_pruning_size() << ", "
+                        << "num_key_fields=" << _num_key_fields;
+                    error_pending_data.insert(rowset->transaction_id());
+                    break;
+                }
+                std::vector<std::pair<std::string, std::string>> 
column_statistics_string(_num_key_fields);
+                std::vector<bool> null_vec(_num_key_fields);
+                for (size_t j = 0; j < _num_key_fields; ++j) {
+                    ColumnPruning column_pruning = 
pending_rowset.column_pruning(j);
+                    column_statistics_string[j].first = column_pruning.min();
+                    column_statistics_string[j].second = column_pruning.max();
+                    if (column_pruning.has_null_flag()) {
+                        null_vec[j] = column_pruning.null_flag();
+                    } else {
+                        null_vec[j] = false;
+                    }
                 }
-            }
 
-            if (rowset->add_column_statistics(column_statistics_string, 
null_vec) != OLAP_SUCCESS) {
-                LOG(WARNING) << "fail to set column statistics when load 
pending data";
-                error_pending_data.insert(pending_delta.transaction_id());
-                break;
+                if (rowset->add_column_statistics(column_statistics_string, 
null_vec) != OLAP_SUCCESS) {
+                    LOG(WARNING) << "fail to set column statistics when load 
pending data";
+                    error_pending_data.insert(pending_delta.transaction_id());
+                    break;
+                }
             }
 
             if (rowset->load() != OLAP_SUCCESS) {
@@ -1182,32 +1184,34 @@ Rowset* OLAPTable::_construct_index_from_version(const 
PDelta* delta, int32_t ro
         return nullptr;
     }
 
-    if (_num_key_fields != prowset->column_pruning_size()) {
-        LOG(WARNING) << "column pruning size error, " << "table=" << 
full_name() << ", "
+    if (prowset->column_pruning_size() != 0) {
+        if (_num_key_fields != prowset->column_pruning_size()) {
+            LOG(WARNING) << "column pruning size error, " << "table=" << 
full_name() << ", "
                 << "version=" << version.first << "-" << version.second << ", "
                 << "version_hash=" << delta->version_hash() << ", "
                 << "column_pruning_size=" << prowset->column_pruning_size() << 
", "
                 << "num_key_fields=" << _num_key_fields;
-        SAFE_DELETE(rowset);
-        return nullptr;
-    }
-    vector<pair<string, string>> column_statistic_strings(_num_key_fields);
-    std::vector<bool> null_vec(_num_key_fields);
-    for (size_t j = 0; j < _num_key_fields; ++j) {
-        ColumnPruning column_pruning = prowset->column_pruning(j);
-        column_statistic_strings[j].first = column_pruning.min();
-        column_statistic_strings[j].second = column_pruning.max();
-        if (column_pruning.has_null_flag()) {
-            null_vec[j] = column_pruning.null_flag();
-        } else {
-            null_vec[j] = false;
+            SAFE_DELETE(rowset);
+            return nullptr;
+        }
+        vector<pair<string, string>> column_statistic_strings(_num_key_fields);
+        std::vector<bool> null_vec(_num_key_fields);
+        for (size_t j = 0; j < _num_key_fields; ++j) {
+            ColumnPruning column_pruning = prowset->column_pruning(j);
+            column_statistic_strings[j].first = column_pruning.min();
+            column_statistic_strings[j].second = column_pruning.max();
+            if (column_pruning.has_null_flag()) {
+                null_vec[j] = column_pruning.null_flag();
+            } else {
+                null_vec[j] = false;
+            }
         }
-    }
 
-    res = rowset->add_column_statistics(column_statistic_strings, null_vec);
-    if (res != OLAP_SUCCESS) {
-        SAFE_DELETE(rowset);
-        return nullptr;
+        res = rowset->add_column_statistics(column_statistic_strings, 
null_vec);
+        if (res != OLAP_SUCCESS) {
+            SAFE_DELETE(rowset);
+            return nullptr;
+        }
     }
 
     res = rowset->load();
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 07d9718f..4d445cc6 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -17,6 +17,20 @@
 
 package org.apache.doris.catalog;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Joiner.MapJoiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+
 import org.apache.doris.alter.Alter;
 import org.apache.doris.alter.AlterJob;
 import org.apache.doris.alter.AlterJob.JobType;
@@ -96,6 +110,7 @@
 import org.apache.doris.common.util.KuduUtil;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.common.util.QueryableReentrantLock;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.consistency.ConsistencyChecker;
 import org.apache.doris.deploy.DeployManager;
@@ -160,21 +175,6 @@
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.PublishVersionDaemon;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Joiner.MapJoiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-import com.sleepycat.je.rep.InsufficientLogException;
-import com.sleepycat.je.rep.NetworkRestore;
-import com.sleepycat.je.rep.NetworkRestoreConfig;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
@@ -208,7 +208,6 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 
 public class Catalog {
     private static final Logger LOG = LogManager.getLogger(Catalog.class);
@@ -231,7 +230,8 @@
     // Operations like Get or Put do not need lock.
     // We use fair ReentrantLock to avoid starvation. Do not use this lock in 
critical code pass
     // because fair lock has poor performance.
-    private ReentrantLock lock;
+    // Using QueryableReentrantLock to print owner thread in debug mode.
+    private QueryableReentrantLock lock;
 
     private ConcurrentHashMap<Long, Database> idToDb;
     private ConcurrentHashMap<String, Database> fullNameToDb;
@@ -374,7 +374,7 @@ private Catalog() {
         this.clone = new Clone();
         this.alter = new Alter();
         this.consistencyChecker = new ConsistencyChecker();
-        this.lock = new ReentrantLock(true);
+        this.lock = new QueryableReentrantLock(true);
         this.backupHandler = new BackupHandler(this);
         this.metaDir = Config.meta_dir;
         this.userPropertyMgr = new UserPropertyMgr();
@@ -498,6 +498,14 @@ private boolean tryLock(boolean mustLock) {
         while (true) {
             try {
                 if (!lock.tryLock(Config.catalog_try_lock_timeout_ms, 
TimeUnit.MILLISECONDS)) {
+                    if (LOG.isDebugEnabled()) {
+                        // to see which thread held this lock for long time.
+                        Thread owner = lock.getOwner();
+                        if (owner != null) {
+                            LOG.debug("catalog lock is held by: {}", 
Util.dumpThread(owner, 10));
+                        }
+                    }
+                    
                     if (mustLock) {
                         continue;
                     } else {
diff --git 
a/fe/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java 
b/fe/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java
new file mode 100644
index 00000000..1f028343
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/*
+ * This Lock is for exposing the getOwner() method,
+ * which is a protected method of ReentrantLock
+ */
+public class QueryableReentrantLock extends ReentrantLock {
+    private static final long serialVersionUID = 1L;
+
+    public QueryableReentrantLock() {
+        super();
+    }
+
+    public QueryableReentrantLock(boolean fair) {
+        super(fair);
+    }
+
+    @Override
+    public Thread getOwner() {
+        return super.getOwner();
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/src/main/java/org/apache/doris/common/util/Util.java
index a96bc0a0..20807318 100644
--- a/fe/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/src/main/java/org/apache/doris/common/util/Util.java
@@ -17,12 +17,10 @@
 
 package org.apache.doris.common.util;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.Config;
-
 import com.google.common.collect.Lists;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -308,5 +306,20 @@ public static boolean deleteDirectory(File directory) {
         }
         return directory.delete();
     }
+
+    public static String dumpThread(Thread t, int lineNum) {
+        StringBuilder sb = new StringBuilder();
+        StackTraceElement[] elements = t.getStackTrace();
+        sb.append("dump thread: ").append(t.getName()).append(", id: 
").append(t.getId()).append("\n");
+        int count = lineNum;
+        for (StackTraceElement element : elements) {
+            if (count == 0) {
+                break;
+            }
+            sb.append("    ").append(element.toString()).append("\n");
+            --count;
+        }
+        return sb.toString();
+    }
 }
 
diff --git 
a/fe/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java 
b/fe/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java
new file mode 100644
index 00000000..f8f7b217
--- /dev/null
+++ 
b/fe/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class QueryableReentrantLockTest {
+
+    private QueryableReentrantLock lock = new QueryableReentrantLock(true);
+
+    @Test
+    public void test() throws InterruptedException {
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                lock.lock();
+                try {
+                    try {
+                        Thread.sleep(5000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }, "thread1");
+
+        Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+                try {
+                    if (!lock.tryLock(1000, TimeUnit.MILLISECONDS)) {
+                        Thread owner = lock.getOwner();
+                        Assert.assertEquals("thread1", owner.getName());
+
+                        System.out.println(Util.dumpThread(owner, 10));
+
+                    }
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+            }
+        }, "thread2");
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to