http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java deleted file mode 100644 index dc5874d..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java +++ /dev/null @@ -1,976 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.HadoopContext; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.HadoopExternalProcessStarter; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopMessageListener; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.spi.IgnitePortProtocol; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; - -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.CRASHED; -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.FAILED; - -/** - * External process registry. Handles external process lifecycle. - */ -public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { - /** Hadoop context. */ - private HadoopContext ctx; - - /** */ - private String javaCmd; - - /** Logger. */ - private IgniteLogger log; - - /** Node process descriptor. */ - private HadoopProcessDescriptor nodeDesc; - - /** Output base. */ - private File outputBase; - - /** Path separator. */ - private String pathSep; - - /** Hadoop external communication. */ - private HadoopExternalCommunication comm; - - /** Starting processes. */ - private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>(); - - /** Starting processes. */ - private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>(); - - /** Busy lock. */ - private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); - - /** Job tracker. */ - private HadoopJobTracker jobTracker; - - /** {@inheritDoc} */ - @Override public void start(HadoopContext ctx) throws IgniteCheckedException { - this.ctx = ctx; - - log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class); - - outputBase = U.resolveWorkDirectory("hadoop", false); - - pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":"); - - initJavaCommand(); - - comm = new HadoopExternalCommunication( - ctx.localNodeId(), - UUID.randomUUID(), - ctx.kernalContext().config().getMarshaller(), - log, - ctx.kernalContext().getSystemExecutorService(), - ctx.kernalContext().gridName()); - - comm.setListener(new MessageListener()); - - comm.start(); - - nodeDesc = comm.localProcessDescriptor(); - - ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP, - HadoopExternalTaskExecutor.class); - - if (nodeDesc.sharedMemoryPort() != -1) - ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP, - HadoopExternalTaskExecutor.class); - - jobTracker = ctx.jobTracker(); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - busyLock.writeLock(); - - try { - comm.stop(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e); - } - } - - /** {@inheritDoc} */ - @Override public void onJobStateChanged(final HadoopJobMetadata meta) { - final HadoopProcess proc = runningProcsByJobId.get(meta.jobId()); - - // If we have a local process for this job. - if (proc != null) { - if (log.isDebugEnabled()) - log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']'); - - if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { - if (log.isDebugEnabled()) - log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() + - ", proc=" + proc + ']'); - - runningProcsByJobId.remove(meta.jobId()); - runningProcsByProcId.remove(proc.descriptor().processId()); - - proc.terminate(); - - return; - } - - if (proc.initFut.isDone()) { - if (!proc.initFut.isFailed()) - sendJobInfoUpdate(proc, meta); - else if (log.isDebugEnabled()) - log.debug("Failed to initialize child process (will skip job state notification) " + - "[jobId=" + meta.jobId() + ", meta=" + meta + ']'); - } - else { - proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override - public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { - try { - f.get(); - - sendJobInfoUpdate(proc, meta); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to initialize child process (will skip job state notification) " + - "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']'); - } - - } - }); - } - } - else if (ctx.isParticipating(meta)) { - HadoopJob job; - - try { - job = jobTracker.job(meta.jobId(), meta.jobInfo()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to get job: " + meta.jobId(), e); - - return; - } - - startProcess(job, meta.mapReducePlan()); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) { - if (log.isDebugEnabled()) - log.debug("Failed to start hadoop tasks (grid is stopping, will ignore)."); - - return; - } - - try { - HadoopProcess proc = runningProcsByJobId.get(job.id()); - - HadoopTaskType taskType = F.first(tasks).type(); - - if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT || - taskType == HadoopTaskType.COMMIT) { - if (proc == null || proc.terminated()) { - runningProcsByJobId.remove(job.id(), proc); - - // Start new process for ABORT task since previous processes were killed. - proc = startProcess(job, jobTracker.plan(job.id())); - - if (log.isDebugEnabled()) - log.debug("Starting new process for maintenance task [jobId=" + job.id() + - ", proc=" + proc + ", taskType=" + taskType + ']'); - } - } - else - assert proc != null : "Missing started process for task execution request: " + job.id() + - ", tasks=" + tasks; - - final HadoopProcess proc0 = proc; - - proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override public void apply( - IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { - if (!busyLock.tryReadLock()) - return; - - try { - f.get(); - - proc0.addTasks(tasks); - - if (log.isDebugEnabled()) - log.debug("Sending task execution request to child process [jobId=" + job.id() + - ", proc=" + proc0 + ", tasks=" + tasks + ']'); - - sendExecutionRequest(proc0, job, tasks); - } - catch (IgniteCheckedException e) { - notifyTasksFailed(tasks, FAILED, e); - } - finally { - busyLock.readUnlock(); - } - } - }); - } - finally { - busyLock.readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void cancelTasks(HadoopJobId jobId) { - HadoopProcess proc = runningProcsByJobId.get(jobId); - - if (proc != null) - proc.terminate(); - } - - /** - * Sends execution request to remote node. - * - * @param proc Process to send request to. - * @param job Job instance. - * @param tasks Collection of tasks to execute in started process. - */ - private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks) - throws IgniteCheckedException { - // Must synchronize since concurrent process crash may happen and will receive onConnectionLost(). - proc.lock(); - - try { - if (proc.terminated()) { - notifyTasksFailed(tasks, CRASHED, null); - - return; - } - - HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest(); - - req.jobId(job.id()); - req.jobInfo(job.info()); - req.tasks(tasks); - - comm.sendMessage(proc.descriptor(), req); - } - finally { - proc.unlock(); - } - } - - /** - * @return External task metadata. - */ - private HadoopExternalTaskMetadata buildTaskMeta() { - HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata(); - - meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator))); - meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled", - "-DIGNITE_HOME=" + U.getIgniteHome())); - - return meta; - } - - /** - * @param tasks Tasks to notify about. - * @param state Fail state. - * @param e Optional error. - */ - private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) { - HadoopTaskStatus fail = new HadoopTaskStatus(state, e); - - for (HadoopTaskInfo task : tasks) - jobTracker.onTaskFinished(task, fail); - } - - /** - * Starts process template that will be ready to execute Hadoop tasks. - * - * @param job Job instance. - * @param plan Map reduce plan. - */ - private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) { - final UUID childProcId = UUID.randomUUID(); - - HadoopJobId jobId = job.id(); - - final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId); - - final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId())); - - HadoopProcess old = runningProcsByJobId.put(jobId, proc); - - assert old == null; - - old = runningProcsByProcId.put(childProcId, proc); - - assert old == null; - - ctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - if (!busyLock.tryReadLock()) { - fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping).")); - - return; - } - - try { - HadoopExternalTaskMetadata startMeta = buildTaskMeta(); - - if (log.isDebugEnabled()) - log.debug("Created hadoop child process metadata for job [job=" + job + - ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']'); - - Process proc = startJavaProcess(childProcId, startMeta, job); - - BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream())); - - String line; - - // Read up all the process output. - while ((line = rdr.readLine()) != null) { - if (log.isDebugEnabled()) - log.debug("Tracing process output: " + line); - - if ("Started".equals(line)) { - // Process started successfully, it should not write anything more to the output stream. - if (log.isDebugEnabled()) - log.debug("Successfully started child process [childProcId=" + childProcId + - ", meta=" + job + ']'); - - fut.onProcessStarted(proc); - - break; - } - else if ("Failed".equals(line)) { - StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n"); - - while ((line = rdr.readLine()) != null) - sb.append(" ").append(line).append("\n"); - - // Cut last character. - sb.setLength(sb.length() - 1); - - log.warning(sb.toString()); - - fut.onDone(new IgniteCheckedException(sb.toString())); - - break; - } - } - } - catch (Throwable e) { - fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e)); - - if (e instanceof Error) - throw (Error)e; - } - finally { - busyLock.readUnlock(); - } - } - }, true); - - fut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { - try { - // Make sure there were no exceptions. - f.get(); - - prepareForJob(proc, job, plan); - } - catch (IgniteCheckedException ignore) { - // Exception is printed in future's onDone() method. - } - } - }); - - return proc; - } - - /** - * Checks that java local command is available. - * - * @throws IgniteCheckedException If initialization failed. - */ - private void initJavaCommand() throws IgniteCheckedException { - String javaHome = System.getProperty("java.home"); - - if (javaHome == null) - javaHome = System.getenv("JAVA_HOME"); - - if (javaHome == null) - throw new IgniteCheckedException("Failed to locate JAVA_HOME."); - - javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java"); - - try { - Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start(); - - Collection<String> out = readProcessOutput(proc); - - int res = proc.waitFor(); - - if (res != 0) - throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " + - "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']'); - - if (log.isInfoEnabled()) { - log.info("Will use java for external task execution: "); - - for (String s : out) - log.info(" " + s); - } - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to check java for external task execution.", e); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e); - } - } - - /** - * Reads process output line-by-line. - * - * @param proc Process to read output. - * @return Read lines. - * @throws IOException If read failed. - */ - private Collection<String> readProcessOutput(Process proc) throws IOException { - BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream())); - - Collection<String> res = new ArrayList<>(); - - String s; - - while ((s = rdr.readLine()) != null) - res.add(s); - - return res; - } - - /** - * Builds process from metadata. - * - * @param childProcId Child process ID. - * @param startMeta Metadata. - * @param job Job. - * @return Started process. - */ - private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta, - HadoopJob job) throws Exception { - String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId; - - if (log.isDebugEnabled()) - log.debug("Will write process log output to: " + outFldr); - - List<String> cmd = new ArrayList<>(); - - File workDir = U.resolveWorkDirectory("", false); - - cmd.add(javaCmd); - cmd.addAll(startMeta.jvmOptions()); - cmd.add("-cp"); - cmd.add(buildClasspath(startMeta.classpath())); - cmd.add(HadoopExternalProcessStarter.class.getName()); - cmd.add("-cpid"); - cmd.add(String.valueOf(childProcId)); - cmd.add("-ppid"); - cmd.add(String.valueOf(nodeDesc.processId())); - cmd.add("-nid"); - cmd.add(String.valueOf(nodeDesc.parentNodeId())); - cmd.add("-addr"); - cmd.add(nodeDesc.address()); - cmd.add("-tport"); - cmd.add(String.valueOf(nodeDesc.tcpPort())); - cmd.add("-sport"); - cmd.add(String.valueOf(nodeDesc.sharedMemoryPort())); - cmd.add("-out"); - cmd.add(outFldr); - cmd.add("-wd"); - cmd.add(workDir.getAbsolutePath()); - - return new ProcessBuilder(cmd) - .redirectErrorStream(true) - .directory(workDir) - .start(); - } - - /** - * Gets job work folder. - * - * @param jobId Job ID. - * @return Job work folder. - */ - private String jobWorkFolder(HadoopJobId jobId) { - return outputBase + File.separator + "Job_" + jobId; - } - - /** - * @param cp Classpath collection. - * @return Classpath string. - */ - private String buildClasspath(Collection<String> cp) { - assert !cp.isEmpty(); - - StringBuilder sb = new StringBuilder(); - - for (String s : cp) - sb.append(s).append(pathSep); - - sb.setLength(sb.length() - 1); - - return sb.toString(); - } - - /** - * Sends job info update request to remote process. - * - * @param proc Process to send request to. - * @param meta Job metadata. - */ - private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) { - Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses(); - - int rdcNum = meta.mapReducePlan().reducers(); - - HadoopProcessDescriptor[] addrs = null; - - if (rdcAddrs != null && rdcAddrs.size() == rdcNum) { - addrs = new HadoopProcessDescriptor[rdcNum]; - - for (int i = 0; i < rdcNum; i++) { - HadoopProcessDescriptor desc = rdcAddrs.get(i); - - assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']'; - - addrs[i] = desc; - } - } - - try { - comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs)); - } - catch (IgniteCheckedException e) { - if (!proc.terminated()) { - log.error("Failed to send job state update message to remote child process (will kill the process) " + - "[jobId=" + proc.jobId + ", meta=" + meta + ']', e); - - proc.terminate(); - } - } - } - - /** - * Sends prepare request to remote process. - * - * @param proc Process to send request to. - * @param job Job. - * @param plan Map reduce plan. - */ - private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) { - try { - comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(), - plan.reducers(), plan.reducers(ctx.localNodeId()))); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job + - ", plan=" + plan + ']', e); - - proc.terminate(); - } - } - - /** - * Processes task finished message. - * - * @param desc Remote process descriptor. - * @param taskMsg Task finished message. - */ - private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) { - HadoopProcess proc = runningProcsByProcId.get(desc.processId()); - - if (proc != null) - proc.removeTask(taskMsg.taskInfo()); - - jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status()); - } - - /** - * - */ - private class MessageListener implements HadoopMessageListener { - /** {@inheritDoc} */ - @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { - if (!busyLock.tryReadLock()) - return; - - try { - if (msg instanceof HadoopProcessStartedAck) { - HadoopProcess proc = runningProcsByProcId.get(desc.processId()); - - assert proc != null : "Missing child process for processId: " + desc; - - HadoopProcessFuture fut = proc.initFut; - - if (fut != null) - fut.onReplyReceived(desc); - // Safety. - else - log.warning("Failed to find process start future (will ignore): " + desc); - } - else if (msg instanceof HadoopTaskFinishedMessage) { - HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg; - - processTaskFinishedMessage(desc, taskMsg); - } - else - log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']'); - } - finally { - busyLock.readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(HadoopProcessDescriptor desc) { - if (!busyLock.tryReadLock()) - return; - - try { - if (desc == null) { - U.warn(log, "Handshake failed."); - - return; - } - - // Notify job tracker about failed tasks. - HadoopProcess proc = runningProcsByProcId.get(desc.processId()); - - if (proc != null) { - Collection<HadoopTaskInfo> tasks = proc.tasks(); - - if (!F.isEmpty(tasks)) { - log.warning("Lost connection with alive process (will terminate): " + desc); - - HadoopTaskStatus status = new HadoopTaskStatus(CRASHED, - new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc)); - - for (HadoopTaskInfo info : tasks) - jobTracker.onTaskFinished(info, status); - - runningProcsByJobId.remove(proc.jobId(), proc); - } - - // Safety. - proc.terminate(); - } - } - finally { - busyLock.readUnlock(); - } - } - } - - /** - * Hadoop process. - */ - private static class HadoopProcess extends ReentrantLock { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - private final HadoopJobId jobId; - - /** Process. */ - private Process proc; - - /** Init future. Completes when process is ready to receive messages. */ - private final HadoopProcessFuture initFut; - - /** Process descriptor. */ - private HadoopProcessDescriptor procDesc; - - /** Reducers planned for this process. */ - private Collection<Integer> reducers; - - /** Tasks. */ - private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>(); - - /** Terminated flag. */ - private volatile boolean terminated; - - /** - * @param jobId Job ID. - * @param initFut Init future. - */ - private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut, - int[] reducers) { - this.jobId = jobId; - this.initFut = initFut; - - if (!F.isEmpty(reducers)) { - this.reducers = new ArrayList<>(reducers.length); - - for (int r : reducers) - this.reducers.add(r); - } - } - - /** - * @return Communication process descriptor. - */ - private HadoopProcessDescriptor descriptor() { - return procDesc; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * Initialized callback. - * - * @param proc Java process representation. - * @param procDesc Process descriptor. - */ - private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) { - this.proc = proc; - this.procDesc = procDesc; - } - - /** - * Terminates process (kills it). - */ - private void terminate() { - // Guard against concurrent message sending. - lock(); - - try { - terminated = true; - - if (!initFut.isDone()) - initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override public void apply( - IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { - proc.destroy(); - } - }); - else - proc.destroy(); - } - finally { - unlock(); - } - } - - /** - * @return Terminated flag. - */ - private boolean terminated() { - return terminated; - } - - /** - * Sets process tasks. - * - * @param tasks Tasks to set. - */ - private void addTasks(Collection<HadoopTaskInfo> tasks) { - this.tasks.addAll(tasks); - } - - /** - * Removes task when it was completed. - * - * @param task Task to remove. - */ - private void removeTask(HadoopTaskInfo task) { - if (tasks != null) - tasks.remove(task); - } - - /** - * @return Collection of tasks. - */ - private Collection<HadoopTaskInfo> tasks() { - return tasks; - } - - /** - * @return Planned reducers. - */ - private Collection<Integer> reducers() { - return reducers; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcess.class, this); - } - } - - /** - * - */ - private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Child process ID. */ - private UUID childProcId; - - /** Job ID. */ - private HadoopJobId jobId; - - /** Process descriptor. */ - private HadoopProcessDescriptor desc; - - /** Running process. */ - private Process proc; - - /** Process started flag. */ - private volatile boolean procStarted; - - /** Reply received flag. */ - private volatile boolean replyReceived; - - /** Logger. */ - private final IgniteLogger log = HadoopExternalTaskExecutor.this.log; - - /** - */ - private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId) { - this.childProcId = childProcId; - this.jobId = jobId; - } - - /** - * Process started callback. - */ - public void onProcessStarted(Process proc) { - this.proc = proc; - - procStarted = true; - - if (procStarted && replyReceived) - onDone(F.t(proc, desc)); - } - - /** - * Reply received callback. - */ - public void onReplyReceived(HadoopProcessDescriptor desc) { - assert childProcId.equals(desc.processId()); - - this.desc = desc; - - replyReceived = true; - - if (procStarted && replyReceived) - onDone(F.t(proc, desc)); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res, - @Nullable Throwable err) { - if (err == null) { - HadoopProcess proc = runningProcsByProcId.get(childProcId); - - assert proc != null; - - assert proc.initFut == this; - - proc.onInitialized(res.get1(), res.get2()); - - if (!F.isEmpty(proc.reducers())) - jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc); - } - else { - // Clean up since init failed. - runningProcsByJobId.remove(jobId); - runningProcsByProcId.remove(childProcId); - } - - if (super.onDone(res, err)) { - if (err == null) { - if (log.isDebugEnabled()) - log.debug("Initialized child process for external task execution [jobId=" + jobId + - ", desc=" + desc + ", initTime=" + duration() + ']'); - } - else - U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId + - ", desc=" + desc + ']', err); - - return true; - } - - return false; - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java deleted file mode 100644 index 27b0329..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.util.Collection; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * External task metadata (classpath, JVM options) needed to start external process execution. - */ -public class HadoopExternalTaskMetadata { - /** Process classpath. */ - private Collection<String> classpath; - - /** JVM options. */ - @GridToStringInclude - private Collection<String> jvmOpts; - - /** - * @return JVM Options. - */ - public Collection<String> jvmOptions() { - return jvmOpts; - } - - /** - * @param jvmOpts JVM options. - */ - public void jvmOptions(Collection<String> jvmOpts) { - this.jvmOpts = jvmOpts; - } - - /** - * @return Classpath. - */ - public Collection<String> classpath() { - return classpath; - } - - /** - * @param classpath Classpath. - */ - public void classpath(Collection<String> classpath) { - this.classpath = classpath; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopExternalTaskMetadata.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java deleted file mode 100644 index 96b3675..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Job info update request. - */ -public class HadoopJobInfoUpdateRequest implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private HadoopJobId jobId; - - /** Job phase. */ - @GridToStringInclude - private HadoopJobPhase jobPhase; - - /** Reducers addresses. */ - @GridToStringInclude - private HadoopProcessDescriptor[] reducersAddrs; - - /** - * Constructor required by {@link Externalizable}. - */ - public HadoopJobInfoUpdateRequest() { - // No-op. - } - - /** - * @param jobId Job ID. - * @param jobPhase Job phase. - * @param reducersAddrs Reducers addresses. - */ - public HadoopJobInfoUpdateRequest(HadoopJobId jobId, HadoopJobPhase jobPhase, - HadoopProcessDescriptor[] reducersAddrs) { - assert jobId != null; - - this.jobId = jobId; - this.jobPhase = jobPhase; - this.reducersAddrs = reducersAddrs; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @return Job phase. - */ - public HadoopJobPhase jobPhase() { - return jobPhase; - } - - /** - * @return Reducers addresses. - */ - public HadoopProcessDescriptor[] reducersAddresses() { - return reducersAddrs; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobPhase); - U.writeArray(out, reducersAddrs); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new HadoopJobId(); - jobId.readExternal(in); - - jobPhase = (HadoopJobPhase)in.readObject(); - reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopJobInfoUpdateRequest.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java deleted file mode 100644 index 43bdc36..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Child process initialization request. - */ -public class HadoopPrepareForJobRequest implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private HadoopJobId jobId; - - /** Job info. */ - @GridToStringInclude - private HadoopJobInfo jobInfo; - - /** Total amount of reducers in the job. */ - @GridToStringInclude - private int totalReducersCnt; - - /** Reducers to be executed on current node. */ - @GridToStringInclude - private int[] locReducers; - - /** - * Constructor required by {@link Externalizable}. - */ - public HadoopPrepareForJobRequest() { - // No-op. - } - - /** - * @param jobId Job ID. - * @param jobInfo Job info. - * @param totalReducersCnt Number of reducers in the job. - * @param locReducers Reducers to be executed on current node. - */ - public HadoopPrepareForJobRequest(HadoopJobId jobId, HadoopJobInfo jobInfo, int totalReducersCnt, - int[] locReducers) { - assert jobId != null; - - this.jobId = jobId; - this.jobInfo = jobInfo; - this.totalReducersCnt = totalReducersCnt; - this.locReducers = locReducers; - } - - /** - * @return Job info. - */ - public HadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @return Reducers to be executed on current node. - */ - public int[] localReducers() { - return locReducers; - } - - /** - * @return Number of reducers in job. - */ - public int totalReducerCount() { - return totalReducersCnt; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobInfo); - out.writeInt(totalReducersCnt); - - U.writeIntArray(out, locReducers); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new HadoopJobId(); - jobId.readExternal(in); - - jobInfo = (HadoopJobInfo)in.readObject(); - totalReducersCnt = in.readInt(); - - locReducers = U.readIntArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopPrepareForJobRequest.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java deleted file mode 100644 index 2dc233b..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.io.Serializable; -import java.util.UUID; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Process descriptor used to identify process for which task is running. - */ -public class HadoopProcessDescriptor implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Parent node ID. */ - private UUID parentNodeId; - - /** Process ID. */ - private UUID procId; - - /** Address. */ - private String addr; - - /** TCP port. */ - private int tcpPort; - - /** Shared memory port. */ - private int shmemPort; - - /** - * @param parentNodeId Parent node ID. - * @param procId Process ID. - */ - public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) { - this.parentNodeId = parentNodeId; - this.procId = procId; - } - - /** - * Gets process ID. - * - * @return Process ID. - */ - public UUID processId() { - return procId; - } - - /** - * Gets parent node ID. - * - * @return Parent node ID. - */ - public UUID parentNodeId() { - return parentNodeId; - } - - /** - * Gets host address. - * - * @return Host address. - */ - public String address() { - return addr; - } - - /** - * Sets host address. - * - * @param addr Host address. - */ - public void address(String addr) { - this.addr = addr; - } - - /** - * @return Shared memory port. - */ - public int sharedMemoryPort() { - return shmemPort; - } - - /** - * Sets shared memory port. - * - * @param shmemPort Shared memory port. - */ - public void sharedMemoryPort(int shmemPort) { - this.shmemPort = shmemPort; - } - - /** - * @return TCP port. - */ - public int tcpPort() { - return tcpPort; - } - - /** - * Sets TCP port. - * - * @param tcpPort TCP port. - */ - public void tcpPort(int tcpPort) { - this.tcpPort = tcpPort; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof HadoopProcessDescriptor)) - return false; - - HadoopProcessDescriptor that = (HadoopProcessDescriptor)o; - - return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = parentNodeId.hashCode(); - - result = 31 * result + procId.hashCode(); - - return result; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcessDescriptor.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java deleted file mode 100644 index b35f3ec..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Process started message. - */ -public class HadoopProcessStartedAck implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcessStartedAck.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java deleted file mode 100644 index 3875304..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Message sent from node to child process to start task(s) execution. - */ -public class HadoopTaskExecutionRequest implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private HadoopJobId jobId; - - /** Job info. */ - @GridToStringInclude - private HadoopJobInfo jobInfo; - - /** Mappers. */ - @GridToStringInclude - private Collection<HadoopTaskInfo> tasks; - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @param jobId Job ID. - */ - public void jobId(HadoopJobId jobId) { - this.jobId = jobId; - } - - /** - * @return Jon info. - */ - public HadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * @param jobInfo Job info. - */ - public void jobInfo(HadoopJobInfo jobInfo) { - this.jobInfo = jobInfo; - } - - /** - * @return Tasks. - */ - public Collection<HadoopTaskInfo> tasks() { - return tasks; - } - - /** - * @param tasks Tasks. - */ - public void tasks(Collection<HadoopTaskInfo> tasks) { - this.tasks = tasks; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopTaskExecutionRequest.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobInfo); - U.writeCollection(out, tasks); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new HadoopJobId(); - jobId.readExternal(in); - - jobInfo = (HadoopJobInfo)in.readObject(); - tasks = U.readCollection(in); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java deleted file mode 100644 index 9e1fdb3..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Task finished message. Sent when local task finishes execution. - */ -public class HadoopTaskFinishedMessage implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Finished task info. */ - private HadoopTaskInfo taskInfo; - - /** Task finish status. */ - private HadoopTaskStatus status; - - /** - * Constructor required by {@link Externalizable}. - */ - public HadoopTaskFinishedMessage() { - // No-op. - } - - /** - * @param taskInfo Finished task info. - * @param status Task finish status. - */ - public HadoopTaskFinishedMessage(HadoopTaskInfo taskInfo, HadoopTaskStatus status) { - assert taskInfo != null; - assert status != null; - - this.taskInfo = taskInfo; - this.status = status; - } - - /** - * @return Finished task info. - */ - public HadoopTaskInfo taskInfo() { - return taskInfo; - } - - /** - * @return Task finish status. - */ - public HadoopTaskStatus status() { - return status; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopTaskFinishedMessage.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - taskInfo.writeExternal(out); - status.writeExternal(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - taskInfo = new HadoopTaskInfo(); - taskInfo.readExternal(in); - - status = new HadoopTaskStatus(); - status.readExternal(in); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java deleted file mode 100644 index 35747bb..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ /dev/null @@ -1,461 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.HadoopHelper; -import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck; -import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob; -import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopExecutorService; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopRunnableTask; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopJobInfoUpdateRequest; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopPrepareForJobRequest; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessStartedAck; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopTaskExecutionRequest; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopTaskFinishedMessage; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopMessageListener; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.IgniteInClosure2X; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.REDUCE; - -/** - * Hadoop process base. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public class HadoopChildProcessRunner { - /** Node process descriptor. */ - private HadoopProcessDescriptor nodeDesc; - - /** Message processing executor service. */ - private ExecutorService msgExecSvc; - - /** Task executor service. */ - private HadoopExecutorService execSvc; - - /** */ - protected GridUnsafeMemory mem = new GridUnsafeMemory(0); - - /** External communication. */ - private HadoopExternalCommunication comm; - - /** Logger. */ - private IgniteLogger log; - - /** Init guard. */ - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Start time. */ - private long startTime; - - /** Init future. */ - private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>(); - - /** Job instance. */ - private HadoopJob job; - - /** Number of uncompleted tasks. */ - private final AtomicInteger pendingTasks = new AtomicInteger(); - - /** Shuffle job. */ - private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob; - - /** Concurrent mappers. */ - private int concMappers; - - /** Concurrent reducers. */ - private int concReducers; - - /** - * Starts child process runner. - */ - public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc, - ExecutorService msgExecSvc, IgniteLogger parentLog) - throws IgniteCheckedException { - this.comm = comm; - this.nodeDesc = nodeDesc; - this.msgExecSvc = msgExecSvc; - - comm.setListener(new MessageListener()); - log = parentLog.getLogger(HadoopChildProcessRunner.class); - - startTime = U.currentTimeMillis(); - - // At this point node knows that this process has started. - comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck()); - } - - /** - * Initializes process for task execution. - * - * @param req Initialization request. - */ - private void prepareProcess(HadoopPrepareForJobRequest req) { - if (initGuard.compareAndSet(false, true)) { - try { - if (log.isDebugEnabled()) - log.debug("Initializing external hadoop task: " + req); - - assert job == null; - - job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null, new HadoopHelperImpl()); - - job.initialize(true, nodeDesc.processId()); - - shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, - req.totalReducerCount(), req.localReducers()); - - initializeExecutors(req); - - if (log.isDebugEnabled()) - log.debug("External process initialized [initWaitTime=" + - (U.currentTimeMillis() - startTime) + ']'); - - initFut.onDone(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize process: " + req, e); - - initFut.onDone(e); - } - } - else - log.warning("Duplicate initialize process request received (will ignore): " + req); - } - - /** - * @param req Task execution request. - */ - private void runTasks(final HadoopTaskExecutionRequest req) { - if (!initFut.isDone() && log.isDebugEnabled()) - log.debug("Will wait for process initialization future completion: " + req); - - initFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - // Make sure init was successful. - f.get(); - - boolean set = pendingTasks.compareAndSet(0, req.tasks().size()); - - assert set; - - HadoopTaskInfo info = F.first(req.tasks()); - - assert info != null; - - int size = info.type() == MAP ? concMappers : concReducers; - -// execSvc.setCorePoolSize(size); -// execSvc.setMaximumPoolSize(size); - - if (log.isDebugEnabled()) - log.debug("Set executor service size for task type [type=" + info.type() + - ", size=" + size + ']'); - - for (HadoopTaskInfo taskInfo : req.tasks()) { - if (log.isDebugEnabled()) - log.debug("Submitted task for external execution: " + taskInfo); - - execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) { - @Override protected void onTaskFinished(HadoopTaskStatus status) { - onTaskFinished0(this, status); - } - - @Override protected HadoopTaskInput createInput(HadoopTaskContext ctx) - throws IgniteCheckedException { - return shuffleJob.input(ctx); - } - - @Override protected HadoopTaskOutput createOutput(HadoopTaskContext ctx) - throws IgniteCheckedException { - return shuffleJob.output(ctx); - } - }); - } - } - catch (IgniteCheckedException e) { - for (HadoopTaskInfo info : req.tasks()) - notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - }); - } - - /** - * Creates executor services. - * - * @param req Init child process request. - */ - private void initializeExecutors(HadoopPrepareForJobRequest req) { - int cpus = Runtime.getRuntime().availableProcessors(); -// -// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus); -// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus); - - execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024); - } - - /** - * Updates external process map so that shuffle can proceed with sending messages to reducers. - * - * @param req Update request. - */ - private void updateTasks(final HadoopJobInfoUpdateRequest req) { - initFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> gridFut) { - assert initGuard.get(); - - assert req.jobId().equals(job.id()); - - if (req.reducersAddresses() != null) { - if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) { - shuffleJob.startSending("external", - new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() { - @Override public void applyx(HadoopProcessDescriptor dest, - HadoopShuffleMessage msg) throws IgniteCheckedException { - comm.sendMessage(dest, msg); - } - }); - } - } - } - }); - } - - /** - * Stops all executors and running tasks. - */ - private void shutdown() { - if (execSvc != null) - execSvc.shutdown(5000); - - if (msgExecSvc != null) - msgExecSvc.shutdownNow(); - - try { - job.dispose(true); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to dispose job.", e); - } - } - - /** - * Notifies node about task finish. - * - * @param run Finished task runnable. - * @param status Task status. - */ - private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) { - HadoopTaskInfo info = run.taskInfo(); - - int pendingTasks0 = pendingTasks.decrementAndGet(); - - if (log.isDebugEnabled()) - log.debug("Hadoop task execution finished [info=" + info - + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() + - ", pendingTasks=" + pendingTasks0 + - ", err=" + status.failCause() + ']'); - - assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported."; - - boolean flush = pendingTasks0 == 0 && info.type() == MAP; - - notifyTaskFinished(info, status, flush); - } - - /** - * @param taskInfo Finished task info. - * @param status Task status. - */ - private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final HadoopTaskStatus status, - boolean flush) { - - final HadoopTaskState state = status.state(); - final Throwable err = status.failCause(); - - if (!flush) { - try { - if (log.isDebugEnabled()) - log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state + - ", err=" + err + ']'); - - comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status)); - } - catch (IgniteCheckedException e) { - log.error("Failed to send message to parent node (will terminate child process).", e); - - shutdown(); - - terminate(); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" + - taskInfo + ", state=" + state + ", err=" + err + ']'); - - final long start = U.currentTimeMillis(); - - try { - shuffleJob.flush().listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - long end = U.currentTimeMillis(); - - if (log.isDebugEnabled()) - log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo + - ", flushTime=" + (end - start) + ']'); - - try { - // Check for errors on shuffle. - f.get(); - - notifyTaskFinished(taskInfo, status, false); - } - catch (IgniteCheckedException e) { - log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + - ", state=" + state + ", err=" + err + ']', e); - - notifyTaskFinished(taskInfo, - new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - }); - } - catch (IgniteCheckedException e) { - log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + - ", state=" + state + ", err=" + err + ']', e); - - notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - } - - /** - * Checks if message was received from parent node and prints warning if not. - * - * @param desc Sender process ID. - * @param msg Received message. - * @return {@code True} if received from parent node. - */ - private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) { - if (!nodeDesc.processId().equals(desc.processId())) { - log.warning("Received process control request from unknown process (will ignore) [desc=" + desc + - ", msg=" + msg + ']'); - - return false; - } - - return true; - } - - /** - * Stops execution of this process. - */ - private void terminate() { - System.exit(1); - } - - /** - * Message listener. - */ - private class MessageListener implements HadoopMessageListener { - /** {@inheritDoc} */ - @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) { - if (msg instanceof HadoopTaskExecutionRequest) { - if (validateNodeMessage(desc, msg)) - runTasks((HadoopTaskExecutionRequest)msg); - } - else if (msg instanceof HadoopJobInfoUpdateRequest) { - if (validateNodeMessage(desc, msg)) - updateTasks((HadoopJobInfoUpdateRequest)msg); - } - else if (msg instanceof HadoopPrepareForJobRequest) { - if (validateNodeMessage(desc, msg)) - prepareProcess((HadoopPrepareForJobRequest)msg); - } - else if (msg instanceof HadoopShuffleMessage) { - if (log.isTraceEnabled()) - log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']'); - - initFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - HadoopShuffleMessage m = (HadoopShuffleMessage)msg; - - shuffleJob.onShuffleMessage(m); - - comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e); - } - } - }); - } - else if (msg instanceof HadoopShuffleAck) { - if (log.isTraceEnabled()) - log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']'); - - shuffleJob.onShuffleAck((HadoopShuffleAck)msg); - } - else - log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']'); - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(HadoopProcessDescriptor desc) { - if (log.isDebugEnabled()) - log.debug("Lost connection with remote process: " + desc); - - if (desc == null) - U.warn(log, "Handshake failed."); - else if (desc.processId().equals(nodeDesc.processId())) { - log.warning("Child process lost connection with parent node (will terminate child process)."); - - shutdown(); - - terminate(); - } - } - } -} \ No newline at end of file