Decoupled GryoRecordReader/Writer from KryoShimService. The shim service should ONLY be for inter-process communication -- not input/output formats. cc/ @dalaro
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8367ab36 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8367ab36 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8367ab36 Branch: refs/heads/TINKERPOP-1389 Commit: 8367ab36e00a463a27268aafd708ffacec1e606d Parents: 19e0f2f Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Tue Oct 25 19:44:45 2016 -0600 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Wed Oct 26 08:16:26 2016 -0600 ---------------------------------------------------------------------- .../process/computer/GiraphGraphComputer.java | 3 -- .../io/gryo/kryoshim/KryoShimService.java | 4 -- .../io/gryo/kryoshim/KryoShimServiceLoader.java | 17 +++---- .../structure/io/HadoopPoolShimService.java | 47 ++++++-------------- .../hadoop/structure/io/HadoopPools.java | 15 ++++++- .../structure/io/HadoopPoolsConfigurable.java | 2 +- .../structure/io/gryo/GryoRecordReader.java | 3 +- .../structure/io/gryo/GryoRecordWriter.java | 4 +- .../process/computer/SparkGraphComputer.java | 28 +++++------- .../SparkHadoopGraphGryoSerializerProvider.java | 4 +- .../computer/SparkHadoopGraphProvider.java | 6 +-- 11 files changed, 53 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java index e21cd3b..1be548a 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java @@ -96,9 +96,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true); this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class); this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class); - if (null == this.giraphConfiguration.get(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null)) - this.giraphConfiguration.set(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()); - System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, this.giraphConfiguration.get(KryoShimServiceLoader.KRYO_SHIM_SERVICE)); this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java index b8880a4..4d3ece5 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java @@ -85,10 +85,6 @@ public interface KryoShimService { /** * Attempt to incorporate the supplied configuration in future read/write calls. - * <p> - * This method is a wart that exists essentially just to support the old - * {@link HadoopPools#initialize(Configuration)} use-case. - * <p> * This method is not guaranteed to have any effect on an instance of this interface * after {@link #writeClassAndObject(Object, OutputStream)} or {@link #readClassAndObject(InputStream)} * has been invoked on that particular instance. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java index c026130..f9e4c2e 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java @@ -35,8 +35,7 @@ import java.util.ServiceLoader; public class KryoShimServiceLoader { private static volatile KryoShimService cachedShimService; - - private static volatile Configuration conf; + private static volatile Configuration configuration; private static final Logger log = LoggerFactory.getLogger(KryoShimServiceLoader.class); @@ -47,8 +46,8 @@ public class KryoShimServiceLoader { */ public static final String KRYO_SHIM_SERVICE = "gremlin.io.kryoShimService"; - public static void applyConfiguration(final Configuration conf) { - KryoShimServiceLoader.conf = conf; + public static void applyConfiguration(final Configuration configuration) { + KryoShimServiceLoader.configuration = configuration; load(true); } @@ -84,7 +83,9 @@ public class KryoShimServiceLoader { } } - String shimClass = System.getProperty(KRYO_SHIM_SERVICE); + String shimClass = null != configuration && configuration.containsKey(KRYO_SHIM_SERVICE) ? + configuration.getString(KRYO_SHIM_SERVICE) : + System.getProperty(KRYO_SHIM_SERVICE); if (null != shimClass) { for (KryoShimService kss : services) { @@ -115,7 +116,7 @@ public class KryoShimServiceLoader { throw new IllegalStateException("Unable to load KryoShimService"); } - final Configuration userConf = conf; + final Configuration userConf = configuration; if (null != userConf) { log.info("Configuring {} provider {} with user-provided configuration", @@ -136,7 +137,7 @@ public class KryoShimServiceLoader { } /** - * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject(Output, Object)}, + * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject}, * where the {@code output} parameter is an internally-created {@link ByteArrayOutputStream}. Returns * the byte array underlying that stream. * @@ -154,7 +155,7 @@ public class KryoShimServiceLoader { } /** - * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject(Input)}, + * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject}, * where the {@code input} parameter is {@code source}. Returns the deserialized object. * * @param source an input stream containing data for a serialized object class and instance http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java index 3fad4fd..a52eac4 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java @@ -24,52 +24,33 @@ import org.apache.tinkerpop.shaded.kryo.Kryo; import org.apache.tinkerpop.shaded.kryo.io.Input; import org.apache.tinkerpop.shaded.kryo.io.Output; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class HadoopPoolShimService implements KryoShimService { - public Object readClassAndObject(final InputStream source) { - - Kryo k = null; - - try { - k = HadoopPools.getGryoPool().takeKryo(); - - return k.readClassAndObject(new Input(source)); - } finally { - if (null != k) { - HadoopPools.getGryoPool().offerKryo(k); - } - } + @Override + public Object readClassAndObject(final InputStream inputStream) { + return HadoopPools.getGryoPool().readWithKryo(kryo -> kryo.readClassAndObject(new Input(inputStream))); } - public void writeClassAndObject(final Object o, final OutputStream sink) { - - Kryo k = null; - - try { - k = HadoopPools.getGryoPool().takeKryo(); - - final Output output = new Output(sink); - - k.writeClassAndObject(output, o); - + @Override + public void writeClassAndObject(final Object object, final OutputStream outputStream) { + HadoopPools.getGryoPool().writeWithKryo(kryo -> { + final Output output = new Output(outputStream); + kryo.writeClassAndObject(output, object); output.flush(); - } finally { - if (null != k) { - HadoopPools.getGryoPool().offerKryo(k); - } - } + }); } @Override - public int getPriority() { - return 0; + public void applyConfiguration(final Configuration configuration) { + HadoopPools.initialize(configuration); } @Override - public void applyConfiguration(final Configuration conf) { - HadoopPools.initialize(conf); + public int getPriority() { + return 0; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java index 392e97d..25bc8b4 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java @@ -31,6 +31,8 @@ import java.util.Collections; */ public final class HadoopPools { + private static final Configuration EMPTY_CONFIGURATION = new BaseConfiguration(); + private HadoopPools() { } @@ -59,8 +61,17 @@ public final class HadoopPools { public static GryoPool getGryoPool() { if (!INITIALIZED) { - HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not been initialized, using the default pool"); // TODO: this is necessary because we can't get the pool intialized in the Merger code of the Hadoop process. - initialize(new BaseConfiguration()); + /*if (null != System.getProperty("configuration", null)) { + try { + HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not been initialized, using the System properties configuration"); + initialize((Configuration) Serializer.deserializeObject(System.getProperty("configuration").getBytes())); + } catch (final Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } else {*/ + HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not been initialized, using the default pool"); + initialize(EMPTY_CONFIGURATION); + //} } return GRYO_POOL; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java index 0e5f135..f05aae9 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java @@ -30,7 +30,7 @@ public interface HadoopPoolsConfigurable extends Configurable { @Override public default void setConf(final Configuration configuration) { - KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration)); + HadoopPools.initialize(configuration); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java index a1daddf..d7ed46b 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java @@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader; import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -72,7 +71,7 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri final Configuration configuration = context.getConfiguration(); if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null) this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER); - KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration)); + HadoopPools.initialize(configuration); this.gryoReader = HadoopPools.getGryoPool().takeReader(); long start = split.getStart(); final Path file = split.getPath(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java index 2ea3394..67a8339 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java @@ -25,10 +25,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.io.DataOutputStream; import java.io.IOException; @@ -45,7 +43,7 @@ public final class GryoRecordWriter extends RecordWriter<NullWritable, VertexWri public GryoRecordWriter(final DataOutputStream outputStream, final Configuration configuration) { this.outputStream = outputStream; this.hasEdges = configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true); - KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration)); + HadoopPools.initialize(configuration); this.gryoWriter = HadoopPools.getGryoPool().takeWriter(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index c7d0cfb..d345100 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -110,20 +110,16 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { super(hadoopGraph); this.sparkConfiguration = new HadoopConfiguration(); ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); - if (KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) && - GryoRegistrator.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_KRYO_REGISTRATOR, null))) { - System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, UnshadedKryoShimService.class.getCanonicalName()); - } else if (GryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) && - !this.sparkConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR)) { - System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()); - } - if (null != System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null)) { - final String shimService = System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE); - this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, - (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim()); - this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, - (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim()); - } + final String shimService = KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) && + GryoRegistrator.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_KRYO_REGISTRATOR, null)) ? + UnshadedKryoShimService.class.getCanonicalName() : + HadoopPoolShimService.class.getCanonicalName(); + this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService); + this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, + (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim()); + this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, + (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim()); + KryoShimServiceLoader.applyConfiguration(this.sparkConfiguration); } @Override @@ -154,8 +150,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { final long startTime = System.currentTimeMillis(); // apache and hadoop configurations that are used throughout the graph computer computation final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration); - if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) - graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); + // TODO !! if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) + // graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration); final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java index 9820b7b..19b9121 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java @@ -34,9 +34,7 @@ import java.util.Map; public final class SparkHadoopGraphGryoSerializerProvider extends SparkHadoopGraphProvider { public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { - if (this.getClass().equals(SparkHadoopGraphGryoSerializerProvider.class) && - !HadoopPoolShimService.class.getCanonicalName().equals(System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null))) - Spark.close(); + Spark.close(); final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); config.remove(Constants.SPARK_KRYO_REGISTRATOR); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index c5b5083..878fd1e 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -42,9 +42,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck; import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; -import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService; import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.util.Map; @@ -56,9 +54,7 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider { @Override public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { - if (this.getClass().equals(SparkHadoopGraphProvider.class) && - !UnshadedKryoShimService.class.getCanonicalName().equals(System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null))) - Spark.close(); + Spark.close(); final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast