[ https://issues.apache.org/jira/browse/HIVE-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578923#comment-15578923 ]
Mahipal Jupalli edited comment on HIVE-14980 at 10/15/16 11:28 PM: ------------------------------------------------------------------- Hi, My idea is to replicate the same checks from the org.apache.hadoop.hive.ql.txn.compactor.Initiator.java to the org.apache.hadoop.hive.ql.txn.compactor.Worker.java logic. {code:title=org.apache.hadoop.hive.ql.txn.compactor.Initiator.java|borderStyle=solid} // Figure out if there are any currently running compactions on the same table or partition. private boolean lookForCurrentCompactions(ShowCompactResponse compactions, CompactionInfo ci) { if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && (e.getPartitionname() == null && ci.partName == null || e.getPartitionname().equals(ci.partName))) { return true; } } } return false; } public void run(){ //... if (lookForCurrentCompactions(currentCompactions, ci)) { LOG.debug("Found currently initiated or working compaction for " + ci.getFullPartitionName() + " so we will not initiate another compaction"); continue; } //... } {code} {code:title=org.apache.hadoop.hive.ql.txn.compactor.Worker.java|borderStyle=solid} public void run() { //... // This chicanery is to get around the fact that the table needs to be final in order to // go into the doAs below. final Table t = t1; ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); if (lookForCurrentCompactions(currentCompactions, ci)) { LOG.debug("Found currently initiated or working compaction for " + ci.getFullPartitionName() + " so we will not initiate another compaction"); continue; } // Find the partition we will be working with, if there is one. Partition p = null; //... //Figure out if there are any currently running compactions on the same table or partition. private boolean lookForCurrentCompactions(ShowCompactResponse compactions, CompactionInfo ci) { if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && (e.getPartitionname() == null && ci.partName == null || e.getPartitionname().equals(ci.partName))) { return true; } } } return false; } } //... {code} Please let me know if this is the correct approach. was (Author: mahipal.jupalli): Hi, My idea is to replicate the same checks from the Initiator to the Worker logic. {code:title=org.apache.hadoop.hive.ql.txn.compactor.Initiator.java|borderStyle=solid} // Figure out if there are any currently running compactions on the same table or partition. private boolean lookForCurrentCompactions(ShowCompactResponse compactions, CompactionInfo ci) { if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && (e.getPartitionname() == null && ci.partName == null || e.getPartitionname().equals(ci.partName))) { return true; } } } return false; } public void run(){ //... if (lookForCurrentCompactions(currentCompactions, ci)) { LOG.debug("Found currently initiated or working compaction for " + ci.getFullPartitionName() + " so we will not initiate another compaction"); continue; } //... } {code} {code:title=org.apache.hadoop.hive.ql.txn.compactor.Worker.java|borderStyle=solid} public void run() { //... // This chicanery is to get around the fact that the table needs to be final in order to // go into the doAs below. final Table t = t1; ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); if (lookForCurrentCompactions(currentCompactions, ci)) { LOG.debug("Found currently initiated or working compaction for " + ci.getFullPartitionName() + " so we will not initiate another compaction"); continue; } // Find the partition we will be working with, if there is one. Partition p = null; //... //Figure out if there are any currently running compactions on the same table or partition. private boolean lookForCurrentCompactions(ShowCompactResponse compactions, CompactionInfo ci) { if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && (e.getPartitionname() == null && ci.partName == null || e.getPartitionname().equals(ci.partName))) { return true; } } } return false; } } //... {code} Please let me know if this is the correct approach. > Minor compaction when triggered simultaniously on the same table/partition > deletes data > --------------------------------------------------------------------------------------- > > Key: HIVE-14980 > URL: https://issues.apache.org/jira/browse/HIVE-14980 > Project: Hive > Issue Type: Bug > Components: Metastore > Affects Versions: 2.1.0 > Reporter: Mahipal Jupalli > Assignee: Mahipal Jupalli > Priority: Critical > Original Estimate: 96h > Remaining Estimate: 96h > > I have two tables (TABLEA, TABLEB). If I manually trigger compaction after > each INSERT into TABLEB from TABLEA, compactions are triggered on random > metastore asynchronously and are stepping on each other which is causing the > data to be deleted. > Example here: > TABLEA - has 10k rows. > insert into mj.tableb select * from mj.tablea; > alter table mj.tableb compact 'MINOR'; > insert into mj.tableb select * from mj.tablea; > alter table mj.tableb compact 'MINOR'; > Once all the compactions are complete, I should ideally see 20k rows in > TABLEB. But I see only 10k rows (Only the rows INSERTED before the last > compaction persist, the old rows are deleted. I believe the old delta files > are deleted). > To further confirm the bug, if I do only one compaction after two inserts, I > see 20k rows in TABLEB. > Proposed Fix: > I have identified the bug in the code, it requires an additional check in the > org.apache.hadoop.hive.ql.txn.compactor.Worker class to check for any active > compactions on the table/partition. I will 'share the details of the fix once > I test it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)