[ 
https://issues.apache.org/jira/browse/HIVE-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578923#comment-15578923
 ] 

Mahipal Jupalli edited comment on HIVE-14980 at 10/15/16 11:28 PM:
-------------------------------------------------------------------

Hi,

My idea is to replicate the same checks from the 
org.apache.hadoop.hive.ql.txn.compactor.Initiator.java to the 
org.apache.hadoop.hive.ql.txn.compactor.Worker.java logic.

{code:title=org.apache.hadoop.hive.ql.txn.compactor.Initiator.java|borderStyle=solid}
// Figure out if there are any currently running compactions on the same table 
or partition.
  private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
                                            CompactionInfo ci) {
    if (compactions.getCompacts() != null) {
      for (ShowCompactResponseElement e : compactions.getCompacts()) {
         if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || 
e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
            e.getDbname().equals(ci.dbname) &&
            e.getTablename().equals(ci.tableName) &&
            (e.getPartitionname() == null && ci.partName == null ||
                  e.getPartitionname().equals(ci.partName))) {
          return true;
        }
      }
    }
    return false;
  }

public void run(){
    //...
    if (lookForCurrentCompactions(currentCompactions, ci)) {
        LOG.debug("Found currently initiated or working compaction for " + 
ci.getFullPartitionName() + " so we will not initiate another compaction");
        continue;
    }
    //...
}
{code}

{code:title=org.apache.hadoop.hive.ql.txn.compactor.Worker.java|borderStyle=solid}
public void run() {
      //...
          // This chicanery is to get around the fact that the table needs to 
be final in order to
        // go into the doAs below.
        final Table t = t1;

        ShowCompactResponse currentCompactions = txnHandler.showCompact(new 
ShowCompactRequest());
        if (lookForCurrentCompactions(currentCompactions, ci)) {
          LOG.debug("Found currently initiated or working compaction for " +
              ci.getFullPartitionName() + " so we will not initiate another 
compaction");
          continue;
        }
        
        // Find the partition we will be working with, if there is one.
        Partition p = null;
      //...
      //Figure out if there are any currently running compactions on the same 
table or partition.
 private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
                                           CompactionInfo ci) {
   if (compactions.getCompacts() != null) {
     for (ShowCompactResponseElement e : compactions.getCompacts()) {
        if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || 
e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
           e.getDbname().equals(ci.dbname) &&
           e.getTablename().equals(ci.tableName) &&
           (e.getPartitionname() == null && ci.partName == null ||
                 e.getPartitionname().equals(ci.partName))) {
         return true;
       }
     }
   }
   return false;
 }
}
  //...
{code}

Please let me know if this is the correct approach.


was (Author: mahipal.jupalli):
Hi,

My idea is to replicate the same checks from the Initiator to the Worker logic.

{code:title=org.apache.hadoop.hive.ql.txn.compactor.Initiator.java|borderStyle=solid}
// Figure out if there are any currently running compactions on the same table 
or partition.
  private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
                                            CompactionInfo ci) {
    if (compactions.getCompacts() != null) {
      for (ShowCompactResponseElement e : compactions.getCompacts()) {
         if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || 
e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
            e.getDbname().equals(ci.dbname) &&
            e.getTablename().equals(ci.tableName) &&
            (e.getPartitionname() == null && ci.partName == null ||
                  e.getPartitionname().equals(ci.partName))) {
          return true;
        }
      }
    }
    return false;
  }

public void run(){
    //...
    if (lookForCurrentCompactions(currentCompactions, ci)) {
        LOG.debug("Found currently initiated or working compaction for " + 
ci.getFullPartitionName() + " so we will not initiate another compaction");
        continue;
    }
    //...
}
{code}

{code:title=org.apache.hadoop.hive.ql.txn.compactor.Worker.java|borderStyle=solid}
public void run() {
      //...
          // This chicanery is to get around the fact that the table needs to 
be final in order to
        // go into the doAs below.
        final Table t = t1;

        ShowCompactResponse currentCompactions = txnHandler.showCompact(new 
ShowCompactRequest());
        if (lookForCurrentCompactions(currentCompactions, ci)) {
          LOG.debug("Found currently initiated or working compaction for " +
              ci.getFullPartitionName() + " so we will not initiate another 
compaction");
          continue;
        }
        
        // Find the partition we will be working with, if there is one.
        Partition p = null;
      //...
      //Figure out if there are any currently running compactions on the same 
table or partition.
 private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
                                           CompactionInfo ci) {
   if (compactions.getCompacts() != null) {
     for (ShowCompactResponseElement e : compactions.getCompacts()) {
        if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || 
e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
           e.getDbname().equals(ci.dbname) &&
           e.getTablename().equals(ci.tableName) &&
           (e.getPartitionname() == null && ci.partName == null ||
                 e.getPartitionname().equals(ci.partName))) {
         return true;
       }
     }
   }
   return false;
 }
}
  //...
{code}

Please let me know if this is the correct approach.

> Minor compaction when triggered simultaniously on the same table/partition 
> deletes data
> ---------------------------------------------------------------------------------------
>
>                 Key: HIVE-14980
>                 URL: https://issues.apache.org/jira/browse/HIVE-14980
>             Project: Hive
>          Issue Type: Bug
>          Components: Metastore
>    Affects Versions: 2.1.0
>            Reporter: Mahipal Jupalli
>            Assignee: Mahipal Jupalli
>            Priority: Critical
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> I have two tables (TABLEA, TABLEB). If I manually trigger compaction after 
> each INSERT into TABLEB from TABLEA, compactions are triggered on random 
> metastore asynchronously and are stepping on each other which is causing the 
> data to be deleted.
> Example here: 
> TABLEA - has 10k rows. 
> insert into mj.tableb select * from mj.tablea;
> alter table mj.tableb compact 'MINOR';
> insert into mj.tableb select * from mj.tablea;
> alter table mj.tableb compact 'MINOR';
> Once all the compactions are complete, I should ideally see 20k rows in 
> TABLEB. But I see only 10k rows (Only the rows INSERTED before the last 
> compaction persist, the old rows are deleted. I believe the old delta files 
> are deleted). 
> To further confirm the bug, if I do only one compaction after two inserts, I 
> see 20k rows in TABLEB.
> Proposed Fix:
> I have identified the bug in the code, it requires an additional check in the 
> org.apache.hadoop.hive.ql.txn.compactor.Worker class to check for any active 
> compactions on the table/partition. I will 'share the details of the fix once 
> I test it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to