Repository: tinkerpop Updated Branches: refs/heads/master 108b236ba -> 4e5c30e20
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7eec250a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java new file mode 100644 index 0000000..a5f657d --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.hadoop.structure.io; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.tinkerpop.gremlin.AbstractGremlinTest; +import org.apache.tinkerpop.gremlin.TestHelper; +import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONRecordWriter; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordWriter; +import org.apache.tinkerpop.gremlin.structure.io.gryo.ToyIoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.gryo.ToyPoint; +import org.apache.tinkerpop.gremlin.structure.io.gryo.ToyTriangle; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoVersion; +import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public abstract class AbstractIoRegistryCheck extends AbstractGremlinTest { + + private static final int NUMBER_OF_VERTICES = 1000; + + public void checkGryoV1d0IoRegistryCompliance(final HadoopGraph graph, final Class<? extends GraphComputer> graphComputerClass) throws Exception { + final File input = TestHelper.generateTempFile(this.getClass(), "gryo-io-registry", ".kryo"); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName()); + graph.configuration().setProperty(GryoPool.CONFIG_IO_GRYO_VERSION, GryoVersion.V1_0.name()); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath()); + graph.configuration().setProperty(IoRegistry.IO_REGISTRY, ToyIoRegistry.class.getCanonicalName()); + final GryoRecordWriter writer = new GryoRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(graph.configuration())); + validateIoRegistryGraph(graph, graphComputerClass, writer); + assertTrue(input.delete()); + } + + public void checkGryoV3d0IoRegistryCompliance(final HadoopGraph graph, final Class<? extends GraphComputer> graphComputerClass) throws Exception { + final File input = TestHelper.generateTempFile(this.getClass(), "gryo-io-registry", ".kryo"); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName()); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath()); + graph.configuration().setProperty(GryoPool.CONFIG_IO_GRYO_VERSION, GryoVersion.V3_0.name()); + graph.configuration().setProperty(IoRegistry.IO_REGISTRY, ToyIoRegistry.class.getCanonicalName()); + final GryoRecordWriter writer = new GryoRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(graph.configuration())); + validateIoRegistryGraph(graph, graphComputerClass, writer); + assertTrue(input.delete()); + } + + public void checkGraphSONIoRegistryCompliance(final HadoopGraph graph, final Class<? extends GraphComputer> graphComputerClass) throws Exception { + final File input = TestHelper.generateTempFile(this.getClass(), "graphson-io-registry", ".json"); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GraphSONInputFormat.class.getCanonicalName()); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GraphSONOutputFormat.class.getCanonicalName()); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath()); + graph.configuration().setProperty(IoRegistry.IO_REGISTRY, ToyIoRegistry.class.getCanonicalName()); + final GraphSONRecordWriter writer = new GraphSONRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(graph.configuration())); + validateIoRegistryGraph(graph, graphComputerClass, writer); + assertTrue(input.delete()); + } + + private void validateIoRegistryGraph(final HadoopGraph graph, + final Class<? extends GraphComputer> graphComputerClass, + final RecordWriter<NullWritable, VertexWritable> writer) throws Exception { + + + for (int i = 0; i < NUMBER_OF_VERTICES; i++) { + final StarGraph starGraph = StarGraph.open(); + Vertex vertex = starGraph.addVertex(T.label, "place", T.id, i, "point", new ToyPoint(i, i * 10), "message", "I'm " + i, "triangle", new ToyTriangle(i, i * 10, i * 100)); + vertex.addEdge("connection", starGraph.addVertex(T.id, i > 0 ? i - 1 : NUMBER_OF_VERTICES - 1)); + writer.write(NullWritable.get(), new VertexWritable(starGraph.getStarVertex())); + } + writer.close(new TaskAttemptContextImpl(ConfUtil.makeHadoopConfiguration(graph.configuration()), new TaskAttemptID())); + + // OLAP TESTING // + validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().project("point", "triangle").by("point").by("triangle").toList()); + validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().out().project("point", "triangle").by("point").by("triangle").toList()); + validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().out().out().project("point", "triangle").by("point").by("triangle").toList()); + // OLTP TESTING // + validatePointTriangles(graph.traversal().V().project("point", "triangle").by("point").by("triangle").toList()); + // HDFS TESTING // + /*validatePointTriangles(IteratorUtils.<Map<String, Object>>asList(IteratorUtils.<Vertex, Map<String, Object>>map(FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())).head(graph.configuration().getInputLocation(), graph.configuration().getGraphReader()), + vertex -> { + return new HashMap<String, Object>() {{ + put("point", vertex.value("point")); + put("triangle", vertex.value("triangle")); + }}; + })));*/ + } + + private void validatePointTriangles(final List<Map<String, Object>> values) { + assertEquals(NUMBER_OF_VERTICES, values.size()); + for (int i = 0; i < NUMBER_OF_VERTICES; i++) { + assertTrue(values.stream().map(m -> m.<ToyPoint>get("point")).collect(Collectors.toList()).contains(new ToyPoint(i, i * 10))); + assertTrue(values.stream().map(m -> m.<ToyTriangle>get("triangle")).collect(Collectors.toList()).contains(new ToyTriangle(i, i * 10, i * 100))); + } + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7eec250a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java new file mode 100644 index 0000000..7b679e2 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.hadoop.structure.io; + +import org.apache.tinkerpop.gremlin.AbstractGremlinTest; +import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; +import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce; +import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.Storage; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public abstract class AbstractStorageCheck extends AbstractGremlinTest { + + public void checkHeadMethods(final Storage storage, final String inputLocation, final String outputLocation, final Class outputGraphParserClass, final Class outputMemoryParserClass) throws Exception { + // TEST INPUT GRAPH + assertFalse(storage.exists(outputLocation)); + if (inputLocation.endsWith(".json") && storage.exists(inputLocation)) { // gryo is not text readable + assertEquals(6, IteratorUtils.count(storage.head(inputLocation))); + for (int i = 0; i < 7; i++) { + assertEquals(i, IteratorUtils.count(storage.head(inputLocation, i))); + } + assertEquals(6, IteratorUtils.count(storage.head(inputLocation, 10))); + } + + //////////////////// + + final ComputerResult result = graphProvider.getGraphComputer(graph).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get(); + // TEST OUTPUT GRAPH + assertTrue(storage.exists(outputLocation)); + assertTrue(storage.exists(Constants.getGraphLocation(outputLocation))); + assertEquals(6, result.graph().traversal().V().count().next().longValue()); + assertEquals(0, result.graph().traversal().E().count().next().longValue()); + assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue()); + assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue()); + assertEquals(2, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).dedup().count().next().longValue()); + assertEquals(6, IteratorUtils.count(storage.head(Constants.getGraphLocation(outputLocation), outputGraphParserClass))); + for (int i = 0; i < 7; i++) { + assertEquals(i, IteratorUtils.count(storage.head(Constants.getGraphLocation(outputLocation), outputGraphParserClass, i))); + } + assertEquals(6, IteratorUtils.count(storage.head(Constants.getGraphLocation(outputLocation), outputGraphParserClass, 346))); + ///// + // TEST MEMORY PERSISTENCE + assertEquals(2, (int) result.memory().get("clusterCount")); + assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount"))); + assertEquals(1, IteratorUtils.count(storage.head(outputLocation, "clusterCount", outputMemoryParserClass))); + assertEquals(2, storage.head(outputLocation, "clusterCount", outputMemoryParserClass).next().getValue()); + } + + public void checkRemoveAndListMethods(final Storage storage, final String outputLocation) throws Exception { + graphProvider.getGraphComputer(graph).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get(); + assertTrue(storage.exists(outputLocation)); + assertTrue(storage.exists(Constants.getGraphLocation(outputLocation))); + assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount"))); + assertEquals(2, storage.ls(outputLocation).size()); + assertTrue(storage.rm(Constants.getGraphLocation(outputLocation))); + assertFalse(storage.rm(Constants.getGraphLocation(outputLocation))); + assertEquals(1, storage.ls(outputLocation).size()); + assertTrue(storage.rm(Constants.getMemoryLocation(outputLocation, "clusterCount"))); + assertEquals(0, storage.ls(outputLocation).size()); + assertFalse(storage.exists(Constants.getGraphLocation(outputLocation))); + assertFalse(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount"))); + if (storage.exists(outputLocation)) + assertTrue(storage.rm(outputLocation)); + assertFalse(storage.exists(outputLocation)); + + //////////////// + + graphProvider.getGraphComputer(graph).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get(); + assertTrue(storage.exists(outputLocation)); + assertTrue(storage.exists(Constants.getGraphLocation(outputLocation))); + assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount"))); + assertEquals(2, storage.ls(outputLocation).size()); + assertTrue(storage.rm(outputLocation)); + assertFalse(storage.exists(outputLocation)); + assertEquals(0, storage.ls(outputLocation).size()); + } + + public void checkCopyMethods(final Storage storage, final String outputLocation, final String newOutputLocation, final Class outputGraphParserClass, final Class outputMemoryParserClass) throws Exception { + graphProvider.getGraphComputer(graph).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get(); + assertTrue(storage.exists(outputLocation)); + assertTrue(storage.exists(Constants.getGraphLocation(outputLocation))); + assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount"))); + assertFalse(storage.exists(newOutputLocation)); + + assertTrue(storage.cp(outputLocation, newOutputLocation)); + assertTrue(storage.exists(outputLocation)); + assertTrue(storage.exists(Constants.getGraphLocation(outputLocation))); + assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount"))); + assertTrue(storage.exists(newOutputLocation)); + assertTrue(storage.exists(Constants.getGraphLocation(newOutputLocation))); + assertTrue(storage.exists(Constants.getMemoryLocation(newOutputLocation, "clusterCount"))); + + assertEquals(2, storage.ls(newOutputLocation).size()); + assertEquals(6, IteratorUtils.count(storage.head(outputLocation, outputGraphParserClass))); + assertEquals(6, IteratorUtils.count(storage.head(newOutputLocation, outputGraphParserClass))); + assertEquals(1, IteratorUtils.count(storage.head(outputLocation, "clusterCount", outputMemoryParserClass))); + assertEquals(1, IteratorUtils.count(storage.head(newOutputLocation, "clusterCount", outputMemoryParserClass))); + } + + public void checkResidualDataInStorage(final Storage storage, final String outputLocation) throws Exception { + final GraphTraversal<Vertex, Long> traversal = g.V().both("knows").groupCount("m").by("age").count(); + assertEquals(4l, traversal.next().longValue()); + assertFalse(storage.exists(outputLocation)); + assertFalse(storage.exists(Constants.getGraphLocation(outputLocation))); + /// + assertEquals(3, traversal.asAdmin().getSideEffects().<Map<Integer, Long>>get("m").size()); + assertEquals(1, traversal.asAdmin().getSideEffects().<Map<Integer, Long>>get("m").get(27).longValue()); + assertEquals(2, traversal.asAdmin().getSideEffects().<Map<Integer, Long>>get("m").get(29).longValue()); + assertEquals(1, traversal.asAdmin().getSideEffects().<Map<Integer, Long>>get("m").get(32).longValue()); + } + + public void checkFileDirectoryDistinction(final Storage storage, final String directory1, final String directory2) throws Exception { + assertTrue(storage.exists(directory1)); + assertTrue(storage.exists(directory2)); + assertTrue(storage.exists(directory1 + "/f*")); + assertTrue(storage.exists(directory2 + "/f*")); + assertEquals(10, storage.ls(directory1).size()); + assertEquals(10, storage.ls(directory1 + "/*").size()); + assertEquals(5, storage.ls(directory2).size()); + assertEquals(5, storage.ls(directory2 + "/*").size()); + for (int i = 0; i < 10; i++) { + assertTrue(storage.exists(directory1 + "/file1-" + i + ".txt.bz")); + assertTrue(storage.exists(directory1 + "/file1-" + i + "*")); + assertTrue(storage.exists(directory1 + "/file1-" + i + ".txt*")); + assertTrue(storage.exists(directory1 + "/file1-" + i + ".*.bz")); + assertTrue(storage.exists(directory1 + "/file1-" + i + ".*.b*")); + } + assertFalse(storage.exists(directory1 + "/file1-10.txt.bz")); + for (int i = 0; i < 5; i++) { + assertTrue(storage.exists(directory2 + "/file2-" + i + ".txt.bz")); + assertTrue(storage.exists(directory2 + "/file2-" + i + "*")); + assertTrue(storage.exists(directory2 + "/file2-" + i + ".txt*")); + assertTrue(storage.exists(directory2 + "/file2-" + i + ".*.bz")); + assertTrue(storage.exists(directory2 + "/file2-" + i + ".*.b*")); + } + assertFalse(storage.exists(directory2 + "/file1-5.txt.bz")); + assertTrue(storage.rm(directory1 + "/file1-0.txt.bz")); + assertFalse(storage.rm(directory1 + "/file1-0.txt.bz")); + assertEquals(9, storage.ls(directory1).size()); + assertEquals(9, storage.ls(directory1 + "/*").size()); + assertEquals(9, storage.ls(directory1 + "/file*").size()); + assertEquals(9, storage.ls(directory1 + "/file1*").size()); + assertEquals(0, storage.ls(directory1 + "/file2*").size()); + assertEquals(5, storage.ls(directory2).size()); + assertEquals(5, storage.ls(directory2 + "/*").size()); + assertEquals(5, storage.ls(directory2 + "/file*").size()); + assertEquals(5, storage.ls(directory2 + "/file2*").size()); + assertEquals(0, storage.ls(directory2 + "/file1*").size()); + assertTrue(storage.rm(directory1 + "/file1-*")); + assertFalse(storage.rm(directory1 + "/file1-*")); + assertEquals(0, storage.ls(directory1).size()); + assertEquals(0, storage.ls(directory1 + "/*").size()); + assertEquals(5, storage.ls(directory2).size()); + assertEquals(5, storage.ls(directory2 + "/*").size()); + assertTrue(storage.rm(directory2 + "/f*")); + assertFalse(storage.rm(directory2 + "/file*")); + assertEquals(0, storage.ls(directory2).size()); + assertEquals(0, storage.ls(directory2 + "*").size()); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7eec250a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java new file mode 100644 index 0000000..c2eeac3 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.hadoop.structure.io; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.TestHelper; +import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.structure.io.Storage; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class FileSystemStorageCheck extends AbstractStorageCheck { + + @Test + @LoadGraphWith(LoadGraphWith.GraphData.MODERN) + public void shouldSupportHeadMethods() throws Exception { + final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())); + final String inputLocation = Constants.getSearchGraphLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), storage).get(); + final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); + // TestHelper creates the directory and we need it not to exist + deleteDirectory(outputLocation); + super.checkHeadMethods(storage, inputLocation, outputLocation, InputOutputHelper.getInputFormat((Class) Class.forName(graph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_WRITER))), SequenceFileInputFormat.class); + } + + @Test + @LoadGraphWith(LoadGraphWith.GraphData.MODERN) + public void shouldSupportRemoveAndListMethods() throws Exception { + final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())); + final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); + super.checkRemoveAndListMethods(storage, outputLocation); + } + + @Test + @LoadGraphWith(LoadGraphWith.GraphData.MODERN) + public void shouldSupportCopyMethods() throws Exception { + final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())); + final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); + final String newOutputLocation = TestHelper.makeTestDataDirectory(FileSystemStorageCheck.class, "new-location-for-copy"); + // TestHelper creates the directory and we need it not to exist + deleteDirectory(newOutputLocation); + super.checkCopyMethods(storage, outputLocation, newOutputLocation, InputOutputHelper.getInputFormat((Class) Class.forName(graph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_WRITER))), SequenceFileInputFormat.class); + + } + + @Test + @LoadGraphWith(LoadGraphWith.GraphData.MODERN) + public void shouldNotHaveResidualDataInStorage() throws Exception { + final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())); + final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); + super.checkResidualDataInStorage(storage, outputLocation); + } + + @Test + public void shouldSupportDirectoryFileDistinction() throws Exception { + final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())); + final String directory1 = TestHelper.makeTestDataDirectory(FileSystemStorageCheck.class, "directory1"); + final String directory2 = TestHelper.makeTestDataDirectory(FileSystemStorageCheck.class, "directory2"); + for (int i = 0; i < 10; i++) { + new File(directory1 + "/" + "file1-" + i + ".txt.bz").createNewFile(); + } + for (int i = 0; i < 5; i++) { + new File(directory2 + "/" + "file2-" + i + ".txt.bz").createNewFile(); + } + super.checkFileDirectoryDistinction(storage, directory1, directory2); + deleteDirectory(directory1); + deleteDirectory(directory2); + } + + private static void deleteDirectory(final String location) throws IOException { + // TestHelper creates the directory and we need it not to exist + assertTrue(new File(location).isDirectory()); + assertTrue(new File(location).exists()); + FileUtils.deleteDirectory(new File(location)); + assertFalse(new File(location).exists()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7eec250a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index 051d5f0..c778c6d 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -20,14 +20,24 @@ package org.apache.tinkerpop.gremlin.spark.process.computer; import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.serializer.KryoSerializer; +import org.apache.tinkerpop.gremlin.AbstractFileGraphProvider; import org.apache.tinkerpop.gremlin.GraphProvider; import org.apache.tinkerpop.gremlin.LoadGraphWith; import org.apache.tinkerpop.gremlin.hadoop.Constants; -import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopElement; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty; import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck; import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; import org.apache.tinkerpop.gremlin.process.computer.Computer; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; +import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.step.map.PageRankTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.PeerPressureTest; @@ -40,18 +50,50 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) + * @author Stephen Mallette (http://stephen.genoprime.com) */ @GraphProvider.Descriptor(computer = SparkGraphComputer.class) -public class SparkHadoopGraphProvider extends HadoopGraphProvider { +public class SparkHadoopGraphProvider extends AbstractFileGraphProvider { - protected static final String PREVIOUS_SPARK_PROVIDER = "previous.spark.provider"; + static final String PREVIOUS_SPARK_PROVIDER = "previous.spark.provider"; + + public static final Set<Class> IMPLEMENTATION = Collections.unmodifiableSet(new HashSet<Class>() {{ + add(HadoopEdge.class); + add(HadoopElement.class); + add(HadoopGraph.class); + add(HadoopProperty.class); + add(HadoopVertex.class); + add(HadoopVertexProperty.class); + add(ComputerGraph.class); + add(ComputerGraph.ComputerElement.class); + add(ComputerGraph.ComputerVertex.class); + add(ComputerGraph.ComputerEdge.class); + add(ComputerGraph.ComputerVertexProperty.class); + add(ComputerGraph.ComputerAdjacentVertex.class); + add(ComputerGraph.ComputerProperty.class); + }}); + + @Override + public void loadGraphData(final Graph graph, final LoadGraphWith loadGraphWith, final Class testClass, final String testName) { + if (loadGraphWith != null) ((HadoopGraph) graph).configuration().setInputLocation(getInputLocation(graph, loadGraphWith.value())); + } + + @Override + public Set<Class> getImplementations() { + return IMPLEMENTATION; + } @Override public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { + this.graphSONInput = RANDOM.nextBoolean(); if (this.getClass().equals(SparkHadoopGraphProvider.class) && !SparkHadoopGraphProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) { Spark.close(); HadoopPools.close(); @@ -59,8 +101,15 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider { System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphProvider.class.getCanonicalName()); } - final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); - config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast + final Map<String,Object> config = new HashMap<String, Object>() {{ + put(Graph.GRAPH, HadoopGraph.class.getName()); + put(Constants.GREMLIN_HADOOP_GRAPH_READER, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName()); + put(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName()); + put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory()); + put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false); + + put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast + }}; // toy graph inputRDD does not have corresponding outputRDD so where jobs chain, it fails (failing makes sense) if (null != loadGraphWith && http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7eec250a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/util/SugarTestHelper.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/util/SugarTestHelper.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/util/SugarTestHelper.java index 334c67f..304f05f 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/util/SugarTestHelper.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/util/SugarTestHelper.java @@ -19,11 +19,11 @@ package org.apache.tinkerpop.gremlin.spark.util; import org.apache.tinkerpop.gremlin.groovy.util.MetaRegistryUtil; -import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; import org.apache.tinkerpop.gremlin.process.traversal.traverser.*; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; import java.util.HashSet; import java.util.Set; @@ -49,7 +49,7 @@ public final class SugarTestHelper { */ public static void clearRegistry() { final Set<Class> implementationsToClear = new HashSet<>(CORE_IMPLEMENTATIONS); - implementationsToClear.addAll(HadoopGraphProvider.IMPLEMENTATION); + implementationsToClear.addAll(SparkHadoopGraphProvider.IMPLEMENTATION); MetaRegistryUtil.clearRegistry(implementationsToClear); }