http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java new file mode 100644 index 0000000..ba3ffd2 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/AccumuloRyaQueryEngine.java @@ -0,0 +1,410 @@ +package mvm.rya.accumulo.query; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import static mvm.rya.api.RdfCloudTripleStoreUtils.layoutToTable; +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaRange; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.query.BatchRyaQuery; +import mvm.rya.api.persist.query.RyaQuery; +import mvm.rya.api.persist.query.RyaQueryEngine; +import mvm.rya.api.query.strategy.ByteRange; +import mvm.rya.api.query.strategy.TriplePatternStrategy; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRowRegex; +import mvm.rya.api.utils.CloseableIterableIteration; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.iterators.user.TimestampFilter; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.calrissian.mango.collect.CloseableIterable; +import org.calrissian.mango.collect.CloseableIterables; +import org.calrissian.mango.collect.FluentCloseableIterable; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterators; + +/** + * Date: 7/17/12 + * Time: 9:28 AM + */ +public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfiguration> { + + private AccumuloRdfConfiguration configuration; + private Connector connector; + private RyaTripleContext ryaContext; + private final Map<TABLE_LAYOUT, KeyValueToRyaStatementFunction> keyValueToRyaStatementFunctionMap = new HashMap<TABLE_LAYOUT, KeyValueToRyaStatementFunction>(); + + public AccumuloRyaQueryEngine(Connector connector) { + this(connector, new AccumuloRdfConfiguration()); + } + + public AccumuloRyaQueryEngine(Connector connector, AccumuloRdfConfiguration conf) { + this.connector = connector; + this.configuration = conf; + ryaContext = RyaTripleContext.getInstance(conf); + keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.SPO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.SPO, ryaContext)); + keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.PO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.PO, ryaContext)); + keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.OSP, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.OSP, ryaContext)); + } + + @Override + public CloseableIteration<RyaStatement, RyaDAOException> query(RyaStatement stmt, AccumuloRdfConfiguration conf) throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + + RyaQuery ryaQuery = RyaQuery.builder(stmt).load(conf).build(); + CloseableIterable<RyaStatement> results = query(ryaQuery); + + return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results); + } + + protected String getData(RyaType ryaType) { + return (ryaType != null) ? (ryaType.getData()) : (null); + } + + @Override + public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, BindingSet>> stmts, AccumuloRdfConfiguration conf) throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + //query configuration + Authorizations authorizations = conf.getAuthorizations(); + Long ttl = conf.getTtl(); + Long maxResults = conf.getLimit(); + Integer maxRanges = conf.getMaxRangesForScanner(); + Integer numThreads = conf.getNumThreads(); + + //TODO: cannot span multiple tables here + try { + Collection<Range> ranges = new HashSet<Range>(); + RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); + TABLE_LAYOUT layout = null; + RyaURI context = null; + TriplePatternStrategy strategy = null; + for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) { + RyaStatement stmt = stmtbs.getKey(); + context = stmt.getContext(); //TODO: This will be overwritten + BindingSet bs = stmtbs.getValue(); + strategy = ryaContext.retrieveStrategy(stmt); + if (strategy == null) { + throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); + } + + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = + strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf); + + //use range to set scanner + //populate scanner based on authorizations, ttl + layout = entry.getKey(); + ByteRange byteRange = entry.getValue(); + Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + ranges.add(range); + rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs)); + } + //no ranges + if (layout == null) return null; + String regexSubject = conf.getRegexSubject(); + String regexPredicate = conf.getRegexPredicate(); + String regexObject = conf.getRegexObject(); + TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); + + String table = layoutToTable(layout, conf); + boolean useBatchScanner = ranges.size() > maxRanges; + RyaStatementBindingSetKeyValueIterator iterator = null; + if (useBatchScanner) { + ScannerBase scanner = connector.createBatchScanner(table, authorizations, numThreads); + ((BatchScanner) scanner).setRanges(ranges); + fillScanner(scanner, context, null, ttl, null, tripleRowRegex, conf); + iterator = new RyaStatementBindingSetKeyValueIterator(layout, ryaContext, scanner, rangeMap); + } else { + Scanner scannerBase = null; + Iterator<Map.Entry<Key, Value>>[] iters = new Iterator[ranges.size()]; + int i = 0; + for (Range range : ranges) { + scannerBase = connector.createScanner(table, authorizations); + scannerBase.setRange(range); + fillScanner(scannerBase, context, null, ttl, null, tripleRowRegex, conf); + iters[i] = scannerBase.iterator(); + i++; + } + iterator = new RyaStatementBindingSetKeyValueIterator(layout, Iterators.concat(iters), rangeMap, ryaContext); + } + if (maxResults != null) { + iterator.setMaxResults(maxResults); + } + return iterator; + } catch (Exception e) { + throw new RyaDAOException(e); + } + + } + + @Override + public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(Collection<RyaStatement> stmts, AccumuloRdfConfiguration conf) + throws RyaDAOException { + if (conf == null) { + conf = configuration; + } + + BatchRyaQuery batchRyaQuery = BatchRyaQuery.builder(stmts).load(conf).build(); + CloseableIterable<RyaStatement> results = query(batchRyaQuery); + + return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results); + } + + @Override + public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery) throws RyaDAOException { + Preconditions.checkNotNull(ryaQuery); + RyaStatement stmt = ryaQuery.getQuery(); + Preconditions.checkNotNull(stmt); + + //query configuration + String[] auths = ryaQuery.getAuths(); + Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); + Long ttl = ryaQuery.getTtl(); + Long currentTime = ryaQuery.getCurrentTime(); + Long maxResults = ryaQuery.getMaxResults(); + Integer batchSize = ryaQuery.getBatchSize(); + String regexSubject = ryaQuery.getRegexSubject(); + String regexPredicate = ryaQuery.getRegexPredicate(); + String regexObject = ryaQuery.getRegexObject(); + TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); + + try { + //find triple pattern range + TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt); + TABLE_LAYOUT layout; + Range range; + RyaURI subject = stmt.getSubject(); + RyaURI predicate = stmt.getPredicate(); + RyaType object = stmt.getObject(); + RyaURI context = stmt.getContext(); + String qualifier = stmt.getQualifer(); + TripleRowRegex tripleRowRegex = null; + if (strategy != null) { + //otherwise, full table scan is supported + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = + strategy.defineRange(subject, predicate, object, context, null); + layout = entry.getKey(); + ByteRange byteRange = entry.getValue(); + range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + + } else { + range = new Range(); + layout = TABLE_LAYOUT.SPO; + strategy = ryaContext.retrieveStrategy(layout); + } + + byte[] objectTypeInfo = null; + if (object != null) { + //TODO: Not good to serialize this twice + if (object instanceof RyaRange) { + objectTypeInfo = RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1]; + } else { + objectTypeInfo = RyaContext.getInstance().serializeType(object)[1]; + } + } + + tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, objectTypeInfo); + + //use range to set scanner + //populate scanner based on authorizations, ttl + String table = layoutToTable(layout, tableLayoutStrategy); + Scanner scanner = connector.createScanner(table, authorizations); + scanner.setRange(range); + if (batchSize != null) { + scanner.setBatchSize(batchSize); + } + fillScanner(scanner, context, qualifier, ttl, currentTime, tripleRowRegex, ryaQuery.getConf()); + + FluentCloseableIterable<RyaStatement> results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)) + .transform(keyValueToRyaStatementFunctionMap.get(layout)); + if (maxResults != null) { + results = results.limit(maxResults.intValue()); + } + + return results; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public CloseableIterable<RyaStatement> query(BatchRyaQuery ryaQuery) throws RyaDAOException { + Preconditions.checkNotNull(ryaQuery); + Iterable<RyaStatement> stmts = ryaQuery.getQueries(); + Preconditions.checkNotNull(stmts); + + //query configuration + String[] auths = ryaQuery.getAuths(); + final Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); + final Long ttl = ryaQuery.getTtl(); + Long currentTime = ryaQuery.getCurrentTime(); + Long maxResults = ryaQuery.getMaxResults(); + Integer batchSize = ryaQuery.getBatchSize(); + Integer numQueryThreads = ryaQuery.getNumQueryThreads(); + String regexSubject = ryaQuery.getRegexSubject(); + String regexPredicate = ryaQuery.getRegexPredicate(); + String regexObject = ryaQuery.getRegexObject(); + TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); + int maxRanges = ryaQuery.getMaxRanges(); + + //TODO: cannot span multiple tables here + try { + Collection<Range> ranges = new HashSet<Range>(); + TABLE_LAYOUT layout = null; + RyaURI context = null; + TriplePatternStrategy strategy = null; + for (RyaStatement stmt : stmts) { + context = stmt.getContext(); //TODO: This will be overwritten + strategy = ryaContext.retrieveStrategy(stmt); + if (strategy == null) { + throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); + } + + Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = + strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null); + + //use range to set scanner + //populate scanner based on authorizations, ttl + layout = entry.getKey(); + ByteRange byteRange = entry.getValue(); + Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + ranges.add(range); + } + //no ranges + if (layout == null) throw new IllegalArgumentException("No table layout specified"); + + final TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); + + final String table = layoutToTable(layout, tableLayoutStrategy); + boolean useBatchScanner = ranges.size() > maxRanges; + FluentCloseableIterable<RyaStatement> results = null; + if (useBatchScanner) { + BatchScanner scanner = connector.createBatchScanner(table, authorizations, numQueryThreads); + scanner.setRanges(ranges); + fillScanner(scanner, context, null, ttl, null, tripleRowRegex, ryaQuery.getConf()); + results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout)); + } else { + final RyaURI fcontext = context; + final RdfCloudTripleStoreConfiguration fconf = ryaQuery.getConf(); + FluentIterable<RyaStatement> fluent = FluentIterable.from(ranges).transformAndConcat(new Function<Range, Iterable<Map.Entry<Key, Value>>>() { + @Override + public Iterable<Map.Entry<Key, Value>> apply(Range range) { + try { + Scanner scanner = connector.createScanner(table, authorizations); + scanner.setRange(range); + fillScanner(scanner, fcontext, null, ttl, null, tripleRowRegex, fconf); + return scanner; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }).transform(keyValueToRyaStatementFunctionMap.get(layout)); + + results = FluentCloseableIterable.from(CloseableIterables.wrap(fluent)); + } + if (maxResults != null) { + results = results.limit(maxResults.intValue()); + } + return results; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + protected void fillScanner(ScannerBase scanner, RyaURI context, String qualifier, Long ttl, Long currentTime, TripleRowRegex tripleRowRegex, RdfCloudTripleStoreConfiguration conf) throws IOException { + if (context != null && qualifier != null) { + scanner.fetchColumn(new Text(context.getData()), new Text(qualifier)); + } else if (context != null) { + scanner.fetchColumnFamily(new Text(context.getData())); + } else if (qualifier != null) { + IteratorSetting setting = new IteratorSetting(8, "riq", RegExFilter.class.getName()); + RegExFilter.setRegexs(setting, null, null, qualifier, null, false); + scanner.addScanIterator(setting); + } + if (ttl != null) { + IteratorSetting setting = new IteratorSetting(9, "fi", TimestampFilter.class.getName()); + TimestampFilter.setStart(setting, System.currentTimeMillis() - ttl, true); + if(currentTime != null){ + TimestampFilter.setStart(setting, currentTime - ttl, true); + TimestampFilter.setEnd(setting, currentTime, true); + } + scanner.addScanIterator(setting); + } + if (tripleRowRegex != null) { + IteratorSetting setting = new IteratorSetting(11, "ri", RegExFilter.class.getName()); + String regex = tripleRowRegex.getRow(); + RegExFilter.setRegexs(setting, regex, null, null, null, false); + scanner.addScanIterator(setting); + } + if (conf instanceof AccumuloRdfConfiguration) { + //TODO should we take the iterator settings as is or should we adjust the priority based on the above? + for (IteratorSetting itr : ((AccumuloRdfConfiguration)conf).getAdditionalIterators()) { + scanner.addScanIterator(itr); + } + } + } + + @Override + public void setConf(AccumuloRdfConfiguration conf) { + this.configuration = conf; + } + + @Override + public AccumuloRdfConfiguration getConf() { + return configuration; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/KeyValueToRyaStatementFunction.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/KeyValueToRyaStatementFunction.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/KeyValueToRyaStatementFunction.java new file mode 100644 index 0000000..2813438 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/KeyValueToRyaStatementFunction.java @@ -0,0 +1,72 @@ +package mvm.rya.accumulo.query; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.util.Map; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; + +import com.google.common.base.Function; + +/** + * Date: 1/30/13 + * Time: 2:09 PM + */ +public class KeyValueToRyaStatementFunction implements Function<Map.Entry<Key, Value>, RyaStatement> { + + private TABLE_LAYOUT tableLayout; + private RyaTripleContext context; + + public KeyValueToRyaStatementFunction(TABLE_LAYOUT tableLayout, RyaTripleContext context) { + this.tableLayout = tableLayout; + this.context = context; + } + + @Override + public RyaStatement apply(Map.Entry<Key, Value> input) { + Key key = input.getKey(); + Value value = input.getValue(); + RyaStatement statement = null; + try { + statement = context.deserializeTriple(tableLayout, + new TripleRow(key.getRowData().toArray(), + key.getColumnFamilyData().toArray(), + key.getColumnQualifierData().toArray(), + key.getTimestamp(), + key.getColumnVisibilityData().toArray(), + (value != null) ? value.get() : null + )); + } catch (TripleRowResolverException e) { + throw new RuntimeException(e); + } + + return statement; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java new file mode 100644 index 0000000..c59cb87 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RangeBindingSetEntries.java @@ -0,0 +1,58 @@ +package mvm.rya.accumulo.query; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.openrdf.query.BindingSet; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +/** + * Class RangeBindingSetCollection + * Date: Feb 23, 2011 + * Time: 10:15:48 AM + */ +public class RangeBindingSetEntries { + public Collection<Map.Entry<Range, BindingSet>> ranges; + + public RangeBindingSetEntries() { + this(new ArrayList<Map.Entry<Range, BindingSet>>()); + } + + public RangeBindingSetEntries(Collection<Map.Entry<Range, BindingSet>> ranges) { + this.ranges = ranges; + } + + public Collection<BindingSet> containsKey(Key key) { + //TODO: need to find a better way to sort these and pull + //TODO: maybe fork/join here + Collection<BindingSet> bss = new ArrayList<BindingSet>(); + for (Map.Entry<Range, BindingSet> entry : ranges) { + if (entry.getKey().contains(key)) + bss.add(entry.getValue()); + } + return bss; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java new file mode 100644 index 0000000..b4333bd --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java @@ -0,0 +1,154 @@ +package mvm.rya.accumulo.query; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import info.aduna.iteration.CloseableIteration; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; + +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.openrdf.query.BindingSet; + +/** + * Date: 7/17/12 + * Time: 11:48 AM + */ +public class RyaStatementBindingSetKeyValueIterator implements CloseableIteration<Map.Entry<RyaStatement, BindingSet>, RyaDAOException> { + private Iterator<Map.Entry<Key, Value>> dataIterator; + private TABLE_LAYOUT tableLayout; + private Long maxResults = -1L; + private ScannerBase scanner; + private boolean isBatchScanner; + private RangeBindingSetEntries rangeMap; + private Iterator<BindingSet> bsIter; + private RyaStatement statement; + private RyaTripleContext ryaContext; + + public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, RyaTripleContext context, ScannerBase scannerBase, RangeBindingSetEntries rangeMap) { + this(tableLayout, ((scannerBase instanceof BatchScanner) ? ((BatchScanner) scannerBase).iterator() : ((Scanner) scannerBase).iterator()), rangeMap, context); + this.scanner = scannerBase; + isBatchScanner = scanner instanceof BatchScanner; + } + + public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, Iterator<Map.Entry<Key, Value>> dataIterator, RangeBindingSetEntries rangeMap, RyaTripleContext ryaContext) { + this.tableLayout = tableLayout; + this.rangeMap = rangeMap; + this.dataIterator = dataIterator; + this.ryaContext = ryaContext; + } + + @Override + public void close() throws RyaDAOException { + dataIterator = null; + if (scanner != null && isBatchScanner) { + ((BatchScanner) scanner).close(); + } + } + + public boolean isClosed() throws RyaDAOException { + return dataIterator == null; + } + + @Override + public boolean hasNext() throws RyaDAOException { + if (isClosed()) { + return false; + } + if (maxResults != 0) { + if (bsIter != null && bsIter.hasNext()) { + return true; + } + if (dataIterator.hasNext()) { + return true; + } else { + maxResults = 0l; + return false; + } + } + return false; + } + + @Override + public Map.Entry<RyaStatement, BindingSet> next() throws RyaDAOException { + if (!hasNext() || isClosed()) { + throw new NoSuchElementException(); + } + + try { + while (true) { + if (bsIter != null && bsIter.hasNext()) { + maxResults--; + return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(statement, bsIter.next()); + } + + if (dataIterator.hasNext()) { + Map.Entry<Key, Value> next = dataIterator.next(); + Key key = next.getKey(); + statement = ryaContext.deserializeTriple(tableLayout, + new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), + key.getTimestamp(), key.getColumnVisibilityData().toArray(), next.getValue().get())); + if (next.getValue() != null) { + statement.setValue(next.getValue().get()); + } + Collection<BindingSet> bindingSets = rangeMap.containsKey(key); + if (!bindingSets.isEmpty()) { + bsIter = bindingSets.iterator(); + } + } else { + break; + } + } + return null; + } catch (TripleRowResolverException e) { + throw new RyaDAOException(e); + } + } + + @Override + public void remove() throws RyaDAOException { + next(); + } + + public Long getMaxResults() { + return maxResults; + } + + public void setMaxResults(Long maxResults) { + this.maxResults = maxResults; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RyaStatementKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RyaStatementKeyValueIterator.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RyaStatementKeyValueIterator.java new file mode 100644 index 0000000..f4c3081 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/RyaStatementKeyValueIterator.java @@ -0,0 +1,107 @@ +package mvm.rya.accumulo.query; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import info.aduna.iteration.CloseableIteration; + +import java.util.Iterator; +import java.util.Map; + +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; + +/** + * Date: 7/17/12 + * Time: 11:48 AM + */ +public class RyaStatementKeyValueIterator implements CloseableIteration<RyaStatement, RyaDAOException> { + private Iterator<Map.Entry<Key, Value>> dataIterator; + private TABLE_LAYOUT tableLayout; + private Long maxResults = -1L; + private RyaTripleContext context; + + public RyaStatementKeyValueIterator(TABLE_LAYOUT tableLayout, RyaTripleContext context, Iterator<Map.Entry<Key, Value>> dataIterator) { + this.tableLayout = tableLayout; + this.dataIterator = dataIterator; + this.context = context; + } + + @Override + public void close() throws RyaDAOException { + dataIterator = null; + } + + public boolean isClosed() throws RyaDAOException { + return dataIterator == null; + } + + @Override + public boolean hasNext() throws RyaDAOException { + if (isClosed()) { + throw new RyaDAOException("Closed Iterator"); + } + return maxResults != 0 && dataIterator.hasNext(); + } + + @Override + public RyaStatement next() throws RyaDAOException { + if (!hasNext()) { + return null; + } + + try { + Map.Entry<Key, Value> next = dataIterator.next(); + Key key = next.getKey(); + RyaStatement statement = context.deserializeTriple(tableLayout, + new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), + key.getTimestamp(), key.getColumnVisibilityData().toArray(), next.getValue().get())); + if (next.getValue() != null) { + statement.setValue(next.getValue().get()); + } + maxResults--; + return statement; + } catch (TripleRowResolverException e) { + throw new RyaDAOException(e); + } + } + + @Override + public void remove() throws RyaDAOException { + next(); + } + + public Long getMaxResults() { + return maxResults; + } + + public void setMaxResults(Long maxResults) { + this.maxResults = maxResults; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/ScannerBaseCloseableIterable.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/ScannerBaseCloseableIterable.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/ScannerBaseCloseableIterable.java new file mode 100644 index 0000000..d2dcef9 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/query/ScannerBaseCloseableIterable.java @@ -0,0 +1,56 @@ +package mvm.rya.accumulo.query; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import com.google.common.base.Preconditions; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.calrissian.mango.collect.AbstractCloseableIterable; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * Date: 1/30/13 + * Time: 2:15 PM + */ +public class ScannerBaseCloseableIterable extends AbstractCloseableIterable<Map.Entry<Key, Value>> { + + protected ScannerBase scanner; + + public ScannerBaseCloseableIterable(ScannerBase scanner) { + Preconditions.checkNotNull(scanner); + this.scanner = scanner; + } + + @Override + protected void doClose() throws IOException { + scanner.close(); + } + + @Override + protected Iterator<Map.Entry<Key, Value>> retrieveIterator() { + return scanner.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/TimeRangeFilter.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/TimeRangeFilter.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/TimeRangeFilter.java new file mode 100644 index 0000000..97d2f54 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/TimeRangeFilter.java @@ -0,0 +1,87 @@ +package mvm.rya.accumulo.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.OptionDescriber; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +/** + * Set the startTime and timeRange. The filter will only keyValues that + * are within the range [startTime - timeRange, startTime]. + */ +public class TimeRangeFilter extends Filter { + private long timeRange; + private long startTime; + public static final String TIME_RANGE_PROP = "timeRange"; + public static final String START_TIME_PROP = "startTime"; + + @Override + public boolean accept(Key k, Value v) { + long diff = startTime - k.getTimestamp(); + return !(diff > timeRange || diff < 0); + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + if (options == null) { + throw new IllegalArgumentException("options must be set for TimeRangeFilter"); + } + + timeRange = -1; + String timeRange_s = options.get(TIME_RANGE_PROP); + if (timeRange_s == null) + throw new IllegalArgumentException("timeRange must be set for TimeRangeFilter"); + + timeRange = Long.parseLong(timeRange_s); + + String time = options.get(START_TIME_PROP); + if (time != null) + startTime = Long.parseLong(time); + else + startTime = System.currentTimeMillis(); + } + + @Override + public OptionDescriber.IteratorOptions describeOptions() { + Map<String, String> options = new TreeMap<String, String>(); + options.put(TIME_RANGE_PROP, "time range from the startTime (milliseconds)"); + options.put(START_TIME_PROP, "if set, use the given value as the absolute time in milliseconds as the start time in the time range."); + return new OptionDescriber.IteratorOptions("timeRangeFilter", "TimeRangeFilter removes entries with timestamps outside of the given time range: " + + "[startTime - timeRange, startTime]", + options, null); + } + + @Override + public boolean validateOptions(Map<String, String> options) { + Long.parseLong(options.get(TIME_RANGE_PROP)); + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java new file mode 100644 index 0000000..cc4edca --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.accumulo.utils; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.security.ColumnVisibility; + +import com.google.common.base.Charsets; + +/** + * Simplifies Accumulo visibility expressions. + */ +@ParametersAreNonnullByDefault +public class VisibilitySimplifier { + + /** + * Simplifies an Accumulo visibility expression. + * + * @param visibility - The expression to simplify. (not null) + * @return A simplified form of {@code visibility}. + */ + public String simplify(final String visibility) { + requireNonNull(visibility); + + String last = visibility; + String simplified = new String(new ColumnVisibility(visibility).flatten(), Charsets.UTF_8); + + while(!simplified.equals(last)) { + last = simplified; + simplified = new String(new ColumnVisibility(simplified).flatten(), Charsets.UTF_8); + } + + return simplified; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java deleted file mode 100644 index 7dd23e6..0000000 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.accumulo; - -import java.io.IOException; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.minicluster.MiniAccumuloCluster; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.zookeeper.ClientCnxn; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; - -/** - * Boilerplate code for a unit test that uses a {@link MiniAccumuloCluster}. - * <p> - * It uses the same instance of {@link MiniAccumuloCluster} and just clears out - * any tables that were added between tests. - */ -public class AccumuloITBase { - - // Managed the MiniAccumuloCluster - private MiniAccumuloClusterInstance cluster = null; - - @BeforeClass - public static void killLoudLogs() { - Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); - } - - @Before - public void initCluster() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { - cluster = new MiniAccumuloClusterInstance(); - cluster.startMiniAccumulo(); - } - - - @After - public void tearDownCluster() throws IOException, InterruptedException { - cluster.stopMiniAccumulo(); - } - - /** - * @return The {@link MiniAccumuloClusterInstance} used by the tests. - */ - public MiniAccumuloClusterInstance getClusterInstance() { - return cluster; - } - - /** - * @return The root username. - */ - public String getUsername() { - return cluster.getUsername(); - } - - /** - * @return The root password. - */ - public String getPassword() { - return cluster.getPassword(); - } - - /** - * @return The MiniAccumulo's zookeeper instance name. - */ - public String getInstanceName() { - return cluster.getInstanceName(); - } - - /** - * @return The MiniAccumulo's zookeepers. - */ - public String getZookeepers() { - return cluster.getZookeepers(); - } - - /** - * @return A {@link Connector} that creates connections to the mini accumulo cluster. - * @throws AccumuloException Could not connect to the cluster. - * @throws AccumuloSecurityException Could not connect to the cluster because of a security violation. - */ - public Connector getConnector() throws AccumuloException, AccumuloSecurityException { - return cluster.getConnector(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java deleted file mode 100644 index ffd316e..0000000 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java +++ /dev/null @@ -1,75 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.security.Authorizations; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AccumuloRdfConfigurationTest { - private static final Logger logger = LoggerFactory.getLogger(AccumuloRdfConfigurationTest.class); - - @Test - public void testAuths() { - String[] arr = {"U", "FOUO"}; - String str = "U,FOUO"; - Authorizations auths = new Authorizations(arr); - - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - - conf.setAuths(arr); - assertTrue(Arrays.equals(arr, conf.getAuths())); - assertEquals(str, conf.getAuth()); - assertEquals(auths, conf.getAuthorizations()); - - conf.setAuth(str); - assertTrue(Arrays.equals(arr, conf.getAuths())); - assertEquals(str, conf.getAuth()); - assertEquals(auths, conf.getAuthorizations()); - } - - @Test - public void testIterators() { - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - - Map<String, String> options = new HashMap<String, String>(); - options.put("key1", "value1"); - options.put("key2", "value2"); - IteratorSetting setting = new IteratorSetting(1, "test", "test2", options); - - conf.setAdditionalIterators(setting); - IteratorSetting[] iteratorSettings = conf.getAdditionalIterators(); - assertTrue(iteratorSettings.length == 1); - - assertEquals(setting, iteratorSettings[0]); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java deleted file mode 100644 index 8b55070..0000000 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java +++ /dev/null @@ -1,713 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import info.aduna.iteration.CloseableIteration; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; - -import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.query.RyaQuery; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaContext; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.iterators.FirstEntryInRowIterator; -import org.calrissian.mango.collect.FluentCloseableIterable; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.XMLSchema; - -/** - * Class AccumuloRdfDAOTest - * Date: Mar 7, 2012 - * Time: 9:42:28 AM - */ -public class AccumuloRyaDAOTest { - - private AccumuloRyaDAO dao; - private ValueFactory vf = new ValueFactoryImpl(); - static String litdupsNS = "urn:test:litdups#"; - private AccumuloRdfConfiguration conf; - private Connector connector; - - @Before - public void setUp() throws Exception { - dao = new AccumuloRyaDAO(); - connector = new MockInstance().getConnector("", ""); - dao.setConnector(connector); - conf = new AccumuloRdfConfiguration(); - dao.setConf(conf); - dao.init(); - } - - @After - public void tearDown() throws Exception { - dao.purge(conf); - dao.destroy(); - } - - @Test - public void testAdd() throws Exception { - RyaURI cpu = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "cpu")); - RyaURI loadPerc = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "loadPerc")); - RyaURI uri1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "uri1")); - dao.add(new RyaStatement(cpu, loadPerc, uri1)); - - CloseableIteration<RyaStatement, RyaDAOException> iter = dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, null), conf); - int count = 0; - while (iter.hasNext()) { - assertTrue(uri1.equals(iter.next().getObject())); - count++; - } - iter.close(); - assertEquals(1, count); - - dao.delete(new RyaStatement(cpu, loadPerc, null), conf); - - iter = dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, null), conf); - count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals(0, count); - } - - @Test - public void testDeleteDiffVisibility() throws Exception { - RyaURI cpu = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "cpu")); - RyaURI loadPerc = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "loadPerc")); - RyaURI uri1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "uri1")); - RyaStatement stmt1 = new RyaStatement(cpu, loadPerc, uri1, null, "1", "vis1".getBytes()); - dao.add(stmt1); - RyaStatement stmt2 = new RyaStatement(cpu, loadPerc, uri1, null, "2", "vis2".getBytes()); - dao.add(stmt2); - - AccumuloRdfConfiguration cloneConf = conf.clone(); - cloneConf.setAuth("vis1,vis2"); - - CloseableIteration<RyaStatement, RyaDAOException> iter = dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, null), cloneConf); - int count = 0; - while (iter.hasNext()) { - iter.next(); - count++; - } - iter.close(); - assertEquals(2, count); - - dao.delete(stmt1, cloneConf); - - iter = dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, null), cloneConf); - count = 0; - while (iter.hasNext()) { - iter.next(); - count++; - } - iter.close(); - assertEquals(1, count); - } - - @Test - public void testDeleteDiffTimestamp() throws Exception { - RyaURI cpu = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "cpu")); - RyaURI loadPerc = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "loadPerc")); - RyaURI uri1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "uri1")); - RyaStatement stmt1 = new RyaStatement(cpu, loadPerc, uri1, null, "1", null, null, 100l); - dao.add(stmt1); - RyaStatement stmt2 = new RyaStatement(cpu, loadPerc, uri1, null, "2", null, null, 100l); - dao.add(stmt2); - - int resultSize = FluentCloseableIterable.from(dao.getQueryEngine().query( - RyaQuery.builder(new RyaStatement(cpu, loadPerc, null)).build())).autoClose().size(); - assertEquals(2, resultSize); - - final RyaStatement addStmt = new RyaStatement(cpu, loadPerc, uri1, null, "1", - null, null, 101l); - dao.delete(stmt1, conf); - dao.add(addStmt); - - resultSize = FluentCloseableIterable.from(dao.getQueryEngine().query( - RyaQuery.builder(new RyaStatement(cpu, loadPerc, null)).build())).autoClose().size(); - assertEquals(2, resultSize); //the delete marker should not delete the new stmt - } - - @Test - public void testDelete() throws Exception { - RyaURI predicate = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "pred")); - RyaURI subj = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "subj")); - - // create a "bulk load" of 10,000 statements - int statement_count = 10000; - for (int i = 0 ; i < statement_count ; i++){ - //make the statement very large so we will get a lot of random flushes - RyaURI obj = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, String.format("object%050d",i))); - RyaStatement stmt = new RyaStatement(subj, predicate, obj); - dao.add(stmt); - } - - CloseableIteration<RyaStatement, RyaDAOException> iter; - - //check to see if all of the statements made it to the subj table - //delete based on the data in the subj table - RyaStatement subjQuery = new RyaStatement(subj, null, null); - iter = dao.getQueryEngine().query(subjQuery, conf); - List<RyaStatement> stmts = new ArrayList<RyaStatement>(); - while (iter.hasNext()) { - stmts.add(iter.next()); - } - assertEquals(statement_count, stmts.size()); - dao.delete(stmts.iterator(), conf); - - // check statements in the predicate table - RyaStatement predQuery = new RyaStatement(null, predicate, null); - iter = dao.getQueryEngine().query(predQuery, conf); - int count = 0; - while (iter.hasNext()) { - count++; - } - iter.close(); - assertEquals(0, count); - } - - @Test - public void testAddEmptyString() throws Exception { - RyaURI cpu = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "cpu")); - RyaURI loadPerc = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "loadPerc")); - RyaType empty = new RyaType(""); - dao.add(new RyaStatement(cpu, loadPerc, empty)); - - CloseableIteration<RyaStatement, RyaDAOException> iter = dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, null), conf); - while (iter.hasNext()) { - assertEquals("", iter.next().getObject().getData()); - } - iter.close(); - } - - @Test - public void testMaxResults() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri1"))); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri2"))); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri3"))); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri4"))); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri5"))); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(conf); - long limit = 3l; - queryConf.setLimit(limit); - - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); - int count = 0; - while (iter.hasNext()) { - iter.next().getObject(); - count++; - } - iter.close(); - assertEquals(limit, count); - } - - @Test - public void testAddValue() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); - String myval = "myval"; - dao.add(new RyaStatement(cpu, loadPerc, uri1, null, null, null, myval.getBytes())); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf); - assertTrue(iter.hasNext()); - assertEquals(myval, new String(iter.next().getValue())); - iter.close(); - } - - @Test - public void testAddCv() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); - RyaURI uri2 = new RyaURI(litdupsNS + "uri2"); - RyaURI uri3 = new RyaURI(litdupsNS + "uri3"); - byte[] colVisABC = "A|B|C".getBytes(); - byte[] colVisAB = "A|B".getBytes(); - byte[] colVisA = "A".getBytes(); - dao.add(new RyaStatement(cpu, loadPerc, uri1, null, null, colVisABC)); - dao.add(new RyaStatement(cpu, loadPerc, uri2, null, null, colVisAB)); - dao.add(new RyaStatement(cpu, loadPerc, uri3, null, null, colVisA)); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - - //query with no auth - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf); - int count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - assertEquals(0, count); - iter.close(); - - AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(); - queryConf.setAuth("B"); - iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); - count = 0; - while (iter.hasNext()) { - iter.next(); - count++; - } - iter.close(); - assertEquals(2, count); - - queryConf.setAuth("A"); - iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); - count = 0; - while (iter.hasNext()) { - iter.next(); - count++; - } - iter.close(); - assertEquals(3, count); - } - - @Test - public void testTTL() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - long current = System.currentTimeMillis(); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri1"), null, null, null, null, current)); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri2"), null, null, null, null, current - 1010l)); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri3"), null, null, null, null, current - 2010l)); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri4"), null, null, null, null, current - 3010l)); - dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + "uri5"), null, null, null, null, current - 4010l)); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - AccumuloRdfConfiguration queryConf = conf.clone(); - queryConf.setTtl(3000l); - - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); - int count = 0; - while (iter.hasNext()) { - iter.next().getObject(); - count++; - } - iter.close(); - assertEquals(3, count); - - queryConf.setStartTime(current - 3000l); - iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf); - count = 0; - while (iter.hasNext()) { - iter.next().getObject(); - count++; - } - iter.close(); - assertEquals(2, count); - } - - @Test - public void testGetNamespace() throws Exception { - dao.addNamespace("ns", litdupsNS); - assertEquals(litdupsNS, dao.getNamespace("ns")); - dao.removeNamespace("ns"); - assertNull(dao.getNamespace("ns")); - } - - //TOOD: Add test for set of queries - @Test - public void testQuery() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); - RyaURI uri2 = new RyaURI(litdupsNS + "uri2"); - RyaURI uri3 = new RyaURI(litdupsNS + "uri3"); - RyaURI uri4 = new RyaURI(litdupsNS + "uri4"); - RyaURI uri5 = new RyaURI(litdupsNS + "uri5"); - RyaURI uri6 = new RyaURI(litdupsNS + "uri6"); - dao.add(new RyaStatement(cpu, loadPerc, uri1)); - dao.add(new RyaStatement(cpu, loadPerc, uri2)); - dao.add(new RyaStatement(cpu, loadPerc, uri3)); - dao.add(new RyaStatement(cpu, loadPerc, uri4)); - dao.add(new RyaStatement(cpu, loadPerc, uri5)); - dao.add(new RyaStatement(cpu, loadPerc, uri6)); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - - Collection<RyaStatement> coll = new ArrayList(); - coll.add(new RyaStatement(null, loadPerc, uri1)); - coll.add(new RyaStatement(null, loadPerc, uri2)); - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.batchQuery(coll, conf); - int count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals(2, count); - - //now use batchscanner - AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(conf); - queryConf.setMaxRangesForScanner(2); - - coll = new ArrayList(); - coll.add(new RyaStatement(null, loadPerc, uri1)); - coll.add(new RyaStatement(null, loadPerc, uri2)); - coll.add(new RyaStatement(null, loadPerc, uri3)); - coll.add(new RyaStatement(null, loadPerc, uri4)); - iter = queryEngine.batchQuery(coll, queryConf); - assertTrue(iter.hasNext()); //old code had a weird behaviour that could not perform hasNext consecutively - assertTrue(iter.hasNext()); - assertTrue(iter.hasNext()); - count = 0; - while (iter.hasNext()) { - count++; - assertTrue(iter.hasNext()); - iter.next(); - } - iter.close(); - assertEquals(4, count); - } - - @Test - public void testQueryDates() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaType uri0 = new RyaType(XMLSchema.DATETIME, "1960-01-01"); // How handles local time - RyaType uri1 = new RyaType(XMLSchema.DATETIME, "1992-01-01T+10:00"); // See Magadan Time - RyaType uri2 = new RyaType(XMLSchema.DATETIME, "2000-01-01TZ"); // How it handles UTC. - RyaType uri3 = new RyaType(XMLSchema.DATETIME, "2000-01-01T00:00:01.111Z"); - RyaType uri4 = new RyaType(XMLSchema.DATETIME, "2000-01-01T00:00:01.111Z"); // duplicate - RyaType uri5 = new RyaType(XMLSchema.DATETIME, "2000-01-01T00:00:01-00:00"); - RyaType uri6 = new RyaType(XMLSchema.DATETIME, "2000-01-01T00:00:01Z"); // duplicate - RyaType uri7 = new RyaType(XMLSchema.DATETIME, "-2000-01-01T00:00:01Z"); - RyaType uri8 = new RyaType(XMLSchema.DATETIME, "111-01-01T00:00:01Z"); - RyaType uri9 = new RyaType(XMLSchema.DATETIME, "12345-01-01T00:00:01Z"); - - dao.add(new RyaStatement(cpu, loadPerc, uri0)); - dao.add(new RyaStatement(cpu, loadPerc, uri1)); - dao.add(new RyaStatement(cpu, loadPerc, uri2)); - dao.add(new RyaStatement(cpu, loadPerc, uri3)); - dao.add(new RyaStatement(cpu, loadPerc, uri4)); - dao.add(new RyaStatement(cpu, loadPerc, uri5)); - dao.add(new RyaStatement(cpu, loadPerc, uri6)); - dao.add(new RyaStatement(cpu, loadPerc, uri7)); - dao.add(new RyaStatement(cpu, loadPerc, uri8)); - dao.add(new RyaStatement(cpu, loadPerc, uri9)); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - - Collection<RyaStatement> coll = new ArrayList(); - coll.add(new RyaStatement(null, loadPerc, uri0)); - coll.add(new RyaStatement(null, loadPerc, uri1)); - coll.add(new RyaStatement(null, loadPerc, uri2)); - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.batchQuery(coll, conf); - int count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals("Three time zones should be normalized when stored, then normalized same when queried.",3, count); - - //now use batchscanner - AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(conf); - queryConf.setMaxRangesForScanner(2); - - coll = new ArrayList(); - coll.add(new RyaStatement(null, loadPerc, uri0)); - coll.add(new RyaStatement(null, loadPerc, uri1)); - coll.add(new RyaStatement(null, loadPerc, uri2)); - coll.add(new RyaStatement(null, loadPerc, uri3)); - coll.add(new RyaStatement(null, loadPerc, uri4)); - coll.add(new RyaStatement(null, loadPerc, uri5)); - coll.add(new RyaStatement(null, loadPerc, uri6)); - coll.add(new RyaStatement(null, loadPerc, uri7)); - coll.add(new RyaStatement(null, loadPerc, uri8)); - coll.add(new RyaStatement(null, loadPerc, uri9)); - iter = queryEngine.batchQuery(coll, queryConf); - count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals("Variety of time specs, including BC, pre-1970, duplicate pair ovewrite,future, 3 digit year.",8, count); - } - - @Test - public void testQueryCollectionRegex() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); - RyaURI uri2 = new RyaURI(litdupsNS + "uri2"); - RyaURI uri3 = new RyaURI(litdupsNS + "uri3"); - RyaURI uri4 = new RyaURI(litdupsNS + "uri4"); - RyaURI uri5 = new RyaURI(litdupsNS + "uri5"); - RyaURI uri6 = new RyaURI(litdupsNS + "uri6"); - dao.add(new RyaStatement(cpu, loadPerc, uri1)); - dao.add(new RyaStatement(cpu, loadPerc, uri2)); - dao.add(new RyaStatement(cpu, loadPerc, uri3)); - dao.add(new RyaStatement(cpu, loadPerc, uri4)); - dao.add(new RyaStatement(cpu, loadPerc, uri5)); - dao.add(new RyaStatement(cpu, loadPerc, uri6)); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - - Collection<RyaStatement> coll = new ArrayList(); - coll.add(new RyaStatement(null, loadPerc, uri1)); - coll.add(new RyaStatement(null, loadPerc, uri2)); - conf.setRegexPredicate(loadPerc.getData()); - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.batchQuery(coll, conf); - int count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals(2, count); - - conf.setRegexPredicate("notLoadPerc"); - iter = queryEngine.batchQuery(coll, conf); - count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals(0, count); - } - - @Test - public void testQueryCollectionRegexWBatchScanner() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); - RyaURI uri2 = new RyaURI(litdupsNS + "uri2"); - RyaURI uri3 = new RyaURI(litdupsNS + "uri3"); - RyaURI uri4 = new RyaURI(litdupsNS + "uri4"); - RyaURI uri5 = new RyaURI(litdupsNS + "uri5"); - RyaURI uri6 = new RyaURI(litdupsNS + "uri6"); - dao.add(new RyaStatement(cpu, loadPerc, uri1)); - dao.add(new RyaStatement(cpu, loadPerc, uri2)); - dao.add(new RyaStatement(cpu, loadPerc, uri3)); - dao.add(new RyaStatement(cpu, loadPerc, uri4)); - dao.add(new RyaStatement(cpu, loadPerc, uri5)); - dao.add(new RyaStatement(cpu, loadPerc, uri6)); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(conf); - queryConf.setMaxRangesForScanner(1); - - Collection<RyaStatement> coll = new ArrayList(); - coll.add(new RyaStatement(null, loadPerc, uri1)); - coll.add(new RyaStatement(null, loadPerc, uri2)); - conf.setRegexPredicate(loadPerc.getData()); - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.batchQuery(coll, queryConf); - int count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals(2, count); - - queryConf.setRegexPredicate("notLoadPerc"); - iter = queryEngine.batchQuery(coll, queryConf); - count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals(0, count); - } - - @Test - public void testLiteralTypes() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaType longLit = new RyaType(XMLSchema.LONG, "3"); - - dao.add(new RyaStatement(cpu, loadPerc, longLit)); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - - CloseableIteration<RyaStatement, RyaDAOException> query = queryEngine.query(new RyaStatement(cpu, null, null), conf); - assertTrue(query.hasNext()); - RyaStatement next = query.next(); - assertEquals(new Long(longLit.getData()), new Long(next.getObject().getData())); - query.close(); - - RyaType doubleLit = new RyaType(XMLSchema.DOUBLE, "2.0"); - - dao.add(new RyaStatement(cpu, loadPerc, doubleLit)); - - query = queryEngine.query(new RyaStatement(cpu, loadPerc, doubleLit), conf); - assertTrue(query.hasNext()); - next = query.next(); - assertEquals(Double.parseDouble(doubleLit.getData()), Double.parseDouble(next.getObject().getData()), 0.001); - query.close(); - } - - @Test - public void testSameLiteralStringTypes() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaType longLit = new RyaType(XMLSchema.LONG, "10"); - RyaType strLit = new RyaType(XMLSchema.STRING, new String(RyaContext.getInstance().serializeType(longLit)[0])); - - RyaStatement expected = new RyaStatement(cpu, loadPerc, longLit); - dao.add(expected); - dao.add(new RyaStatement(cpu, loadPerc, strLit)); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - - CloseableIteration<RyaStatement, RyaDAOException> query = queryEngine.query(new RyaStatement(cpu, loadPerc, longLit), conf); - assertTrue(query.hasNext()); - RyaStatement next = query.next(); - assertEquals(new Long(longLit.getData()), new Long(next.getObject().getData())); - assertEquals(longLit.getDataType(), next.getObject().getDataType()); - assertFalse(query.hasNext()); - query.close(); - } - - @Test - public void testPurge() throws RyaDAOException, TableNotFoundException { - dao.add(newRyaStatement()); - assertFalse("table should not be empty", areTablesEmpty()); - - dao.purge(conf); - assertTrue("table should be empty", areTablesEmpty()); - //assertNotNull(dao.getVersion()); - } - - @Test - public void testPurgeDoesNotBreakBatchWriters() throws TableNotFoundException, RyaDAOException { - dao.purge(conf); - assertTrue("table should be empty", areTablesEmpty()); - - dao.add(newRyaStatement()); - assertFalse("table should not be empty", areTablesEmpty()); - } - - @Test - public void testDropAndDestroy() throws RyaDAOException { - assertTrue(dao.isInitialized()); - dao.dropAndDestroy(); - for (String tableName : dao.getTables()) { - assertFalse(tableExists(tableName)); - } - assertFalse(dao.isInitialized()); - } - - @Test - public void testQueryWithIterators() throws Exception { - RyaURI cpu = new RyaURI(litdupsNS + "cpu"); - RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); - RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); - dao.add(new RyaStatement(cpu, loadPerc, uri1, null, "qual1")); - dao.add(new RyaStatement(cpu, loadPerc, uri1, null, "qual2")); - - AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); - - AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(conf); - IteratorSetting firstEntryInRow = new IteratorSetting(3 /* correct value?? */, FirstEntryInRowIterator.class); - queryConf.setAdditionalIterators(firstEntryInRow); - - Collection<RyaStatement> coll = new ArrayList<>(); - coll.add(new RyaStatement(null, loadPerc, uri1)); - CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.batchQuery(coll, queryConf); - int count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals(1, count); - - //Assert that without the iterator we get 2 - coll = new ArrayList<>(); - coll.add(new RyaStatement(null, loadPerc, uri1)); - iter = queryEngine.batchQuery(coll, conf); - count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - iter.close(); - assertEquals(2, count); - - } - - private boolean areTablesEmpty() throws TableNotFoundException { - for (String table : dao.getTables()) { - if (tableExists(table)) { - // TODO: filter out version - if (createScanner(table).iterator().hasNext()) { - return false; - } - } - } - return true; - } - - private boolean tableExists(String tableName) { - return dao.getConnector().tableOperations().exists(tableName); - } - - private Scanner createScanner(String tableName) throws TableNotFoundException { - return dao.getConnector().createScanner(tableName, conf.getAuthorizations()); - } - - private RyaStatement newRyaStatement() { - RyaURI subject = new RyaURI(litdupsNS + randomString()); - RyaURI predicate = new RyaURI(litdupsNS + randomString()); - RyaType object = new RyaType(randomString()); - - return new RyaStatement(subject, predicate, object); - } - - private String randomString() { - return UUID.randomUUID().toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaITBase.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaITBase.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaITBase.java deleted file mode 100644 index dea227b..0000000 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaITBase.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.accumulo; - -import java.io.IOException; -import java.util.Date; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import com.google.common.base.Optional; - -import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; -import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; -import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; -import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; -import mvm.rya.api.instance.RyaDetails.ProspectorDetails; -import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; -import mvm.rya.api.instance.RyaDetailsRepository; -import mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException; -import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; - -/** - * Contains boilerplate code for spinning up a Mini Accumulo Cluster and initializing - * some of the Rya stuff. We can not actually initialize an instance of Rya here - * because Sail is not available to us. - */ -public class AccumuloRyaITBase { - - // Managed the MiniAccumuloCluster - private static final MiniAccumuloClusterInstance cluster = new MiniAccumuloClusterInstance(); - - // Manage the Rya instances that are hosted on the cluster - protected static final AtomicInteger ryaInstanceNameCounter = new AtomicInteger(1); - private String ryaInstanceName; - - @BeforeClass - public static void initCluster() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { - cluster.startMiniAccumulo(); - } - - @Before - public void prepareForNextTest() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, AlreadyInitializedException, RyaDetailsRepositoryException { - // Get the next Rya instance name. - ryaInstanceName = "testInstance" + ryaInstanceNameCounter.getAndIncrement() + "_"; - - // Create Rya Details for the instance name. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(cluster.getConnector(), ryaInstanceName); - - final RyaDetails details = RyaDetails.builder() - .setRyaInstanceName(ryaInstanceName) - .setRyaVersion("0.0.0.0") - .setFreeTextDetails( new FreeTextIndexDetails(true) ) - .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) - .setGeoIndexDetails( new GeoIndexDetails(true) ) - .setTemporalIndexDetails( new TemporalIndexDetails(true) ) - .setPCJIndexDetails( - PCJIndexDetails.builder() - .setEnabled(true) ) - .setJoinSelectivityDetails( new JoinSelectivityDetails( Optional.<Date>absent() ) ) - .setProspectorDetails( new ProspectorDetails( Optional.<Date>absent() )) - .build(); - - detailsRepo.initialize(details); - } - - @AfterClass - public static void tearDownCluster() throws IOException, InterruptedException { - cluster.stopMiniAccumulo(); - } - - /** - * @return The {@link MiniAccumuloClusterInstance} used by the tests. - */ - public MiniAccumuloClusterInstance getClusterInstance() { - return cluster; - } - - /** - * @return The name of the Rya instance that is being used for the current test. - */ - public String getRyaInstanceName() { - return ryaInstanceName; - } -} \ No newline at end of file