http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffle.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffle.java deleted file mode 100644 index 8561dab..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffle.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.shuffle; - -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.GridTopic; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopComponent; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopContext; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.lang.IgniteInClosure2X; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiPredicate; - -/** - * Shuffle. - */ -public class HadoopShuffle extends HadoopComponent { - /** */ - private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>(); - - /** */ - protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); - - /** {@inheritDoc} */ - @Override public void start(HadoopContext ctx) throws IgniteCheckedException { - super.start(ctx); - - ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, - new IgniteBiPredicate<UUID, Object>() { - @Override public boolean apply(UUID nodeId, Object msg) { - return onMessageReceived(nodeId, (HadoopMessage)msg); - } - }); - } - - /** - * Stops shuffle. - * - * @param cancel If should cancel all ongoing activities. - */ - @Override public void stop(boolean cancel) { - for (HadoopShuffleJob job : jobs.values()) { - try { - job.close(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close job.", e); - } - } - - jobs.clear(); - } - - /** - * Creates new shuffle job. - * - * @param jobId Job ID. - * @return Created shuffle job. - * @throws IgniteCheckedException If job creation failed. - */ - private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException { - HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); - - HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, - ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId())); - - UUID[] rdcAddrs = new UUID[plan.reducers()]; - - for (int i = 0; i < rdcAddrs.length; i++) { - UUID nodeId = plan.nodeForReducer(i); - - assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']'; - - rdcAddrs[i] = nodeId; - } - - boolean init = job.initializeReduceAddresses(rdcAddrs); - - assert init; - - return job; - } - - /** - * @param nodeId Node ID to send message to. - * @param msg Message to send. - * @throws IgniteCheckedException If send failed. - */ - private void send0(UUID nodeId, Object msg) throws IgniteCheckedException { - ClusterNode node = ctx.kernalContext().discovery().node(nodeId); - - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); - } - - /** - * @param jobId Task info. - * @return Shuffle job. - */ - private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException { - HadoopShuffleJob<UUID> res = jobs.get(jobId); - - if (res == null) { - res = newJob(jobId); - - HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res); - - if (old != null) { - res.close(); - - res = old; - } - else if (res.reducersInitialized()) - startSending(res); - } - - return res; - } - - /** - * Starts message sending thread. - * - * @param shuffleJob Job to start sending for. - */ - private void startSending(HadoopShuffleJob<UUID> shuffleJob) { - shuffleJob.startSending(ctx.kernalContext().gridName(), - new IgniteInClosure2X<UUID, HadoopShuffleMessage>() { - @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException { - send0(dest, msg); - } - } - ); - } - - /** - * Message received callback. - * - * @param src Sender node ID. - * @param msg Received message. - * @return {@code True}. - */ - public boolean onMessageReceived(UUID src, HadoopMessage msg) { - if (msg instanceof HadoopShuffleMessage) { - HadoopShuffleMessage m = (HadoopShuffleMessage)msg; - - try { - job(m.jobId()).onShuffleMessage(m); - } - catch (IgniteCheckedException e) { - U.error(log, "Message handling failed.", e); - } - - try { - // Reply with ack. - send0(src, new HadoopShuffleAck(m.id(), m.jobId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e); - } - } - else if (msg instanceof HadoopShuffleAck) { - HadoopShuffleAck m = (HadoopShuffleAck)msg; - - try { - job(m.jobId()).onShuffleAck(m); - } - catch (IgniteCheckedException e) { - U.error(log, "Message handling failed.", e); - } - } - else - throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src + - ", msg=" + msg + ']'); - - return true; - } - - /** - * @param taskCtx Task info. - * @return Output. - */ - public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException { - return job(taskCtx.taskInfo().jobId()).output(taskCtx); - } - - /** - * @param taskCtx Task info. - * @return Input. - */ - public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { - return job(taskCtx.taskInfo().jobId()).input(taskCtx); - } - - /** - * @param jobId Job id. - */ - public void jobFinished(HadoopJobId jobId) { - HadoopShuffleJob job = jobs.remove(jobId); - - if (job != null) { - try { - job.close(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close job: " + jobId, e); - } - } - } - - /** - * Flushes all the outputs for the given job to remote nodes. - * - * @param jobId Job ID. - * @return Future. - */ - public IgniteInternalFuture<?> flush(HadoopJobId jobId) { - HadoopShuffleJob job = jobs.get(jobId); - - if (job == null) - return new GridFinishedFuture<>(); - - try { - return job.flush(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - } - - /** - * @return Memory. - */ - public GridUnsafeMemory memory() { - return mem; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java deleted file mode 100644 index c2ac017..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor; - -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; -import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobMetadata; -import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.typedef.internal.U; - - -/** - * Task executor. - */ -public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { - /** Job tracker. */ - private HadoopJobTracker jobTracker; - - /** */ - private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); - - /** Executor service to run tasks. */ - private HadoopExecutorService exec; - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - jobTracker = ctx.jobTracker(); - - exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(), - ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize()); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (exec != null) { - exec.shutdown(3000); - - if (cancel) { - for (HadoopJobId jobId : jobs.keySet()) - cancelTasks(jobId); - } - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - if (exec != null && !exec.shutdown(30000)) - U.warn(log, "Failed to finish running tasks in 30 sec."); - } - - /** {@inheritDoc} */ - @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + - ", tasksCnt=" + tasks.size() + ']'); - - Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id()); - - if (executedTasks == null) { - executedTasks = new GridConcurrentHashSet<>(); - - Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks); - - assert extractedCol == null; - } - - final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks; - - for (final HadoopTaskInfo info : tasks) { - assert info != null; - - HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info, - ctx.localNodeId()) { - @Override protected void onTaskFinished(HadoopTaskStatus status) { - if (log.isDebugEnabled()) - log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " + - "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']'); - - finalExecutedTasks.remove(this); - - jobTracker.onTaskFinished(info, status); - } - - @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException { - return ctx.shuffle().input(taskCtx); - } - - @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException { - return ctx.shuffle().output(taskCtx); - } - }; - - executedTasks.add(task); - - exec.submit(task); - } - } - - /** - * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks - * for this job ID. - * <p> - * It is guaranteed that this method will not be called concurrently with - * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. - * - * @param jobId Job ID to cancel. - */ - @Override public void cancelTasks(HadoopJobId jobId) { - Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId); - - if (executedTasks != null) { - for (HadoopRunnableTask task : executedTasks) - task.cancel(); - } - } - - /** {@inheritDoc} */ - @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException { - if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { - Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); - - assert executedTasks == null || executedTasks.isEmpty(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java deleted file mode 100644 index 8704c7b..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor; - - -import java.util.Collection; -import java.util.concurrent.Callable; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.internal.util.worker.GridWorkerListener; -import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter; -import org.apache.ignite.thread.IgniteThread; -import org.jsr166.ConcurrentHashMap8; - -import static java.util.Collections.newSetFromMap; - -/** - * Executor service without thread pooling. - */ -public class HadoopExecutorService { - /** */ - private final LinkedBlockingQueue<Callable<?>> queue; - - /** */ - private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>()); - - /** */ - private final AtomicInteger active = new AtomicInteger(); - - /** */ - private final int maxTasks; - - /** */ - private final String gridName; - - /** */ - private final IgniteLogger log; - - /** */ - private volatile boolean shutdown; - - /** */ - private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() { - @Override public void onStopped(GridWorker w) { - workers.remove(w); - - if (shutdown) { - active.decrementAndGet(); - - return; - } - - Callable<?> task = queue.poll(); - - if (task != null) - startThread(task); - else { - active.decrementAndGet(); - - if (!queue.isEmpty()) - startFromQueue(); - } - } - }; - - /** - * @param log Logger. - * @param gridName Grid name. - * @param maxTasks Max number of tasks. - * @param maxQueue Max queue length. - */ - public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) { - assert maxTasks > 0 : maxTasks; - assert maxQueue > 0 : maxQueue; - - this.maxTasks = maxTasks; - this.queue = new LinkedBlockingQueue<>(maxQueue); - this.gridName = gridName; - this.log = log.getLogger(HadoopExecutorService.class); - } - - /** - * @return Number of active workers. - */ - public int active() { - return workers.size(); - } - - /** - * Submit task. - * - * @param task Task. - */ - public void submit(Callable<?> task) { - while (queue.isEmpty()) { - int active0 = active.get(); - - if (active0 == maxTasks) - break; - - if (active.compareAndSet(active0, active0 + 1)) { - startThread(task); - - return; // Started in new thread bypassing queue. - } - } - - try { - while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) { - if (shutdown) - return; // Rejected due to shutdown. - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - return; - } - - startFromQueue(); - } - - /** - * Attempts to start task from queue. - */ - private void startFromQueue() { - do { - int active0 = active.get(); - - if (active0 == maxTasks) - break; - - if (active.compareAndSet(active0, active0 + 1)) { - Callable<?> task = queue.poll(); - - if (task == null) { - int res = active.decrementAndGet(); - - assert res >= 0 : res; - - break; - } - - startThread(task); - } - } - while (!queue.isEmpty()); - } - - /** - * @param task Task. - */ - private void startThread(final Callable<?> task) { - String workerName; - - if (task instanceof HadoopRunnableTask) { - final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo(); - - workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); - } - else - workerName = task.toString(); - - GridWorker w = new GridWorker(gridName, workerName, log, lsnr) { - @Override protected void body() { - try { - task.call(); - } - catch (Exception e) { - log.error("Failed to execute task: " + task, e); - } - } - }; - - workers.add(w); - - if (shutdown) - w.cancel(); - - new IgniteThread(w).start(); - } - - /** - * Shuts down this executor service. - * - * @param awaitTimeMillis Time in milliseconds to wait for tasks completion. - * @return {@code true} If all tasks completed. - */ - public boolean shutdown(long awaitTimeMillis) { - shutdown = true; - - for (GridWorker w : workers) - w.cancel(); - - while (awaitTimeMillis > 0 && !workers.isEmpty()) { - try { - Thread.sleep(100); - - awaitTimeMillis -= 100; - } - catch (InterruptedException e) { - break; - } - } - - return workers.isEmpty(); - } - - /** - * @return {@code true} If method {@linkplain #shutdown(long)} was already called. - */ - public boolean isShutdown() { - return shutdown; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java deleted file mode 100644 index 8415d6f..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor; - -import java.util.UUID; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopHashMultimap; -import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopMultimap; -import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopSkipList; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.COMBINER_HASHMAP_SIZE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMBINE; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP; - -/** - * Runnable task. - */ -public abstract class HadoopRunnableTask implements Callable<Void> { - /** */ - private final GridUnsafeMemory mem; - - /** */ - private final IgniteLogger log; - - /** */ - private final HadoopJob job; - - /** Task to run. */ - private final HadoopTaskInfo info; - - /** Submit time. */ - private final long submitTs = U.currentTimeMillis(); - - /** Execution start timestamp. */ - private long execStartTs; - - /** Execution end timestamp. */ - private long execEndTs; - - /** */ - private HadoopMultimap combinerInput; - - /** */ - private volatile HadoopTaskContext ctx; - - /** Set if task is to cancelling. */ - private volatile boolean cancelled; - - /** Node id. */ - private UUID nodeId; - - /** - * @param log Log. - * @param job Job. - * @param mem Memory. - * @param info Task info. - * @param nodeId Node id. - */ - protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info, - UUID nodeId) { - this.nodeId = nodeId; - this.log = log.getLogger(HadoopRunnableTask.class); - this.job = job; - this.mem = mem; - this.info = info; - } - - /** - * @return Wait time. - */ - public long waitTime() { - return execStartTs - submitTs; - } - - /** - * @return Execution time. - */ - public long executionTime() { - return execEndTs - execStartTs; - } - - /** {@inheritDoc} */ - @Override public Void call() throws IgniteCheckedException { - ctx = job.getTaskContext(info); - - return ctx.runAsJobOwner(new Callable<Void>() { - @Override public Void call() throws Exception { - call0(); - - return null; - } - }); - } - - /** - * Implements actual task running. - * @throws IgniteCheckedException - */ - void call0() throws IgniteCheckedException { - execStartTs = U.currentTimeMillis(); - - Throwable err = null; - - HadoopTaskState state = HadoopTaskState.COMPLETED; - - HadoopPerformanceCounter perfCntr = null; - - try { - perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); - - perfCntr.onTaskSubmit(info, submitTs); - perfCntr.onTaskPrepare(info, execStartTs); - - ctx.prepareTaskEnvironment(); - - runTask(perfCntr); - - if (info.type() == MAP && job.info().hasCombiner()) { - ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); - - try { - runTask(perfCntr); - } - finally { - ctx.taskInfo(info); - } - } - } - catch (HadoopTaskCancelledException ignored) { - state = HadoopTaskState.CANCELED; - } - catch (Throwable e) { - state = HadoopTaskState.FAILED; - err = e; - - U.error(log, "Task execution failed.", e); - - if (e instanceof Error) - throw e; - } - finally { - execEndTs = U.currentTimeMillis(); - - if (perfCntr != null) - perfCntr.onTaskFinish(info, execEndTs); - - onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); - - if (combinerInput != null) - combinerInput.close(); - - if (ctx != null) - ctx.cleanupTaskEnvironment(); - } - } - - /** - * @param perfCntr Performance counter. - * @throws IgniteCheckedException If failed. - */ - private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - try (HadoopTaskOutput out = createOutputInternal(ctx); - HadoopTaskInput in = createInputInternal(ctx)) { - - ctx.input(in); - ctx.output(out); - - perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis()); - - ctx.run(); - } - } - - /** - * Cancel the executed task. - */ - public void cancel() { - cancelled = true; - - if (ctx != null) - ctx.cancel(); - } - - /** - * @param status Task status. - */ - protected abstract void onTaskFinished(HadoopTaskStatus status); - - /** - * @param ctx Task context. - * @return Task input. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { - switch (ctx.taskInfo().type()) { - case SETUP: - case MAP: - case COMMIT: - case ABORT: - return null; - - case COMBINE: - assert combinerInput != null; - - return combinerInput.input(ctx); - - default: - return createInput(ctx); - } - } - - /** - * @param ctx Task context. - * @return Input. - * @throws IgniteCheckedException If failed. - */ - protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Task info. - * @return Output. - * @throws IgniteCheckedException If failed. - */ - protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Task info. - * @return Task output. - * @throws IgniteCheckedException If failed. - */ - private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { - switch (ctx.taskInfo().type()) { - case SETUP: - case REDUCE: - case COMMIT: - case ABORT: - return null; - - case MAP: - if (job.info().hasCombiner()) { - assert combinerInput == null; - - combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ? - new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): - new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree - - return combinerInput.startAdding(ctx); - } - - default: - return createOutput(ctx); - } - } - - /** - * @return Task info. - */ - public HadoopTaskInfo taskInfo() { - return info; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java deleted file mode 100644 index 7819367..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor; - -import java.util.Collection; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopComponent; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobMetadata; - -/** - * Common superclass for task executor. - */ -public abstract class HadoopTaskExecutorAdapter extends HadoopComponent { - /** - * Runs tasks. - * - * @param job Job. - * @param tasks Tasks. - * @throws IgniteCheckedException If failed. - */ - public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException; - - /** - * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks - * for this job ID. - * <p> - * It is guaranteed that this method will not be called concurrently with - * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. - * - * @param jobId Job ID to cancel. - */ - public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException; - - /** - * On job state change callback; - * - * @param meta Job metadata. - */ - public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java deleted file mode 100644 index 62ba932..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor; - -/** -* State of the task. -*/ -public enum HadoopTaskState { - /** Running task. */ - RUNNING, - - /** Completed task. */ - COMPLETED, - - /** Failed task. */ - FAILED, - - /** Canceled task. */ - CANCELED, - - /** Process crashed. */ - CRASHED -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java deleted file mode 100644 index c45616e..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -/** - * Task status. - */ -public class HadoopTaskStatus implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private HadoopTaskState state; - - /** */ - private Throwable failCause; - - /** */ - private HadoopCounters cntrs; - - /** - * Default constructor required by {@link Externalizable}. - */ - public HadoopTaskStatus() { - // No-op. - } - - /** - * Creates new instance. - * - * @param state Task state. - * @param failCause Failure cause (if any). - */ - public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) { - this(state, failCause, null); - } - - /** - * Creates new instance. - * - * @param state Task state. - * @param failCause Failure cause (if any). - * @param cntrs Task counters. - */ - public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause, - @Nullable HadoopCounters cntrs) { - assert state != null; - - this.state = state; - this.failCause = failCause; - this.cntrs = cntrs; - } - - /** - * @return State. - */ - public HadoopTaskState state() { - return state; - } - - /** - * @return Fail cause. - */ - @Nullable public Throwable failCause() { - return failCause; - } - - /** - * @return Counters. - */ - @Nullable public HadoopCounters counters() { - return cntrs; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopTaskStatus.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(state); - out.writeObject(failCause); - out.writeObject(cntrs); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - state = (HadoopTaskState)in.readObject(); - failCause = (Throwable)in.readObject(); - cntrs = (HadoopCounters)in.readObject(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutor.java deleted file mode 100644 index a330650..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutor.java +++ /dev/null @@ -1,976 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopContext; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; -import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobMetadata; -import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskExecutorAdapter; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskStatus; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.child.HadoopExternalProcessStarter; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopExternalCommunication; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopMessageListener; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.spi.IgnitePortProtocol; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; - -import static org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState.CRASHED; -import static org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState.FAILED; - -/** - * External process registry. Handles external process lifecycle. - */ -public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { - /** Hadoop context. */ - private HadoopContext ctx; - - /** */ - private String javaCmd; - - /** Logger. */ - private IgniteLogger log; - - /** Node process descriptor. */ - private HadoopProcessDescriptor nodeDesc; - - /** Output base. */ - private File outputBase; - - /** Path separator. */ - private String pathSep; - - /** Hadoop external communication. */ - private HadoopExternalCommunication comm; - - /** Starting processes. */ - private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>(); - - /** Starting processes. */ - private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>(); - - /** Busy lock. */ - private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); - - /** Job tracker. */ - private HadoopJobTracker jobTracker; - - /** {@inheritDoc} */ - @Override public void start(HadoopContext ctx) throws IgniteCheckedException { - this.ctx = ctx; - - log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class); - - outputBase = U.resolveWorkDirectory("hadoop", false); - - pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":"); - - initJavaCommand(); - - comm = new HadoopExternalCommunication( - ctx.localNodeId(), - UUID.randomUUID(), - ctx.kernalContext().config().getMarshaller(), - log, - ctx.kernalContext().getSystemExecutorService(), - ctx.kernalContext().gridName()); - - comm.setListener(new MessageListener()); - - comm.start(); - - nodeDesc = comm.localProcessDescriptor(); - - ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP, - HadoopExternalTaskExecutor.class); - - if (nodeDesc.sharedMemoryPort() != -1) - ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP, - HadoopExternalTaskExecutor.class); - - jobTracker = ctx.jobTracker(); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - busyLock.writeLock(); - - try { - comm.stop(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e); - } - } - - /** {@inheritDoc} */ - @Override public void onJobStateChanged(final HadoopJobMetadata meta) { - final HadoopProcess proc = runningProcsByJobId.get(meta.jobId()); - - // If we have a local process for this job. - if (proc != null) { - if (log.isDebugEnabled()) - log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']'); - - if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { - if (log.isDebugEnabled()) - log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() + - ", proc=" + proc + ']'); - - runningProcsByJobId.remove(meta.jobId()); - runningProcsByProcId.remove(proc.descriptor().processId()); - - proc.terminate(); - - return; - } - - if (proc.initFut.isDone()) { - if (!proc.initFut.isFailed()) - sendJobInfoUpdate(proc, meta); - else if (log.isDebugEnabled()) - log.debug("Failed to initialize child process (will skip job state notification) " + - "[jobId=" + meta.jobId() + ", meta=" + meta + ']'); - } - else { - proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override - public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { - try { - f.get(); - - sendJobInfoUpdate(proc, meta); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to initialize child process (will skip job state notification) " + - "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']'); - } - - } - }); - } - } - else if (ctx.isParticipating(meta)) { - HadoopJob job; - - try { - job = jobTracker.job(meta.jobId(), meta.jobInfo()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to get job: " + meta.jobId(), e); - - return; - } - - startProcess(job, meta.mapReducePlan()); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) { - if (log.isDebugEnabled()) - log.debug("Failed to start hadoop tasks (grid is stopping, will ignore)."); - - return; - } - - try { - HadoopProcess proc = runningProcsByJobId.get(job.id()); - - HadoopTaskType taskType = F.first(tasks).type(); - - if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT || - taskType == HadoopTaskType.COMMIT) { - if (proc == null || proc.terminated()) { - runningProcsByJobId.remove(job.id(), proc); - - // Start new process for ABORT task since previous processes were killed. - proc = startProcess(job, jobTracker.plan(job.id())); - - if (log.isDebugEnabled()) - log.debug("Starting new process for maintenance task [jobId=" + job.id() + - ", proc=" + proc + ", taskType=" + taskType + ']'); - } - } - else - assert proc != null : "Missing started process for task execution request: " + job.id() + - ", tasks=" + tasks; - - final HadoopProcess proc0 = proc; - - proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override public void apply( - IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { - if (!busyLock.tryReadLock()) - return; - - try { - f.get(); - - proc0.addTasks(tasks); - - if (log.isDebugEnabled()) - log.debug("Sending task execution request to child process [jobId=" + job.id() + - ", proc=" + proc0 + ", tasks=" + tasks + ']'); - - sendExecutionRequest(proc0, job, tasks); - } - catch (IgniteCheckedException e) { - notifyTasksFailed(tasks, FAILED, e); - } - finally { - busyLock.readUnlock(); - } - } - }); - } - finally { - busyLock.readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void cancelTasks(HadoopJobId jobId) { - HadoopProcess proc = runningProcsByJobId.get(jobId); - - if (proc != null) - proc.terminate(); - } - - /** - * Sends execution request to remote node. - * - * @param proc Process to send request to. - * @param job Job instance. - * @param tasks Collection of tasks to execute in started process. - */ - private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks) - throws IgniteCheckedException { - // Must synchronize since concurrent process crash may happen and will receive onConnectionLost(). - proc.lock(); - - try { - if (proc.terminated()) { - notifyTasksFailed(tasks, CRASHED, null); - - return; - } - - HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest(); - - req.jobId(job.id()); - req.jobInfo(job.info()); - req.tasks(tasks); - - comm.sendMessage(proc.descriptor(), req); - } - finally { - proc.unlock(); - } - } - - /** - * @return External task metadata. - */ - private HadoopExternalTaskMetadata buildTaskMeta() { - HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata(); - - meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator))); - meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled", - "-DIGNITE_HOME=" + U.getIgniteHome())); - - return meta; - } - - /** - * @param tasks Tasks to notify about. - * @param state Fail state. - * @param e Optional error. - */ - private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) { - HadoopTaskStatus fail = new HadoopTaskStatus(state, e); - - for (HadoopTaskInfo task : tasks) - jobTracker.onTaskFinished(task, fail); - } - - /** - * Starts process template that will be ready to execute Hadoop tasks. - * - * @param job Job instance. - * @param plan Map reduce plan. - */ - private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) { - final UUID childProcId = UUID.randomUUID(); - - HadoopJobId jobId = job.id(); - - final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId); - - final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId())); - - HadoopProcess old = runningProcsByJobId.put(jobId, proc); - - assert old == null; - - old = runningProcsByProcId.put(childProcId, proc); - - assert old == null; - - ctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - if (!busyLock.tryReadLock()) { - fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping).")); - - return; - } - - try { - HadoopExternalTaskMetadata startMeta = buildTaskMeta(); - - if (log.isDebugEnabled()) - log.debug("Created hadoop child process metadata for job [job=" + job + - ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']'); - - Process proc = startJavaProcess(childProcId, startMeta, job); - - BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream())); - - String line; - - // Read up all the process output. - while ((line = rdr.readLine()) != null) { - if (log.isDebugEnabled()) - log.debug("Tracing process output: " + line); - - if ("Started".equals(line)) { - // Process started successfully, it should not write anything more to the output stream. - if (log.isDebugEnabled()) - log.debug("Successfully started child process [childProcId=" + childProcId + - ", meta=" + job + ']'); - - fut.onProcessStarted(proc); - - break; - } - else if ("Failed".equals(line)) { - StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n"); - - while ((line = rdr.readLine()) != null) - sb.append(" ").append(line).append("\n"); - - // Cut last character. - sb.setLength(sb.length() - 1); - - log.warning(sb.toString()); - - fut.onDone(new IgniteCheckedException(sb.toString())); - - break; - } - } - } - catch (Throwable e) { - fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e)); - - if (e instanceof Error) - throw (Error)e; - } - finally { - busyLock.readUnlock(); - } - } - }, true); - - fut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { - try { - // Make sure there were no exceptions. - f.get(); - - prepareForJob(proc, job, plan); - } - catch (IgniteCheckedException ignore) { - // Exception is printed in future's onDone() method. - } - } - }); - - return proc; - } - - /** - * Checks that java local command is available. - * - * @throws IgniteCheckedException If initialization failed. - */ - private void initJavaCommand() throws IgniteCheckedException { - String javaHome = System.getProperty("java.home"); - - if (javaHome == null) - javaHome = System.getenv("JAVA_HOME"); - - if (javaHome == null) - throw new IgniteCheckedException("Failed to locate JAVA_HOME."); - - javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java"); - - try { - Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start(); - - Collection<String> out = readProcessOutput(proc); - - int res = proc.waitFor(); - - if (res != 0) - throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " + - "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']'); - - if (log.isInfoEnabled()) { - log.info("Will use java for external task execution: "); - - for (String s : out) - log.info(" " + s); - } - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to check java for external task execution.", e); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e); - } - } - - /** - * Reads process output line-by-line. - * - * @param proc Process to read output. - * @return Read lines. - * @throws IOException If read failed. - */ - private Collection<String> readProcessOutput(Process proc) throws IOException { - BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream())); - - Collection<String> res = new ArrayList<>(); - - String s; - - while ((s = rdr.readLine()) != null) - res.add(s); - - return res; - } - - /** - * Builds process from metadata. - * - * @param childProcId Child process ID. - * @param startMeta Metadata. - * @param job Job. - * @return Started process. - */ - private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta, - HadoopJob job) throws Exception { - String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId; - - if (log.isDebugEnabled()) - log.debug("Will write process log output to: " + outFldr); - - List<String> cmd = new ArrayList<>(); - - File workDir = U.resolveWorkDirectory("", false); - - cmd.add(javaCmd); - cmd.addAll(startMeta.jvmOptions()); - cmd.add("-cp"); - cmd.add(buildClasspath(startMeta.classpath())); - cmd.add(HadoopExternalProcessStarter.class.getName()); - cmd.add("-cpid"); - cmd.add(String.valueOf(childProcId)); - cmd.add("-ppid"); - cmd.add(String.valueOf(nodeDesc.processId())); - cmd.add("-nid"); - cmd.add(String.valueOf(nodeDesc.parentNodeId())); - cmd.add("-addr"); - cmd.add(nodeDesc.address()); - cmd.add("-tport"); - cmd.add(String.valueOf(nodeDesc.tcpPort())); - cmd.add("-sport"); - cmd.add(String.valueOf(nodeDesc.sharedMemoryPort())); - cmd.add("-out"); - cmd.add(outFldr); - cmd.add("-wd"); - cmd.add(workDir.getAbsolutePath()); - - return new ProcessBuilder(cmd) - .redirectErrorStream(true) - .directory(workDir) - .start(); - } - - /** - * Gets job work folder. - * - * @param jobId Job ID. - * @return Job work folder. - */ - private String jobWorkFolder(HadoopJobId jobId) { - return outputBase + File.separator + "Job_" + jobId; - } - - /** - * @param cp Classpath collection. - * @return Classpath string. - */ - private String buildClasspath(Collection<String> cp) { - assert !cp.isEmpty(); - - StringBuilder sb = new StringBuilder(); - - for (String s : cp) - sb.append(s).append(pathSep); - - sb.setLength(sb.length() - 1); - - return sb.toString(); - } - - /** - * Sends job info update request to remote process. - * - * @param proc Process to send request to. - * @param meta Job metadata. - */ - private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) { - Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses(); - - int rdcNum = meta.mapReducePlan().reducers(); - - HadoopProcessDescriptor[] addrs = null; - - if (rdcAddrs != null && rdcAddrs.size() == rdcNum) { - addrs = new HadoopProcessDescriptor[rdcNum]; - - for (int i = 0; i < rdcNum; i++) { - HadoopProcessDescriptor desc = rdcAddrs.get(i); - - assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']'; - - addrs[i] = desc; - } - } - - try { - comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs)); - } - catch (IgniteCheckedException e) { - if (!proc.terminated()) { - log.error("Failed to send job state update message to remote child process (will kill the process) " + - "[jobId=" + proc.jobId + ", meta=" + meta + ']', e); - - proc.terminate(); - } - } - } - - /** - * Sends prepare request to remote process. - * - * @param proc Process to send request to. - * @param job Job. - * @param plan Map reduce plan. - */ - private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) { - try { - comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(), - plan.reducers(), plan.reducers(ctx.localNodeId()))); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job + - ", plan=" + plan + ']', e); - - proc.terminate(); - } - } - - /** - * Processes task finished message. - * - * @param desc Remote process descriptor. - * @param taskMsg Task finished message. - */ - private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) { - HadoopProcess proc = runningProcsByProcId.get(desc.processId()); - - if (proc != null) - proc.removeTask(taskMsg.taskInfo()); - - jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status()); - } - - /** - * - */ - private class MessageListener implements HadoopMessageListener { - /** {@inheritDoc} */ - @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { - if (!busyLock.tryReadLock()) - return; - - try { - if (msg instanceof HadoopProcessStartedAck) { - HadoopProcess proc = runningProcsByProcId.get(desc.processId()); - - assert proc != null : "Missing child process for processId: " + desc; - - HadoopProcessFuture fut = proc.initFut; - - if (fut != null) - fut.onReplyReceived(desc); - // Safety. - else - log.warning("Failed to find process start future (will ignore): " + desc); - } - else if (msg instanceof HadoopTaskFinishedMessage) { - HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg; - - processTaskFinishedMessage(desc, taskMsg); - } - else - log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']'); - } - finally { - busyLock.readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(HadoopProcessDescriptor desc) { - if (!busyLock.tryReadLock()) - return; - - try { - if (desc == null) { - U.warn(log, "Handshake failed."); - - return; - } - - // Notify job tracker about failed tasks. - HadoopProcess proc = runningProcsByProcId.get(desc.processId()); - - if (proc != null) { - Collection<HadoopTaskInfo> tasks = proc.tasks(); - - if (!F.isEmpty(tasks)) { - log.warning("Lost connection with alive process (will terminate): " + desc); - - HadoopTaskStatus status = new HadoopTaskStatus(CRASHED, - new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc)); - - for (HadoopTaskInfo info : tasks) - jobTracker.onTaskFinished(info, status); - - runningProcsByJobId.remove(proc.jobId(), proc); - } - - // Safety. - proc.terminate(); - } - } - finally { - busyLock.readUnlock(); - } - } - } - - /** - * Hadoop process. - */ - private static class HadoopProcess extends ReentrantLock { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - private final HadoopJobId jobId; - - /** Process. */ - private Process proc; - - /** Init future. Completes when process is ready to receive messages. */ - private final HadoopProcessFuture initFut; - - /** Process descriptor. */ - private HadoopProcessDescriptor procDesc; - - /** Reducers planned for this process. */ - private Collection<Integer> reducers; - - /** Tasks. */ - private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>(); - - /** Terminated flag. */ - private volatile boolean terminated; - - /** - * @param jobId Job ID. - * @param initFut Init future. - */ - private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut, - int[] reducers) { - this.jobId = jobId; - this.initFut = initFut; - - if (!F.isEmpty(reducers)) { - this.reducers = new ArrayList<>(reducers.length); - - for (int r : reducers) - this.reducers.add(r); - } - } - - /** - * @return Communication process descriptor. - */ - private HadoopProcessDescriptor descriptor() { - return procDesc; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * Initialized callback. - * - * @param proc Java process representation. - * @param procDesc Process descriptor. - */ - private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) { - this.proc = proc; - this.procDesc = procDesc; - } - - /** - * Terminates process (kills it). - */ - private void terminate() { - // Guard against concurrent message sending. - lock(); - - try { - terminated = true; - - if (!initFut.isDone()) - initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override public void apply( - IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { - proc.destroy(); - } - }); - else - proc.destroy(); - } - finally { - unlock(); - } - } - - /** - * @return Terminated flag. - */ - private boolean terminated() { - return terminated; - } - - /** - * Sets process tasks. - * - * @param tasks Tasks to set. - */ - private void addTasks(Collection<HadoopTaskInfo> tasks) { - this.tasks.addAll(tasks); - } - - /** - * Removes task when it was completed. - * - * @param task Task to remove. - */ - private void removeTask(HadoopTaskInfo task) { - if (tasks != null) - tasks.remove(task); - } - - /** - * @return Collection of tasks. - */ - private Collection<HadoopTaskInfo> tasks() { - return tasks; - } - - /** - * @return Planned reducers. - */ - private Collection<Integer> reducers() { - return reducers; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcess.class, this); - } - } - - /** - * - */ - private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Child process ID. */ - private UUID childProcId; - - /** Job ID. */ - private HadoopJobId jobId; - - /** Process descriptor. */ - private HadoopProcessDescriptor desc; - - /** Running process. */ - private Process proc; - - /** Process started flag. */ - private volatile boolean procStarted; - - /** Reply received flag. */ - private volatile boolean replyReceived; - - /** Logger. */ - private final IgniteLogger log = HadoopExternalTaskExecutor.this.log; - - /** - */ - private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId) { - this.childProcId = childProcId; - this.jobId = jobId; - } - - /** - * Process started callback. - */ - public void onProcessStarted(Process proc) { - this.proc = proc; - - procStarted = true; - - if (procStarted && replyReceived) - onDone(F.t(proc, desc)); - } - - /** - * Reply received callback. - */ - public void onReplyReceived(HadoopProcessDescriptor desc) { - assert childProcId.equals(desc.processId()); - - this.desc = desc; - - replyReceived = true; - - if (procStarted && replyReceived) - onDone(F.t(proc, desc)); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res, - @Nullable Throwable err) { - if (err == null) { - HadoopProcess proc = runningProcsByProcId.get(childProcId); - - assert proc != null; - - assert proc.initFut == this; - - proc.onInitialized(res.get1(), res.get2()); - - if (!F.isEmpty(proc.reducers())) - jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc); - } - else { - // Clean up since init failed. - runningProcsByJobId.remove(jobId); - runningProcsByProcId.remove(childProcId); - } - - if (super.onDone(res, err)) { - if (err == null) { - if (log.isDebugEnabled()) - log.debug("Initialized child process for external task execution [jobId=" + jobId + - ", desc=" + desc + ", initTime=" + duration() + ']'); - } - else - U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId + - ", desc=" + desc + ']', err); - - return true; - } - - return false; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskMetadata.java deleted file mode 100644 index 6e42a7f..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskMetadata.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external; - -import java.util.Collection; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * External task metadata (classpath, JVM options) needed to start external process execution. - */ -public class HadoopExternalTaskMetadata { - /** Process classpath. */ - private Collection<String> classpath; - - /** JVM options. */ - @GridToStringInclude - private Collection<String> jvmOpts; - - /** - * @return JVM Options. - */ - public Collection<String> jvmOptions() { - return jvmOpts; - } - - /** - * @param jvmOpts JVM options. - */ - public void jvmOptions(Collection<String> jvmOpts) { - this.jvmOpts = jvmOpts; - } - - /** - * @return Classpath. - */ - public Collection<String> classpath() { - return classpath; - } - - /** - * @param classpath Classpath. - */ - public void classpath(Collection<String> classpath) { - this.classpath = classpath; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopExternalTaskMetadata.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopJobInfoUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopJobInfoUpdateRequest.java deleted file mode 100644 index 8f457a0..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopJobInfoUpdateRequest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Job info update request. - */ -public class HadoopJobInfoUpdateRequest implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private HadoopJobId jobId; - - /** Job phase. */ - @GridToStringInclude - private HadoopJobPhase jobPhase; - - /** Reducers addresses. */ - @GridToStringInclude - private HadoopProcessDescriptor[] reducersAddrs; - - /** - * Constructor required by {@link Externalizable}. - */ - public HadoopJobInfoUpdateRequest() { - // No-op. - } - - /** - * @param jobId Job ID. - * @param jobPhase Job phase. - * @param reducersAddrs Reducers addresses. - */ - public HadoopJobInfoUpdateRequest(HadoopJobId jobId, HadoopJobPhase jobPhase, - HadoopProcessDescriptor[] reducersAddrs) { - assert jobId != null; - - this.jobId = jobId; - this.jobPhase = jobPhase; - this.reducersAddrs = reducersAddrs; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @return Job phase. - */ - public HadoopJobPhase jobPhase() { - return jobPhase; - } - - /** - * @return Reducers addresses. - */ - public HadoopProcessDescriptor[] reducersAddresses() { - return reducersAddrs; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobPhase); - U.writeArray(out, reducersAddrs); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new HadoopJobId(); - jobId.readExternal(in); - - jobPhase = (HadoopJobPhase)in.readObject(); - reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopJobInfoUpdateRequest.class, this); - } -} \ No newline at end of file