http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java deleted file mode 100644 index a2c55a2..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ /dev/null @@ -1,1708 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.jobtracker; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryUpdatedListener; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.expiry.ModifiedExpiryPolicy; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.MutableEntry; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; -import org.apache.ignite.internal.processors.hadoop.HadoopComponent; -import org.apache.ignite.internal.processors.hadoop.HadoopContext; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; -import org.apache.ignite.internal.util.GridMutex; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; -import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CIX1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_COMPLETE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_MAP; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_REDUCE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.ABORT; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMMIT; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.REDUCE; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.SETUP; -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.COMPLETED; -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.CRASHED; -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.FAILED; -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.RUNNING; - -/** - * Hadoop job tracker. - */ -public class HadoopJobTracker extends HadoopComponent { - /** */ - private final GridMutex mux = new GridMutex(); - - /** */ - private volatile IgniteInternalCache<HadoopJobId, HadoopJobMetadata> jobMetaPrj; - - /** Projection with expiry policy for finished job updates. */ - private volatile IgniteInternalCache<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj; - - /** Map-reduce execution planner. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private HadoopMapReducePlanner mrPlanner; - - /** All the known jobs. */ - private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJob>> jobs = new ConcurrentHashMap8<>(); - - /** Locally active jobs. */ - private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>(); - - /** Locally requested finish futures. */ - private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobId>> activeFinishFuts = - new ConcurrentHashMap8<>(); - - /** Event processing service. */ - private ExecutorService evtProcSvc; - - /** Component busy lock. */ - private GridSpinReadWriteLock busyLock; - - /** Class to create HadoopJob instances from. */ - private Class<? extends HadoopJob> jobCls; - - /** Closure to check result of async transform of system cache. */ - private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> gridFut) { - try { - gridFut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to transform system cache.", e); - } - } - }; - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void start(final HadoopContext ctx) throws IgniteCheckedException { - super.start(ctx); - - busyLock = new GridSpinReadWriteLock(); - - evtProcSvc = Executors.newFixedThreadPool(1); - - UUID nodeId = ctx.localNodeId(); - - assert jobCls == null; - - String[] libNames = null; - - if (ctx.configuration() != null) - libNames = ctx.configuration().getNativeLibraryNames(); - - HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId), libNames, - ctx.kernalContext().hadoopHelper()); - - try { - jobCls = (Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName()); - } - catch (Exception ioe) { - throw new IgniteCheckedException("Failed to load job class [class=" - + HadoopV2Job.class.getName() + ']', ioe); - } - } - - /** - * @return Job meta projection. - */ - @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private IgniteInternalCache<HadoopJobId, HadoopJobMetadata> jobMetaCache() { - IgniteInternalCache<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj; - - if (prj == null) { - synchronized (mux) { - if ((prj = jobMetaPrj) == null) { - GridCacheAdapter<HadoopJobId, HadoopJobMetadata> sysCache = ctx.kernalContext().cache() - .internalCache(CU.SYS_CACHE_HADOOP_MR); - - assert sysCache != null; - - mrPlanner = ctx.planner(); - - try { - ctx.kernalContext().resource().injectGeneric(mrPlanner); - } - catch (IgniteCheckedException e) { // Must not happen. - U.error(log, "Failed to inject resources.", e); - - throw new IllegalStateException(e); - } - - jobMetaPrj = prj = sysCache; - - if (ctx.configuration().getFinishedJobInfoTtl() > 0) { - ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( - new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl())); - - finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc); - } - else - finishedJobMetaPrj = jobMetaPrj; - } - } - } - - return prj; - } - - /** - * @return Projection with expiry policy for finished job updates. - */ - private IgniteInternalCache<HadoopJobId, HadoopJobMetadata> finishedJobMetaCache() { - IgniteInternalCache<HadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj; - - if (prj == null) { - jobMetaCache(); - - prj = finishedJobMetaPrj; - - assert prj != null; - } - - return prj; - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - jobMetaCache().context().continuousQueries().executeInternalQuery( - new CacheEntryUpdatedListener<HadoopJobId, HadoopJobMetadata>() { - @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends HadoopJobId, - ? extends HadoopJobMetadata>> evts) { - if (!busyLock.tryReadLock()) - return; - - try { - // Must process query callback in a separate thread to avoid deadlocks. - evtProcSvc.submit(new EventHandler() { - @Override protected void body() throws IgniteCheckedException { - processJobMetadataUpdates(evts); - } - }); - } - finally { - busyLock.readUnlock(); - } - } - }, - null, - true, - true, - false - ); - - ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(final Event evt) { - if (!busyLock.tryReadLock()) - return; - - try { - // Must process discovery callback in a separate thread to avoid deadlock. - evtProcSvc.submit(new EventHandler() { - @Override protected void body() { - processNodeLeft((DiscoveryEvent)evt); - } - }); - } - finally { - busyLock.readUnlock(); - } - } - }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - busyLock.writeLock(); - - evtProcSvc.shutdown(); - - // Fail all pending futures. - for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values()) - fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); - } - - /** - * Submits execution of Hadoop job to grid. - * - * @param jobId Job ID. - * @param info Job info. - * @return Job completion future. - */ - @SuppressWarnings("unchecked") - public IgniteInternalFuture<HadoopJobId> submit(HadoopJobId jobId, HadoopJobInfo info) { - if (!busyLock.tryReadLock()) { - return new GridFinishedFuture<>(new IgniteCheckedException("Failed to execute map-reduce job " + - "(grid is stopping): " + info)); - } - - try { - long jobPrepare = U.currentTimeMillis(); - - if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId)) - throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); - - HadoopJob job = job(jobId, info); - - HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); - - HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info); - - meta.mapReducePlan(mrPlan); - - meta.pendingSplits(allSplits(mrPlan)); - meta.pendingReducers(allReducers(mrPlan)); - - GridFutureAdapter<HadoopJobId> completeFut = new GridFutureAdapter<>(); - - GridFutureAdapter<HadoopJobId> old = activeFinishFuts.put(jobId, completeFut); - - assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']'; - - if (log.isDebugEnabled()) - log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']'); - - long jobStart = U.currentTimeMillis(); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(), - ctx.localNodeId()); - - perfCntr.clientSubmissionEvents(info); - perfCntr.onJobPrepare(jobPrepare); - perfCntr.onJobStart(jobStart); - - if (jobMetaCache().getAndPutIfAbsent(jobId, meta) != null) - throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); - - return completeFut; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to submit job: " + jobId, e); - - return new GridFinishedFuture<>(e); - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Convert Hadoop job metadata to job status. - * - * @param meta Metadata. - * @return Status. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public static HadoopJobStatus status(HadoopJobMetadata meta) { - HadoopJobInfo jobInfo = meta.jobInfo(); - - return new HadoopJobStatus( - meta.jobId(), - jobInfo.jobName(), - jobInfo.user(), - meta.pendingSplits() != null ? meta.pendingSplits().size() : 0, - meta.pendingReducers() != null ? meta.pendingReducers().size() : 0, - meta.mapReducePlan().mappers(), - meta.mapReducePlan().reducers(), - meta.phase(), - meta.failCause() != null, - meta.version() - ); - } - - /** - * Gets hadoop job status for given job ID. - * - * @param jobId Job ID to get status for. - * @return Job status for given job ID or {@code null} if job was not found. - */ - @Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; // Grid is stopping. - - try { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - return meta != null ? status(meta) : null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Gets job finish future. - * - * @param jobId Job ID. - * @return Finish future or {@code null}. - * @throws IgniteCheckedException If failed. - */ - @Nullable public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; // Grid is stopping. - - try { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta == null) - return null; - - if (log.isTraceEnabled()) - log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']'); - - if (meta.phase() == PHASE_COMPLETE) { - if (log.isTraceEnabled()) - log.trace("Job is complete, returning finished future: " + jobId); - - return new GridFinishedFuture<>(jobId); - } - - GridFutureAdapter<HadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId, - new GridFutureAdapter<HadoopJobId>()); - - // Get meta from cache one more time to close the window. - meta = jobMetaCache().get(jobId); - - if (log.isTraceEnabled()) - log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']'); - - if (meta == null) { - fut.onDone(); - - activeFinishFuts.remove(jobId , fut); - } - else if (meta.phase() == PHASE_COMPLETE) { - fut.onDone(jobId, meta.failCause()); - - activeFinishFuts.remove(jobId , fut); - } - - return fut; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Gets job plan by job ID. - * - * @param jobId Job ID. - * @return Job plan. - * @throws IgniteCheckedException If failed. - */ - public HadoopMapReducePlan plan(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; - - try { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta != null) - return meta.mapReducePlan(); - - return null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Callback from task executor invoked when a task has been finished. - * - * @param info Task info. - * @param status Task status. - */ - @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) - public void onTaskFinished(HadoopTaskInfo info, HadoopTaskStatus status) { - if (!busyLock.tryReadLock()) - return; - - try { - assert status.state() != RUNNING; - - if (log.isDebugEnabled()) - log.debug("Received task finished callback [info=" + info + ", status=" + status + ']'); - - JobLocalState state = activeJobs.get(info.jobId()); - - // Task CRASHes with null fail cause. - assert (status.state() != FAILED) || status.failCause() != null : - "Invalid task status [info=" + info + ", status=" + status + ']'; - - assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)): - "Missing local state for finished task [info=" + info + ", status=" + status + ']'; - - StackedProcessor incrCntrs = null; - - if (status.state() == COMPLETED) - incrCntrs = new IncrementCountersProcessor(null, status.counters()); - - switch (info.type()) { - case SETUP: { - state.onSetupFinished(info, status, incrCntrs); - - break; - } - - case MAP: { - state.onMapFinished(info, status, incrCntrs); - - break; - } - - case REDUCE: { - state.onReduceFinished(info, status, incrCntrs); - - break; - } - - case COMBINE: { - state.onCombineFinished(info, status, incrCntrs); - - break; - } - - case COMMIT: - case ABORT: { - IgniteInternalCache<HadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache(); - - cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)). - listen(failsLog); - - break; - } - } - } - finally { - busyLock.readUnlock(); - } - } - - /** - * @param jobId Job id. - * @param c Closure of operation. - */ - private void transform(HadoopJobId jobId, EntryProcessor<HadoopJobId, HadoopJobMetadata, Void> c) { - jobMetaCache().invokeAsync(jobId, c).listen(failsLog); - } - - /** - * Callback from task executor called when process is ready to received shuffle messages. - * - * @param jobId Job ID. - * @param reducers Reducers. - * @param desc Process descriptor. - */ - public void onExternalMappersInitialized(HadoopJobId jobId, Collection<Integer> reducers, - HadoopProcessDescriptor desc) { - transform(jobId, new InitializeReducersProcessor(null, reducers, desc)); - } - - /** - * Gets all input splits for given hadoop map-reduce plan. - * - * @param plan Map-reduce plan. - * @return Collection of all input splits that should be processed. - */ - @SuppressWarnings("ConstantConditions") - private Map<HadoopInputSplit, Integer> allSplits(HadoopMapReducePlan plan) { - Map<HadoopInputSplit, Integer> res = new HashMap<>(); - - int taskNum = 0; - - for (UUID nodeId : plan.mapperNodeIds()) { - for (HadoopInputSplit split : plan.mappers(nodeId)) { - if (res.put(split, taskNum++) != null) - throw new IllegalStateException("Split duplicate."); - } - } - - return res; - } - - /** - * Gets all reducers for this job. - * - * @param plan Map-reduce plan. - * @return Collection of reducers. - */ - private Collection<Integer> allReducers(HadoopMapReducePlan plan) { - Collection<Integer> res = new HashSet<>(); - - for (int i = 0; i < plan.reducers(); i++) - res.add(i); - - return res; - } - - /** - * Processes node leave (or fail) event. - * - * @param evt Discovery event. - */ - @SuppressWarnings("ConstantConditions") - private void processNodeLeft(DiscoveryEvent evt) { - if (log.isDebugEnabled()) - log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']'); - - // Check only if this node is responsible for job status updates. - if (ctx.jobUpdateLeader()) { - boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder(); - - // Iteration over all local entries is correct since system cache is REPLICATED. - for (Object metaObj : jobMetaCache().values()) { - HadoopJobMetadata meta = (HadoopJobMetadata)metaObj; - - HadoopJobId jobId = meta.jobId(); - - HadoopMapReducePlan plan = meta.mapReducePlan(); - - HadoopJobPhase phase = meta.phase(); - - try { - if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) { - // Failover setup task. - HadoopJob job = job(jobId, meta.jobInfo()); - - Collection<HadoopTaskInfo> setupTask = setupTask(jobId); - - assert setupTask != null; - - ctx.taskExecutor().run(job, setupTask); - } - else if (phase == PHASE_MAP || phase == PHASE_REDUCE) { - // Must check all nodes, even that are not event node ID due to - // multiple node failure possibility. - Collection<HadoopInputSplit> cancelSplits = null; - - for (UUID nodeId : plan.mapperNodeIds()) { - if (ctx.kernalContext().discovery().node(nodeId) == null) { - // Node has left the grid. - Collection<HadoopInputSplit> mappers = plan.mappers(nodeId); - - if (cancelSplits == null) - cancelSplits = new HashSet<>(); - - cancelSplits.addAll(mappers); - } - } - - Collection<Integer> cancelReducers = null; - - for (UUID nodeId : plan.reducerNodeIds()) { - if (ctx.kernalContext().discovery().node(nodeId) == null) { - // Node has left the grid. - int[] reducers = plan.reducers(nodeId); - - if (cancelReducers == null) - cancelReducers = new HashSet<>(); - - for (int rdc : reducers) - cancelReducers.add(rdc); - } - } - - if (cancelSplits != null || cancelReducers != null) - jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException( - "One or more nodes participating in map-reduce job execution failed."), cancelSplits, - cancelReducers)); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to cancel job: " + meta, e); - } - } - } - } - - /** - * @param updated Updated cache entries. - * @throws IgniteCheckedException If failed. - */ - private void processJobMetadataUpdates( - Iterable<CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata>> updated) - throws IgniteCheckedException { - UUID locNodeId = ctx.localNodeId(); - - for (CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata> entry : updated) { - HadoopJobId jobId = entry.getKey(); - HadoopJobMetadata meta = entry.getValue(); - - if (meta == null || !ctx.isParticipating(meta)) - continue; - - if (log.isDebugEnabled()) - log.debug("Processing job metadata update callback [locNodeId=" + locNodeId + - ", meta=" + meta + ']'); - - try { - ctx.taskExecutor().onJobStateChanged(meta); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process job state changed callback (will fail the job) " + - "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e); - - transform(jobId, new CancelJobProcessor(null, e)); - - continue; - } - - processJobMetaUpdate(jobId, meta, locNodeId); - } - } - - /** - * @param jobId Job ID. - * @param plan Map-reduce plan. - */ - @SuppressWarnings({"unused", "ConstantConditions" }) - private void printPlan(HadoopJobId jobId, HadoopMapReducePlan plan) { - log.info("Plan for " + jobId); - - SB b = new SB(); - - b.a(" Map: "); - - for (UUID nodeId : plan.mapperNodeIds()) - b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' '); - - log.info(b.toString()); - - b = new SB(); - - b.a(" Reduce: "); - - for (UUID nodeId : plan.reducerNodeIds()) - b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' '); - - log.info(b.toString()); - } - - /** - * @param jobId Job ID. - * @param meta Job metadata. - * @param locNodeId Local node ID. - * @throws IgniteCheckedException If failed. - */ - private void processJobMetaUpdate(HadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId) - throws IgniteCheckedException { - JobLocalState state = activeJobs.get(jobId); - - HadoopJob job = job(jobId, meta.jobInfo()); - - HadoopMapReducePlan plan = meta.mapReducePlan(); - - switch (meta.phase()) { - case PHASE_SETUP: { - if (ctx.jobUpdateLeader()) { - Collection<HadoopTaskInfo> setupTask = setupTask(jobId); - - if (setupTask != null) - ctx.taskExecutor().run(job, setupTask); - } - - break; - } - - case PHASE_MAP: { - // Check if we should initiate new task on local node. - Collection<HadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta); - - if (tasks != null) - ctx.taskExecutor().run(job, tasks); - - break; - } - - case PHASE_REDUCE: { - if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) { - HadoopTaskInfo info = new HadoopTaskInfo(COMMIT, jobId, 0, 0, null); - - if (log.isDebugEnabled()) - log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId + - ", jobId=" + jobId + ']'); - - ctx.taskExecutor().run(job, Collections.singletonList(info)); - - break; - } - - Collection<HadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job); - - if (tasks != null) - ctx.taskExecutor().run(job, tasks); - - break; - } - - case PHASE_CANCELLING: { - // Prevent multiple task executor notification. - if (state != null && state.onCancel()) { - if (log.isDebugEnabled()) - log.debug("Cancelling local task execution for job: " + meta); - - ctx.taskExecutor().cancelTasks(jobId); - } - - if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) { - if (ctx.jobUpdateLeader()) { - if (state == null) - state = initState(jobId); - - // Prevent running multiple abort tasks. - if (state.onAborted()) { - HadoopTaskInfo info = new HadoopTaskInfo(ABORT, jobId, 0, 0, null); - - if (log.isDebugEnabled()) - log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId + - ", jobId=" + jobId + ']'); - - ctx.taskExecutor().run(job, Collections.singletonList(info)); - } - } - - break; - } - else { - // Check if there are unscheduled mappers or reducers. - Collection<HadoopInputSplit> cancelMappers = new ArrayList<>(); - Collection<Integer> cancelReducers = new ArrayList<>(); - - Collection<HadoopInputSplit> mappers = plan.mappers(ctx.localNodeId()); - - if (mappers != null) { - for (HadoopInputSplit b : mappers) { - if (state == null || !state.mapperScheduled(b)) - cancelMappers.add(b); - } - } - - int[] rdc = plan.reducers(ctx.localNodeId()); - - if (rdc != null) { - for (int r : rdc) { - if (state == null || !state.reducerScheduled(r)) - cancelReducers.add(r); - } - } - - if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty()) - transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers)); - } - - break; - } - - case PHASE_COMPLETE: { - if (log.isDebugEnabled()) - log.debug("Job execution is complete, will remove local state from active jobs " + - "[jobId=" + jobId + ", meta=" + meta + ']'); - - if (state != null) { - state = activeJobs.remove(jobId); - - assert state != null; - - ctx.shuffle().jobFinished(jobId); - } - - GridFutureAdapter<HadoopJobId> finishFut = activeFinishFuts.remove(jobId); - - if (finishFut != null) { - if (log.isDebugEnabled()) - log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']'); - - finishFut.onDone(jobId, meta.failCause()); - } - - assert job != null; - - if (ctx.jobUpdateLeader()) - job.cleanupStagingDirectory(); - - jobs.remove(jobId); - - if (ctx.jobUpdateLeader()) { - ClassLoader ldr = job.getClass().getClassLoader(); - - try { - String statWriterClsName = job.info().property(HadoopUtils.JOB_COUNTER_WRITER_PROPERTY); - - if (statWriterClsName != null) { - Class<?> cls = ldr.loadClass(statWriterClsName); - - HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance(); - - HadoopCounters cntrs = meta.counters(); - - writer.write(job, cntrs); - } - } - catch (Exception e) { - log.error("Can't write statistic due to: ", e); - } - } - - job.dispose(false); - - break; - } - - default: - throw new IllegalStateException("Unknown phase: " + meta.phase()); - } - } - - /** - * Creates setup task based on job information. - * - * @param jobId Job ID. - * @return Setup task wrapped in collection. - */ - @Nullable private Collection<HadoopTaskInfo> setupTask(HadoopJobId jobId) { - if (activeJobs.containsKey(jobId)) - return null; - else { - initState(jobId); - - return Collections.singleton(new HadoopTaskInfo(SETUP, jobId, 0, 0, null)); - } - } - - /** - * Creates mapper tasks based on job information. - * - * @param mappers Mapper blocks. - * @param meta Job metadata. - * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node. - */ - private Collection<HadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) { - UUID locNodeId = ctx.localNodeId(); - HadoopJobId jobId = meta.jobId(); - - JobLocalState state = activeJobs.get(jobId); - - Collection<HadoopTaskInfo> tasks = null; - - if (mappers != null) { - if (state == null) - state = initState(jobId); - - for (HadoopInputSplit split : mappers) { - if (state.addMapper(split)) { - if (log.isDebugEnabled()) - log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId + - ", split=" + split + ']'); - - HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split); - - if (tasks == null) - tasks = new ArrayList<>(); - - tasks.add(taskInfo); - } - } - } - - return tasks; - } - - /** - * Creates reducer tasks based on job information. - * - * @param reducers Reducers (may be {@code null}). - * @param job Job instance. - * @return Collection of task infos. - */ - private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) { - UUID locNodeId = ctx.localNodeId(); - HadoopJobId jobId = job.id(); - - JobLocalState state = activeJobs.get(jobId); - - Collection<HadoopTaskInfo> tasks = null; - - if (reducers != null) { - if (state == null) - state = initState(job.id()); - - for (int rdc : reducers) { - if (state.addReducer(rdc)) { - if (log.isDebugEnabled()) - log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId + - ", rdc=" + rdc + ']'); - - HadoopTaskInfo taskInfo = new HadoopTaskInfo(REDUCE, jobId, rdc, 0, null); - - if (tasks == null) - tasks = new ArrayList<>(); - - tasks.add(taskInfo); - } - } - } - - return tasks; - } - - /** - * Initializes local state for given job metadata. - * - * @param jobId Job ID. - * @return Local state. - */ - private JobLocalState initState(HadoopJobId jobId) { - return F.addIfAbsent(activeJobs, jobId, new JobLocalState()); - } - - /** - * Gets or creates job instance. - * - * @param jobId Job ID. - * @param jobInfo Job info. - * @return Job. - * @throws IgniteCheckedException If failed. - */ - @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException { - GridFutureAdapter<HadoopJob> fut = jobs.get(jobId); - - if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJob>())) != null) - return fut.get(); - - fut = jobs.get(jobId); - - HadoopJob job = null; - - try { - if (jobInfo == null) { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta == null) - throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId); - - jobInfo = meta.jobInfo(); - } - - job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames(), - ctx.kernalContext().hadoopHelper()); - - job.initialize(false, ctx.localNodeId()); - - fut.onDone(job); - - return job; - } - catch (IgniteCheckedException e) { - fut.onDone(e); - - jobs.remove(jobId, fut); - - if (job != null) { - try { - job.dispose(false); - } - catch (IgniteCheckedException e0) { - U.error(log, "Failed to dispose job: " + jobId, e0); - } - } - - throw e; - } - } - - /** - * Kills job. - * - * @param jobId Job ID. - * @return {@code True} if job was killed. - * @throws IgniteCheckedException If failed. - */ - public boolean killJob(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return false; // Grid is stopping. - - try { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) { - HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled."); - - jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err)); - } - } - finally { - busyLock.readUnlock(); - } - - IgniteInternalFuture<?> fut = finishFuture(jobId); - - if (fut != null) { - try { - fut.get(); - } - catch (Exception e) { - if (e.getCause() instanceof HadoopTaskCancelledException) - return true; - } - } - - return false; - } - - /** - * Returns job counters. - * - * @param jobId Job identifier. - * @return Job counters or {@code null} if job cannot be found. - * @throws IgniteCheckedException If failed. - */ - @Nullable public HadoopCounters jobCounters(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; - - try { - final HadoopJobMetadata meta = jobMetaCache().get(jobId); - - return meta != null ? meta.counters() : null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Event handler protected by busy lock. - */ - private abstract class EventHandler implements Runnable { - /** {@inheritDoc} */ - @Override public void run() { - if (!busyLock.tryReadLock()) - return; - - try { - body(); - } - catch (Throwable e) { - U.error(log, "Unhandled exception while processing event.", e); - - if (e instanceof Error) - throw (Error)e; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Handler body. - */ - protected abstract void body() throws Exception; - } - - /** - * - */ - private class JobLocalState { - /** Mappers. */ - private final Collection<HadoopInputSplit> currMappers = new HashSet<>(); - - /** Reducers. */ - private final Collection<Integer> currReducers = new HashSet<>(); - - /** Number of completed mappers. */ - private final AtomicInteger completedMappersCnt = new AtomicInteger(); - - /** Cancelled flag. */ - private boolean cancelled; - - /** Aborted flag. */ - private boolean aborted; - - /** - * @param mapSplit Map split to add. - * @return {@code True} if mapper was added. - */ - private boolean addMapper(HadoopInputSplit mapSplit) { - return currMappers.add(mapSplit); - } - - /** - * @param rdc Reducer number to add. - * @return {@code True} if reducer was added. - */ - private boolean addReducer(int rdc) { - return currReducers.add(rdc); - } - - /** - * Checks whether this split was scheduled for given attempt. - * - * @param mapSplit Map split to check. - * @return {@code True} if mapper was scheduled. - */ - public boolean mapperScheduled(HadoopInputSplit mapSplit) { - return currMappers.contains(mapSplit); - } - - /** - * Checks whether this split was scheduled for given attempt. - * - * @param rdc Reducer number to check. - * @return {@code True} if reducer was scheduled. - */ - public boolean reducerScheduled(int rdc) { - return currReducers.contains(rdc); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onSetupFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { - final HadoopJobId jobId = taskInfo.jobId(); - - if (status.state() == FAILED || status.state() == CRASHED) - transform(jobId, new CancelJobProcessor(prev, status.failCause())); - else - transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP)); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onMapFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, - final StackedProcessor prev) { - final HadoopJobId jobId = taskInfo.jobId(); - - boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size(); - - if (status.state() == FAILED || status.state() == CRASHED) { - // Fail the whole job. - transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause())); - - return; - } - - IgniteInClosure<IgniteInternalFuture<?>> cacheUpdater = new CIX1<IgniteInternalFuture<?>>() { - @Override public void applyx(IgniteInternalFuture<?> f) { - Throwable err = null; - - if (f != null) { - try { - f.get(); - } - catch (IgniteCheckedException e) { - err = e; - } - } - - transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err)); - } - }; - - if (lastMapperFinished) - ctx.shuffle().flush(jobId).listen(cacheUpdater); - else - cacheUpdater.apply(null); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onReduceFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { - HadoopJobId jobId = taskInfo.jobId(); - if (status.state() == FAILED || status.state() == CRASHED) - // Fail the whole job. - transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause())); - else - transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber())); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onCombineFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, - final StackedProcessor prev) { - final HadoopJobId jobId = taskInfo.jobId(); - - if (status.state() == FAILED || status.state() == CRASHED) - // Fail the whole job. - transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause())); - else { - ctx.shuffle().flush(jobId).listen(new CIX1<IgniteInternalFuture<?>>() { - @Override public void applyx(IgniteInternalFuture<?> f) { - Throwable err = null; - - if (f != null) { - try { - f.get(); - } - catch (IgniteCheckedException e) { - err = e; - } - } - - transform(jobId, new RemoveMappersProcessor(prev, currMappers, err)); - } - }); - } - } - - /** - * @return {@code True} if job was cancelled by this (first) call. - */ - public boolean onCancel() { - if (!cancelled && !aborted) { - cancelled = true; - - return true; - } - - return false; - } - - /** - * @return {@code True} if job was aborted this (first) call. - */ - public boolean onAborted() { - if (!aborted) { - aborted = true; - - return true; - } - - return false; - } - } - - /** - * Update job phase transform closure. - */ - private static class UpdatePhaseProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Phase to update. */ - private final HadoopJobPhase phase; - - /** - * @param prev Previous closure. - * @param phase Phase to update. - */ - private UpdatePhaseProcessor(@Nullable StackedProcessor prev, HadoopJobPhase phase) { - super(prev); - - this.phase = phase; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - cp.phase(phase); - } - } - - /** - * Remove mapper transform closure. - */ - private static class RemoveMappersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final Collection<HadoopInputSplit> splits; - - /** Error. */ - private final Throwable err; - - /** - * @param prev Previous closure. - * @param split Mapper split to remove. - * @param err Error. - */ - private RemoveMappersProcessor(@Nullable StackedProcessor prev, HadoopInputSplit split, Throwable err) { - this(prev, Collections.singletonList(split), err); - } - - /** - * @param prev Previous closure. - * @param splits Mapper splits to remove. - * @param err Error. - */ - private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<HadoopInputSplit> splits, - Throwable err) { - super(prev); - - this.splits = splits; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); - - for (HadoopInputSplit s : splits) - splitsCp.remove(s); - - cp.pendingSplits(splitsCp); - - if (cp.phase() != PHASE_CANCELLING && err != null) - cp.failCause(err); - - if (err != null) - cp.phase(PHASE_CANCELLING); - - if (splitsCp.isEmpty()) { - if (cp.phase() != PHASE_CANCELLING) - cp.phase(PHASE_REDUCE); - } - } - } - - /** - * Remove reducer transform closure. - */ - private static class RemoveReducerProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final int rdc; - - /** Error. */ - private Throwable err; - - /** - * @param prev Previous closure. - * @param rdc Reducer to remove. - */ - private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) { - super(prev); - - this.rdc = rdc; - } - - /** - * @param prev Previous closure. - * @param rdc Reducer to remove. - * @param err Error. - */ - private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) { - super(prev); - - this.rdc = rdc; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers()); - - rdcCp.remove(rdc); - - cp.pendingReducers(rdcCp); - - if (err != null) { - cp.phase(PHASE_CANCELLING); - cp.failCause(err); - } - } - } - - /** - * Initialize reducers. - */ - private static class InitializeReducersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Reducers. */ - private final Collection<Integer> rdc; - - /** Process descriptor for reducers. */ - private final HadoopProcessDescriptor desc; - - /** - * @param prev Previous closure. - * @param rdc Reducers to initialize. - * @param desc External process descriptor. - */ - private InitializeReducersProcessor(@Nullable StackedProcessor prev, - Collection<Integer> rdc, - HadoopProcessDescriptor desc) { - super(prev); - - assert !F.isEmpty(rdc); - assert desc != null; - - this.rdc = rdc; - this.desc = desc; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - Map<Integer, HadoopProcessDescriptor> oldMap = meta.reducersAddresses(); - - Map<Integer, HadoopProcessDescriptor> rdcMap = oldMap == null ? - new HashMap<Integer, HadoopProcessDescriptor>() : new HashMap<>(oldMap); - - for (Integer r : rdc) - rdcMap.put(r, desc); - - cp.reducersAddresses(rdcMap); - } - } - - /** - * Remove reducer transform closure. - */ - private static class CancelJobProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final Collection<HadoopInputSplit> splits; - - /** Reducers to remove. */ - private final Collection<Integer> rdc; - - /** Error. */ - private final Throwable err; - - /** - * @param prev Previous closure. - * @param err Fail cause. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) { - this(prev, err, null, null); - } - - /** - * @param prev Previous closure. - * @param splits Splits to remove. - * @param rdc Reducers to remove. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, - Collection<HadoopInputSplit> splits, - Collection<Integer> rdc) { - this(prev, null, splits, rdc); - } - - /** - * @param prev Previous closure. - * @param err Error. - * @param splits Splits to remove. - * @param rdc Reducers to remove. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, - Throwable err, - Collection<HadoopInputSplit> splits, - Collection<Integer> rdc) { - super(prev); - - this.splits = splits; - this.rdc = rdc; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - final HadoopJobPhase currPhase = meta.phase(); - - assert currPhase == PHASE_CANCELLING || currPhase == PHASE_COMPLETE - || err != null: "Invalid phase for cancel: " + currPhase; - - Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers()); - - if (rdc != null) - rdcCp.removeAll(rdc); - - cp.pendingReducers(rdcCp); - - Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); - - if (splits != null) { - for (HadoopInputSplit s : splits) - splitsCp.remove(s); - } - - cp.pendingSplits(splitsCp); - - if (currPhase != PHASE_COMPLETE && currPhase != PHASE_CANCELLING) - cp.phase(PHASE_CANCELLING); - - if (err != null) - cp.failCause(err); - } - } - - /** - * Increment counter values closure. - */ - private static class IncrementCountersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final HadoopCounters counters; - - /** - * @param prev Previous closure. - * @param counters Task counters to add into job counters. - */ - private IncrementCountersProcessor(@Nullable StackedProcessor prev, HadoopCounters counters) { - super(prev); - - assert counters != null; - - this.counters = counters; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - HadoopCounters cntrs = new HadoopCountersImpl(cp.counters()); - - cntrs.merge(counters); - - cp.counters(cntrs); - } - } - - /** - * Abstract stacked closure. - */ - private abstract static class StackedProcessor implements - EntryProcessor<HadoopJobId, HadoopJobMetadata, Void>, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final StackedProcessor prev; - - /** - * @param prev Previous closure. - */ - private StackedProcessor(@Nullable StackedProcessor prev) { - this.prev = prev; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry<HadoopJobId, HadoopJobMetadata> e, Object... args) { - HadoopJobMetadata val = apply(e.getValue()); - - if (val != null) - e.setValue(val); - else - e.remove(); - - return null; - } - - /** - * @param meta Old value. - * @return New value. - */ - private HadoopJobMetadata apply(HadoopJobMetadata meta) { - if (meta == null) - return null; - - HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta); - - update(meta, cp); - - return cp; - } - - /** - * Update given job metadata object. - * - * @param meta Initial job metadata. - * @param cp Copy. - */ - protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp); - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java deleted file mode 100644 index 0d7bd3a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.message; - -import java.io.Externalizable; - -/** - * Marker interface for all hadoop messages. - */ -public interface HadoopMessage extends Externalizable { - // No-op. -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java index 15c62c8..7aaf3fa 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java @@ -17,13 +17,14 @@ package org.apache.ignite.internal.processors.hadoop.planner; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; import org.jetbrains.annotations.Nullable; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + /** * Map-reduce plan. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java deleted file mode 100644 index 5f96e08..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.mapreduce.Cluster; -import org.apache.hadoop.mapreduce.ClusterMetrics; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.QueueAclsInfo; -import org.apache.hadoop.mapreduce.QueueInfo; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskCompletionEvent; -import org.apache.hadoop.mapreduce.TaskReport; -import org.apache.hadoop.mapreduce.TaskTrackerInfo; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.protocol.ClientProtocol; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.mapreduce.v2.LogParams; -import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.security.token.Token; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.client.GridClient; -import org.apache.ignite.internal.client.GridClientException; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; -import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceCounters; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; - -/** - * Hadoop client protocol. - */ -public class HadoopClientProtocol implements ClientProtocol { - /** Protocol version. */ - private static final long PROTO_VER = 1L; - - /** Default Ignite system directory. */ - private static final String SYS_DIR = ".ignite/system"; - - /** Configuration. */ - private final Configuration conf; - - /** Ignite client. */ - private volatile GridClient cli; - - /** Last received version. */ - private long lastVer = -1; - - /** Last received status. */ - private HadoopJobStatus lastStatus; - - /** - * Constructor. - * - * @param conf Configuration. - * @param cli Ignite client. - */ - public HadoopClientProtocol(Configuration conf, GridClient cli) { - assert cli != null; - - this.conf = conf; - this.cli = cli; - } - - /** {@inheritDoc} */ - @Override public JobID getNewJobID() throws IOException, InterruptedException { - try { - conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - - HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null); - - conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - - return new JobID(jobID.globalId().toString(), jobID.localId()); - } - catch (GridClientException e) { - throw new IOException("Failed to get new job ID.", e); - } - } - - /** {@inheritDoc} */ - @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, - InterruptedException { - try { - conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); - - HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(), - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); - - if (status == null) - throw new IOException("Failed to submit job (null status obtained): " + jobId); - - return processStatus(status); - } - catch (GridClientException | IgniteCheckedException e) { - throw new IOException("Failed to submit job.", e); - } - } - - /** {@inheritDoc} */ - @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { - return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0); - } - - /** {@inheritDoc} */ - @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException { - return Cluster.JobTrackerStatus.RUNNING; - } - - /** {@inheritDoc} */ - @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException { - return 0; - } - - /** {@inheritDoc} */ - @Override public AccessControlList getQueueAdmins(String queueName) throws IOException { - return new AccessControlList("*"); - } - - /** {@inheritDoc} */ - @Override public void killJob(JobID jobId) throws IOException, InterruptedException { - try { - cli.compute().execute(HadoopProtocolKillJobTask.class.getName(), - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); - } - catch (GridClientException e) { - throw new IOException("Failed to kill job: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, - InterruptedException { - return false; - } - - /** {@inheritDoc} */ - @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException { - try { - Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); - - HadoopProtocolTaskArguments args = delay >= 0 ? - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); - - HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args); - - if (status == null) - throw new IOException("Job tracker doesn't have any information about the job: " + jobId); - - return processStatus(status); - } - catch (GridClientException e) { - throw new IOException("Failed to get job status: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { - try { - final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(), - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); - - if (counters == null) - throw new IOException("Job tracker doesn't have any information about the job: " + jobId); - - return new HadoopMapReduceCounters(counters); - } - catch (GridClientException e) { - throw new IOException("Failed to get job counters: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException { - return new TaskReport[0]; - } - - /** {@inheritDoc} */ - @Override public String getFilesystemName() throws IOException, InterruptedException { - return FileSystem.get(conf).getUri().toString(); - } - - /** {@inheritDoc} */ - @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException { - return new JobStatus[0]; - } - - /** {@inheritDoc} */ - @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents) - throws IOException, InterruptedException { - return new TaskCompletionEvent[0]; - } - - /** {@inheritDoc} */ - @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException { - return new String[0]; - } - - /** {@inheritDoc} */ - @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { - return new TaskTrackerInfo[0]; - } - - /** {@inheritDoc} */ - @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { - return new TaskTrackerInfo[0]; - } - - /** {@inheritDoc} */ - @Override public String getSystemDir() throws IOException, InterruptedException { - Path sysDir = new Path(SYS_DIR); - - return sysDir.toString(); - } - - /** {@inheritDoc} */ - @Override public String getStagingAreaDir() throws IOException, InterruptedException { - String usr = UserGroupInformation.getCurrentUser().getShortUserName(); - - return HadoopUtils.stagingAreaDir(conf, usr).toString(); - } - - /** {@inheritDoc} */ - @Override public String getJobHistoryDir() throws IOException, InterruptedException { - return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getQueues() throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { - return new QueueAclsInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, - InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, - InterruptedException { - return 0; - } - - /** {@inheritDoc} */ - @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, - InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException, - InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return PROTO_VER; - } - - /** {@inheritDoc} */ - @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) - throws IOException { - return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); - } - - /** - * Process received status update. - * - * @param status Ignite status. - * @return Hadoop status. - */ - private JobStatus processStatus(HadoopJobStatus status) { - // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because - // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class - // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will - // change in future and either protocol will serve statuses for several jobs or status update will not be - // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap). - // (vozerov) - if (lastVer < status.version()) { - lastVer = status.version(); - - lastStatus = status; - } - else - assert lastStatus != null; - - return HadoopUtils.status(lastStatus, conf); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java deleted file mode 100644 index 8f0271c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.compute.ComputeJobContext; -import org.apache.ignite.internal.processors.hadoop.Hadoop; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; - -/** - * Task to get job counters. - */ -public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<HadoopCounters> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public HadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop, - HadoopProtocolTaskArguments args) throws IgniteCheckedException { - - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - - assert nodeId != null; - assert id != null; - - return hadoop.counters(new HadoopJobId(nodeId, id)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java deleted file mode 100644 index c08fe77..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.compute.ComputeJobContext; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.Hadoop; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteInClosure; - -/** - * Job status task. - */ -public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> { - /** */ - private static final long serialVersionUID = 0L; - - /** Default poll delay */ - private static final long DFLT_POLL_DELAY = 100L; - - /** Attribute for held status. */ - private static final String ATTR_HELD = "held"; - - /** {@inheritDoc} */ - @Override public HadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop, - HadoopProtocolTaskArguments args) throws IgniteCheckedException { - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - Long pollDelay = args.get(2); - - assert nodeId != null; - assert id != null; - - HadoopJobId jobId = new HadoopJobId(nodeId, id); - - if (pollDelay == null) - pollDelay = DFLT_POLL_DELAY; - - if (pollDelay > 0) { - IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId); - - if (fut != null) { - if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true)) - return hadoop.status(jobId); - else { - fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut0) { - jobCtx.callcc(); - } - }); - - jobCtx.setAttribute(ATTR_HELD, true); - - return jobCtx.holdcc(pollDelay); - } - } - else - return null; - } - else - return hadoop.status(jobId); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java deleted file mode 100644 index 0f65664..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.compute.ComputeJobContext; -import org.apache.ignite.internal.processors.hadoop.Hadoop; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; - -/** - * Kill job task. - */ -public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public Boolean run(ComputeJobContext jobCtx, Hadoop hadoop, - HadoopProtocolTaskArguments args) throws IgniteCheckedException { - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - - assert nodeId != null; - assert id != null; - - HadoopJobId jobId = new HadoopJobId(nodeId, id); - - return hadoop.kill(jobId); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java deleted file mode 100644 index bde7821..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.compute.ComputeJobContext; -import org.apache.ignite.internal.processors.hadoop.Hadoop; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; - -/** - * Task to get the next job ID. - */ -public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<HadoopJobId> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public HadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop, - HadoopProtocolTaskArguments args) { - return hadoop.nextJobId(); - } -} \ No newline at end of file