Author: kturner
Date: Wed Apr 10 17:37:58 2013
New Revision: 1466582

URL: http://svn.apache.org/r1466582
Log:
ACCUMULO-1044 fixed some issues w/ metadata constraint bulk flag check, made 
the check more strict, and added a lot test for it

Modified:
    
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
    
accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java

Modified: 
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java?rev=1466582&r1=1466581&r2=1466582&view=diff
==============================================================================
--- 
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
 (original)
+++ 
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
 Wed Apr 10 17:37:58 2013
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.apache.accumulo.server.zookeeper.ZooCache;
@@ -139,6 +140,8 @@ public class MetadataConstraints impleme
       violations = addViolation(violations, 5);
     }
     
+    boolean checkedBulk = false;
+
     for (ColumnUpdate columnUpdate : colUpdates) {
       Text columnFamily = new Text(columnUpdate.getColumnFamily());
       
@@ -168,7 +171,7 @@ public class MetadataConstraints impleme
       } else if 
(columnFamily.equals(Constants.METADATA_SCANFILE_COLUMN_FAMILY)) {
         
       } else if 
(columnFamily.equals(Constants.METADATA_BULKFILE_COLUMN_FAMILY)) {
-        if (!columnUpdate.isDeleted()) {
+        if (!columnUpdate.isDeleted() && !checkedBulk) {
           // splits, which also write the time reference, are allowed to write 
this reference even when
           // the transaction is not running because the other half of the 
tablet is holding a reference
           // to the file.
@@ -177,26 +180,42 @@ public class MetadataConstraints impleme
           // but it writes everything.  We allow it to re-write the bulk 
information if it is setting the location. 
           // See ACCUMULO-1230. 
           boolean isLocationMutation = false;
+          
+          HashSet<Text> dataFiles = new HashSet<Text>();
+          HashSet<Text> loadedFiles = new HashSet<Text>();
+
+          String tidString = new String(columnUpdate.getValue());
+          int otherTidCount = 0;
+
           for (ColumnUpdate update : mutation.getUpdates()) {
             if (new 
ColumnFQ(update).equals(Constants.METADATA_DIRECTORY_COLUMN)) {
               isSplitMutation = true;
-            }
-            if 
(update.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY))
 {
+            } else if (new 
Text(update.getColumnFamily()).equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY))
 {
               isLocationMutation = true;
+            } else if (new 
Text(update.getColumnFamily()).equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY))
 {
+              dataFiles.add(new Text(update.getColumnQualifier()));
+            } else if (new 
Text(update.getColumnFamily()).equals(Constants.METADATA_BULKFILE_COLUMN_FAMILY))
 {
+              loadedFiles.add(new Text(update.getColumnQualifier()));
+              
+              if (!new String(update.getValue()).equals(tidString)) {
+                otherTidCount++;
+              }
             }
           }
           
           if (!isSplitMutation && !isLocationMutation) {
-            String tidString = new String(columnUpdate.getValue());
             long tid = Long.parseLong(tidString);
+            
             try {
-              if (!new 
ZooArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
+              if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || 
!getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
                 violations = addViolation(violations, 8);
               }
             } catch (Exception ex) {
               violations = addViolation(violations, 8);
             }
           }
+          
+          checkedBulk = true;
         }
       } else {
         if (!isValidColumn(columnUpdate)) {
@@ -248,6 +267,10 @@ public class MetadataConstraints impleme
     return violations;
   }
   
+  protected Arbitrator getArbitrator() {
+    return new ZooArbitrator();
+  }
+
   public String getViolationDescription(short violationCode) {
     switch (violationCode) {
       case 1:

Modified: 
accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java?rev=1466582&r1=1466581&r2=1466582&view=diff
==============================================================================
--- 
accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
 (original)
+++ 
accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
 Wed Apr 10 17:37:58 2013
@@ -18,6 +18,7 @@ package org.apache.accumulo.server.const
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import java.util.List;
 
@@ -25,6 +26,7 @@ import org.apache.accumulo.core.Constant
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -32,6 +34,26 @@ import org.junit.Test;
 
 public class MetadataConstraintsTest {
   
+  static class TestMetadataConstraints extends MetadataConstraints {
+    @Override
+    protected Arbitrator getArbitrator() {
+      return new Arbitrator() {
+        
+        @Override
+        public boolean transactionAlive(String type, long tid) throws 
Exception {
+          if (tid == 9)
+            throw new RuntimeException("txid 9 reserved for future use");
+          return tid == 5 || tid == 7;
+        }
+        
+        @Override
+        public boolean transactionComplete(String type, long tid) throws 
Exception {
+          return tid != 5 && tid != 7;
+        }
+      };
+    }
+  }
+
   @Test
   public void testCheck() {
     Logger.getLogger(AccumuloConfiguration.class).setLevel(Level.ERROR);
@@ -106,13 +128,112 @@ public class MetadataConstraintsTest {
     assertEquals(1, violations.size());
     assertEquals(Short.valueOf((short) 4), violations.get(0));
 
+  }
+  
+  @Test
+  public void testBulkFileCheck() {
+    MetadataConstraints mc = new TestMetadataConstraints();
+    Mutation m;
+    List<Short> violations;
+
+    // inactive txid
     m = new Mutation(new Text("0;foo"));
     m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("12345".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("1,1".getBytes()));
     violations = mc.check(null, m);
-    
     assertNotNull(violations);
     assertEquals(1, violations.size());
     assertEquals(Short.valueOf((short)8), violations.get(0));
+    
+    // txid that throws exception
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("9".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // active txid w/ file
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // active txid w/o file
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("5".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+    
+    // two active txids w/ files
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("1,1".getBytes()));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), 
new Value("7".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile2"), 
new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+
+    // two files w/ one active txid
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("1,1".getBytes()));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), 
new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile2"), 
new Value("1,1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+
+    // two loaded w/ one active txid and one file
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("5".getBytes()));
+    m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("1,1".getBytes()));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), 
new Value("5".getBytes()));
+    violations = mc.check(null, m);
+    assertNotNull(violations);
+    assertEquals(1, violations.size());
+    assertEquals(Short.valueOf((short) 8), violations.get(0));
+
+    // active txid, mutation that looks like split
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("5".getBytes()));
+    Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // inactive txid, mutation that looks like split
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("12345".getBytes()));
+    Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // active txid, mutation that looks like a load
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("5".getBytes()));
+    m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("789"), 
new Value("127.0.0.1:9997".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // inactive txid, mutation that looks like a load
+    m = new Mutation(new Text("0;foo"));
+    m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), 
new Value("12345".getBytes()));
+    m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("789"), 
new Value("127.0.0.1:9997".getBytes()));
+    violations = mc.check(null, m);
+    assertNull(violations);
+    
+    // deleting a load flag
+    m = new Mutation(new Text("0;foo"));
+    m.putDelete(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new 
Text("/someFile"));
+    violations = mc.check(null, m);
+    assertNull(violations);
+
+
   }
   
 }


Reply via email to