Repository: incubator-tinkerpop Updated Branches: refs/heads/master 406956db5 -> 2c7b1f92b
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java new file mode 100644 index 0000000..d5ba90d --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.structure.util.star; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.tinkerpop.gremlin.process.computer.GraphFilter; +import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import org.apache.tinkerpop.gremlin.structure.io.kryoshim.InputShim; +import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShim; +import org.apache.tinkerpop.gremlin.structure.io.kryoshim.OutputShim; +import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim; + +public class StarGraphSerializer implements SerializerShim<StarGraph> { + + private final Direction edgeDirectionToSerialize; + private GraphFilter graphFilter; + + private final static byte VERSION_1 = Byte.MIN_VALUE; + + public StarGraphSerializer(final Direction edgeDirectionToSerialize, final GraphFilter graphFilter) { + this.edgeDirectionToSerialize = edgeDirectionToSerialize; + this.graphFilter = graphFilter; + } + + @Override + public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final StarGraph starGraph) { + output.writeByte(VERSION_1); + kryo.writeObjectOrNull(output, starGraph.edgeProperties, HashMap.class); + kryo.writeObjectOrNull(output, starGraph.metaProperties, HashMap.class); + kryo.writeClassAndObject(output, starGraph.starVertex.id); + kryo.writeObject(output, starGraph.starVertex.label); + writeEdges(kryo, output, starGraph, Direction.IN); + writeEdges(kryo, output, starGraph, Direction.OUT); + kryo.writeObject(output, null != starGraph.starVertex.vertexProperties); + if (null != starGraph.starVertex.vertexProperties) { + kryo.writeObject(output, starGraph.starVertex.vertexProperties.size()); + for (final Map.Entry<String, List<VertexProperty>> vertexProperties : starGraph.starVertex.vertexProperties.entrySet()) { + kryo.writeObject(output, vertexProperties.getKey()); + kryo.writeObject(output, vertexProperties.getValue().size()); + for (final VertexProperty vertexProperty : vertexProperties.getValue()) { + kryo.writeClassAndObject(output, vertexProperty.id()); + kryo.writeClassAndObject(output, vertexProperty.value()); + } + } + } + } + + /** + * If the returned {@link StarGraph} is null, that means that the {@link GraphFilter} filtered the vertex. + */ + @Override + public <I extends InputShim> StarGraph read(KryoShim<I, ?> kryo, I input, Class<StarGraph> clazz) { + final StarGraph starGraph = StarGraph.open(); + input.readByte(); // version field ignored for now - for future use with backward compatibility + starGraph.edgeProperties = kryo.readObjectOrNull(input, HashMap.class); + starGraph.metaProperties = kryo.readObjectOrNull(input, HashMap.class); + starGraph.addVertex(T.id, kryo.readClassAndObject(input), T.label, kryo.readObject(input, String.class)); + readEdges(kryo, input, starGraph, Direction.IN); + readEdges(kryo, input, starGraph, Direction.OUT); + if (kryo.readObject(input, Boolean.class)) { + final int numberOfUniqueKeys = kryo.readObject(input, Integer.class); + for (int i = 0; i < numberOfUniqueKeys; i++) { + final String vertexPropertyKey = kryo.readObject(input, String.class); + final int numberOfVertexPropertiesWithKey = kryo.readObject(input, Integer.class); + for (int j = 0; j < numberOfVertexPropertiesWithKey; j++) { + final Object id = kryo.readClassAndObject(input); + final Object value = kryo.readClassAndObject(input); + starGraph.starVertex.property(VertexProperty.Cardinality.list, vertexPropertyKey, value, T.id, id); + } + } + } + return this.graphFilter.hasFilter() ? starGraph.applyGraphFilter(this.graphFilter).orElse(null) : starGraph; + } + + private <O extends OutputShim> void writeEdges(final KryoShim<?, O> kryo, final O output, final StarGraph starGraph, final Direction direction) { + // only write edges if there are some AND if the user requested them to be serialized AND if they match + // the direction being serialized by the format + final Map<String, List<Edge>> starEdges = direction.equals(Direction.OUT) ? starGraph.starVertex.outEdges : starGraph.starVertex.inEdges; + final boolean writeEdges = null != starEdges && edgeDirectionToSerialize != null + && (edgeDirectionToSerialize == direction || edgeDirectionToSerialize == Direction.BOTH); + kryo.writeObject(output, writeEdges); + if (writeEdges) { + kryo.writeObject(output, starEdges.size()); + for (final Map.Entry<String, List<Edge>> edges : starEdges.entrySet()) { + kryo.writeObject(output, edges.getKey()); + kryo.writeObject(output, edges.getValue().size()); + for (final Edge edge : edges.getValue()) { + kryo.writeClassAndObject(output, edge.id()); + kryo.writeClassAndObject(output, direction.equals(Direction.OUT) ? edge.inVertex().id() : edge.outVertex().id()); + } + } + } + } + + private <I extends InputShim> void readEdges(final KryoShim<I, ?> kryo, final I input, final StarGraph starGraph, final Direction direction) { + if (kryo.readObject(input, Boolean.class)) { + final int numberOfUniqueLabels = kryo.readObject(input, Integer.class); + for (int i = 0; i < numberOfUniqueLabels; i++) { + final String edgeLabel = kryo.readObject(input, String.class); + final int numberOfEdgesWithLabel = kryo.readObject(input, Integer.class); + for (int j = 0; j < numberOfEdgesWithLabel; j++) { + final Object edgeId = kryo.readClassAndObject(input); + final Object adjacentVertexId = kryo.readClassAndObject(input); + if (this.graphFilter.checkEdgeLegality(direction, edgeLabel).positive()) { + if (direction.equals(Direction.OUT)) + starGraph.starVertex.addOutEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId); + else + starGraph.starVertex.addInEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId); + } else if (null != starGraph.edgeProperties) { + starGraph.edgeProperties.remove(edgeId); + } + } + } + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java new file mode 100644 index 0000000..2053280 --- /dev/null +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.hadoop.structure.io; + +import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService; +import org.apache.tinkerpop.shaded.kryo.Kryo; +import org.apache.tinkerpop.shaded.kryo.io.Input; +import org.apache.tinkerpop.shaded.kryo.io.Output; + +import java.io.InputStream; +import java.io.OutputStream; + +public class HadoopPoolShimService implements KryoShimService { + + public Object readClassAndObject(InputStream source) { + + Kryo k = null; + + try { + k = HadoopPools.getGryoPool().takeKryo(); + + return k.readClassAndObject(new Input(source)); + } finally { + if (null != k) { + HadoopPools.getGryoPool().offerKryo(k); + } + } + } + + public void writeClassAndObject(Object o, OutputStream sink) { + + Kryo k = null; + + try { + k = HadoopPools.getGryoPool().takeKryo(); + + Output output = new Output(sink); + + k.writeClassAndObject(output, o); + + output.flush(); + } finally { + if (null != k) { + HadoopPools.getGryoPool().offerKryo(k); + } + } + } + + @Override + public int getPriority() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java index f09f703..5074ad5 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java @@ -43,6 +43,7 @@ public final class HadoopPools { GRYO_POOL = GryoPool.build(). poolSize(configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)). ioRegistries(configuration.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())). + initializeMapper(m -> m.registrationRequired(false)). create(); } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java index 9a07f75..e7a38a5 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java @@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; +import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader; +import org.apache.tinkerpop.shaded.kryo.io.Output; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -63,29 +65,14 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl @Override public void readFields(final DataInput input) throws IOException { - this.t = HadoopPools.getGryoPool().doWithReader(gryoReader -> { - try { - // class argument is Object because gryo doesn't really care that we don't know the specific type. - // the type is embedded in the stream so it can just read it from there and return it as needed. - // presumably that will cast nicely to T - return (T) gryoReader.readObject(new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input)), Object.class); - } catch (final IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - }); + ByteArrayInputStream bais = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input)); + this.t = KryoShimServiceLoader.readClassAndObject(bais); } @Override public void write(final DataOutput output) throws IOException { - HadoopPools.getGryoPool().doWithWriter(gryoWriter -> { - try { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - gryoWriter.writeObject(outputStream, this.t); - WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray()); - } catch (final IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - }); + byte serialized[] = KryoShimServiceLoader.writeClassAndObjectToBytes(this.t); + WritableUtils.writeCompressedByteArray(output, serialized); } private void writeObject(final ObjectOutputStream outputStream) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java index ac360e9..7ac8e8c 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.structure.util.ElementHelper; import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; @@ -60,42 +61,15 @@ public final class VertexWritable implements Writable, Serializable { @Override public void readFields(final DataInput input) throws IOException { - try { - this.vertex = null; - this.vertex = HadoopPools.getGryoPool().doWithReader(gryoReader -> { - try { - final ByteArrayInputStream inputStream = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input)); - return gryoReader.readObject(inputStream, StarGraph.class).getStarVertex(); // read the star graph - } catch (final IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - }); - } catch (final IllegalStateException e) { - if (e.getCause() instanceof IOException) - throw (IOException) e.getCause(); - else - throw e; - } + this.vertex = null; + ByteArrayInputStream bais = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input)); + this.vertex = ((StarGraph)KryoShimServiceLoader.readClassAndObject(bais)).getStarVertex(); // read the star graph; } @Override public void write(final DataOutput output) throws IOException { - try { - HadoopPools.getGryoPool().doWithWriter(gryoWriter -> { - try { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - gryoWriter.writeObject(outputStream, this.vertex.graph()); // write the star graph - WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray()); - } catch (final IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - }); - } catch (final IllegalStateException e) { - if (e.getCause() instanceof IOException) - throw (IOException) e.getCause(); - else - throw e; - } + byte serialized[] = KryoShimServiceLoader.writeClassAndObjectToBytes(this.vertex.graph()); + WritableUtils.writeCompressedByteArray(output, serialized); } private void writeObject(final ObjectOutputStream outputStream) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService new file mode 100644 index 0000000..0b27e72 --- /dev/null +++ b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService @@ -0,0 +1 @@ +org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # HadoopPools provides/caches instances of TinkerPop's shaded Kryo