Decoupled GryoRecordReader/Writer from KryoShimService. The shim service should 
ONLY be for inter-process communication -- not input/output formats. cc/ @dalaro


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8367ab36
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8367ab36
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8367ab36

Branch: refs/heads/TINKERPOP-1389
Commit: 8367ab36e00a463a27268aafd708ffacec1e606d
Parents: 19e0f2f
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Tue Oct 25 19:44:45 2016 -0600
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Wed Oct 26 08:16:26 2016 -0600

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   |  3 --
 .../io/gryo/kryoshim/KryoShimService.java       |  4 --
 .../io/gryo/kryoshim/KryoShimServiceLoader.java | 17 +++----
 .../structure/io/HadoopPoolShimService.java     | 47 ++++++--------------
 .../hadoop/structure/io/HadoopPools.java        | 15 ++++++-
 .../structure/io/HadoopPoolsConfigurable.java   |  2 +-
 .../structure/io/gryo/GryoRecordReader.java     |  3 +-
 .../structure/io/gryo/GryoRecordWriter.java     |  4 +-
 .../process/computer/SparkGraphComputer.java    | 28 +++++-------
 .../SparkHadoopGraphGryoSerializerProvider.java |  4 +-
 .../computer/SparkHadoopGraphProvider.java      |  6 +--
 11 files changed, 53 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
 
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index e21cd3b..1be548a 100644
--- 
a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ 
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -96,9 +96,6 @@ public final class GiraphGraphComputer extends 
AbstractHadoopGraphComputer imple
         
this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), 
true);
         
this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
         
this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
-        if (null == 
this.giraphConfiguration.get(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null))
-            
this.giraphConfiguration.set(KryoShimServiceLoader.KRYO_SHIM_SERVICE, 
HadoopPoolShimService.class.getCanonicalName());
-        System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, 
this.giraphConfiguration.get(KryoShimServiceLoader.KRYO_SHIM_SERVICE));
         this.useWorkerThreadsInConfiguration = 
