Author: olga Date: Thu Mar 27 12:01:32 2008 New Revision: 641943 URL: http://svn.apache.org/viewvc?rev=641943&view=rev Log: Files that I forgot to svn add as part of commit for PIG-94 patch
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (added) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce; +import org.apache.pig.impl.eval.collector.DataCollector; +import org.apache.pig.impl.streaming.ExecutableManager; +import org.apache.pig.impl.streaming.StreamingCommand; +import org.apache.pig.impl.streaming.StreamingCommand.Handle; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * [EMAIL PROTECTED] HadoopExecutableManager} is a specialization of + * [EMAIL PROTECTED] ExecutableManager} and provides HDFS-specific support for secondary + * outputs, task-logs etc. + * + * <code>HadoopExecutableManager</code> provides support for secondary outputs + * of the managed process and also persists the logs of the tasks on HDFS. + */ +public class HadoopExecutableManager extends ExecutableManager { + + JobConf job; + + String scriptOutputDir; + String scriptLogDir; + String taskId; + + FSDataOutputStream errorStream; + + boolean writeHeaderFooter = false; + + public HadoopExecutableManager() {} + + public void configure(Properties properties, StreamingCommand command, + DataCollector endOfPipe) + throws IOException, ExecException { + super.configure(properties, command, endOfPipe); + + // Chmod +x the executable + File executable = new File(command.getExecutable()); + if (executable.isAbsolute()) { + // we don't own it. Hope it is executable ... + } else { + try { + FileUtil.chmod(executable.toString(), "a+x"); + } catch (InterruptedException ie) { + throw new ExecException(ie); + } + } + + // Save the output directory for the Pig Script + scriptOutputDir = properties.getProperty("pig.streaming.task.output.dir"); + scriptLogDir = properties.getProperty("pig.streaming.log.dir"); + + // Save the taskid + taskId = properties.getProperty("pig.streaming.task.id"); + + // Save a copy of the JobConf + job = PigMapReduce.getPigContext().getJobConf(); + } + + protected void exec() throws IOException { + // Create the HDFS file for the stderr of the task, if necessary + if (writeErrorToHDFS(command.getLogFilesLimit(), taskId)) { + try { + Path errorFile = + new Path(new Path(scriptLogDir, command.getLogDir()), taskId); + errorStream = + errorFile.getFileSystem(job).create(errorFile); + } catch (IOException ie) { + // Don't fail the task if we couldn't save it's stderr on HDFS + System.err.println("Failed to create stderr file of task: " + + taskId + " in HDFS at " + scriptLogDir + + " with " + ie); + errorStream = null; + } + } + + // Header for stderr file of the task + writeDebugHeader(); + + // Exec the command ... + super.exec(); + } + + public void close() throws IOException, ExecException { + super.close(); + + // Footer for stderr file of the task + writeDebugFooter(); + + // Copy the secondary outputs of the task to HDFS + Path scriptOutputDir = new Path(this.scriptOutputDir); + FileSystem fs = scriptOutputDir.getFileSystem(job); + List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT); + if (outputSpecs != null) { + for (int i=1; i < outputSpecs.size(); ++i) { + String fileName = outputSpecs.get(i).getName(); + try { + fs.copyFromLocalFile(false, true, new Path(fileName), + new Path(scriptOutputDir, + taskId+"-"+fileName) + ); + } catch (IOException ioe) { + System.err.println("Failed to save secondary output '" + + fileName + "' of task: " + taskId + + " with " + ioe); + throw new ExecException(ioe); + } + } + } + + // Close the stderr file on HDFS + if (errorStream != null) { + errorStream.close(); + } + } + + /** + * Should the stderr data of this task be persisted on HDFS? + * + * @param limit maximum number of tasks whose stderr log-files are persisted + * @param taskId id of the task + * @return <code>true</code> if stderr data of task should be persisted on + * HDFS, <code>false</code> otherwise + */ + private boolean writeErrorToHDFS(int limit, String taskId) { + // These are hard-coded begin/end offsets a Hadoop *taskid* + int beginIndex = 25, endIndex = 31; + + int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex)); + return command.getPersistStderr() && tipId < command.getLogFilesLimit(); + } + + protected void processError(String error) { + super.processError(error); + + try { + if (errorStream != null) { + errorStream.writeBytes(error); + } + } catch (IOException ioe) { + super.processError("Failed to save error logs to HDFS with: " + + ioe); + } + } + + private void writeDebugHeader() { + processError("===== Task Information Header =====" ); + + processError("\nCommand: " + command.getExecutable()); + processError("\nStart time: " + new Date(System.currentTimeMillis())); + processError("\nInput-split file: " + job.get("map.input.file")); + processError("\nInput-split start-offset: " + + job.getLong("map.input.start", -1)); + processError("\nInput-split length: " + + job.getLong("map.input.length", -1)); + + processError("\n===== * * * =====\n"); + } + + private void writeDebugFooter() { + processError("===== Task Information Footer ====="); + + processError("\nEnd time: " + new Date(System.currentTimeMillis())); + processError("\nExit code: " + exitCode); + + List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT); + HandleSpec inputSpec = + (inputSpecs != null) ? inputSpecs.get(0) : null; + processError("\nInput bytes: " + inputBytes + " bytes " + + ((inputSpec != null) ? + "(" + inputSpec.getName() + " using " + + inputSpec.getSpec() + ")" + : "")); + + List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT); + HandleSpec outputSpec = + (outputSpecs != null) ? outputSpecs.get(0) : null; + processError("\nOutput bytes: " + outputBytes + " bytes " + + ((outputSpec != null) ? + "(" + outputSpec.getName() + " using " + + outputSpec.getSpec() + ")" + : "")); + if (outputSpecs != null) { + for (int i=1; i < outputSpecs.size(); ++i) { + HandleSpec spec = outputSpecs.get(i); + processError("\n " + new File(spec.getName()).length() + + " bytes using " + spec.getSpec()); + } + } + + processError("\n===== * * * =====\n"); + } +} + + \ No newline at end of file Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultInputHandler.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.pig.StoreFunc; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * [EMAIL PROTECTED] DefaultInputHandler} handles the input for the Pig-Streaming + * executable in a [EMAIL PROTECTED] InputType#SYNCHRONOUS} manner by feeding it input + * via its <code>stdin</code>. + */ +public class DefaultInputHandler extends InputHandler { + + OutputStream stdin; + + public DefaultInputHandler() { + serializer = new PigStorage(); + } + + public DefaultInputHandler(HandleSpec spec) { + serializer = (StoreFunc)PigContext.instantiateFuncFromSpec(spec.spec); + } + + public InputType getInputType() { + return InputType.SYNCHRONOUS; + } + + public void bindTo(OutputStream os) throws IOException { + stdin = os; + super.bindTo(stdin); + } + + public void close() throws IOException { + super.close(); + stdin.flush(); + stdin.close(); + stdin = null; + } +} Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import org.apache.pig.LoadFunc; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * [EMAIL PROTECTED] FileOutputHandler} handles the output from the Pig-Streaming + * executable in an [EMAIL PROTECTED] OutputType#SYNCHRONOUS} manner by reading its output + * via its <code>stdout</code>. + */ +public class DefaultOutputHandler extends OutputHandler { + + public DefaultOutputHandler() { + deserializer = new PigStorage(); + } + + public DefaultOutputHandler(HandleSpec spec) { + deserializer = (LoadFunc)PigContext.instantiateFuncFromSpec(spec.spec); + } + + public OutputType getOutputType() { + return OutputType.SYNCHRONOUS; + } +} Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Datum; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.eval.collector.DataCollector; +import org.apache.pig.impl.streaming.InputHandler.InputType; +import org.apache.pig.impl.streaming.OutputHandler.OutputType; + +/** + * [EMAIL PROTECTED] ExecutableManager} manages an external executable which processes data + * in a Pig query. + * + * The <code>ExecutableManager</code> is responsible for startup/teardown of the + * external process and also for managing it. + * It feeds input records to the executable via it's <code>stdin</code>, + * collects the output records from the <code>stdout</code> and also diagnostic + * information from the <code>stdout</code>. + */ +public class ExecutableManager { + private static final Log LOG = + LogFactory.getLog(ExecutableManager.class.getName()); + private static final int SUCCESS = 0; + + protected StreamingCommand command; // Streaming command to be run + String[] argv; // Parsed/split commands + + Process process; // Handle to the process + protected int exitCode = -127; // Exit code of the process + + protected DataOutputStream stdin; // stdin of the process + + ProcessOutputThread stdoutThread; // thread to get process output + InputStream stdout; // stdout of the process + // interpret the process' output + + ProcessErrorThread stderrThread; // thread to get process output + InputStream stderr; // stderr of the process + + DataCollector endOfPipe; + + // Input/Output handlers + InputHandler inputHandler; + OutputHandler outputHandler; + + Properties properties; + + protected long inputBytes = 0; + protected long outputBytes = 0; + + public ExecutableManager() {} + + public void configure(Properties properties, StreamingCommand command, + DataCollector endOfPipe) + throws IOException, ExecException { + this.properties = properties; + + this.command = command; + this.argv = this.command.getCommandArgs(); + + // Create the input/output handlers + this.inputHandler = HandlerFactory.createInputHandler(command); + this.outputHandler = + HandlerFactory.createOutputHandler(command); + + // Successor + this.endOfPipe = endOfPipe; + } + + public void close() throws IOException, ExecException { + // Close the InputHandler, which in some cases lets the process + // terminate + inputHandler.close(); + + // Check if we need to start the process now ... + if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) { + exec(); + } + + // Wait for the process to exit and the stdout/stderr threads to complete + try { + exitCode = process.waitFor(); + + if (stdoutThread != null) { + stdoutThread.join(0); + } + if (stderrThread != null) { + stderrThread.join(0); + } + + } catch (InterruptedException ie) {} + + // Clean up the process + process.destroy(); + + LOG.debug("Process exited with: " + exitCode); + if (exitCode != SUCCESS) { + throw new ExecException(command + " failed with exit status: " + + exitCode); + } + + if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) { + // Trigger the outputHandler + outputHandler.bindTo(null); + + // Start the thread to process the output and wait for + // it to terminate + stdoutThread = new ProcessOutputThread(outputHandler); + stdoutThread.start(); + + try { + stdoutThread.join(0); + } catch (InterruptedException ie) {} + } + + } + + protected void exec() throws IOException { + // Unquote command-line arguments ... + for (int i=0; i < argv.length; ++i) { + String arg = argv[i]; + if (arg.charAt(0) == '\'' && arg.charAt(arg.length()-1) == '\'') { + argv[i] = arg.substring(1, arg.length()-1); + } + } + + // Start the external process + ProcessBuilder processBuilder = new ProcessBuilder(argv); + process = processBuilder.start(); + LOG.debug("Started the process for command: " + command); + + // Pick up the process' stderr stream and start the thread to + // process the stderr stream + stderr = + new DataInputStream(new BufferedInputStream(process.getErrorStream())); + stderrThread = new ProcessErrorThread(); + stderrThread.start(); + + // Check if we need to handle the process' stdout directly + if (outputHandler.getOutputType() == OutputType.SYNCHRONOUS) { + // Get hold of the stdout of the process + stdout = + new DataInputStream(new BufferedInputStream(process.getInputStream())); + + // Bind the stdout to the OutputHandler + outputHandler.bindTo(stdout); + + // Start the thread to process the executable's stdout + stdoutThread = new ProcessOutputThread(outputHandler); + stdoutThread.start(); + } + } + + public void run() throws IOException { + // Check if we need to exec the process NOW ... + if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) { + return; + } + + // Start the executable ... + exec(); + stdin = + new DataOutputStream(new BufferedOutputStream(process.getOutputStream())); + inputHandler.bindTo(stdin); + } + + public void add(Datum d) throws IOException { + // Pass the serialized tuple to the executable via the InputHandler + Tuple t = (Tuple)d; + inputHandler.putNext(t); + inputBytes += t.getMemorySize(); + } + + /** + * Workhorse to process the output of the managed process. + * + * The <code>ExecutableManager</code>, by default, just pushes the received + * <code>Datum</code> into eval-pipeline to be processed by the successor. + * + * @param d <code>Datum</code> to process + */ + protected void processOutput(Datum d) { + endOfPipe.add(d); + } + + class ProcessOutputThread extends Thread { + + OutputHandler outputHandler; + + ProcessOutputThread(OutputHandler outputHandler) { + setDaemon(true); + this.outputHandler = outputHandler; + } + + public void run() { + try { + // Read tuples from the executable and push them down the pipe + Tuple tuple = null; + while ((tuple = outputHandler.getNext()) != null) { + processOutput(tuple); + outputBytes += tuple.getMemorySize(); + } + + outputHandler.close(); + } catch (Throwable t) { + LOG.warn(t); + try { + outputHandler.close(); + } catch (IOException ioe) { + LOG.info(ioe); + } + throw new RuntimeException(t); + } + } + } + + /** + * Workhorse to process the stderr stream of the managed process. + * + * By default <code>ExecuatbleManager</code> just sends out the received + * error message to the <code>stderr</code> of itself. + * + * @param error error message from the managed process. + */ + protected void processError(String error) { + // Just send it out to our stderr + System.err.print(error); + } + + class ProcessErrorThread extends Thread { + + public ProcessErrorThread() { + setDaemon(true); + } + + public void run() { + try { + String error; + BufferedReader reader = + new BufferedReader(new InputStreamReader(stderr)); + while ((error = reader.readLine()) != null) { + processError(error+"\n"); + } + + if (stderr != null) { + stderr.close(); + LOG.debug("ProcessErrorThread done"); + } + } catch (Throwable th) { + LOG.warn(th); + try { + if (stderr != null) { + stderr.close(); + } + } catch (IOException ioe) { + LOG.info(ioe); + throw new RuntimeException(th); + } + } + } + } +} Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileInputHandler.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.pig.StoreFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * [EMAIL PROTECTED] FileInputHandler} handles the input for the Pig-Streaming + * executable in an [EMAIL PROTECTED] InputType#ASYNCHRONOUS} manner by feeding it input + * via an external file specified by the user. + */ +public class FileInputHandler extends InputHandler { + + String fileName; + OutputStream fileOutStream; + + public FileInputHandler(HandleSpec handleSpec) throws ExecException { + fileName = handleSpec.name; + serializer = + (StoreFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec); + + try { + fileOutStream = new FileOutputStream(new File(fileName)); + super.bindTo(fileOutStream); + } catch (IOException fnfe) { + throw new ExecException(fnfe); + } + } + + public InputType getInputType() { + return InputType.ASYNCHRONOUS; + } + + public void bindTo(OutputStream os) throws IOException { + throw new UnsupportedOperationException("Cannot call bindTo on " + + "FileInputHandler"); + } + + public void close() throws IOException { + super.close(); + fileOutStream.flush(); + fileOutStream.close(); + fileOutStream = null; + } + +} Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * [EMAIL PROTECTED] FileOutputHandler} handles the output from the Pig-Streaming + * executable in an [EMAIL PROTECTED] OutputType#ASYNCHRONOUS} manner by reading it from + * an external file specified by the user. + */ +public class FileOutputHandler extends OutputHandler { + + String fileName; + InputStream fileInStream; + + public FileOutputHandler(HandleSpec handleSpec) throws ExecException { + fileName = handleSpec.name; + deserializer = + (LoadFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec); + } + + public OutputType getOutputType() { + return OutputType.ASYNCHRONOUS; + } + + public void bindTo(InputStream is) throws IOException { + // This is a trigger to start processing the output from the file ... + fileInStream = new FileInputStream(new File(fileName)); + super.bindTo(fileInStream); + } + + public void close() throws IOException { + super.close(); + fileInStream.close(); + fileInStream = null; + } + +} Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/HandlerFactory.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.impl.streaming.StreamingCommand.Handle; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * Factory to create an [EMAIL PROTECTED] InputHandler} or [EMAIL PROTECTED] OutputHandler} + * depending on the specification of the [EMAIL PROTECTED] StreamingCommand}. + */ +public class HandlerFactory { + + /** + * Create an <code>InputHandler</code> for the given input specification + * of the <code>StreamingCommand</code>. + * + * @param command <code>StreamingCommand</code> + * @return <code>InputHandler</code> for the given input specification + * @throws ExecException + */ + public static InputHandler createInputHandler(StreamingCommand command) + throws ExecException { + List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT); + + HandleSpec in = null; + if (inputSpecs == null || (in = inputSpecs.get(0)) == null) { + return new DefaultInputHandler(); + } + + return (in.name.equals("stdin")) ? new DefaultInputHandler(in) : + new FileInputHandler(in); + } + + /** + * Create an <code>OutputHandler</code> for the given output specification + * of the <code>StreamingCommand</code>. + * + * @param command <code>StreamingCommand</code> + * @return <code>OutputHandler</code> for the given output specification + * @throws ExecException + */ + public static OutputHandler createOutputHandler(StreamingCommand command) + throws ExecException { + List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT); + + HandleSpec out = null; + if (outputSpecs == null || (out = outputSpecs.get(0)) == null) { + return new DefaultOutputHandler(); + } + + return (out.name.equals("stdout")) ? new DefaultOutputHandler(out) : + new FileOutputHandler(out); + } +} Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/InputHandler.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.pig.StoreFunc; +import org.apache.pig.data.Tuple; + +/** + * [EMAIL PROTECTED] InputHandler} is responsible for handling the input to the + * Pig-Streaming external command. + * + * The managed executable could be fed input in a [EMAIL PROTECTED] InputType#SYNCHRONOUS} + * manner via its <code>stdin</code> or in an [EMAIL PROTECTED] InputType#ASYNCHRONOUS} + * manner via an external file which is subsequently read by the executable. + */ +public abstract class InputHandler { + /** + * + */ + public enum InputType {SYNCHRONOUS, ASYNCHRONOUS} + + /* + * The serializer to be used to send data to the managed process. + * + * It is the responsibility of the concrete sub-classes to setup and + * manage the serializer. + */ + protected StoreFunc serializer; + + /** + * Get the handled <code>InputType</code> + * @return the handled <code>InputType</code> + */ + public abstract InputType getInputType(); + + /** + * Send the given input <code>Tuple</code> to the managed executable. + * + * @param t input <code>Tuple</code> + * @throws IOException + */ + public void putNext(Tuple t) throws IOException { + serializer.putNext(t); + } + + /** + * Close the <code>InputHandler</code> since there is no more input + * to be sent to the managed process. + * + * @throws IOException + */ + public void close() throws IOException { + serializer.finish(); + } + + /** + * Bind the <code>InputHandler</code> to the <code>OutputStream</code> + * from which it reads input and sends it to the managed process. + * + * @param os <code>OutputStream</code> from which to read input data for the + * managed process + * @throws IOException + */ + public void bindTo(OutputStream os) throws IOException { + serializer.bindTo(os); + } +} Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.pig.LoadFunc; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.BufferedPositionedInputStream; + +/** + * [EMAIL PROTECTED] OutputHandler} is responsible for handling the output of the + * Pig-Streaming external command. + * + * The output of the managed executable could be fetched in a + * [EMAIL PROTECTED] OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an + * [EMAIL PROTECTED] OutputType#ASYNCHRONOUS} manner via an external file to which the + * process wrote its output. + */ +public abstract class OutputHandler { + public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS} + + /* + * The deserializer to be used to send data to the managed process. + * + * It is the responsibility of the concrete sub-classes to setup and + * manage the deserializer. + */ + protected LoadFunc deserializer; + + /** + * Get the handled <code>OutputType</code>. + * @return the handled <code>OutputType</code> + */ + public abstract OutputType getOutputType(); + + /** + * Bind the <code>OutputHandler</code> to the <code>InputStream</code> + * from which to read the output data of the managed process. + * + * @param is <code>InputStream</code> from which to read the output data + * of the managed process + * @throws IOException + */ + public void bindTo(InputStream is) throws IOException { + deserializer.bindTo("", new BufferedPositionedInputStream(is), 0, + Long.MAX_VALUE); + } + + /** + * Get the next output <code>Tuple</code> of the managed process. + * + * @return the next output <code>Tuple</code> of the managed process + * @throws IOException + */ + public Tuple getNext() throws IOException { + return deserializer.getNext(); + } + + /** + * Close the <code>OutputHandler</code>. + * @throws IOException + */ + public void close() throws IOException {} +} Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=641943&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu Mar 27 12:01:32 2008 @@ -0,0 +1,347 @@ +package org.apache.pig.impl.streaming; + +import java.io.Serializable; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.pig.builtin.PigStorage; + + +/** + * [EMAIL PROTECTED] StreamingCommand} represents the specification of an external + * command to be executed in a Pig Query. + * + * <code>StreamingCommand</code> encapsulates all relevant details of the + * command specified by the user either directly via the <code>STREAM</code> + * operator or indirectly via a <code>DEFINE</code> operator. It includes + * details such as input/output/error specifications and also files to be + * shipped to the cluster and files to be cached. + */ +public class StreamingCommand implements Serializable { + private static final long serialVersionUID = 1L; + + // External command to be executed and it's parsed components + String executable; + String[] argv; + + // Files to be shipped to the cluster in-order to be executed + List<String> shipSpec = new LinkedList<String>(); + + // Files to be shipped to the cluster in-order to be executed + List<String> cacheSpec = new LinkedList<String>(); + + /** + * Handle to communicate with the external process. + */ + public enum Handle {INPUT, OUTPUT} + + /** + * Map from the the stdin/stdout/stderr handles to their specifications + */ + Map<Handle, List<HandleSpec>> handleSpecs = + new TreeMap<Handle, List<HandleSpec>>(); + + // Should the stderr of the process be persisted? + boolean persistStderr = false; + + // Directory where the process's stderr logs should be persisted. + String logDir; + + // Limit on the number of persisted log-files + int logFilesLimit = 100; + public static final int MAX_TASKS = 100; + + boolean shipFiles = true; + + /** + * Create a new <code>StreamingCommand</code> with the given command. + * + * @param command streaming command to be executed + * @param argv parsed arguments of the <code>command</code> + */ + public StreamingCommand(String[] argv) { + this.argv = argv; + + // Assume that argv[0] is the executable + this.executable = this.argv[0]; + } + + /** + * Get the command to be executed. + * + * @return the command to be executed + */ + public String getExecutable() { + return executable; + } + + /** + * Set the executable for the <code>StreamingCommand</code>. + * + * @param executable the executable for the <code>StreamingCommand</code> + */ + public void setExecutable(String executable) { + this.executable = executable; + } + + /** + * Set the command line arguments for the <code>StreamingCommand</code>. + * + * @param argv the command line arguments for the + * <code>StreamingCommand</code> + */ + public void setCommandArgs(String[] argv) { + this.argv = argv; + } + + /** + * Get the parsed command arguments. + * + * @return the parsed command arguments as <code>String[]</code> + */ + public String[] getCommandArgs() { + return argv; + } + + /** + * Get the list of files which need to be shipped to the cluster. + * + * @return the list of files which need to be shipped to the cluster + */ + public List<String> getShipSpecs() { + return shipSpec; + } + + /** + * Get the list of files which need to be cached on the execute nodes. + * + * @return the list of files which need to be cached on the execute nodes + */ + public List<String> getCacheSpecs() { + return cacheSpec; + } + + /** + * Add a file to be shipped to the cluster. + * + * Users can use this to distribute executables and other necessary files + * to the clusters. + * + * @param path path of the file to be shipped to the cluster + */ + public void addPathToShip(String path) { + shipSpec.add(path); + } + + /** + * Add a file to be cached on execute nodes on the cluster. The file is + * assumed to be available at the shared filesystem. + * + * @param path path of the file to be cached on the execute nodes + */ + public void addPathToCache(String path) { + cacheSpec.add(path); + } + + /** + * Attach a [EMAIL PROTECTED] HandleSpec} to a given [EMAIL PROTECTED] Handle} + * @param handle <code>Handle</code> to which the specification is to + * be attached. + * @param handleSpec <code>HandleSpec</code> for the given handle. + */ + public void addHandleSpec(Handle handle, HandleSpec handleSpec) { + List<HandleSpec> handleSpecList = handleSpecs.get(handle); + + if (handleSpecList == null) { + handleSpecList = new LinkedList<HandleSpec>(); + handleSpecs.put(handle, handleSpecList); + } + + handleSpecList.add(handleSpec); + } + + /** + * Get specifications for the given <code>Handle</code>. + * + * @param handle <code>Handle</code> of the stream + * @return specification for the given <code>Handle</code> + */ + public List<HandleSpec> getHandleSpecs(Handle handle) { + return handleSpecs.get(handle); + } + + /** + * Should the stderr of the managed process be persisted? + * + * @return <code>true</code> if the stderr of the managed process should be + * persisted, <code>false</code> otherwise. + */ + public boolean getPersistStderr() { + return persistStderr; + } + + /** + * Specify if the stderr of the managed process should be persisted. + * + * @param persistStderr <code>true</code> if the stderr of the managed + * process should be persisted, else <code>false</code> + */ + public void setPersistStderr(boolean persistStderr) { + this.persistStderr = persistStderr; + } + + /** + * Get the directory where the log-files of the command are persisted. + * + * @return the directory where the log-files of the command are persisted + */ + public String getLogDir() { + return logDir; + } + + /** + * Set the directory where the log-files of the command are persisted. + * + * @param logDir the directory where the log-files of the command are persisted + */ + public void setLogDir(String logDir) { + this.logDir = logDir; + if (this.logDir.startsWith("/")) { + this.logDir = this.logDir.substring(1); + } + setPersistStderr(true); + } + + /** + * Get the maximum number of tasks whose stderr logs files are persisted. + * + * @return the maximum number of tasks whose stderr logs files are persisted + */ + public int getLogFilesLimit() { + return logFilesLimit; + } + + /** + * Set the maximum number of tasks whose stderr logs files are persisted. + * @param logFilesLimit the maximum number of tasks whose stderr logs files + * are persisted + */ + public void setLogFilesLimit(int logFilesLimit) { + this.logFilesLimit = Math.min(MAX_TASKS, logFilesLimit); + } + + /** + * Set whether files should be shipped or not. + * + * @param shipFiles <code>true</code> if files of this command should be + * shipped, <code>false</code> otherwise + */ + public void setShipFiles(boolean shipFiles) { + this.shipFiles = shipFiles; + } + + /** + * Get whether files for this command should be shipped or not. + * + * @return <code>true</code> if files of this command should be shipped, + * <code>false</code> otherwise + */ + public boolean getShipFiles() { + return shipFiles; + } + + public String toString() { + return executable; + } + + /** + * Specification about the usage of the [EMAIL PROTECTED] Handle} to communicate + * with the external process. + * + * It specifies the stream-handle which can be one of <code>stdin</code>/ + * <code>stdout</code>/<code>stderr</code> or a named file and also the + * serializer/deserializer specification to be used to read/write data + * to/from the stream. + */ + public static class HandleSpec + implements Comparable<HandleSpec>, Serializable { + private static final long serialVersionUID = 1L; + + String name; + String spec; + + /** + * Create a new [EMAIL PROTECTED] HandleSpec} with a given name using the default + * [EMAIL PROTECTED] PigStorage} serializer/deserializer. + * + * @param handleName name of the handle (one of <code>stdin</code>, + * <code>stdout</code> or a file-path) + */ + public HandleSpec(String handleName) { + this(handleName, PigStorage.class.getName()); + } + + /** + * Create a new [EMAIL PROTECTED] HandleSpec} with a given name using the default + * [EMAIL PROTECTED] PigStorage} serializer/deserializer. + * + * @param handleName name of the handle (one of <code>stdin</code>, + * <code>stdout</code> or a file-path) + * @param spec serializer/deserializer spec + */ + public HandleSpec(String handleName, String spec) { + this.name = handleName; + this.spec = spec; + } + + public int compareTo(HandleSpec o) { + return this.name.compareTo(o.name); + } + + public String toString() { + return name + " using " + spec; + } + + /** + * Get the <b>name</b> of the <code>HandleSpec</code>. + * + * @return the <b>name</b> of the <code>HandleSpec</code> (one of + * <code>stdin</code>, <code>stdout</code> or a file-path) + */ + public String getName() { + return name; + } + + /** + * Set the <b>name</b> of the <code>HandleSpec</code>. + * + * @param name <b>name</b> of the <code>HandleSpec</code> (one of + * <code>stdin</code>, <code>stdout</code> or a file-path) + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get the serializer/deserializer spec of the <code>HandleSpec</code>. + * + * @return the serializer/deserializer spec of the + * <code>HandleSpec</code> + */ + public String getSpec() { + return spec; + } + + /** + * Set the serializer/deserializer spec of the <code>HandleSpec</code>. + * + * @param spec the serializer/deserializer spec of the + * <code>HandleSpec</code> + */ + public void setSpec(String spec) { + this.spec = spec; + } + } +} \ No newline at end of file