http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java
new file mode 100644
index 0000000..d13f50e
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//package mvm.rya.accumulo;
+
+//
+//import com.google.common.collect.Iterators;
+//import com.google.common.io.ByteArrayDataInput;
+//import com.google.common.io.ByteStreams;
+//import info.aduna.iteration.CloseableIteration;
+//import mvm.rya.api.RdfCloudTripleStoreConstants;
+//import mvm.rya.api.RdfCloudTripleStoreUtils;
+//import mvm.rya.api.persist.RdfDAOException;
+//import mvm.rya.api.utils.NullableStatementImpl;
+//import org.apache.accumulo.core.client.*;
+//import org.apache.accumulo.core.data.Key;
+//import org.apache.accumulo.core.data.Range;
+//import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+//import org.apache.accumulo.core.iterators.user.TimestampFilter;
+//import org.apache.accumulo.core.security.Authorizations;
+//import org.apache.hadoop.io.Text;
+//import org.openrdf.model.Resource;
+//import org.openrdf.model.Statement;
+//import org.openrdf.model.URI;
+//import org.openrdf.model.Value;
+//import org.openrdf.query.BindingSet;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.IOException;
+//import java.util.Collection;
+//import java.util.Collections;
+//import java.util.HashSet;
+//import java.util.Iterator;
+//import java.util.Map.Entry;
+//
+//import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+//import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+//import static mvm.rya.api.RdfCloudTripleStoreUtils.writeValue;
+//
+//public class AccumuloRdfQueryIterator implements
+//        CloseableIteration<Entry<Statement, BindingSet>, RdfDAOException> {
+//
+//    protected final Logger logger = LoggerFactory.getLogger(getClass());
+//
+//    private boolean open = false;
+//    private Iterator result;
+//    private Resource[] contexts;
+//    private Collection<Entry<Statement, BindingSet>> statements;
+//    private int numOfThreads = 20;
+//
+//    private RangeBindingSetEntries rangeMap = new RangeBindingSetEntries();
+//    private ScannerBase scanner;
+//    private boolean isBatchScanner = true;
+//    private Statement statement;
+//    Iterator<BindingSet> iter_bss = null;
+//
+//    private boolean hasNext = true;
+//    private AccumuloRdfConfiguration conf;
+//    private TABLE_LAYOUT tableLayout;
+//    private Text context_txt;
+//
+//    private DefineTripleQueryRangeFactory queryRangeFactory = new 
DefineTripleQueryRangeFactory();
+//
+//    public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> 
statements, Connector connector, Resource... contexts)
+//            throws RdfDAOException {
+//        this(statements, connector, null, contexts);
+//    }
+//
+//    public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> 
statements, Connector connector,
+//                                    AccumuloRdfConfiguration conf, 
Resource... contexts)
+//            throws RdfDAOException {
+//        this.statements = statements;
+//        this.contexts = contexts;
+//        this.conf = conf;
+//        initialize(connector);
+//        open = true;
+//    }
+//
+//    public AccumuloRdfQueryIterator(Resource subject, URI predicate, Value 
object, Connector connector,
+//                                    AccumuloRdfConfiguration conf, 
Resource[] contexts) throws RdfDAOException {
+//        this(Collections.<Entry<Statement, BindingSet>>singleton(new 
RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(
+//                new NullableStatementImpl(subject, predicate, object, 
contexts),
+//                null)), connector, conf, contexts);
+//    }
+//
+//    protected void initialize(Connector connector)
+//            throws RdfDAOException {
+//        try {
+//            //TODO: We cannot span multiple tables here
+//            Collection<Range> ranges = new HashSet<Range>();
+//
+//            result = Iterators.emptyIterator();
+//            Long startTime = conf.getStartTime();
+//            Long ttl = conf.getTtl();
+//
+//            Resource context = null;
+//            for (Entry<Statement, BindingSet> stmtbs : statements) {
+//                Statement stmt = stmtbs.getKey();
+//                Resource subject = stmt.getSubject();
+//                URI predicate = stmt.getPredicate();
+//                Value object = stmt.getObject();
+//                context = stmt.getContext(); //TODO: assumes the same 
context for all statements
+//                logger.debug("Batch Scan, lookup subject[" + subject + "] 
predicate[" + predicate + "] object[" + object + "] combination");
+//
+//                Entry<TABLE_LAYOUT, Range> entry = 
queryRangeFactory.defineRange(subject, predicate, object, conf);
+//                tableLayout = entry.getKey();
+////                isTimeRange = isTimeRange || 
queryRangeFactory.isTimeRange();
+//                Range range = entry.getValue();
+//                ranges.add(range);
+//                rangeMap.ranges.add(new 
RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, 
stmtbs.getValue()));
+//            }
+//
+//            Authorizations authorizations = 
AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+//            String auth = conf.getAuth();
+//            if (auth != null) {
+//                authorizations = new Authorizations(auth.split(","));
+//            }
+//            String table = 
RdfCloudTripleStoreUtils.layoutToTable(tableLayout, conf);
+//            result = createScanner(connector, authorizations, table, 
context, startTime, ttl, ranges);
+////            if (isBatchScanner) {
+////                ((BatchScanner) scanner).setRanges(ranges);
+////            } else {
+////                for (Range range : ranges) {
+////                    ((Scanner) scanner).setRange(range); //TODO: Not good 
way of doing this
+////                }
+////            }
+////
+////            if (isBatchScanner) {
+////                result = ((BatchScanner) scanner).iterator();
+////            } else {
+////                result = ((Scanner) scanner).iterator();
+////            }
+//        } catch (Exception e) {
+//            throw new RdfDAOException(e);
+//        }
+//    }
+//
+//    protected Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> 
createScanner(Connector connector, Authorizations authorizations, String table, 
Resource context, Long startTime, Long ttl, Collection<Range> ranges) throws 
TableNotFoundException, IOException {
+////        ShardedConnector shardedConnector = new 
ShardedConnector(connector, 4, ta)
+//        if (rangeMap.ranges.size() > (numOfThreads / 2)) { //TODO: Arbitrary 
number, make configurable
+//            BatchScanner scannerBase = connector.createBatchScanner(table, 
authorizations, numOfThreads);
+//            scannerBase.setRanges(ranges);
+//            populateScanner(context, startTime, ttl, scannerBase);
+//            return scannerBase.iterator();
+//        } else {
+//            isBatchScanner = false;
+//            Iterator<Entry<Key, org.apache.accumulo.core.data.Value>>[] 
iters = new Iterator[ranges.size()];
+//            int i = 0;
+//            for (Range range : ranges) {
+//                Scanner scannerBase = connector.createScanner(table, 
authorizations);
+//                populateScanner(context, startTime, ttl, scannerBase);
+//                scannerBase.setRange(range);
+//                iters[i] = scannerBase.iterator();
+//                i++;
+//                scanner = scannerBase; //TODO: Always overridden, but 
doesn't matter since Scanner doesn't need to be closed
+//            }
+//            return Iterators.concat(iters);
+//        }
+//
+//    }
+//
+//    protected void populateScanner(Resource context, Long startTime, Long 
ttl, ScannerBase scannerBase) throws IOException {
+//        if (context != null) { //default graph
+//            context_txt = new Text(writeValue(context));
+//            scannerBase.fetchColumnFamily(context_txt);
+//        }
+//
+////        if (!isQueryTimeBased(conf)) {
+//        if (startTime != null && ttl != null) {
+////            scannerBase.setScanIterators(1, 
FilteringIterator.class.getName(), "filteringIterator");
+////            scannerBase.setScanIteratorOption("filteringIterator", "0", 
TimeRangeFilter.class.getName());
+////            scannerBase.setScanIteratorOption("filteringIterator", "0." + 
TimeRangeFilter.TIME_RANGE_PROP, ttl);
+////            scannerBase.setScanIteratorOption("filteringIterator", "0." + 
TimeRangeFilter.START_TIME_PROP, startTime);
+//            IteratorSetting setting = new IteratorSetting(1, "fi", 
TimestampFilter.class.getName());
+//            TimestampFilter.setStart(setting, startTime, true);
+//            TimestampFilter.setEnd(setting, startTime + ttl, true);
+//            scannerBase.addScanIterator(setting);
+//        } else if (ttl != null) {
+////                scannerBase.setScanIterators(1, 
FilteringIterator.class.getName(), "filteringIterator");
+////                scannerBase.setScanIteratorOption("filteringIterator", 
"0", AgeOffFilter.class.getName());
+////                scannerBase.setScanIteratorOption("filteringIterator", 
"0.ttl", ttl);
+//            IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class.getName());
+//            AgeOffFilter.setTTL(setting, ttl);
+//            scannerBase.addScanIterator(setting);
+//        }
+////        }
+//    }
+//
+//    @Override
+//    public void close() throws RdfDAOException {
+//        if (!open)
+//            return;
+//        verifyIsOpen();
+//        open = false;
+//        if (scanner != null && isBatchScanner) {
+//            ((BatchScanner) scanner).close();
+//        }
+//    }
+//
+//    public void verifyIsOpen() throws RdfDAOException {
+//        if (!open) {
+//            throw new RdfDAOException("Iterator not open");
+//        }
+//    }
+//
+//    @Override
+//    public boolean hasNext() throws RdfDAOException {
+//        try {
+//            if (!open)
+//                return false;
+//            verifyIsOpen();
+//            /**
+//             * For some reason, the result.hasNext returns false
+//             * once at the end of the iterator, and then true
+//             * for every subsequent call.
+//             */
+//            hasNext = (hasNext && result.hasNext());
+//            return hasNext || ((iter_bss != null) && iter_bss.hasNext());
+//        } catch (Exception e) {
+//            throw new RdfDAOException(e);
+//        }
+//    }
+//
+//    @Override
+//    public Entry<Statement, BindingSet> next() throws RdfDAOException {
+//        try {
+//            if (!this.hasNext())
+//                return null;
+//
+//            return getStatement(result, contexts);
+//        } catch (Exception e) {
+//            throw new RdfDAOException(e);
+//        }
+//    }
+//
+//    public Entry<Statement, BindingSet> getStatement(
+//            Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> 
rowResults,
+//            Resource... filterContexts) throws IOException {
+//        try {
+//            while (true) {
+//                if (iter_bss != null && iter_bss.hasNext()) {
+//                    return new 
RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(statement, 
iter_bss.next());
+//                }
+//
+//                if (rowResults.hasNext()) {
+//                    Entry<Key, org.apache.accumulo.core.data.Value> entry = 
rowResults.next();
+//                    Key key = entry.getKey();
+//                    ByteArrayDataInput input = 
ByteStreams.newDataInput(key.getRow().getBytes());
+//                    statement = 
RdfCloudTripleStoreUtils.translateStatementFromRow(input, 
key.getColumnFamily(), tableLayout, RdfCloudTripleStoreConstants.VALUE_FACTORY);
+//                    iter_bss = rangeMap.containsKey(key).iterator();
+//                } else
+//                    break;
+//            }
+//        } catch (Exception e) {
+//            throw new IOException(e);
+//        }
+//        return null;
+//    }
+//
+//    @Override
+//    public void remove() throws RdfDAOException {
+//        next();
+//    }
+//
+//    public int getNumOfThreads() {
+//        return numOfThreads;
+//    }
+//
+//    public void setNumOfThreads(int numOfThreads) {
+//        this.numOfThreads = numOfThreads;
+//    }
+//
+//    public DefineTripleQueryRangeFactory getQueryRangeFactory() {
+//        return queryRangeFactory;
+//    }
+//
+//    public void setQueryRangeFactory(DefineTripleQueryRangeFactory 
queryRangeFactory) {
+//        this.queryRangeFactory = queryRangeFactory;
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java
new file mode 100644
index 0000000..157fc5a
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java
@@ -0,0 +1,72 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.resolver.triple.TripleRow;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
+
+/**
+ * Class AccumuloRdfUtils
+ * Date: Mar 1, 2012
+ * Time: 7:15:54 PM
+ */
+public class AccumuloRdfUtils {
+    private static final Log logger = 
LogFactory.getLog(AccumuloRdfUtils.class);
+
+    public static void createTableIfNotExist(TableOperations tableOperations, 
String tableName) throws AccumuloException, AccumuloSecurityException, 
TableExistsException {
+        boolean tableExists = tableOperations.exists(tableName);
+        if (!tableExists) {
+            logger.debug("Creating accumulo table: " + tableName);
+            tableOperations.create(tableName);
+        }
+    }
+
+    public static Key from(TripleRow tripleRow) {
+        return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES),
+                defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES),
+                defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE));
+    }
+
+    public static Value extractValue(TripleRow tripleRow) {
+        return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES));
+    }
+
+    private static byte[] defaultTo(byte[] bytes, byte[] def) {
+        return bytes != null ? bytes : def;
+    }
+
+    private static Long defaultTo(Long l, Long def) {
+        return l != null ? l : def;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
new file mode 100644
index 0000000..195030e
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
@@ -0,0 +1,551 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.INFO_NAMESPACE_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_MEMORY;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_TIME;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA;
+import static 
mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.Namespace;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.experimental.AccumuloIndexer;
+import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.resolver.RyaTripleContext;
+
+public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, 
RyaNamespaceManager<AccumuloRdfConfiguration> {
+    private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
+
+    private boolean initialized = false;
+    private boolean flushEachUpdate = true;
+    private Connector connector;
+    private BatchWriterConfig batchWriterConfig;
+
+    private MultiTableBatchWriter mt_bw;
+
+    // Do not flush these individually
+    private BatchWriter bw_spo;
+    private BatchWriter bw_po;
+    private BatchWriter bw_osp;
+
+    private BatchWriter bw_ns;
+
+    private List<AccumuloIndexer> secondaryIndexers;
+
+    private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+    private RyaTableMutationsFactory ryaTableMutationsFactory;
+    private TableLayoutStrategy tableLayoutStrategy;
+    private AccumuloRyaQueryEngine queryEngine;
+    private RyaTripleContext ryaContext;
+
+    @Override
+    public boolean isInitialized() throws RyaDAOException {
+        return initialized;
+    }
+
+    @Override
+    public void init() throws RyaDAOException {
+        if (initialized) {
+            return;
+        }
+        try {
+            checkNotNull(conf);
+            checkNotNull(connector);
+
+            if(batchWriterConfig == null){
+                batchWriterConfig = new BatchWriterConfig();
+                batchWriterConfig.setMaxMemory(MAX_MEMORY);
+                batchWriterConfig.setTimeout(MAX_TIME, TimeUnit.MILLISECONDS);
+                batchWriterConfig.setMaxWriteThreads(NUM_THREADS);
+            }
+
+            tableLayoutStrategy = conf.getTableLayoutStrategy();
+            ryaContext = RyaTripleContext.getInstance(conf);
+            ryaTableMutationsFactory = new 
RyaTableMutationsFactory(ryaContext);
+
+            secondaryIndexers = conf.getAdditionalIndexers();
+
+            flushEachUpdate = conf.flushEachUpdate();
+
+            TableOperations tableOperations = connector.tableOperations();
+            AccumuloRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getSpo());
+            AccumuloRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getPo());
+            AccumuloRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getOsp());
+            AccumuloRdfUtils.createTableIfNotExist(tableOperations, 
tableLayoutStrategy.getNs());
+
+            for (AccumuloIndexer index : secondaryIndexers) {
+                index.setConf(conf);
+            }
+
+            mt_bw = connector.createMultiTableBatchWriter(batchWriterConfig);
+
+            //get the batch writers for tables
+            bw_spo = mt_bw.getBatchWriter(tableLayoutStrategy.getSpo());
+            bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo());
+            bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp());
+
+            bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
+
+            for (AccumuloIndexer index : secondaryIndexers) {
+               index.setConnector(connector);
+               index.setMultiTableBatchWriter(mt_bw);
+               index.init();
+            }
+
+            queryEngine = new AccumuloRyaQueryEngine(connector, conf);
+
+            checkVersion();
+
+            initialized = true;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+       public String getVersion() throws RyaDAOException {
+        String version = null;
+        CloseableIteration<RyaStatement, RyaDAOException> versIter = 
queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, 
null), conf);
+        if (versIter.hasNext()) {
+            version = versIter.next().getObject().getData();
+        }
+        versIter.close();
+
+        return version;
+    }
+
+    @Override
+    public void add(RyaStatement statement) throws RyaDAOException {
+        commit(Iterators.singletonIterator(statement));
+    }
+
+    @Override
+    public void add(Iterator<RyaStatement> iter) throws RyaDAOException {
+        commit(iter);
+    }
+
+    @Override
+    public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) 
throws RyaDAOException {
+        this.delete(Iterators.singletonIterator(stmt), aconf);
+    }
+
+    @Override
+    public void delete(Iterator<RyaStatement> statements, 
AccumuloRdfConfiguration conf) throws RyaDAOException {
+        try {
+            while (statements.hasNext()) {
+                RyaStatement stmt = statements.next();
+                //query first
+                CloseableIteration<RyaStatement, RyaDAOException> query = 
this.queryEngine.query(stmt, conf);
+                while (query.hasNext()) {
+                    deleteSingleRyaStatement(query.next());
+                }
+
+                for (AccumuloIndexer index : secondaryIndexers) {
+                    index.deleteStatement(stmt);
+                }
+            }
+            if (flushEachUpdate) { mt_bw.flush(); }
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) 
throws RyaDAOException {
+        BatchDeleter bd_spo = null;
+        BatchDeleter bd_po = null;
+        BatchDeleter bd_osp = null;
+
+        try {
+            bd_spo = createBatchDeleter(tableLayoutStrategy.getSpo(), 
conf.getAuthorizations());
+            bd_po = createBatchDeleter(tableLayoutStrategy.getPo(), 
conf.getAuthorizations());
+            bd_osp = createBatchDeleter(tableLayoutStrategy.getOsp(), 
conf.getAuthorizations());
+
+            bd_spo.setRanges(Collections.singleton(new Range()));
+            bd_po.setRanges(Collections.singleton(new Range()));
+            bd_osp.setRanges(Collections.singleton(new Range()));
+
+            for (RyaURI graph : graphs){
+                bd_spo.fetchColumnFamily(new Text(graph.getData()));
+                bd_po.fetchColumnFamily(new Text(graph.getData()));
+                bd_osp.fetchColumnFamily(new Text(graph.getData()));
+            }
+
+            bd_spo.delete();
+            bd_po.delete();
+            bd_osp.delete();
+
+            //TODO indexers do not support delete-UnsupportedOperation 
Exception will be thrown
+//            for (AccumuloIndex index : secondaryIndexers) {
+//                index.dropGraph(graphs);
+//            }
+
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        } finally {
+            if (bd_spo != null) {
+                bd_spo.close();
+            }
+            if (bd_po != null) {
+                bd_po.close();
+            }
+            if (bd_osp != null) {
+                bd_osp.close();
+            }
+        }
+
+    }
+
+    protected void deleteSingleRyaStatement(RyaStatement stmt) throws 
IOException, MutationsRejectedException {
+        Map<TABLE_LAYOUT, Collection<Mutation>> map = 
ryaTableMutationsFactory.serializeDelete(stmt);
+        bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO));
+        bw_po.addMutations(map.get(TABLE_LAYOUT.PO));
+        bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP));
+    }
+
+    protected void commit(Iterator<RyaStatement> commitStatements) throws 
RyaDAOException {
+        try {
+            //TODO: Should have a lock here in case we are adding and 
committing at the same time
+            while (commitStatements.hasNext()) {
+                RyaStatement stmt = commitStatements.next();
+
+                Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = 
ryaTableMutationsFactory.serialize(stmt);
+                Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+                Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+                Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+                bw_spo.addMutations(spo);
+                bw_po.addMutations(po);
+                bw_osp.addMutations(osp);
+
+                for (AccumuloIndexer index : secondaryIndexers) {
+                    index.storeStatement(stmt);
+                }
+            }
+
+            if (flushEachUpdate) { mt_bw.flush(); }
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public void destroy() throws RyaDAOException {
+        if (!initialized) {
+            return;
+        }
+        //TODO: write lock
+        try {
+            initialized = false;
+            mt_bw.flush();
+
+            mt_bw.close();
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+            try {
+                indexer.destroy();
+            } catch(Exception e) {
+                logger.warn("Failed to destroy indexer", e);
+            }
+        }
+    }
+
+    @Override
+    public void addNamespace(String pfx, String namespace) throws 
RyaDAOException {
+        try {
+            Mutation m = new Mutation(new Text(pfx));
+            m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new 
Value(namespace.getBytes()));
+            bw_ns.addMutation(m);
+            if (flushEachUpdate) { mt_bw.flush(); }
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public String getNamespace(String pfx) throws RyaDAOException {
+        try {
+            Scanner scanner = 
connector.createScanner(tableLayoutStrategy.getNs(),
+                    ALL_AUTHORIZATIONS);
+            scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT);
+            scanner.setRange(new Range(new Text(pfx)));
+            Iterator<Map.Entry<Key, Value>> iterator = scanner
+                    .iterator();
+
+            if (iterator.hasNext()) {
+                return new String(iterator.next().getValue().get());
+            }
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+        return null;
+    }
+
+    @Override
+    public void removeNamespace(String pfx) throws RyaDAOException {
+        try {
+            Mutation del = new Mutation(new Text(pfx));
+            del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
+            bw_ns.addMutation(del);
+            if (flushEachUpdate) { mt_bw.flush(); }
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() 
throws RyaDAOException {
+        try {
+            Scanner scanner = 
connector.createScanner(tableLayoutStrategy.getNs(),
+                    ALL_AUTHORIZATIONS);
+            scanner.fetchColumnFamily(INFO_NAMESPACE_TXT);
+            Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
+            return new AccumuloNamespaceTableIterator(result);
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public RyaNamespaceManager<AccumuloRdfConfiguration> getNamespaceManager() 
{
+        return this;
+    }
+
+    @Override
+    public void purge(RdfCloudTripleStoreConfiguration configuration) {
+        for (String tableName : getTables()) {
+            try {
+                purge(tableName, configuration.getAuths());
+                compact(tableName);
+            } catch (TableNotFoundException e) {
+                logger.error(e.getMessage());
+            } catch (MutationsRejectedException e) {
+                logger.error(e.getMessage());
+            }
+        }
+        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+            try {
+                indexer.purge(configuration);
+            } catch(Exception e) {
+                logger.error("Failed to purge indexer", e);
+            }
+        }
+    }
+
+    @Override
+    public void dropAndDestroy() throws RyaDAOException {
+        for (String tableName : getTables()) {
+            try {
+                drop(tableName);
+            } catch (AccumuloSecurityException e) {
+                logger.error(e.getMessage());
+                throw new RyaDAOException(e);
+            } catch (AccumuloException e) {
+                logger.error(e.getMessage());
+                throw new RyaDAOException(e);
+            } catch (TableNotFoundException e) {
+                logger.warn(e.getMessage());
+            }
+        }
+        destroy();
+        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+            try {
+                indexer.dropAndDestroy();
+            } catch(Exception e) {
+                logger.error("Failed to drop and destroy indexer", e);
+            }
+        }
+    }
+
+    public Connector getConnector() {
+        return connector;
+    }
+
+    public void setConnector(Connector connector) {
+        this.connector = connector;
+    }
+
+    public BatchWriterConfig getBatchWriterConfig(){
+        return batchWriterConfig;
+    }
+
+    public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
+        this.batchWriterConfig = batchWriterConfig;
+    }
+
+    protected MultiTableBatchWriter getMultiTableBatchWriter(){
+       return mt_bw;
+    }
+
+    @Override
+       public AccumuloRdfConfiguration getConf() {
+        return conf;
+    }
+
+    @Override
+       public void setConf(AccumuloRdfConfiguration conf) {
+        this.conf = conf;
+    }
+
+    public RyaTableMutationsFactory getRyaTableMutationsFactory() {
+        return ryaTableMutationsFactory;
+    }
+
+    public void setRyaTableMutationsFactory(RyaTableMutationsFactory 
ryaTableMutationsFactory) {
+        this.ryaTableMutationsFactory = ryaTableMutationsFactory;
+    }
+
+    @Override
+       public AccumuloRyaQueryEngine getQueryEngine() {
+        return queryEngine;
+    }
+
+    public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) {
+        this.queryEngine = queryEngine;
+    }
+
+    public void flush() throws RyaDAOException {
+        try {
+            mt_bw.flush();
+        } catch (MutationsRejectedException e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    protected String[] getTables() {
+        // core tables
+        List<String> tableNames = Lists.newArrayList(
+                tableLayoutStrategy.getSpo(),
+                tableLayoutStrategy.getPo(),
+                tableLayoutStrategy.getOsp(),
+                tableLayoutStrategy.getNs(),
+                tableLayoutStrategy.getEval());
+
+        // Additional Tables
+        for (AccumuloIndexer index : secondaryIndexers) {
+            tableNames.add(index.getTableName());
+        }
+
+        return tableNames.toArray(new String[]{});
+    }
+
+    private void purge(String tableName, String[] auths) throws 
TableNotFoundException, MutationsRejectedException {
+        if (tableExists(tableName)) {
+            logger.info("Purging accumulo table: " + tableName);
+            BatchDeleter batchDeleter = createBatchDeleter(tableName, new 
Authorizations(auths));
+            try {
+                batchDeleter.setRanges(Collections.singleton(new Range()));
+                batchDeleter.delete();
+            } finally {
+                batchDeleter.close();
+            }
+        }
+    }
+
+    private void compact(String tableName) {
+        logger.info("Requesting major compaction for table " + tableName);
+        try {
+            connector.tableOperations().compact(tableName, null, null, true, 
false);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    private boolean tableExists(String tableName) {
+        return getConnector().tableOperations().exists(tableName);
+    }
+
+    private BatchDeleter createBatchDeleter(String tableName, Authorizations 
authorizations) throws TableNotFoundException {
+        return connector.createBatchDeleter(tableName, authorizations, 
NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
+    }
+
+    private void checkVersion() throws RyaDAOException, IOException, 
MutationsRejectedException {
+        String version = getVersion();
+        if (version == null) {
+            //adding to core Rya tables but not Indexes
+            Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = 
ryaTableMutationsFactory.serialize(getVersionRyaStatement());
+            Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+            Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+            Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+            bw_spo.addMutations(spo);
+            bw_po.addMutations(po);
+            bw_osp.addMutations(osp);
+        }
+        //TODO: Do a version check here
+    }
+
+    protected RyaStatement getVersionRyaStatement() {
+        return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, 
VERSION_RYA);
+    }
+
+    private void drop(String tableName) throws TableNotFoundException, 
AccumuloException, AccumuloSecurityException {
+        logger.info("Dropping cloudbase table: " + tableName);
+        connector.tableOperations().delete(tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java
new file mode 100644
index 0000000..b5a4e84
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//package mvm.rya.accumulo;
+
+//
+//import com.google.common.io.ByteArrayDataOutput;
+//import com.google.common.io.ByteStreams;
+//import mvm.rya.api.RdfCloudTripleStoreUtils;
+//import mvm.rya.api.domain.RangeValue;
+//import org.apache.accumulo.core.data.Range;
+//import org.apache.hadoop.io.Text;
+//import org.openrdf.model.Value;
+//import org.openrdf.model.ValueFactory;
+//import org.openrdf.model.impl.ValueFactoryImpl;
+//
+//import java.io.IOException;
+//import java.util.Map;
+//
+//import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
+//import static mvm.rya.api.RdfCloudTripleStoreUtils.CustomEntry;
+//
+///**
+// * Class DefineTripleQueryRangeFactory
+// * Date: Jun 2, 2011
+// * Time: 10:35:43 AM
+// */
+//public class DefineTripleQueryRangeFactory {
+//
+//    ValueFactory vf = ValueFactoryImpl.getInstance();
+//
+//    protected void fillRange(ByteArrayDataOutput startRowOut, 
ByteArrayDataOutput endRowOut, Value val, boolean empty)
+//            throws IOException {
+//        if(!empty) {
+//            startRowOut.write(DELIM_BYTES);
+//            endRowOut.write(DELIM_BYTES);
+//        }
+//        //null check?
+//        if(val instanceof RangeValue) {
+//            RangeValue rangeValue = (RangeValue) val;
+//            Value start = rangeValue.getStart();
+//            Value end = rangeValue.getEnd();
+//            byte[] start_val_bytes = 
RdfCloudTripleStoreUtils.writeValue(start);
+//            byte[] end_val_bytes = RdfCloudTripleStoreUtils.writeValue(end);
+//            startRowOut.write(start_val_bytes);
+//            endRowOut.write(end_val_bytes);
+//        } else {
+//            byte[] val_bytes = RdfCloudTripleStoreUtils.writeValue(val);
+//            startRowOut.write(val_bytes);
+//            endRowOut.write(val_bytes);
+//        }
+//    }
+//
+//    public Map.Entry<TABLE_LAYOUT, Range> defineRange(Value subject, Value 
predicate, Value object, AccumuloRdfConfiguration conf)
+//            throws IOException {
+//
+//        byte[] startrow, stoprow;
+//        ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput();
+//        ByteArrayDataOutput stopRowOut = ByteStreams.newDataOutput();
+//        Range range;
+//        TABLE_LAYOUT tableLayout;
+//
+//        if (subject != null) {
+//            /**
+//             * Case: s
+//             * Table: spo
+//             * Want this to be the first if statement since it will be most 
likely the most asked for table
+//             */
+//            tableLayout = TABLE_LAYOUT.SPO;
+//            fillRange(startRowOut, stopRowOut, subject, true);
+//            if (predicate != null) {
+//                /**
+//                 * Case: sp
+//                 * Table: spo
+//                 */
+//                fillRange(startRowOut, stopRowOut, predicate, false);
+//                if (object != null) {
+//                    /**
+//                     * Case: spo
+//                     * Table: spo
+//                     */
+//                    fillRange(startRowOut, stopRowOut, object, false);
+//                }
+//            } else if (object != null) {
+//                /**
+//                 * Case: so
+//                 * Table: osp
+//                 * Very rare case. Could have put this in the OSP if clause, 
but I wanted to reorder the if statement
+//                 * for best performance. The SPO table probably gets the 
most scans, so I want it to be the first if
+//                 * statement in the branch.
+//                 */
+//                tableLayout = TABLE_LAYOUT.OSP;
+//                startRowOut = ByteStreams.newDataOutput();
+//                stopRowOut = ByteStreams.newDataOutput();
+//                fillRange(startRowOut, stopRowOut, object, true);
+//                fillRange(startRowOut, stopRowOut, subject, false);
+//            }
+//        } else if (predicate != null) {
+//            /**
+//             * Case: p
+//             * Table: po
+//             * Wanted this to be the second if statement, since it will be 
the second most asked for table
+//             */
+//            tableLayout = TABLE_LAYOUT.PO;
+//            fillRange(startRowOut, stopRowOut, predicate, true);
+//            if (object != null) {
+//                /**
+//                 * Case: po
+//                 * Table: po
+//                 */
+//                fillRange(startRowOut, stopRowOut, object, false);
+//            }
+//        } else if (object != null) {
+//            /**
+//             * Case: o
+//             * Table: osp
+//             * Probably a pretty rare scenario
+//             */
+//            tableLayout = TABLE_LAYOUT.OSP;
+//            fillRange(startRowOut, stopRowOut, object, true);
+//        } else {
+//            tableLayout = TABLE_LAYOUT.SPO;
+//            stopRowOut.write(Byte.MAX_VALUE);
+//        }
+//
+//        startrow = startRowOut.toByteArray();
+//        stopRowOut.write(DELIM_STOP_BYTES);
+//        stoprow = stopRowOut.toByteArray();
+//        Text startRowTxt = new Text(startrow);
+//        Text stopRowTxt = new Text(stoprow);
+//        range = new Range(startRowTxt, stopRowTxt);
+//
+//        return new CustomEntry<TABLE_LAYOUT, Range>(tableLayout, range);
+//    }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java
new file mode 100644
index 0000000..574029e
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java
@@ -0,0 +1,115 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+public class RyaTableKeyValues {
+    public static final ColumnVisibility EMPTY_CV = new ColumnVisibility();
+    public static final Text EMPTY_CV_TEXT = new 
Text(EMPTY_CV.getExpression());
+
+    RyaTripleContext instance;
+
+    private RyaStatement stmt;
+    private Collection<Map.Entry<Key, Value>> spo = new 
ArrayList<Map.Entry<Key, Value>>();
+    private Collection<Map.Entry<Key, Value>> po = new 
ArrayList<Map.Entry<Key, Value>>();
+    private Collection<Map.Entry<Key, Value>> osp = new 
ArrayList<Map.Entry<Key, Value>>();
+
+    public RyaTableKeyValues(RyaStatement stmt, 
RdfCloudTripleStoreConfiguration conf) {
+        this.stmt = stmt;
+        this.instance = RyaTripleContext.getInstance(conf);
+    }
+
+    public Collection<Map.Entry<Key, Value>> getSpo() {
+        return spo;
+    }
+
+    public Collection<Map.Entry<Key, Value>> getPo() {
+        return po;
+    }
+
+    public Collection<Map.Entry<Key, Value>> getOsp() {
+        return osp;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public RyaTableKeyValues invoke() throws IOException {
+        /**
+         * TODO: If there are contexts, do we still replicate the information 
into the default graph as well
+         * as the named graphs?
+         */try {
+            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt);
+            TripleRow tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+            byte[] columnVisibility = tripleRow.getColumnVisibility();
+            Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new 
Text(columnVisibility);
+            Long timestamp = tripleRow.getTimestamp();
+            timestamp = timestamp == null ? 0l : timestamp;
+            byte[] value = tripleRow.getValue();
+            Value v = value == null ? EMPTY_VALUE : new Value(value);
+            spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+            tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
+            po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+            tripleRow = 
rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
+            osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+                    new Text(tripleRow.getColumnFamily()),
+                    new Text(tripleRow.getColumnQualifier()),
+                    cv, timestamp), v));
+        } catch (TripleRowResolverException e) {
+            throw new IOException(e);
+        }
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "RyaTableKeyValues{" +
+                "statement=" + stmt +
+                ", spo=" + spo +
+                ", po=" + po +
+                ", o=" + osp +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java
new file mode 100644
index 0000000..2a4871d
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java
@@ -0,0 +1,148 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+public class RyaTableMutationsFactory {
+
+    RyaTripleContext ryaContext;
+
+    public RyaTableMutationsFactory(RyaTripleContext ryaContext) {
+       this.ryaContext = ryaContext;
+    }
+
+    //TODO: Does this still need to be collections
+    public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> serialize(
+            RyaStatement stmt) throws IOException {
+
+        Collection<Mutation> spo_muts = new ArrayList<Mutation>();
+        Collection<Mutation> po_muts = new ArrayList<Mutation>();
+        Collection<Mutation> osp_muts = new ArrayList<Mutation>();
+        /**
+         * TODO: If there are contexts, do we still replicate the information 
into the default graph as well
+         * as the named graphs?
+         */
+        try {
+            Map<TABLE_LAYOUT, TripleRow> rowMap = 
ryaContext.serializeTriple(stmt);
+            TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO);
+            spo_muts.add(createMutation(tripleRow));
+            tripleRow = rowMap.get(TABLE_LAYOUT.PO);
+            po_muts.add(createMutation(tripleRow));
+            tripleRow = rowMap.get(TABLE_LAYOUT.OSP);
+            osp_muts.add(createMutation(tripleRow));
+        } catch (TripleRowResolverException fe) {
+            throw new IOException(fe);
+        }
+
+        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> 
mutations =
+                new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>>();
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts);
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts);
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts);
+
+        return mutations;
+    }
+
+    public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> serializeDelete(
+            RyaStatement stmt) throws IOException {
+
+        Collection<Mutation> spo_muts = new ArrayList<Mutation>();
+        Collection<Mutation> po_muts = new ArrayList<Mutation>();
+        Collection<Mutation> osp_muts = new ArrayList<Mutation>();
+        /**
+         * TODO: If there are contexts, do we still replicate the information 
into the default graph as well
+         * as the named graphs?
+         */
+        try {
+            Map<TABLE_LAYOUT, TripleRow> rowMap = 
ryaContext.serializeTriple(stmt);
+            TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO);
+            spo_muts.add(deleteMutation(tripleRow));
+            tripleRow = rowMap.get(TABLE_LAYOUT.PO);
+            po_muts.add(deleteMutation(tripleRow));
+            tripleRow = rowMap.get(TABLE_LAYOUT.OSP);
+            osp_muts.add(deleteMutation(tripleRow));
+        } catch (TripleRowResolverException fe) {
+            throw new IOException(fe);
+        }
+
+        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> 
mutations =
+                new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>>();
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts);
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts);
+        mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts);
+
+        return mutations;
+
+    }
+
+    protected Mutation deleteMutation(TripleRow tripleRow) {
+        Mutation m = new Mutation(new Text(tripleRow.getRow()));
+
+        byte[] columnFamily = tripleRow.getColumnFamily();
+        Text cfText = columnFamily == null ? EMPTY_TEXT : new 
Text(columnFamily);
+
+        byte[] columnQualifier = tripleRow.getColumnQualifier();
+        Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+
+        m.putDelete(cfText, cqText, new 
ColumnVisibility(tripleRow.getColumnVisibility()),
+                tripleRow.getTimestamp());
+        return m;
+    }
+
+    protected Mutation createMutation(TripleRow tripleRow) {
+        Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
+        byte[] columnVisibility = tripleRow.getColumnVisibility();
+        ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new 
ColumnVisibility(columnVisibility);
+        Long timestamp = tripleRow.getTimestamp();
+        byte[] value = tripleRow.getValue();
+        Value v = value == null ? EMPTY_VALUE : new Value(value);
+        byte[] columnQualifier = tripleRow.getColumnQualifier();
+        Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+        byte[] columnFamily = tripleRow.getColumnFamily();
+        Text cfText = columnFamily == null ? EMPTY_TEXT : new 
Text(columnFamily);
+
+        mutation.put(cfText, cqText, cv, timestamp, v);
+        return mutation;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java
new file mode 100644
index 0000000..5df5da9
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java
@@ -0,0 +1,59 @@
+package mvm.rya.accumulo.experimental;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+import java.util.Collection;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+
+public abstract class AbstractAccumuloIndexer implements AccumuloIndexer {
+
+    @Override
+    public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws 
IOException {
+    }
+
+    @Override
+    public void storeStatements(Collection<RyaStatement> statements) throws 
IOException {
+        for (RyaStatement s : statements) {
+            storeStatement(s);
+        }
+    }
+
+    @Override
+    public void deleteStatement(RyaStatement stmt) throws IOException {
+    }
+
+    @Override
+    public void dropGraph(RyaURI... graphs) {
+    }
+
+    @Override
+    public void flush() throws IOException {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java
new file mode 100644
index 0000000..5581e08
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java
@@ -0,0 +1,38 @@
+package mvm.rya.accumulo.experimental;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+public interface AccumuloIndexer extends RyaSecondaryIndexer {
+    public void init();    
+    public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws 
IOException;
+    public void setConnector(Connector connector);
+    public void destroy();
+    public void purge(RdfCloudTripleStoreConfiguration configuration);
+    public void dropAndDestroy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
new file mode 100644
index 0000000..6e818b3
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
@@ -0,0 +1,229 @@
+package mvm.rya.accumulo.instance;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map.Entry;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Result;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetailsRepository;
+
+/**
+ * An implementation of {@link RyaDetailsRepository} that stores a Rya
+ * instance's {@link RyaDetails} in an Accumulo table.
+ * </p>
+ * XXX
+ * This implementation writes the details object as a serialized byte array to
+ * a row in Accumulo. Storing the entire structure within a single value is
+ * attractive because Accumulo's conditional writer will let us do checkAndSet
+ * style operations to synchronize writes to the object. On the downside, only
+ * Java clients will work.
+ */
+@ParametersAreNonnullByDefault
+public class AccumuloRyaInstanceDetailsRepository implements 
RyaDetailsRepository {
+
+    public static final String INSTANCE_DETAILS_TABLE_NAME = 
"instance_details";
+
+    private static final Text ROW_ID = new Text("instance metadata");
+    private static final Text COL_FAMILY = new Text("instance");
+    private static final Text COL_QUALIFIER = new Text("details");
+
+    private final RyaDetailsSerializer serializer = new RyaDetailsSerializer();
+
+    private final Connector connector;
+    private final String instanceName;
+    private final String detailsTableName;
+
+
+    /**
+     * Constructs an instance of {@link AccumuloRyaInstanceDetailsRepository}.
+     *
+     * @param connector - Connects to the instance of Accumulo that hosts the 
Rya instance. (not null)
+     * @param instanceName - The name of the Rya instance this repository 
represents. (not null)
+     */
+    public AccumuloRyaInstanceDetailsRepository(final Connector connector, 
final String instanceName) {
+        this.connector = requireNonNull( connector );
+        this.instanceName = requireNonNull( instanceName );
+        this.detailsTableName = instanceName + INSTANCE_DETAILS_TABLE_NAME;
+    }
+
+    @Override
+    public boolean isInitialized() throws RyaDetailsRepositoryException {
+        try {
+            final Scanner scanner = connector.createScanner(detailsTableName, 
new Authorizations());
+            scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
+            return scanner.iterator().hasNext();
+        } catch (final TableNotFoundException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public void initialize(final RyaDetails details) throws 
AlreadyInitializedException, RyaDetailsRepositoryException {
+        // Preconditions.
+        requireNonNull( details );
+
+        if(!details.getRyaInstanceName().equals( instanceName )) {
+            throw new RyaDetailsRepositoryException("The instance name that 
was in the provided 'details' does not match " +
+                    "the instance name that this repository is connected to. 
Make sure you're connected to the" +
+                    "correct Rya instance.");
+        }
+
+        if(isInitialized()) {
+            throw new AlreadyInitializedException("The repository has already 
been initialized for the Rya instance named '" +
+                    instanceName + "'.");
+        }
+
+        // Create the table that hosts the details if it has not been created 
yet.
+        final TableOperations tableOps = connector.tableOperations();
+        if(!tableOps.exists(detailsTableName)) {
+            try {
+                tableOps.create(detailsTableName);
+            } catch (AccumuloException | AccumuloSecurityException | 
TableExistsException e) {
+                throw new RyaDetailsRepositoryException("Could not initialize 
the Rya instance details for the instance named '" +
+                        instanceName + "' because the the table that holds 
that information could not be created.");
+            }
+        }
+
+        // Write the details to the table.
+        BatchWriter writer = null;
+        try {
+            writer = connector.createBatchWriter(detailsTableName, new 
BatchWriterConfig());
+
+            final byte[] bytes = serializer.serialize(details);
+            final Mutation mutation = new Mutation(ROW_ID);
+            mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(bytes));
+            writer.addMutation( mutation );
+
+        } catch (final TableNotFoundException | MutationsRejectedException e) {
+            throw new RyaDetailsRepositoryException("Could not initialize the 
Rya instance details for the instance named '" + instanceName + "'.", e);
+        } finally {
+            if(writer != null) {
+                try {
+                    writer.close();
+                } catch (final MutationsRejectedException e) {
+                    throw new RyaDetailsRepositoryException("Could not 
initialize the Rya instance details for the instance named '" + instanceName + 
"'.", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public RyaDetails getRyaInstanceDetails() throws NotInitializedException, 
RyaDetailsRepositoryException {
+        // Preconditions.
+        if(!isInitialized()) {
+            throw new NotInitializedException("Could not fetch the details for 
the Rya instanced named '" +
+                    instanceName + "' because it has not been initialized 
yet.");
+        }
+
+        // Read it from the table.
+        try {
+            // Fetch the value from the table.
+            final Scanner scanner = connector.createScanner(detailsTableName, 
new Authorizations());
+            scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
+            final Entry<Key, Value> entry = scanner.iterator().next();
+
+            // Deserialize it.
+            final byte[] bytes = entry.getValue().get();
+            return serializer.deserialize( bytes );
+
+        } catch (final TableNotFoundException e) {
+            throw new RyaDetailsRepositoryException("Could not get the details 
from the table.", e);
+        }
+    }
+
+    @Override
+    public void update(final RyaDetails oldDetails, final RyaDetails 
newDetails)
+            throws NotInitializedException, ConcurrentUpdateException, 
RyaDetailsRepositoryException {
+        // Preconditions.
+        requireNonNull(oldDetails);
+        requireNonNull(newDetails);
+
+        if(!newDetails.getRyaInstanceName().equals( instanceName )) {
+            throw new RyaDetailsRepositoryException("The instance name that 
was in the provided 'newDetails' does not match " +
+                    "the instance name that this repository is connected to. 
Make sure you're connected to the" +
+                    "correct Rya instance.");
+        }
+
+        if(!isInitialized()) {
+            throw new NotInitializedException("Could not update the details 
for the Rya instanced named '" +
+                    instanceName + "' because it has not been initialized 
yet.");
+        }
+
+        // Use a conditional writer so that we can detect when the old details
+        // are no longer the currently stored ones.
+        ConditionalWriter writer = null;
+        try {
+            // Setup the condition that ensures the details have not changed 
since the edits were made.
+            final byte[] oldDetailsBytes = serializer.serialize(oldDetails);
+            final Condition condition = new Condition(COL_FAMILY, 
COL_QUALIFIER);
+            condition.setValue( oldDetailsBytes );
+
+            // Create the mutation that only performs the update if the 
details haven't changed.
+            final ConditionalMutation mutation = new 
ConditionalMutation(ROW_ID);
+            mutation.addCondition( condition );
+            final byte[] newDetailsBytes = serializer.serialize(newDetails);
+            mutation.put(COL_FAMILY, COL_QUALIFIER, new 
Value(newDetailsBytes));
+
+            // Do the write.
+            writer = connector.createConditionalWriter(detailsTableName, new 
ConditionalWriterConfig());
+            final Result result = writer.write(mutation);
+            switch(result.getStatus()) {
+                case REJECTED:
+                case VIOLATED:
+                    throw new ConcurrentUpdateException("Could not update the 
details for the Rya instance named '" +
+                            instanceName + "' because the old value is out of 
date.");
+                case UNKNOWN:
+                case INVISIBLE_VISIBILITY:
+                    throw new RyaDetailsRepositoryException("Could not update 
the details for the Rya instance named '" + instanceName + "'.");
+            }
+        } catch (final TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
+            throw new RyaDetailsRepositoryException("Could not update the 
details for the Rya instance named '" + instanceName + "'.");
+        } finally {
+            if(writer != null) {
+                writer.close();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java
 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java
new file mode 100644
index 0000000..8c863ea
--- /dev/null
+++ 
b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java
@@ -0,0 +1,96 @@
+package mvm.rya.accumulo.instance;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+
+/**
+ * Serializes {@link RyaDetails} instances.
+ */
+@ParametersAreNonnullByDefault
+public class RyaDetailsSerializer {
+
+    /**
+     * Serializes an instance of {@link RyaDetails}.
+     *
+     * @param details - The details that will be serialized. (not null)
+     * @return The serialized details.
+     */
+    public byte[] serialize(final RyaDetails details) throws 
SerializationException {
+        requireNonNull(details);
+
+        try {
+            final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            new ObjectOutputStream(stream).writeObject( details );
+            return stream.toByteArray();
+        } catch (final IOException e) {
+            throw new SerializationException("Could not serialize an instance 
of RyaDetails.", e);
+        }
+    }
+
+    /**
+     * Deserializes an instance of {@link RyaDetails}.
+     *
+     * @param bytes - The serialized for of a {@link RyaDetails}. (not null)
+     * @return The deserialized object.
+     */
+    public RyaDetails deserialize(final byte[] bytes) throws 
SerializationException {
+        requireNonNull(bytes);
+
+        try {
+            final ByteArrayInputStream stream = new ByteArrayInputStream( 
bytes );
+            final Object o = new ObjectInputStream( stream ).readObject();
+
+            if(! (o instanceof RyaDetails) ) {
+                throw new SerializationException("Wrong type of object was 
deserialized. Class: " + o.getClass().getName() );
+            }
+
+            return (RyaDetails) o;
+        } catch (final ClassNotFoundException | IOException e) {
+            throw new SerializationException("Could not deserialize an 
instance of RyaDetails.", e);
+        }
+    }
+
+    /**
+     * Could not serialize an instance of {@link RyaDetails}.
+     */
+    public static class SerializationException extends 
RyaDetailsRepositoryException {
+        private static final long serialVersionUID = 1L;
+
+        public SerializationException(final String message) {
+            super(message);
+        }
+
+        public SerializationException(final String message, final Throwable 
cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

Reply via email to