this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || 
this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 
-666) != -666;
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
index b8880a4..4d3ece5 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
@@ -85,10 +85,6 @@ public interface KryoShimService {
 
     /**
      * Attempt to incorporate the supplied configuration in future read/write 
calls.
-     * <p>
-     * This method is a wart that exists essentially just to support the old
-     * {@link HadoopPools#initialize(Configuration)} use-case.
-     * <p>
      * This method is not guaranteed to have any effect on an instance of this 
interface
      * after {@link #writeClassAndObject(Object, OutputStream)} or {@link 
#readClassAndObject(InputStream)}
      * has been invoked on that particular instance.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index c026130..f9e4c2e 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -35,8 +35,7 @@ import java.util.ServiceLoader;
 public class KryoShimServiceLoader {
 
     private static volatile KryoShimService cachedShimService;
-
-    private static volatile Configuration conf;
+    private static volatile Configuration configuration;
 
     private static final Logger log = 
LoggerFactory.getLogger(KryoShimServiceLoader.class);
 
@@ -47,8 +46,8 @@ public class KryoShimServiceLoader {
      */
     public static final String KRYO_SHIM_SERVICE = 
"gremlin.io.kryoShimService";
 
-    public static void applyConfiguration(final Configuration conf) {
-        KryoShimServiceLoader.conf = conf;
+    public static void applyConfiguration(final Configuration configuration) {
+        KryoShimServiceLoader.configuration = configuration;
         load(true);
     }
 
@@ -84,7 +83,9 @@ public class KryoShimServiceLoader {
             }
         }
 
-        String shimClass = System.getProperty(KRYO_SHIM_SERVICE);
+        String shimClass = null != configuration && 
configuration.containsKey(KRYO_SHIM_SERVICE) ?
+                configuration.getString(KRYO_SHIM_SERVICE) :
+                System.getProperty(KRYO_SHIM_SERVICE);
 
         if (null != shimClass) {
             for (KryoShimService kss : services) {
@@ -115,7 +116,7 @@ public class KryoShimServiceLoader {
             throw new IllegalStateException("Unable to load KryoShimService");
         }
 
-        final Configuration userConf = conf;
+        final Configuration userConf = configuration;
 
         if (null != userConf) {
             log.info("Configuring {} provider {} with user-provided 
configuration",
@@ -136,7 +137,7 @@ public class KryoShimServiceLoader {
     }
 
     /**
-     * A loose abstraction of {@link 
org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject(Output, Object)},
+     * A loose abstraction of {@link 
org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject},
      * where the {@code output} parameter is an internally-created {@link 
ByteArrayOutputStream}.  Returns
      * the byte array underlying that stream.
      *
@@ -154,7 +155,7 @@ public class KryoShimServiceLoader {
     }
 
     /**
-     * A loose abstraction of {@link 
org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject(Input)},
+     * A loose abstraction of {@link 
org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject},
      * where the {@code input} parameter is {@code source}.  Returns the 
deserialized object.
      *
      * @param source an input stream containing data for a serialized object 
class and instance

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index 3fad4fd..a52eac4 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -24,52 +24,33 @@ import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
 public class HadoopPoolShimService implements KryoShimService {
 
-    public Object readClassAndObject(final InputStream source) {
-
-        Kryo k = null;
-
-        try {
-            k = HadoopPools.getGryoPool().takeKryo();
-
-            return k.readClassAndObject(new Input(source));
-        } finally {
-            if (null != k) {
-                HadoopPools.getGryoPool().offerKryo(k);
-            }
-        }
+    @Override
+    public Object readClassAndObject(final InputStream inputStream) {
+        return HadoopPools.getGryoPool().readWithKryo(kryo -> 
kryo.readClassAndObject(new Input(inputStream)));
     }
 
-    public void writeClassAndObject(final Object o, final OutputStream sink) {
-
-        Kryo k = null;
-
-        try {
-            k = HadoopPools.getGryoPool().takeKryo();
-
-            final Output output = new Output(sink);
-
-            k.writeClassAndObject(output, o);
-
+    @Override
+    public void writeClassAndObject(final Object object, final OutputStream 
outputStream) {
+        HadoopPools.getGryoPool().writeWithKryo(kryo -> {
+            final Output output = new Output(outputStream);
+            kryo.writeClassAndObject(output, object);
             output.flush();
-        } finally {
-            if (null != k) {
-                HadoopPools.getGryoPool().offerKryo(k);
-            }
-        }
+        });
     }
 
     @Override
-    public int getPriority() {
-        return 0;
+    public void applyConfiguration(final Configuration configuration) {
+        HadoopPools.initialize(configuration);
     }
 
     @Override
-    public void applyConfiguration(final Configuration conf) {
-        HadoopPools.initialize(conf);
+    public int getPriority() {
+        return 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index 392e97d..25bc8b4 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -31,6 +31,8 @@ import java.util.Collections;
  */
 public final class HadoopPools {
 
+    private static final Configuration EMPTY_CONFIGURATION = new 
BaseConfiguration();
+
     private HadoopPools() {
     }
 
@@ -59,8 +61,17 @@ public final class HadoopPools {
 
     public static GryoPool getGryoPool() {
         if (!INITIALIZED) {
-            HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() 
+ " has not been initialized, using the default pool");     // TODO: this is 
necessary because we can't get the pool intialized in the Merger code of the 
Hadoop process.
-            initialize(new BaseConfiguration());
+            /*if (null != System.getProperty("configuration", null)) {
+                try {
+                    HadoopGraph.LOGGER.warn("The " + 
HadoopPools.class.getSimpleName() + " has not been initialized, using the 
System properties configuration");
+                    initialize((Configuration) 
Serializer.deserializeObject(System.getProperty("configuration").getBytes()));
+                } catch (final Exception e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+            } else {*/
+            HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() 
+ " has not been initialized, using the default pool");
+            initialize(EMPTY_CONFIGURATION);
+            //}
         }
         return GRYO_POOL;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
index 0e5f135..f05aae9 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
@@ -30,7 +30,7 @@ public interface HadoopPoolsConfigurable extends Configurable 
{
 
     @Override
     public default void setConf(final Configuration configuration) {
-        
KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
+        HadoopPools.initialize(configuration);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index a1daddf..d7ed46b 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator;
-import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -72,7 +71,7 @@ public final class GryoRecordReader extends 
RecordReader<NullWritable, VertexWri
         final Configuration configuration = context.getConfiguration();
         if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != 
null)
             this.graphFilter = 
VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration),
 Constants.GREMLIN_HADOOP_GRAPH_FILTER);
-        
KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
+        HadoopPools.initialize(configuration);
         this.gryoReader = HadoopPools.getGryoPool().takeReader();
         long start = split.getStart();
         final Path file = split.getPath();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
index 2ea3394..67a8339 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
@@ -25,10 +25,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
-import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -45,7 +43,7 @@ public final class GryoRecordWriter extends 
RecordWriter<NullWritable, VertexWri
     public GryoRecordWriter(final DataOutputStream outputStream, final 
Configuration configuration) {
         this.outputStream = outputStream;
         this.hasEdges = 
configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true);
-        
KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
+        HadoopPools.initialize(configuration);
         this.gryoWriter = HadoopPools.getGryoPool().takeWriter();
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index c7d0cfb..d345100 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -110,20 +110,16 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
         super(hadoopGraph);
         this.sparkConfiguration = new HadoopConfiguration();
         ConfigurationUtils.copy(this.hadoopGraph.configuration(), 
this.sparkConfiguration);
-        if 
(KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER,
 null)) &&
-                
GryoRegistrator.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_KRYO_REGISTRATOR,
 null))) {
-            System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, 
UnshadedKryoShimService.class.getCanonicalName());
-        } else if 
(GryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER,
 null)) &&
