http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java new file mode 100644 index 0000000..d13f50e --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//package mvm.rya.accumulo; + +// +//import com.google.common.collect.Iterators; +//import com.google.common.io.ByteArrayDataInput; +//import com.google.common.io.ByteStreams; +//import info.aduna.iteration.CloseableIteration; +//import mvm.rya.api.RdfCloudTripleStoreConstants; +//import mvm.rya.api.RdfCloudTripleStoreUtils; +//import mvm.rya.api.persist.RdfDAOException; +//import mvm.rya.api.utils.NullableStatementImpl; +//import org.apache.accumulo.core.client.*; +//import org.apache.accumulo.core.data.Key; +//import org.apache.accumulo.core.data.Range; +//import org.apache.accumulo.core.iterators.user.AgeOffFilter; +//import org.apache.accumulo.core.iterators.user.TimestampFilter; +//import org.apache.accumulo.core.security.Authorizations; +//import org.apache.hadoop.io.Text; +//import org.openrdf.model.Resource; +//import org.openrdf.model.Statement; +//import org.openrdf.model.URI; +//import org.openrdf.model.Value; +//import org.openrdf.query.BindingSet; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.io.IOException; +//import java.util.Collection; +//import java.util.Collections; +//import java.util.HashSet; +//import java.util.Iterator; +//import java.util.Map.Entry; +// +//import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; +//import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +//import static mvm.rya.api.RdfCloudTripleStoreUtils.writeValue; +// +//public class AccumuloRdfQueryIterator implements +// CloseableIteration<Entry<Statement, BindingSet>, RdfDAOException> { +// +// protected final Logger logger = LoggerFactory.getLogger(getClass()); +// +// private boolean open = false; +// private Iterator result; +// private Resource[] contexts; +// private Collection<Entry<Statement, BindingSet>> statements; +// private int numOfThreads = 20; +// +// private RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); +// private ScannerBase scanner; +// private boolean isBatchScanner = true; +// private Statement statement; +// Iterator<BindingSet> iter_bss = null; +// +// private boolean hasNext = true; +// private AccumuloRdfConfiguration conf; +// private TABLE_LAYOUT tableLayout; +// private Text context_txt; +// +// private DefineTripleQueryRangeFactory queryRangeFactory = new DefineTripleQueryRangeFactory(); +// +// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, Resource... contexts) +// throws RdfDAOException { +// this(statements, connector, null, contexts); +// } +// +// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, +// AccumuloRdfConfiguration conf, Resource... contexts) +// throws RdfDAOException { +// this.statements = statements; +// this.contexts = contexts; +// this.conf = conf; +// initialize(connector); +// open = true; +// } +// +// public AccumuloRdfQueryIterator(Resource subject, URI predicate, Value object, Connector connector, +// AccumuloRdfConfiguration conf, Resource[] contexts) throws RdfDAOException { +// this(Collections.<Entry<Statement, BindingSet>>singleton(new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>( +// new NullableStatementImpl(subject, predicate, object, contexts), +// null)), connector, conf, contexts); +// } +// +// protected void initialize(Connector connector) +// throws RdfDAOException { +// try { +// //TODO: We cannot span multiple tables here +// Collection<Range> ranges = new HashSet<Range>(); +// +// result = Iterators.emptyIterator(); +// Long startTime = conf.getStartTime(); +// Long ttl = conf.getTtl(); +// +// Resource context = null; +// for (Entry<Statement, BindingSet> stmtbs : statements) { +// Statement stmt = stmtbs.getKey(); +// Resource subject = stmt.getSubject(); +// URI predicate = stmt.getPredicate(); +// Value object = stmt.getObject(); +// context = stmt.getContext(); //TODO: assumes the same context for all statements +// logger.debug("Batch Scan, lookup subject[" + subject + "] predicate[" + predicate + "] object[" + object + "] combination"); +// +// Entry<TABLE_LAYOUT, Range> entry = queryRangeFactory.defineRange(subject, predicate, object, conf); +// tableLayout = entry.getKey(); +//// isTimeRange = isTimeRange || queryRangeFactory.isTimeRange(); +// Range range = entry.getValue(); +// ranges.add(range); +// rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, stmtbs.getValue())); +// } +// +// Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; +// String auth = conf.getAuth(); +// if (auth != null) { +// authorizations = new Authorizations(auth.split(",")); +// } +// String table = RdfCloudTripleStoreUtils.layoutToTable(tableLayout, conf); +// result = createScanner(connector, authorizations, table, context, startTime, ttl, ranges); +//// if (isBatchScanner) { +//// ((BatchScanner) scanner).setRanges(ranges); +//// } else { +//// for (Range range : ranges) { +//// ((Scanner) scanner).setRange(range); //TODO: Not good way of doing this +//// } +//// } +//// +//// if (isBatchScanner) { +//// result = ((BatchScanner) scanner).iterator(); +//// } else { +//// result = ((Scanner) scanner).iterator(); +//// } +// } catch (Exception e) { +// throw new RdfDAOException(e); +// } +// } +// +// protected Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> createScanner(Connector connector, Authorizations authorizations, String table, Resource context, Long startTime, Long ttl, Collection<Range> ranges) throws TableNotFoundException, IOException { +//// ShardedConnector shardedConnector = new ShardedConnector(connector, 4, ta) +// if (rangeMap.ranges.size() > (numOfThreads / 2)) { //TODO: Arbitrary number, make configurable +// BatchScanner scannerBase = connector.createBatchScanner(table, authorizations, numOfThreads); +// scannerBase.setRanges(ranges); +// populateScanner(context, startTime, ttl, scannerBase); +// return scannerBase.iterator(); +// } else { +// isBatchScanner = false; +// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>>[] iters = new Iterator[ranges.size()]; +// int i = 0; +// for (Range range : ranges) { +// Scanner scannerBase = connector.createScanner(table, authorizations); +// populateScanner(context, startTime, ttl, scannerBase); +// scannerBase.setRange(range); +// iters[i] = scannerBase.iterator(); +// i++; +// scanner = scannerBase; //TODO: Always overridden, but doesn't matter since Scanner doesn't need to be closed +// } +// return Iterators.concat(iters); +// } +// +// } +// +// protected void populateScanner(Resource context, Long startTime, Long ttl, ScannerBase scannerBase) throws IOException { +// if (context != null) { //default graph +// context_txt = new Text(writeValue(context)); +// scannerBase.fetchColumnFamily(context_txt); +// } +// +//// if (!isQueryTimeBased(conf)) { +// if (startTime != null && ttl != null) { +//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); +//// scannerBase.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName()); +//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, ttl); +//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, startTime); +// IteratorSetting setting = new IteratorSetting(1, "fi", TimestampFilter.class.getName()); +// TimestampFilter.setStart(setting, startTime, true); +// TimestampFilter.setEnd(setting, startTime + ttl, true); +// scannerBase.addScanIterator(setting); +// } else if (ttl != null) { +//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); +//// scannerBase.setScanIteratorOption("filteringIterator", "0", AgeOffFilter.class.getName()); +//// scannerBase.setScanIteratorOption("filteringIterator", "0.ttl", ttl); +// IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName()); +// AgeOffFilter.setTTL(setting, ttl); +// scannerBase.addScanIterator(setting); +// } +//// } +// } +// +// @Override +// public void close() throws RdfDAOException { +// if (!open) +// return; +// verifyIsOpen(); +// open = false; +// if (scanner != null && isBatchScanner) { +// ((BatchScanner) scanner).close(); +// } +// } +// +// public void verifyIsOpen() throws RdfDAOException { +// if (!open) { +// throw new RdfDAOException("Iterator not open"); +// } +// } +// +// @Override +// public boolean hasNext() throws RdfDAOException { +// try { +// if (!open) +// return false; +// verifyIsOpen(); +// /** +// * For some reason, the result.hasNext returns false +// * once at the end of the iterator, and then true +// * for every subsequent call. +// */ +// hasNext = (hasNext && result.hasNext()); +// return hasNext || ((iter_bss != null) && iter_bss.hasNext()); +// } catch (Exception e) { +// throw new RdfDAOException(e); +// } +// } +// +// @Override +// public Entry<Statement, BindingSet> next() throws RdfDAOException { +// try { +// if (!this.hasNext()) +// return null; +// +// return getStatement(result, contexts); +// } catch (Exception e) { +// throw new RdfDAOException(e); +// } +// } +// +// public Entry<Statement, BindingSet> getStatement( +// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> rowResults, +// Resource... filterContexts) throws IOException { +// try { +// while (true) { +// if (iter_bss != null && iter_bss.hasNext()) { +// return new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(statement, iter_bss.next()); +// } +// +// if (rowResults.hasNext()) { +// Entry<Key, org.apache.accumulo.core.data.Value> entry = rowResults.next(); +// Key key = entry.getKey(); +// ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes()); +// statement = RdfCloudTripleStoreUtils.translateStatementFromRow(input, key.getColumnFamily(), tableLayout, RdfCloudTripleStoreConstants.VALUE_FACTORY); +// iter_bss = rangeMap.containsKey(key).iterator(); +// } else +// break; +// } +// } catch (Exception e) { +// throw new IOException(e); +// } +// return null; +// } +// +// @Override +// public void remove() throws RdfDAOException { +// next(); +// } +// +// public int getNumOfThreads() { +// return numOfThreads; +// } +// +// public void setNumOfThreads(int numOfThreads) { +// this.numOfThreads = numOfThreads; +// } +// +// public DefineTripleQueryRangeFactory getQueryRangeFactory() { +// return queryRangeFactory; +// } +// +// public void setQueryRangeFactory(DefineTripleQueryRangeFactory queryRangeFactory) { +// this.queryRangeFactory = queryRangeFactory; +// } +//}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java new file mode 100644 index 0000000..157fc5a --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java @@ -0,0 +1,72 @@ +package mvm.rya.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.resolver.triple.TripleRow; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; + +/** + * Class AccumuloRdfUtils + * Date: Mar 1, 2012 + * Time: 7:15:54 PM + */ +public class AccumuloRdfUtils { + private static final Log logger = LogFactory.getLog(AccumuloRdfUtils.class); + + public static void createTableIfNotExist(TableOperations tableOperations, String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException { + boolean tableExists = tableOperations.exists(tableName); + if (!tableExists) { + logger.debug("Creating accumulo table: " + tableName); + tableOperations.create(tableName); + } + } + + public static Key from(TripleRow tripleRow) { + return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES), + defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE)); + } + + public static Value extractValue(TripleRow tripleRow) { + return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES)); + } + + private static byte[] defaultTo(byte[] bytes, byte[] def) { + return bytes != null ? bytes : def; + } + + private static Long defaultTo(Long l, Long def) { + return l != null ? l : def; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java new file mode 100644 index 0000000..195030e --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java @@ -0,0 +1,551 @@ +package mvm.rya.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.INFO_NAMESPACE_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_MEMORY; +import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_TIME; +import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS; +import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA; +import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA; +import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Namespace; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.resolver.RyaTripleContext; + +public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> { + private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class); + + private boolean initialized = false; + private boolean flushEachUpdate = true; + private Connector connector; + private BatchWriterConfig batchWriterConfig; + + private MultiTableBatchWriter mt_bw; + + // Do not flush these individually + private BatchWriter bw_spo; + private BatchWriter bw_po; + private BatchWriter bw_osp; + + private BatchWriter bw_ns; + + private List<AccumuloIndexer> secondaryIndexers; + + private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + private RyaTableMutationsFactory ryaTableMutationsFactory; + private TableLayoutStrategy tableLayoutStrategy; + private AccumuloRyaQueryEngine queryEngine; + private RyaTripleContext ryaContext; + + @Override + public boolean isInitialized() throws RyaDAOException { + return initialized; + } + + @Override + public void init() throws RyaDAOException { + if (initialized) { + return; + } + try { + checkNotNull(conf); + checkNotNull(connector); + + if(batchWriterConfig == null){ + batchWriterConfig = new BatchWriterConfig(); + batchWriterConfig.setMaxMemory(MAX_MEMORY); + batchWriterConfig.setTimeout(MAX_TIME, TimeUnit.MILLISECONDS); + batchWriterConfig.setMaxWriteThreads(NUM_THREADS); + } + + tableLayoutStrategy = conf.getTableLayoutStrategy(); + ryaContext = RyaTripleContext.getInstance(conf); + ryaTableMutationsFactory = new RyaTableMutationsFactory(ryaContext); + + secondaryIndexers = conf.getAdditionalIndexers(); + + flushEachUpdate = conf.flushEachUpdate(); + + TableOperations tableOperations = connector.tableOperations(); + AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); + AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); + AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp()); + AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs()); + + for (AccumuloIndexer index : secondaryIndexers) { + index.setConf(conf); + } + + mt_bw = connector.createMultiTableBatchWriter(batchWriterConfig); + + //get the batch writers for tables + bw_spo = mt_bw.getBatchWriter(tableLayoutStrategy.getSpo()); + bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo()); + bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp()); + + bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs()); + + for (AccumuloIndexer index : secondaryIndexers) { + index.setConnector(connector); + index.setMultiTableBatchWriter(mt_bw); + index.init(); + } + + queryEngine = new AccumuloRyaQueryEngine(connector, conf); + + checkVersion(); + + initialized = true; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public String getVersion() throws RyaDAOException { + String version = null; + CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); + if (versIter.hasNext()) { + version = versIter.next().getObject().getData(); + } + versIter.close(); + + return version; + } + + @Override + public void add(RyaStatement statement) throws RyaDAOException { + commit(Iterators.singletonIterator(statement)); + } + + @Override + public void add(Iterator<RyaStatement> iter) throws RyaDAOException { + commit(iter); + } + + @Override + public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException { + this.delete(Iterators.singletonIterator(stmt), aconf); + } + + @Override + public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException { + try { + while (statements.hasNext()) { + RyaStatement stmt = statements.next(); + //query first + CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf); + while (query.hasNext()) { + deleteSingleRyaStatement(query.next()); + } + + for (AccumuloIndexer index : secondaryIndexers) { + index.deleteStatement(stmt); + } + } + if (flushEachUpdate) { mt_bw.flush(); } + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException { + BatchDeleter bd_spo = null; + BatchDeleter bd_po = null; + BatchDeleter bd_osp = null; + + try { + bd_spo = createBatchDeleter(tableLayoutStrategy.getSpo(), conf.getAuthorizations()); + bd_po = createBatchDeleter(tableLayoutStrategy.getPo(), conf.getAuthorizations()); + bd_osp = createBatchDeleter(tableLayoutStrategy.getOsp(), conf.getAuthorizations()); + + bd_spo.setRanges(Collections.singleton(new Range())); + bd_po.setRanges(Collections.singleton(new Range())); + bd_osp.setRanges(Collections.singleton(new Range())); + + for (RyaURI graph : graphs){ + bd_spo.fetchColumnFamily(new Text(graph.getData())); + bd_po.fetchColumnFamily(new Text(graph.getData())); + bd_osp.fetchColumnFamily(new Text(graph.getData())); + } + + bd_spo.delete(); + bd_po.delete(); + bd_osp.delete(); + + //TODO indexers do not support delete-UnsupportedOperation Exception will be thrown +// for (AccumuloIndex index : secondaryIndexers) { +// index.dropGraph(graphs); +// } + + } catch (Exception e) { + throw new RyaDAOException(e); + } finally { + if (bd_spo != null) { + bd_spo.close(); + } + if (bd_po != null) { + bd_po.close(); + } + if (bd_osp != null) { + bd_osp.close(); + } + } + + } + + protected void deleteSingleRyaStatement(RyaStatement stmt) throws IOException, MutationsRejectedException { + Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt); + bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO)); + bw_po.addMutations(map.get(TABLE_LAYOUT.PO)); + bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP)); + } + + protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException { + try { + //TODO: Should have a lock here in case we are adding and committing at the same time + while (commitStatements.hasNext()) { + RyaStatement stmt = commitStatements.next(); + + Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt); + Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); + Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); + Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); + bw_spo.addMutations(spo); + bw_po.addMutations(po); + bw_osp.addMutations(osp); + + for (AccumuloIndexer index : secondaryIndexers) { + index.storeStatement(stmt); + } + } + + if (flushEachUpdate) { mt_bw.flush(); } + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void destroy() throws RyaDAOException { + if (!initialized) { + return; + } + //TODO: write lock + try { + initialized = false; + mt_bw.flush(); + + mt_bw.close(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + for(AccumuloIndexer indexer : this.secondaryIndexers) { + try { + indexer.destroy(); + } catch(Exception e) { + logger.warn("Failed to destroy indexer", e); + } + } + } + + @Override + public void addNamespace(String pfx, String namespace) throws RyaDAOException { + try { + Mutation m = new Mutation(new Text(pfx)); + m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes())); + bw_ns.addMutation(m); + if (flushEachUpdate) { mt_bw.flush(); } + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public String getNamespace(String pfx) throws RyaDAOException { + try { + Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), + ALL_AUTHORIZATIONS); + scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT); + scanner.setRange(new Range(new Text(pfx))); + Iterator<Map.Entry<Key, Value>> iterator = scanner + .iterator(); + + if (iterator.hasNext()) { + return new String(iterator.next().getValue().get()); + } + } catch (Exception e) { + throw new RyaDAOException(e); + } + return null; + } + + @Override + public void removeNamespace(String pfx) throws RyaDAOException { + try { + Mutation del = new Mutation(new Text(pfx)); + del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT); + bw_ns.addMutation(del); + if (flushEachUpdate) { mt_bw.flush(); } + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException { + try { + Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), + ALL_AUTHORIZATIONS); + scanner.fetchColumnFamily(INFO_NAMESPACE_TXT); + Iterator<Map.Entry<Key, Value>> result = scanner.iterator(); + return new AccumuloNamespaceTableIterator(result); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public RyaNamespaceManager<AccumuloRdfConfiguration> getNamespaceManager() { + return this; + } + + @Override + public void purge(RdfCloudTripleStoreConfiguration configuration) { + for (String tableName : getTables()) { + try { + purge(tableName, configuration.getAuths()); + compact(tableName); + } catch (TableNotFoundException e) { + logger.error(e.getMessage()); + } catch (MutationsRejectedException e) { + logger.error(e.getMessage()); + } + } + for(AccumuloIndexer indexer : this.secondaryIndexers) { + try { + indexer.purge(configuration); + } catch(Exception e) { + logger.error("Failed to purge indexer", e); + } + } + } + + @Override + public void dropAndDestroy() throws RyaDAOException { + for (String tableName : getTables()) { + try { + drop(tableName); + } catch (AccumuloSecurityException e) { + logger.error(e.getMessage()); + throw new RyaDAOException(e); + } catch (AccumuloException e) { + logger.error(e.getMessage()); + throw new RyaDAOException(e); + } catch (TableNotFoundException e) { + logger.warn(e.getMessage()); + } + } + destroy(); + for(AccumuloIndexer indexer : this.secondaryIndexers) { + try { + indexer.dropAndDestroy(); + } catch(Exception e) { + logger.error("Failed to drop and destroy indexer", e); + } + } + } + + public Connector getConnector() { + return connector; + } + + public void setConnector(Connector connector) { + this.connector = connector; + } + + public BatchWriterConfig getBatchWriterConfig(){ + return batchWriterConfig; + } + + public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) { + this.batchWriterConfig = batchWriterConfig; + } + + protected MultiTableBatchWriter getMultiTableBatchWriter(){ + return mt_bw; + } + + @Override + public AccumuloRdfConfiguration getConf() { + return conf; + } + + @Override + public void setConf(AccumuloRdfConfiguration conf) { + this.conf = conf; + } + + public RyaTableMutationsFactory getRyaTableMutationsFactory() { + return ryaTableMutationsFactory; + } + + public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) { + this.ryaTableMutationsFactory = ryaTableMutationsFactory; + } + + @Override + public AccumuloRyaQueryEngine getQueryEngine() { + return queryEngine; + } + + public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) { + this.queryEngine = queryEngine; + } + + public void flush() throws RyaDAOException { + try { + mt_bw.flush(); + } catch (MutationsRejectedException e) { + throw new RyaDAOException(e); + } + } + + protected String[] getTables() { + // core tables + List<String> tableNames = Lists.newArrayList( + tableLayoutStrategy.getSpo(), + tableLayoutStrategy.getPo(), + tableLayoutStrategy.getOsp(), + tableLayoutStrategy.getNs(), + tableLayoutStrategy.getEval()); + + // Additional Tables + for (AccumuloIndexer index : secondaryIndexers) { + tableNames.add(index.getTableName()); + } + + return tableNames.toArray(new String[]{}); + } + + private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException { + if (tableExists(tableName)) { + logger.info("Purging accumulo table: " + tableName); + BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths)); + try { + batchDeleter.setRanges(Collections.singleton(new Range())); + batchDeleter.delete(); + } finally { + batchDeleter.close(); + } + } + } + + private void compact(String tableName) { + logger.info("Requesting major compaction for table " + tableName); + try { + connector.tableOperations().compact(tableName, null, null, true, false); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + private boolean tableExists(String tableName) { + return getConnector().tableOperations().exists(tableName); + } + + private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException { + return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS); + } + + private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException { + String version = getVersion(); + if (version == null) { + //adding to core Rya tables but not Indexes + Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement()); + Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); + Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); + Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); + bw_spo.addMutations(spo); + bw_po.addMutations(po); + bw_osp.addMutations(osp); + } + //TODO: Do a version check here + } + + protected RyaStatement getVersionRyaStatement() { + return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA); + } + + private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + logger.info("Dropping cloudbase table: " + tableName); + connector.tableOperations().delete(tableName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java new file mode 100644 index 0000000..b5a4e84 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//package mvm.rya.accumulo; + +// +//import com.google.common.io.ByteArrayDataOutput; +//import com.google.common.io.ByteStreams; +//import mvm.rya.api.RdfCloudTripleStoreUtils; +//import mvm.rya.api.domain.RangeValue; +//import org.apache.accumulo.core.data.Range; +//import org.apache.hadoop.io.Text; +//import org.openrdf.model.Value; +//import org.openrdf.model.ValueFactory; +//import org.openrdf.model.impl.ValueFactoryImpl; +// +//import java.io.IOException; +//import java.util.Map; +// +//import static mvm.rya.api.RdfCloudTripleStoreConstants.*; +//import static mvm.rya.api.RdfCloudTripleStoreUtils.CustomEntry; +// +///** +// * Class DefineTripleQueryRangeFactory +// * Date: Jun 2, 2011 +// * Time: 10:35:43 AM +// */ +//public class DefineTripleQueryRangeFactory { +// +// ValueFactory vf = ValueFactoryImpl.getInstance(); +// +// protected void fillRange(ByteArrayDataOutput startRowOut, ByteArrayDataOutput endRowOut, Value val, boolean empty) +// throws IOException { +// if(!empty) { +// startRowOut.write(DELIM_BYTES); +// endRowOut.write(DELIM_BYTES); +// } +// //null check? +// if(val instanceof RangeValue) { +// RangeValue rangeValue = (RangeValue) val; +// Value start = rangeValue.getStart(); +// Value end = rangeValue.getEnd(); +// byte[] start_val_bytes = RdfCloudTripleStoreUtils.writeValue(start); +// byte[] end_val_bytes = RdfCloudTripleStoreUtils.writeValue(end); +// startRowOut.write(start_val_bytes); +// endRowOut.write(end_val_bytes); +// } else { +// byte[] val_bytes = RdfCloudTripleStoreUtils.writeValue(val); +// startRowOut.write(val_bytes); +// endRowOut.write(val_bytes); +// } +// } +// +// public Map.Entry<TABLE_LAYOUT, Range> defineRange(Value subject, Value predicate, Value object, AccumuloRdfConfiguration conf) +// throws IOException { +// +// byte[] startrow, stoprow; +// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput(); +// ByteArrayDataOutput stopRowOut = ByteStreams.newDataOutput(); +// Range range; +// TABLE_LAYOUT tableLayout; +// +// if (subject != null) { +// /** +// * Case: s +// * Table: spo +// * Want this to be the first if statement since it will be most likely the most asked for table +// */ +// tableLayout = TABLE_LAYOUT.SPO; +// fillRange(startRowOut, stopRowOut, subject, true); +// if (predicate != null) { +// /** +// * Case: sp +// * Table: spo +// */ +// fillRange(startRowOut, stopRowOut, predicate, false); +// if (object != null) { +// /** +// * Case: spo +// * Table: spo +// */ +// fillRange(startRowOut, stopRowOut, object, false); +// } +// } else if (object != null) { +// /** +// * Case: so +// * Table: osp +// * Very rare case. Could have put this in the OSP if clause, but I wanted to reorder the if statement +// * for best performance. The SPO table probably gets the most scans, so I want it to be the first if +// * statement in the branch. +// */ +// tableLayout = TABLE_LAYOUT.OSP; +// startRowOut = ByteStreams.newDataOutput(); +// stopRowOut = ByteStreams.newDataOutput(); +// fillRange(startRowOut, stopRowOut, object, true); +// fillRange(startRowOut, stopRowOut, subject, false); +// } +// } else if (predicate != null) { +// /** +// * Case: p +// * Table: po +// * Wanted this to be the second if statement, since it will be the second most asked for table +// */ +// tableLayout = TABLE_LAYOUT.PO; +// fillRange(startRowOut, stopRowOut, predicate, true); +// if (object != null) { +// /** +// * Case: po +// * Table: po +// */ +// fillRange(startRowOut, stopRowOut, object, false); +// } +// } else if (object != null) { +// /** +// * Case: o +// * Table: osp +// * Probably a pretty rare scenario +// */ +// tableLayout = TABLE_LAYOUT.OSP; +// fillRange(startRowOut, stopRowOut, object, true); +// } else { +// tableLayout = TABLE_LAYOUT.SPO; +// stopRowOut.write(Byte.MAX_VALUE); +// } +// +// startrow = startRowOut.toByteArray(); +// stopRowOut.write(DELIM_STOP_BYTES); +// stoprow = stopRowOut.toByteArray(); +// Text startRowTxt = new Text(startrow); +// Text stopRowTxt = new Text(stoprow); +// range = new Range(startRowTxt, stopRowTxt); +// +// return new CustomEntry<TABLE_LAYOUT, Range>(tableLayout, range); +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java new file mode 100644 index 0000000..574029e --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java @@ -0,0 +1,115 @@ +package mvm.rya.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; + +import java.io.IOException; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; + +public class RyaTableKeyValues { + public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(); + public static final Text EMPTY_CV_TEXT = new Text(EMPTY_CV.getExpression()); + + RyaTripleContext instance; + + private RyaStatement stmt; + private Collection<Map.Entry<Key, Value>> spo = new ArrayList<Map.Entry<Key, Value>>(); + private Collection<Map.Entry<Key, Value>> po = new ArrayList<Map.Entry<Key, Value>>(); + private Collection<Map.Entry<Key, Value>> osp = new ArrayList<Map.Entry<Key, Value>>(); + + public RyaTableKeyValues(RyaStatement stmt, RdfCloudTripleStoreConfiguration conf) { + this.stmt = stmt; + this.instance = RyaTripleContext.getInstance(conf); + } + + public Collection<Map.Entry<Key, Value>> getSpo() { + return spo; + } + + public Collection<Map.Entry<Key, Value>> getPo() { + return po; + } + + public Collection<Map.Entry<Key, Value>> getOsp() { + return osp; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public RyaTableKeyValues invoke() throws IOException { + /** + * TODO: If there are contexts, do we still replicate the information into the default graph as well + * as the named graphs? + */try { + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt); + TripleRow tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new Text(columnVisibility); + Long timestamp = tripleRow.getTimestamp(); + timestamp = timestamp == null ? 0l : timestamp; + byte[] value = tripleRow.getValue(); + Value v = value == null ? EMPTY_VALUE : new Value(value); + spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); + po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); + osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + return this; + } + + @Override + public String toString() { + return "RyaTableKeyValues{" + + "statement=" + stmt + + ", spo=" + spo + + ", po=" + po + + ", o=" + osp + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java new file mode 100644 index 0000000..2a4871d --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java @@ -0,0 +1,148 @@ +package mvm.rya.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; + +public class RyaTableMutationsFactory { + + RyaTripleContext ryaContext; + + public RyaTableMutationsFactory(RyaTripleContext ryaContext) { + this.ryaContext = ryaContext; + } + + //TODO: Does this still need to be collections + public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serialize( + RyaStatement stmt) throws IOException { + + Collection<Mutation> spo_muts = new ArrayList<Mutation>(); + Collection<Mutation> po_muts = new ArrayList<Mutation>(); + Collection<Mutation> osp_muts = new ArrayList<Mutation>(); + /** + * TODO: If there are contexts, do we still replicate the information into the default graph as well + * as the named graphs? + */ + try { + Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt); + TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO); + spo_muts.add(createMutation(tripleRow)); + tripleRow = rowMap.get(TABLE_LAYOUT.PO); + po_muts.add(createMutation(tripleRow)); + tripleRow = rowMap.get(TABLE_LAYOUT.OSP); + osp_muts.add(createMutation(tripleRow)); + } catch (TripleRowResolverException fe) { + throw new IOException(fe); + } + + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations = + new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>(); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts); + + return mutations; + } + + public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serializeDelete( + RyaStatement stmt) throws IOException { + + Collection<Mutation> spo_muts = new ArrayList<Mutation>(); + Collection<Mutation> po_muts = new ArrayList<Mutation>(); + Collection<Mutation> osp_muts = new ArrayList<Mutation>(); + /** + * TODO: If there are contexts, do we still replicate the information into the default graph as well + * as the named graphs? + */ + try { + Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt); + TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO); + spo_muts.add(deleteMutation(tripleRow)); + tripleRow = rowMap.get(TABLE_LAYOUT.PO); + po_muts.add(deleteMutation(tripleRow)); + tripleRow = rowMap.get(TABLE_LAYOUT.OSP); + osp_muts.add(deleteMutation(tripleRow)); + } catch (TripleRowResolverException fe) { + throw new IOException(fe); + } + + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations = + new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>(); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts); + + return mutations; + + } + + protected Mutation deleteMutation(TripleRow tripleRow) { + Mutation m = new Mutation(new Text(tripleRow.getRow())); + + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + + m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), + tripleRow.getTimestamp()); + return m; + } + + protected Mutation createMutation(TripleRow tripleRow) { + Mutation mutation = new Mutation(new Text(tripleRow.getRow())); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + Long timestamp = tripleRow.getTimestamp(); + byte[] value = tripleRow.getValue(); + Value v = value == null ? EMPTY_VALUE : new Value(value); + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + mutation.put(cfText, cqText, cv, timestamp, v); + return mutation; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java new file mode 100644 index 0000000..5df5da9 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java @@ -0,0 +1,59 @@ +package mvm.rya.accumulo.experimental; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.IOException; +import java.util.Collection; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; + +import org.apache.accumulo.core.client.MultiTableBatchWriter; + +public abstract class AbstractAccumuloIndexer implements AccumuloIndexer { + + @Override + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { + } + + @Override + public void storeStatements(Collection<RyaStatement> statements) throws IOException { + for (RyaStatement s : statements) { + storeStatement(s); + } + } + + @Override + public void deleteStatement(RyaStatement stmt) throws IOException { + } + + @Override + public void dropGraph(RyaURI... graphs) { + } + + @Override + public void flush() throws IOException { + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java new file mode 100644 index 0000000..5581e08 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java @@ -0,0 +1,38 @@ +package mvm.rya.accumulo.experimental; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.IOException; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +public interface AccumuloIndexer extends RyaSecondaryIndexer { + public void init(); + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException; + public void setConnector(Connector connector); + public void destroy(); + public void purge(RdfCloudTripleStoreConfiguration configuration); + public void dropAndDestroy(); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java new file mode 100644 index 0000000..6e818b3 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java @@ -0,0 +1,229 @@ +package mvm.rya.accumulo.instance; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static java.util.Objects.requireNonNull; + +import java.util.Map.Entry; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriter.Result; +import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; + +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetailsRepository; + +/** + * An implementation of {@link RyaDetailsRepository} that stores a Rya + * instance's {@link RyaDetails} in an Accumulo table. + * </p> + * XXX + * This implementation writes the details object as a serialized byte array to + * a row in Accumulo. Storing the entire structure within a single value is + * attractive because Accumulo's conditional writer will let us do checkAndSet + * style operations to synchronize writes to the object. On the downside, only + * Java clients will work. + */ +@ParametersAreNonnullByDefault +public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepository { + + public static final String INSTANCE_DETAILS_TABLE_NAME = "instance_details"; + + private static final Text ROW_ID = new Text("instance metadata"); + private static final Text COL_FAMILY = new Text("instance"); + private static final Text COL_QUALIFIER = new Text("details"); + + private final RyaDetailsSerializer serializer = new RyaDetailsSerializer(); + + private final Connector connector; + private final String instanceName; + private final String detailsTableName; + + + /** + * Constructs an instance of {@link AccumuloRyaInstanceDetailsRepository}. + * + * @param connector - Connects to the instance of Accumulo that hosts the Rya instance. (not null) + * @param instanceName - The name of the Rya instance this repository represents. (not null) + */ + public AccumuloRyaInstanceDetailsRepository(final Connector connector, final String instanceName) { + this.connector = requireNonNull( connector ); + this.instanceName = requireNonNull( instanceName ); + this.detailsTableName = instanceName + INSTANCE_DETAILS_TABLE_NAME; + } + + @Override + public boolean isInitialized() throws RyaDetailsRepositoryException { + try { + final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations()); + scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER); + return scanner.iterator().hasNext(); + } catch (final TableNotFoundException e) { + return false; + } + } + + @Override + public void initialize(final RyaDetails details) throws AlreadyInitializedException, RyaDetailsRepositoryException { + // Preconditions. + requireNonNull( details ); + + if(!details.getRyaInstanceName().equals( instanceName )) { + throw new RyaDetailsRepositoryException("The instance name that was in the provided 'details' does not match " + + "the instance name that this repository is connected to. Make sure you're connected to the" + + "correct Rya instance."); + } + + if(isInitialized()) { + throw new AlreadyInitializedException("The repository has already been initialized for the Rya instance named '" + + instanceName + "'."); + } + + // Create the table that hosts the details if it has not been created yet. + final TableOperations tableOps = connector.tableOperations(); + if(!tableOps.exists(detailsTableName)) { + try { + tableOps.create(detailsTableName); + } catch (AccumuloException | AccumuloSecurityException | TableExistsException e) { + throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + + instanceName + "' because the the table that holds that information could not be created."); + } + } + + // Write the details to the table. + BatchWriter writer = null; + try { + writer = connector.createBatchWriter(detailsTableName, new BatchWriterConfig()); + + final byte[] bytes = serializer.serialize(details); + final Mutation mutation = new Mutation(ROW_ID); + mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(bytes)); + writer.addMutation( mutation ); + + } catch (final TableNotFoundException | MutationsRejectedException e) { + throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + instanceName + "'.", e); + } finally { + if(writer != null) { + try { + writer.close(); + } catch (final MutationsRejectedException e) { + throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + instanceName + "'.", e); + } + } + } + } + + @Override + public RyaDetails getRyaInstanceDetails() throws NotInitializedException, RyaDetailsRepositoryException { + // Preconditions. + if(!isInitialized()) { + throw new NotInitializedException("Could not fetch the details for the Rya instanced named '" + + instanceName + "' because it has not been initialized yet."); + } + + // Read it from the table. + try { + // Fetch the value from the table. + final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations()); + scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER); + final Entry<Key, Value> entry = scanner.iterator().next(); + + // Deserialize it. + final byte[] bytes = entry.getValue().get(); + return serializer.deserialize( bytes ); + + } catch (final TableNotFoundException e) { + throw new RyaDetailsRepositoryException("Could not get the details from the table.", e); + } + } + + @Override + public void update(final RyaDetails oldDetails, final RyaDetails newDetails) + throws NotInitializedException, ConcurrentUpdateException, RyaDetailsRepositoryException { + // Preconditions. + requireNonNull(oldDetails); + requireNonNull(newDetails); + + if(!newDetails.getRyaInstanceName().equals( instanceName )) { + throw new RyaDetailsRepositoryException("The instance name that was in the provided 'newDetails' does not match " + + "the instance name that this repository is connected to. Make sure you're connected to the" + + "correct Rya instance."); + } + + if(!isInitialized()) { + throw new NotInitializedException("Could not update the details for the Rya instanced named '" + + instanceName + "' because it has not been initialized yet."); + } + + // Use a conditional writer so that we can detect when the old details + // are no longer the currently stored ones. + ConditionalWriter writer = null; + try { + // Setup the condition that ensures the details have not changed since the edits were made. + final byte[] oldDetailsBytes = serializer.serialize(oldDetails); + final Condition condition = new Condition(COL_FAMILY, COL_QUALIFIER); + condition.setValue( oldDetailsBytes ); + + // Create the mutation that only performs the update if the details haven't changed. + final ConditionalMutation mutation = new ConditionalMutation(ROW_ID); + mutation.addCondition( condition ); + final byte[] newDetailsBytes = serializer.serialize(newDetails); + mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(newDetailsBytes)); + + // Do the write. + writer = connector.createConditionalWriter(detailsTableName, new ConditionalWriterConfig()); + final Result result = writer.write(mutation); + switch(result.getStatus()) { + case REJECTED: + case VIOLATED: + throw new ConcurrentUpdateException("Could not update the details for the Rya instance named '" + + instanceName + "' because the old value is out of date."); + case UNKNOWN: + case INVISIBLE_VISIBILITY: + throw new RyaDetailsRepositoryException("Could not update the details for the Rya instance named '" + instanceName + "'."); + } + } catch (final TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + throw new RyaDetailsRepositoryException("Could not update the details for the Rya instance named '" + instanceName + "'."); + } finally { + if(writer != null) { + writer.close(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java new file mode 100644 index 0000000..8c863ea --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java @@ -0,0 +1,96 @@ +package mvm.rya.accumulo.instance; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import javax.annotation.ParametersAreNonnullByDefault; + +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; + +/** + * Serializes {@link RyaDetails} instances. + */ +@ParametersAreNonnullByDefault +public class RyaDetailsSerializer { + + /** + * Serializes an instance of {@link RyaDetails}. + * + * @param details - The details that will be serialized. (not null) + * @return The serialized details. + */ + public byte[] serialize(final RyaDetails details) throws SerializationException { + requireNonNull(details); + + try { + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + new ObjectOutputStream(stream).writeObject( details ); + return stream.toByteArray(); + } catch (final IOException e) { + throw new SerializationException("Could not serialize an instance of RyaDetails.", e); + } + } + + /** + * Deserializes an instance of {@link RyaDetails}. + * + * @param bytes - The serialized for of a {@link RyaDetails}. (not null) + * @return The deserialized object. + */ + public RyaDetails deserialize(final byte[] bytes) throws SerializationException { + requireNonNull(bytes); + + try { + final ByteArrayInputStream stream = new ByteArrayInputStream( bytes ); + final Object o = new ObjectInputStream( stream ).readObject(); + + if(! (o instanceof RyaDetails) ) { + throw new SerializationException("Wrong type of object was deserialized. Class: " + o.getClass().getName() ); + } + + return (RyaDetails) o; + } catch (final ClassNotFoundException | IOException e) { + throw new SerializationException("Could not deserialize an instance of RyaDetails.", e); + } + } + + /** + * Could not serialize an instance of {@link RyaDetails}. + */ + public static class SerializationException extends RyaDetailsRepositoryException { + private static final long serialVersionUID = 1L; + + public SerializationException(final String message) { + super(message); + } + + public SerializationException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file