>From Ian Maxon <[email protected]>:
Ian Maxon has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17760 )
Change subject: DON'T MERGE: Debugging UDF tests
......................................................................
DON'T MERGE: Debugging UDF tests
Change-Id: Ie477ba08405d2e58cb4b27d5d160c7773ff1246a
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
1 file changed, 40 insertions(+), 0 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/60/17760/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
index 385d738..acfb3fd 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
@@ -18,13 +18,17 @@
*/
package org.apache.asterix.external.library;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
@@ -37,9 +41,13 @@
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class PythonLibraryTCPSocketEvaluator extends
AbstractLibrarySocketEvaluator {
+ private static final Logger LOGGER =
LogManager.getLogger(PythonLibraryTCPSocketEvaluator.class);
+
public static final String ENTRYPOINT = "entrypoint.py";
public static final String SITE_PACKAGES = "site-packages";
@@ -52,6 +60,9 @@
private List<String> pythonArgs;
private Map<String, String> pythonEnv;
+ //DEBUG
+ private StreamGobbler out;
+
public PythonLibraryTCPSocketEvaluator(JobId jobId,
PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
File pythonHome, String sitePkgs, List<String> pythonArgs,
Map<String, String> pythonEnv,
ExternalFunctionResultRouter router, IPCSystem ipcSys,
TaskAttemptId task,
@@ -66,6 +77,21 @@
this.ipcSys = ipcSys;
}
+ //DEBUG: with apologies to https://stackoverflow.com/a/33386692
+ class StreamGobbler implements Runnable {
+ private InputStream inputStream;
+ private Consumer<String> consumeInputLine;
+
+ public StreamGobbler(InputStream inputStream, Consumer<String>
consumeInputLine) {
+ this.inputStream = inputStream;
+ this.consumeInputLine = consumeInputLine;
+ }
+
+ public void run() {
+ new BufferedReader(new
InputStreamReader(inputStream)).lines().forEach(consumeInputLine);
+ }
+ }
+
@Override
public void start() throws IOException, AsterixException {
PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
@@ -83,7 +109,12 @@
ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
pb.environment().putAll(pythonEnv);
pb.directory(new File(wd));
+ pb.redirectErrorStream(true);
p = pb.start();
+ //DEBUG
+ out = new StreamGobbler(p.getInputStream(), LOGGER::info);
+ new Thread(out).start();
+ //DEBUG
proto = new PythonTCPSocketProto(p.getOutputStream(), router, p);
proto.start();
proto.helo();
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17760
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ie477ba08405d2e58cb4b27d5d160c7773ff1246a
Gerrit-Change-Number: 17760
Gerrit-PatchSet: 1
Gerrit-Owner: Ian Maxon <[email protected]>
Gerrit-MessageType: newchange