-                
!this.sparkConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR)) {
-            System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, 
HadoopPoolShimService.class.getCanonicalName());
-        }
-        if (null != 
System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null)) {
-            final String shimService = 
System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE);
-            
this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
-                    
(this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, 
"") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + 
shimService).trim());
-            
this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
-                    
(this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") 
+ " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
-        }
+        final String shimService = 
KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER,
 null)) &&
+                
GryoRegistrator.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_KRYO_REGISTRATOR,
 null)) ?
+                UnshadedKryoShimService.class.getCanonicalName() :
+                HadoopPoolShimService.class.getCanonicalName();
+        
this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, 
shimService);
+        
this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
+                
(this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, 
"") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + 
shimService).trim());
+        
this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+                
(this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") 
+ " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
+        KryoShimServiceLoader.applyConfiguration(this.sparkConfiguration);
     }
 
     @Override
@@ -154,8 +150,8 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
             final long startTime = System.currentTimeMillis();
             // apache and hadoop configurations that are used throughout the 
graph computer computation
             final org.apache.commons.configuration.Configuration 
graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
-            if 
(!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
-                
graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, 
GryoSerializer.class.getCanonicalName());
+            // TODO !! if 
(!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
+            //    
graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, 
GryoSerializer.class.getCanonicalName());
             
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES,
 this.persist.equals(GraphComputer.Persist.EDGES));
             final Configuration hadoopConfiguration = 
ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
             final Storage fileSystemStorage = 
FileSystemStorage.open(hadoopConfiguration);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
index 9820b7b..19b9121 100644
--- 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
+++ 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
@@ -34,9 +34,7 @@ import java.util.Map;
 public final class SparkHadoopGraphGryoSerializerProvider extends 
SparkHadoopGraphProvider {
 
     public Map<String, Object> getBaseConfiguration(final String graphName, 
final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData 
loadGraphWith) {
-        if 
(this.getClass().equals(SparkHadoopGraphGryoSerializerProvider.class) &&
-                
!HadoopPoolShimService.class.getCanonicalName().equals(System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE,
 null)))
-            Spark.close();
+        Spark.close();
         final Map<String, Object> config = 
super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
         config.put(Constants.SPARK_SERIALIZER, 
GryoSerializer.class.getCanonicalName());
         config.remove(Constants.SPARK_KRYO_REGISTRATOR);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8367ab36/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index c5b5083..878fd1e 100644
--- 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -42,9 +42,7 @@ import 
org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import 
org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
 import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
-import 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
 import org.apache.tinkerpop.gremlin.structure.Graph;
-import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.util.Map;
 
@@ -56,9 +54,7 @@ public class SparkHadoopGraphProvider extends 
HadoopGraphProvider {
 
     @Override
     public Map<String, Object> getBaseConfiguration(final String graphName, 
final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData 
loadGraphWith) {
-        if (this.getClass().equals(SparkHadoopGraphProvider.class) &&
-                
!UnshadedKryoShimService.class.getCanonicalName().equals(System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE,
 null)))
-            Spark.close();
+        Spark.close();
 
         final Map<String, Object> config = 
super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
         config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);  // this 
makes the test suite go really fast

Reply via email to