yuqi created IOTDB-1087:
---------------------------

             Summary: Flush thread hang in  the merge method of 
LevelCompactionTsFileManagement
                 Key: IOTDB-1087
                 URL: https://issues.apache.org/jira/browse/IOTDB-1087
             Project: Apache IoTDB
          Issue Type: Bug
            Reporter: yuqi


The following code can reproduce the problem:


{code:java}
try (Connection connection = DriverManager
        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
         Statement statement = connection.createStatement()) {

      statement.execute("CREATE TIMESERIES root.ln.wf01.wt01.name WITH 
DATATYPE=TEXT");
      statement.execute("CREATE TIMESERIES root.ln.wf01.wt01.age WITH 
DATATYPE=INT32, ENCODING=RLE, COMPRESSOR = SNAPPY");

      statement.execute("insert into root.ln.wf01.wt01(timestamp,name,age) 
values(1000,'zhang',10)");
      statement.execute("flush");

      ResultSet r1 = statement.executeQuery("select * from root.ln.wf01.wt01");
      r1.next();

      Assert.assertEquals("zhang", r1.getString(2));
      Assert.assertEquals(10, r1.getInt(3));

      statement.execute("insert into root.ln.wf01.wt01(timestamp,name,age) 
values(2000,'wang',20)");
      statement.execute("flush");
      statement.execute("insert into root.ln.wf01.wt01(timestamp,name,age) 
values(3000,'li',30)");

      ResultSet r2 = statement.executeQuery("select * from root.ln.wf01.wt01 
where name = 'wang'");
      r2.next();
      Assert.assertEquals(20, r2.getInt(3));

      ResultSet r3 = statement.executeQuery("select * from root.ln.wf01.wt01 
where name = 'li'");
      r3.next();
      Assert.assertEquals(30, r3.getInt(3));

      ResultSet r4 = statement.executeQuery("select sum(age) from 
root.ln.wf01.wt01");
      r4.next();
      double d = r4.getDouble(1);

      Assert.assertTrue(60.0 == d);

      //now we try to insert more values
      for (int i = 20; i <= 10000; i++) {
        String time = String.valueOf(i * 100);
        String values = String.valueOf(i * 10);
        statement.execute(String.format("insert into 
root.ln.wf01.wt01(timestamp,name,age) values(%s,'wang', %s)", time, values));
      }

      //execute flush and the process stuck
      statement.execute("flush");
      ResultSet r5 = statement.executeQuery("select * from root.ln.wf01.wt01 
where timestamp = 100000");
      r5.next();
      Assert.assertEquals(10000, r5.getInt(3));
    }
{code}

The test will hang in the command flush
the following are some key stack info:


{code:java}
"MergeThread-0" #38 prio=5 os_prio=31 tid=0x00007f8d7d203000 nid=0x6c03 in 
Object.wait() [0x00007000123fb000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at 
org.apache.iotdb.db.engine.storagegroup.TsFileLock.writeLock(TsFileLock.java:69)
        - locked <0x0000000782cbebd8> (a 
org.apache.iotdb.db.engine.storagegroup.TsFileLock)
        at 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.writeLock(TsFileResource.java:499)
        at 
org.apache.iotdb.db.engine.merge.task.MergeFileTask.moveUnmergedToNew(MergeFileTask.java:180)
        at 
org.apache.iotdb.db.engine.merge.task.MergeFileTask.mergeFiles(MergeFileTask.java:95)
        at 
org.apache.iotdb.db.engine.merge.task.MergeTask.doMerge(MergeTask.java:158)
        at 
org.apache.iotdb.db.engine.merge.task.MergeTask.call(MergeTask.java:100)
        at 
org.apache.iotdb.db.engine.merge.task.MergeTask.call(MergeTask.java:53)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


"pool-7-IoTDB-Compaction-ServerServiceImpl-thread-2" #31 prio=5 os_prio=31 
tid=0x00007f8d7c4a3000 nid=0x6803 waiting on condition [0x0000700011ce6000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at 
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement.merge(LevelCompactionTsFileManagement.java:486)
        at 
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement.merge(LevelCompactionTsFileManagement.java:467)
        at 
org.apache.iotdb.db.engine.compaction.TsFileManagement$CompactionMergeTask.run(TsFileManagement.java:176)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"pool-6-IoTDB-RPC-Client-thread-1" #19 prio=5 os_prio=31 tid=0x00007f8d7d50d000 
nid=0xa203 in Object.wait() [0x00007000110c1000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.syncCloseAllWorkingTsFileProcessors(StorageGroupProcessor.java:1288)
        - locked <0x0000000782d0a430> (a java.lang.Object)
        at 
org.apache.iotdb.db.engine.StorageEngine.syncCloseAllProcessor(StorageEngine.java:443)
        at 
org.apache.iotdb.db.qp.executor.PlanExecutor.operateFlush(PlanExecutor.java:335)
        at 
org.apache.iotdb.db.qp.executor.PlanExecutor.processNonQuery(PlanExecutor.java:260)
        at 
org.apache.iotdb.db.service.TSServiceImpl.executeNonQuery(TSServiceImpl.java:1123)
        at 
org.apache.iotdb.db.service.TSServiceImpl.executeNonQueryPlan(TSServiceImpl.java:1774)
        at 
org.apache.iotdb.db.service.TSServiceImpl.executeUpdateStatement(TSServiceImpl.java:1110)
        at 
org.apache.iotdb.db.service.TSServiceImpl.executeStatement(TSServiceImpl.java:534)
        at 
org.apache.iotdb.service.rpc.thrift.TSIService$Processor$executeStatement.getResult(TSIService.java:2133)
        at 
org.apache.iotdb.service.rpc.thrift.TSIService$Processor$executeStatement.getResult(TSIService.java:2113)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:38)
        at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:313)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}


Reason:
`FileReaderManager` holds read lock (see TsFileLock) when execute query and 
release the lock when close the session. Compaction thread needs to get write 
lock(see MergeFileTask#moveUnmergedToNew) and thus get hang 

{code:java}
 
 //hang here
 seqFile.writeLock();
    try {
      resource.removeFileReader(seqFile);
      ChunkMetadataCache.getInstance().remove(seqFile);
      
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());

      File newMergeFile = seqFile.getTsFile();
      newMergeFile.delete();
      fsFactory.moveFile(fileWriter.getFile(), newMergeFile);
      seqFile.setFile(newMergeFile);
    } catch (Exception e) {
      logger.error(e.getMessage(), e);
    } finally {
      // clean cache
      if (IoTDBDescriptor.getInstance().getConfig().isMetaDataCacheEnable()) {
        ChunkCache.getInstance().clear();
        ChunkMetadataCache.getInstance().clear();
        TimeSeriesMetadataCache.getInstance().clear();
        
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
      }
      seqFile.writeUnlock();
    }
{code}








--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to