IGNITE-9338: ML TF integration: tf cluster can't connect after killing first node with default port 10800
this closes #4601 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b369ff06 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b369ff06 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b369ff06 Branch: refs/heads/ignite-9273 Commit: b369ff06922ecee43f9baf675b843c399e48324f Parents: fb26835 Author: Anton Dmitriev <dmitrievanth...@gmail.com> Authored: Thu Aug 23 14:33:11 2018 +0300 Committer: Yury Babak <yba...@gridgain.com> Committed: Thu Aug 23 14:33:11 2018 +0300 ---------------------------------------------------------------------- .../tfrunning/TensorFlowServerManager.java | 25 +++--- .../TensorFlowServerScriptFormatter.java | 3 + .../cluster/util/TensorFlowChiefRunner.java | 2 +- .../util/TensorFlowProcessBuilderSupplier.java | 72 +++++++++++++++ .../util/TensorFlowUserScriptRunner.java | 3 +- .../core/pythonrunning/PythonProcess.java | 68 -------------- .../PythonProcessBuilderSupplier.java | 93 -------------------- .../pythonrunning/PythonProcessManager.java | 56 ------------ .../core/pythonrunning/package-info.java | 25 ------ .../core/util/PythonProcessBuilderSupplier.java | 93 ++++++++++++++++++++ 10 files changed, 185 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java index fa6194a..ed6c801 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java @@ -24,15 +24,16 @@ import org.apache.ignite.Ignite; import org.apache.ignite.Ignition; import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec; +import org.apache.ignite.tensorflow.cluster.util.TensorFlowProcessBuilderSupplier; import org.apache.ignite.tensorflow.core.ProcessManager; import org.apache.ignite.tensorflow.core.ProcessManagerWrapper; -import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcess; -import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcessManager; +import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess; +import org.apache.ignite.tensorflow.core.nativerunning.NativeProcessManager; /** * TensorFlow server manager that allows to start, stop and make other actions with TensorFlow servers. */ -public class TensorFlowServerManager extends ProcessManagerWrapper<PythonProcess, TensorFlowServer> { +public class TensorFlowServerManager extends ProcessManagerWrapper<NativeProcess, TensorFlowServer> { /** TensorFlow server script formatter. */ private static final TensorFlowServerScriptFormatter scriptFormatter = new TensorFlowServerScriptFormatter(); @@ -42,7 +43,7 @@ public class TensorFlowServerManager extends ProcessManagerWrapper<PythonProcess * @param ignite Ignite instance. */ public TensorFlowServerManager(Ignite ignite) { - this(new PythonProcessManager(ignite)); + this(new NativeProcessManager(ignite)); } /** @@ -50,17 +51,21 @@ public class TensorFlowServerManager extends ProcessManagerWrapper<PythonProcess * * @param delegate Delegate. */ - public TensorFlowServerManager(ProcessManager<PythonProcess> delegate) { + public TensorFlowServerManager(ProcessManager<NativeProcess> delegate) { super(delegate); } /** {@inheritDoc} */ - @Override protected PythonProcess transformSpecification(TensorFlowServer spec) { - return new PythonProcess( + @Override protected NativeProcess transformSpecification(TensorFlowServer spec) { + return new NativeProcess( + new TensorFlowProcessBuilderSupplier( + true, + spec.getTaskIdx(), + "job:" + spec.getJobName(), + "task:" + spec.getTaskIdx() + ), scriptFormatter.format(spec, true, Ignition.ignite()), - getNode(spec), - "job:" + spec.getJobName(), - "task:" + spec.getTaskIdx() + getNode(spec) ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java index a93d910..7cfa1c6 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java @@ -70,6 +70,9 @@ public class TensorFlowServerScriptFormatter { .append(srv.getTaskIdx()) .append("))") .append("\n"); + builder.append("print('IGNITE_DATASET_HOST = ', os.environ.get('IGNITE_DATASET_HOST'))").append("\n"); + builder.append("print('IGNITE_DATASET_PORT = ', os.environ.get('IGNITE_DATASET_PORT'))").append("\n"); + builder.append("print('IGNITE_DATASET_PART = ', os.environ.get('IGNITE_DATASET_PART'))").append("\n"); builder.append("server = tf.train.Server(cluster"); http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java index 96535eb..d8640fa 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java @@ -23,9 +23,9 @@ import org.apache.ignite.Ignite; import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; import org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServer; import org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServerScriptFormatter; -import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcessBuilderSupplier; import org.apache.ignite.tensorflow.core.util.AsyncNativeProcessRunner; import org.apache.ignite.tensorflow.core.util.NativeProcessRunner; +import org.apache.ignite.tensorflow.core.util.PythonProcessBuilderSupplier; /** * Utils class that helps to start and stop chief process. http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java new file mode 100644 index 0000000..8b95526 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowProcessBuilderSupplier.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster.util; + +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; +import org.apache.ignite.tensorflow.core.util.PythonProcessBuilderSupplier; + +/** + * Python process builder supplier that is used to create TensorFlow worker process builder. + */ +public class TensorFlowProcessBuilderSupplier extends PythonProcessBuilderSupplier { + /** */ + private static final long serialVersionUID = 6866243505446122897L; + + /** Prefix for worker environment variables. */ + private static final String ENV_PREFIX = "IGNITE_DATASET_"; + + /** Partition of the upstream cache. */ + private final Integer part; + + /** + * Constructs a new instance of Python process builder supplier. + * + * @param interactive Interactive flag (allows to used standard input to pass Python script). + * @param part Partition index. + * @param meta Meta information that adds to script as arguments. + */ + public TensorFlowProcessBuilderSupplier(boolean interactive, Integer part, String... meta) { + super(interactive, meta); + this.part = part; + } + + /** {@inheritDoc} */ + @Override public ProcessBuilder get() { + ProcessBuilder pythonProcBuilder = super.get(); + + Ignite ignite = Ignition.ignite(); + ClusterNode locNode = ignite.cluster().localNode(); + + Integer port = locNode.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT); + + Map<String, String> env = pythonProcBuilder.environment(); + env.put(ENV_PREFIX + "HOST", "localhost"); + + if (port != null) + env.put(ENV_PREFIX + "PORT", String.valueOf(port)); + + if (part != null) + env.put(ENV_PREFIX + "PART", String.valueOf(part)); + + return pythonProcBuilder; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java index 6bb3b0a..17e63bb 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java @@ -37,7 +37,6 @@ import org.apache.ignite.Ignition; import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive; import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec; -import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcessBuilderSupplier; import org.apache.ignite.tensorflow.core.util.AsyncNativeProcessRunner; import org.apache.ignite.tensorflow.core.util.NativeProcessRunner; @@ -118,7 +117,7 @@ public class TensorFlowUserScriptRunner extends AsyncNativeProcessRunner { if (workingDir == null) throw new IllegalStateException("Working directory is not created"); - ProcessBuilder procBuilder = new PythonProcessBuilderSupplier(false).get(); + ProcessBuilder procBuilder = new TensorFlowProcessBuilderSupplier(false, null).get(); procBuilder.directory(workingDir); procBuilder.command(jobArchive.getCommands()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java deleted file mode 100644 index 34c5a12..0000000 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.tensorflow.core.pythonrunning; - -import java.io.Serializable; -import java.util.UUID; - -/** - * Python process specification. - */ -public class PythonProcess implements Serializable { - /** */ - private static final long serialVersionUID = -1623536488451695210L; - - /** Stdin of the process. */ - private final String stdin; - - /** Node identifier. */ - private final UUID nodeId; - - /** Meta information that adds to script as arguments. */ - private final String[] meta; - - /** - * Constructs a new instance of python process. - * - * @param stdin Stdin of the process. - * @param nodeId Node identifier. - * @param meta Meta information that adds to script as arguments. - */ - public PythonProcess(String stdin, UUID nodeId, String... meta) { - assert nodeId != null : "Node identifier should not be null"; - - this.stdin = stdin; - this.nodeId = nodeId; - this.meta = meta; - } - - /** */ - public String getStdin() { - return stdin; - } - - /** */ - public UUID getNodeId() { - return nodeId; - } - - /** */ - public String[] getMeta() { - return meta; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java deleted file mode 100644 index c7f7fde..0000000 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.tensorflow.core.pythonrunning; - -import java.lang.management.ManagementFactory; -import java.util.Map; -import org.apache.ignite.tensorflow.util.SerializableSupplier; - -/** - * Python process builder supplier that is used to create Python process builder. - */ -public class PythonProcessBuilderSupplier implements SerializableSupplier<ProcessBuilder> { - /** */ - private static final long serialVersionUID = 7181937306294456125L; - - /** Python environment variable name. */ - private static final String PYTHON_ENV_NAME = "PYTHON"; - - /** Interactive flag (allows to used standard input to pass Python script). */ - private final boolean interactive; - - /** Meta information that adds to script as arguments. */ - private final String[] meta; - - /** - * Constructs a new instance of Python process builder supplier. - * - * @param interactive Interactive flag (allows to used standard input to pass Python script). - * @param meta Meta information that adds to script as arguments. - */ - public PythonProcessBuilderSupplier(boolean interactive, String... meta) { - this.interactive = interactive; - this.meta = meta; - } - - /** - * Returns process builder to be used to start Python process. - * - * @return Process builder to be used to start Python process. - */ - public ProcessBuilder get() { - String python = System.getenv(PYTHON_ENV_NAME); - - if (python == null) - python = "python3"; - - ProcessBuilder procBldr; - if (interactive) { - String[] cmd = new String[meta.length + 3]; - - cmd[0] = python; - cmd[1] = "-i"; - cmd[2] = "-"; - - System.arraycopy(meta, 0, cmd, 3, meta.length); - - procBldr = new ProcessBuilder(cmd); - } - else - procBldr = new ProcessBuilder(python); - - Map<String, String> env = procBldr.environment(); - env.put("PPID", String.valueOf(getProcessId())); - - return procBldr; - } - - /** - * Returns current process identifier. - * - * @return Process identifier. - */ - private long getProcessId() { - String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; - - return Long.parseLong(pid); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java deleted file mode 100644 index d050c0e..0000000 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.tensorflow.core.pythonrunning; - -import org.apache.ignite.Ignite; -import org.apache.ignite.tensorflow.core.ProcessManager; -import org.apache.ignite.tensorflow.core.ProcessManagerWrapper; -import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess; -import org.apache.ignite.tensorflow.core.nativerunning.NativeProcessManager; - -/** - * Python process manager that allows to start, stop and make other actions with python processes. - */ -public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, PythonProcess> { - /** - * Constructs a new instance of python process manager. - * - * @param ignite Ignite instance. - */ - public PythonProcessManager(Ignite ignite) { - this(new NativeProcessManager(ignite)); - } - - /** - * Constructs a new instance of python process manager. - * - * @param delegate Delegate. - */ - public PythonProcessManager(ProcessManager<NativeProcess> delegate) { - super(delegate); - } - - /** {@inheritDoc} */ - @Override protected NativeProcess transformSpecification(PythonProcess spec) { - return new NativeProcess( - new PythonProcessBuilderSupplier(true, spec.getMeta()), - spec.getStdin(), - spec.getNodeId() - ); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/package-info.java deleted file mode 100644 index 541c047..0000000 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/package-info.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * The part of TensorFlow integration infrastructure that allows to start and maintain Python native processes. As - * described in {@link org.apache.ignite.tensorflow.core.pythonrunning.PythonProcess} user only needs to specify Python - * code and identifier of the node the process should be started and Python Process Manager will make the rest so that - * the given code will be executed and maintained on the specified node. - */ -package org.apache.ignite.tensorflow.core.pythonrunning; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b369ff06/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/PythonProcessBuilderSupplier.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/PythonProcessBuilderSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/PythonProcessBuilderSupplier.java new file mode 100644 index 0000000..ffb5e82 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/PythonProcessBuilderSupplier.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.util; + +import java.lang.management.ManagementFactory; +import java.util.Map; +import org.apache.ignite.tensorflow.util.SerializableSupplier; + +/** + * Python process builder supplier that is used to create Python process builder. + */ +public class PythonProcessBuilderSupplier implements SerializableSupplier<ProcessBuilder> { + /** */ + private static final long serialVersionUID = 7181937306294456125L; + + /** Python environment variable name. */ + private static final String PYTHON_ENV_NAME = "PYTHON"; + + /** Interactive flag (allows to used standard input to pass Python script). */ + private final boolean interactive; + + /** Meta information that adds to script as arguments. */ + private final String[] meta; + + /** + * Constructs a new instance of Python process builder supplier. + * + * @param interactive Interactive flag (allows to used standard input to pass Python script). + * @param meta Meta information that adds to script as arguments. + */ + public PythonProcessBuilderSupplier(boolean interactive, String... meta) { + this.interactive = interactive; + this.meta = meta; + } + + /** + * Returns process builder to be used to start Python process. + * + * @return Process builder to be used to start Python process. + */ + public ProcessBuilder get() { + String python = System.getenv(PYTHON_ENV_NAME); + + if (python == null) + python = "python3"; + + ProcessBuilder procBldr; + if (interactive) { + String[] cmd = new String[meta.length + 3]; + + cmd[0] = python; + cmd[1] = "-i"; + cmd[2] = "-"; + + System.arraycopy(meta, 0, cmd, 3, meta.length); + + procBldr = new ProcessBuilder(cmd); + } + else + procBldr = new ProcessBuilder(python); + + Map<String, String> env = procBldr.environment(); + env.put("PPID", String.valueOf(getProcessId())); + + return procBldr; + } + + /** + * Returns current process identifier. + * + * @return Process identifier. + */ + private long getProcessId() { + String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; + + return Long.parseLong(pid); + } +}