This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new 195d0da Beam now support TraverserServer which allows the output steps to write (in parallel and over the network) to a multi-threaded server backed by a TraverserSet and thus, is an Iterator<Traverser>. 195d0da is described below commit 195d0da36507c72e4faa0913d6977be0dd6103ec Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Tue Mar 26 08:27:20 2019 -0600 Beam now support TraverserServer which allows the output steps to write (in parallel and over the network) to a multi-threaded server backed by a TraverserSet and thus, is an Iterator<Traverser>. --- .../machine/{ => species}/BasicMachine.java | 3 +- .../machine/{ => species}/LocalMachine.java | 3 +- .../RemoteMachine.java} | 24 ++-- .../machine/species/io/TraverserServer.java | 122 +++++++++++++++++++++ .../machine/{ => species}/LocalMachineTest.java | 2 +- .../tinkerpop/machine/processor/beam/Beam.java | 14 +-- .../tinkerpop/machine/processor/beam/OutputFn.java | 61 ++++++++++- .../tinkerpop/machine/processor/beam/BeamTest.java | 2 +- .../machine/processor/pipes/PipesTest.java | 2 +- 9 files changed, 203 insertions(+), 30 deletions(-) diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/BasicMachine.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java similarity index 94% copy from java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/BasicMachine.java copy to java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java index 27cfc58..43b4946 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/BasicMachine.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine; +package org.apache.tinkerpop.machine.species; +import org.apache.tinkerpop.machine.Machine; import org.apache.tinkerpop.machine.bytecode.Bytecode; import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; import org.apache.tinkerpop.machine.traverser.Traverser; diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/LocalMachine.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java similarity index 97% rename from java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/LocalMachine.java rename to java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java index a5d4e27..042124c 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/LocalMachine.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine; +package org.apache.tinkerpop.machine.species; +import org.apache.tinkerpop.machine.Machine; import org.apache.tinkerpop.machine.bytecode.Bytecode; import org.apache.tinkerpop.machine.bytecode.BytecodeUtil; import org.apache.tinkerpop.machine.bytecode.SourceInstruction; diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/BasicMachine.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java similarity index 76% rename from java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/BasicMachine.java rename to java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java index 27cfc58..00da1c6 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/BasicMachine.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/RemoteMachine.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine; +package org.apache.tinkerpop.machine.species; +import org.apache.tinkerpop.machine.Machine; import org.apache.tinkerpop.machine.bytecode.Bytecode; -import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; import org.apache.tinkerpop.machine.traverser.Traverser; import java.util.Iterator; @@ -27,28 +27,26 @@ import java.util.Iterator; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class BasicMachine implements Machine { +public class RemoteMachine implements Machine { - private BasicMachine() { - // use open(); + private final int port; + + public RemoteMachine(final int port) { + this.port = port; } @Override public <C> Bytecode<C> register(final Bytecode<C> sourceCode) { - return sourceCode; + return null; } @Override - public <C> void close(final Bytecode<C> sourceCode) { - + public <C, E> Iterator<Traverser<C, E>> submit(final Bytecode<C> bytecode) { + return null; } @Override - public <C, E> Iterator<Traverser<C, E>> submit(final Bytecode<C> bytecode) { - return Compilation.<C, Object, E>compile(bytecode).getProcessor(); - } + public <C> void close(final Bytecode<C> sourceCode) { - public static Machine open() { - return new BasicMachine(); } } diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java new file mode 100644 index 0000000..a6b565b --- /dev/null +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/io/TraverserServer.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.species.io; + +import org.apache.tinkerpop.machine.traverser.Traverser; +import org.apache.tinkerpop.machine.traverser.TraverserSet; + +import java.io.EOFException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TraverserServer<C, S> implements Runnable, Iterator<Traverser<C, S>> { + + private final TraverserSet<C, S> traverserSet = new TraverserSet<>(); + private final int serverPort; + private ServerSocket serverSocket; + private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE); + + public TraverserServer(final int serverPort) { + this.serverPort = serverPort; + } + + public void run() { + try { + this.serverSocket = new ServerSocket(this.serverPort); + this.serverAlive.set(Boolean.TRUE); + // System.out.println("Server started: " + this.serverSocket.toString()); + while (this.isAlive()) { + final Socket clientSocket = this.serverSocket.accept(); + new Thread(new Worker(clientSocket)).start(); + } + // System.out.println("Server Stopped."); + } catch (final Exception e) { + if (this.serverAlive.get()) + throw new RuntimeException(e.getMessage(), e); + } + } + + + private boolean isAlive() { + return this.serverAlive.get(); + } + + public synchronized void stop() { + try { + this.serverAlive.set(Boolean.FALSE); + this.serverSocket.close(); + } catch (final IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public boolean hasNext() { + if (!this.traverserSet.isEmpty()) + return true; + else { + while (this.isAlive()) { + if (!this.traverserSet.isEmpty()) + return true; + } + return !this.traverserSet.isEmpty(); + } + } + + @Override + public Traverser<C, S> next() { + return this.traverserSet.remove(); + } + + public class Worker implements Runnable { + + private final Socket clientSocket; + + Worker(final Socket clientSocket) { + this.clientSocket = clientSocket; + } + + public void run() { + //int counter = 0; + try { + //System.out.println("Client connected: " + this.clientSocket.toString()); + final ObjectInputStream input = new ObjectInputStream(this.clientSocket.getInputStream()); + while (true) { + final Traverser<C, S> traverser = (Traverser<C, S>) input.readObject(); + //System.out.println("Received traverser [" + this.clientSocket.getPort() + "]: " + traverser); + traverserSet.add(traverser); + } + //System.out.println(this.toString() + ": is complete..." + counter); + } catch (final EOFException e) { + // okay -- this is how the worker closes + } catch (final IOException | ClassNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + +} diff --git a/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/LocalMachineTest.java b/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/species/LocalMachineTest.java similarity index 98% rename from java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/LocalMachineTest.java rename to java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/species/LocalMachineTest.java index 53584a7..ae08b75 100644 --- a/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/LocalMachineTest.java +++ b/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/species/LocalMachineTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine; +package org.apache.tinkerpop.machine.species; import org.apache.tinkerpop.machine.bytecode.Bytecode; import org.apache.tinkerpop.machine.bytecode.compiler.CommonCompiler; diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java index 39cd859..b984670 100644 --- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java @@ -28,22 +28,21 @@ import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.processor.beam.serialization.TraverserCoder; import org.apache.tinkerpop.machine.processor.beam.util.ExecutionPlanner; import org.apache.tinkerpop.machine.processor.beam.util.TopologyUtil; +import org.apache.tinkerpop.machine.species.io.TraverserServer; import org.apache.tinkerpop.machine.traverser.Traverser; import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public class Beam<C, S, E> implements Processor<C, S, E> { + public static final int RESULT_SERVER_PORT = 6532; // TODO: this needs to be a dynamic configuration public static final int MAX_REPETIONS = 15; // TODO: this needs to be a dynamic configuration private final Pipeline pipeline; - static List<Traverser> OUTPUT = new ArrayList<>(); // FIX THIS! private Iterator<Traverser<C, E>> iterator = null; public Beam(final Compilation<C, S, E> compilation) { @@ -52,7 +51,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> { final PCollection<Traverser<C, S>> source = this.pipeline.apply(Create.of(EmptyTraverser.instance())); source.setCoder(new TraverserCoder<>()); final PCollection<Traverser<C, E>> sink = TopologyUtil.compile(source, compilation); - sink.apply(ParDo.of(new OutputFn<>())); // TODO: we need an in-memory router of outgoing data + sink.apply(ParDo.of(new OutputFn<>("localhost", RESULT_SERVER_PORT))); } @@ -75,7 +74,6 @@ public class Beam<C, S, E> implements Processor<C, S, E> { @Override public void reset() { - OUTPUT.clear(); this.iterator = null; } @@ -88,9 +86,11 @@ public class Beam<C, S, E> implements Processor<C, S, E> { private final void setupPipeline() { if (null == this.iterator) { + final TraverserServer<C, E> server = new TraverserServer<>(Beam.RESULT_SERVER_PORT); + new Thread(server).start(); this.pipeline.run().waitUntilFinish(); - this.iterator = (Iterator) new ArrayList<>(OUTPUT).iterator(); - OUTPUT.clear(); + server.stop(); + this.iterator = server; } } diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java index cfebb0e..c270456 100644 --- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java @@ -21,19 +21,70 @@ package org.apache.tinkerpop.machine.processor.beam; import org.apache.beam.sdk.transforms.DoFn; import org.apache.tinkerpop.machine.traverser.Traverser; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.net.Socket; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class OutputFn<C, S> extends DoFn<Traverser<C, S>, String> { - +public class OutputFn<C, S> extends DoFn<Traverser<C, S>, Void> { - public OutputFn() { + private final String resultServerLocation; + private final int resultServerPort; + private Socket resultServer; + private ObjectOutputStream outputStream; + OutputFn(final String resultServerLocation, final int resultServerPort) { + this.resultServerLocation = resultServerLocation; + this.resultServerPort = resultServerPort; } @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<String> output) { - Beam.OUTPUT.add(traverser); + public void processElement(final @Element Traverser<C, S> traverser) { + try { + this.outputStream.writeObject(traverser); + } catch (final IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @StartBundle + public void startBundle() { + // only create a connection if results are generated + if (null == this.resultServer) { + try { + //System.out.println("setting up client: " + this.toString()); + this.resultServer = new Socket(this.resultServerLocation, this.resultServerPort); + this.outputStream = new ObjectOutputStream(this.resultServer.getOutputStream()); + //System.out.println("Connected to server: " + this.resultServer.toString()); + } catch (final Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + //System.out.println(this.toString() + " client setup"); + } + } + + @FinishBundle + public void finishBundle() { + try { + this.outputStream.flush(); + } catch (final IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Teardown + public void stop() { + if (null != this.resultServer) { + try { + // System.out.println(this.toString() + " client stopping"); + this.outputStream.flush(); + this.resultServer.close(); + } catch (final Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } } @Override diff --git a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java index 454917f..ac28f62 100644 --- a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java +++ b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java @@ -24,7 +24,7 @@ import org.apache.tinkerpop.language.gremlin.Traversal; import org.apache.tinkerpop.language.gremlin.TraversalSource; import org.apache.tinkerpop.language.gremlin.TraversalUtil; import org.apache.tinkerpop.language.gremlin.core.__; -import org.apache.tinkerpop.machine.LocalMachine; +import org.apache.tinkerpop.machine.species.LocalMachine; import org.apache.tinkerpop.machine.Machine; import org.apache.tinkerpop.machine.coefficient.LongCoefficient; import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy; diff --git a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java index a41aeed..a9ef4b7 100644 --- a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java +++ b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java @@ -24,7 +24,7 @@ import org.apache.tinkerpop.language.gremlin.Traversal; import org.apache.tinkerpop.language.gremlin.TraversalSource; import org.apache.tinkerpop.language.gremlin.TraversalUtil; import org.apache.tinkerpop.language.gremlin.common.__; -import org.apache.tinkerpop.machine.LocalMachine; +import org.apache.tinkerpop.machine.species.LocalMachine; import org.apache.tinkerpop.machine.Machine; import org.apache.tinkerpop.machine.coefficient.LongCoefficient; import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;