http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/core/src/main/thrift/tabletserver.thrift ---------------------------------------------------------------------- diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index 5dfcb4b..c6d442c 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -166,9 +166,14 @@ service TabletClientService extends client.ClientService { throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:ConstraintViolationException cve), - - list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:data.CMBatch mutations, 5:list<string> symbols) + + data.UpdateID startConditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:string tableID) throws (1:client.ThriftSecurityException sec); + + list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID, 3:data.CMBatch mutations, 4:list<string> symbols) + throws (1:NoSuchScanIDException nssi); + + void invalidateConditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID); // on success, returns an empty list list<data.TKeyExtent> bulkImport(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:i64 tid, 2:data.TabletFiles files, 5:bool setTime) throws (1:client.ThriftSecurityException sec),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index 465eb8e..94a8cd6 100644 --- a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -313,23 +313,10 @@ public class SecurityOperation { return hasTablePermission(credentials.getPrincipal(), table, TablePermission.WRITE, true); } - public boolean canConditionallyUpdate(TCredentials credentials, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols, - List<ByteBuffer> authorizations) throws ThriftSecurityException { - Set<TKeyExtent> ks = mutations.keySet(); - - byte[] table = null; - - for (TKeyExtent tke : ks) { - if (table == null) - table = tke.getTable(); - else if (!Arrays.equals(table, tke.getTable())) - return false; - } + public boolean canConditionallyUpdate(TCredentials credentials, String tableID, List<ByteBuffer> authorizations) throws ThriftSecurityException { authenticate(credentials); - String tableID = new String(table); - return hasTablePermission(credentials.getPrincipal(), tableID, TablePermission.WRITE, true) && hasTablePermission(credentials.getPrincipal(), tableID, TablePermission.READ, true); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index 4b905ce..ee1d1b6 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -397,9 +397,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } + synchronized Session reserveSession(long sessionId, boolean wait) { + Session session = sessions.get(sessionId); + if (session != null) { + while(wait && session.reserved){ + try { + wait(1000); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } + + if (session.reserved) + throw new IllegalStateException(); + session.reserved = true; + } + + return session; + + } + synchronized void unreserveSession(Session session) { if (!session.reserved) throw new IllegalStateException(); + notifyAll(); session.reserved = false; session.lastAccessTime = System.currentTimeMillis(); } @@ -409,7 +430,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu if (session != null) unreserveSession(session); } - + synchronized Session getSession(long sessionId) { Session session = sessions.get(sessionId); if (session != null) @@ -418,9 +439,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } Session removeSession(long sessionId) { + return removeSession(sessionId, false); + } + + Session removeSession(long sessionId, boolean unreserve) { Session session = null; synchronized (this) { session = sessions.remove(sessionId); + if(unreserve && session != null) + unreserveSession(session); } // do clean up out side of lock.. @@ -719,6 +746,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } + private static class ConditionalSession extends Session { + public TCredentials credentials; + public Authorizations auths; + public String tableId; + } + private static class UpdateSession extends Session { public Tablet currentTablet; public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>(); @@ -1856,7 +1889,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } - private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, List<ByteBuffer> authorizations, + private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, Authorizations authorizations, Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, List<String> symbols) { // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows. ConditionalMutationSet.sortConditionalMutations(updates); @@ -1869,7 +1902,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu // get as many locks as possible w/o blocking... defer any rows that are locked List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred); try { - checkConditions(updates, results, new Authorizations(authorizations), symbols); + checkConditions(updates, results, authorizations, symbols); writeConditionalMutations(updates, results, credentials); } finally { rowLocks.releaseRowLocks(locks); @@ -1878,11 +1911,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } @Override - public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, - Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) throws ThriftSecurityException { + public long startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID) throws ThriftSecurityException, TException { Authorizations userauths = null; - if (!security.canConditionallyUpdate(credentials, mutations, symbols, authorizations)) + if (!security.canConditionallyUpdate(credentials, tableID, authorizations)) throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); userauths = security.getUserAuthorizations(credentials); @@ -1890,23 +1922,58 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu if (!userauths.contains(ByteBufferUtil.toBytes(auth))) throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS); + ConditionalSession cs = new ConditionalSession(); + cs.auths = new Authorizations(authorizations); + cs.credentials = credentials; + cs.tableId = tableID; + + return sessionManager.createSession(cs, false); + } + + @Override + public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) + throws NoSuchScanIDException, TException { // TODO sessions, should show up in list scans // TODO timeout like scans do - Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET, - new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT)); + ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID); - ArrayList<TCMResult> results = new ArrayList<TCMResult>(); + if(cs == null) + throw new NoSuchScanIDException(); - Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(credentials, authorizations, updates, results, symbols); - - while (deferred.size() > 0) { - deferred = conditionalUpdate(credentials, authorizations, deferred, results, symbols); + + + try{ + Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET, + new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT)); + + Text tid = new Text(cs.tableId); + for(KeyExtent ke : updates.keySet()) + if(!ke.getTableId().equals(tid)) + throw new IllegalArgumentException("Unexpected table id "+tid+" != "+ke.getTableId()); + + ArrayList<TCMResult> results = new ArrayList<TCMResult>(); + + Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs.credentials, cs.auths, updates, results, symbols); + + while (deferred.size() > 0) { + deferred = conditionalUpdate(cs.credentials, cs.auths, deferred, results, symbols); + } + + return results; + }finally{ + sessionManager.removeSession(sessID, true); } - - return results; } + @Override + public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException { + //this method should wait for any running conditional update to complete + //after this method returns a conditional update should not be able to start + ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID, true); + if(cs != null) + sessionManager.removeSession(sessID, true); + } @Override public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException, http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index f8b32de..67e7249 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -52,6 +52,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; @@ -204,10 +205,21 @@ public class NullTserver { } @Override - public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, - Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) throws TException { + public long startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID) throws ThriftSecurityException, + TException { + return 0; + } + + @Override + public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) + throws NoSuchScanIDException, TException { return null; } + + @Override + public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException { + + } } static class Opts extends Help { http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec537137/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java index 33dc458..65a5636 100644 --- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java +++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java @@ -1127,6 +1127,7 @@ public class ConditionalWriterTest { TabletLocator locator = TabletLocator.getLocator(zki, new Text(Tables.getNameToIdMap(zki).get(table))); while (locator.locateTablet(new Text("a"), false, false, CredentialHelper.create("root", new PasswordToken(secret), zki.getInstanceID())) != null) { UtilWaitThread.sleep(50); + locator.invalidateCache(); } }