http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java new file mode 100644 index 0000000..76bc629 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.Ignition; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteState.STARTED; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST; +import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED; +import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM; +import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP; +import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.parameter; + +/** + * Wrapper for IGFS server. + */ +public class HadoopIgfsWrapper implements HadoopIgfs { + /** Delegate. */ + private final AtomicReference<Delegate> delegateRef = new AtomicReference<>(); + + /** Authority. */ + private final String authority; + + /** Connection string. */ + private final HadoopIgfsEndpoint endpoint; + + /** Log directory. */ + private final String logDir; + + /** Configuration. */ + private final Configuration conf; + + /** Logger. */ + private final Log log; + + /** The user name this wrapper works on behalf of. */ + private final String userName; + + /** + * Constructor. + * + * @param authority Authority (connection string). + * @param logDir Log directory for server. + * @param conf Configuration. + * @param log Current logger. + */ + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) + throws IOException { + try { + this.authority = authority; + this.endpoint = new HadoopIgfsEndpoint(authority); + this.logDir = logDir; + this.conf = conf; + this.log = log; + this.userName = user; + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to parse endpoint: " + authority, e); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) { + return hndResp; + } + }); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + Delegate delegate = delegateRef.get(); + + if (delegate != null && delegateRef.compareAndSet(delegate, null)) + delegate.close(force); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsFile>() { + @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.info(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsFile>() { + @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.update(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) + throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.setTimes(path, accessTime, modificationTime); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.rename(src, dest); + } + }, src); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.delete(path, recursive); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, + final long len) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.affinity(path, start, len); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() { + @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.contentSummary(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.mkdirs(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listFiles(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listPaths(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsStatus>() { + @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.fsStatus(); + } + }); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) + throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path, seqReadsBeforePrefetch); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, + final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props) + throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.create(path, overwrite, colocate, replication, blockSize, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + @Nullable final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.append(path, create, props); + } + }, path); + } + + /** + * Execute closure which is not path-specific. + * + * @param clo Closure. + * @return Result. + * @throws IOException If failed. + */ + private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException { + return withReconnectHandling(clo, null); + } + + /** + * Execute closure. + * + * @param clo Closure. + * @param path Path for exceptions. + * @return Result. + * @throws IOException If failed. + */ + private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path) + throws IOException { + Exception err = null; + + for (int i = 0; i < 2; i++) { + Delegate curDelegate = null; + + boolean close = false; + boolean force = false; + + try { + curDelegate = delegate(); + + assert curDelegate != null; + + close = curDelegate.doomed; + + return clo.apply(curDelegate.hadoop, curDelegate.hndResp); + } + catch (HadoopIgfsCommunicationException e) { + if (curDelegate != null && !curDelegate.doomed) { + // Try getting rid fo faulty delegate ASAP. + delegateRef.compareAndSet(curDelegate, null); + + close = true; + force = true; + } + + if (log.isDebugEnabled()) + log.debug("Failed to send message to a server: " + e); + + err = e; + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null); + } + finally { + if (close) { + assert curDelegate != null; + + curDelegate.close(force); + } + } + } + + List<Throwable> list = X.getThrowableList(err); + + Throwable cause = list.get(list.size() - 1); + + throw new IOException("Failed to communicate with IGFS: " + + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err); + } + + /** + * Get delegate creating it if needed. + * + * @return Delegate. + */ + private Delegate delegate() throws HadoopIgfsCommunicationException { + // These fields will contain possible exceptions from shmem and TCP endpoints. + Exception errShmem = null; + Exception errTcp = null; + + // 1. If delegate is set, return it immediately. + Delegate curDelegate = delegateRef.get(); + + if (curDelegate != null) + return curDelegate; + + // 2. Guess that we are in the same VM. + boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false); + + if (!skipInProc) { + IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs()); + + if (igfs != null) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsInProc(igfs, log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + if (hadoop != null) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e); + } + } + } + + // 3. Try connecting using shmem. + boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false); + + if (curDelegate == null && !skipLocShmem && !U.isWindows()) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e); + + errShmem = e; + } + } + + // 4. Try local TCP connection. + boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); + + if (curDelegate == null && !skipLocTcp) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + + ", port=" + endpoint.port() + ']', e); + + errTcp = e; + } + } + + // 5. Try remote TCP connection. + if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + + ", port=" + endpoint.port() + ']', e); + + errTcp = e; + } + } + + if (curDelegate != null) { + if (!delegateRef.compareAndSet(null, curDelegate)) + curDelegate.doomed = true; + + return curDelegate; + } + else { + SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=["); + + if (errShmem != null) + errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], "); + + errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] "); + + errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " + + "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint)."); + + throw new HadoopIgfsCommunicationException(errMsg.toString()); + } + } + + /** + * File system operation closure. + */ + private static interface FileSystemClosure<T> { + /** + * Call closure body. + * + * @param hadoop RPC handler. + * @param hndResp Handshake response. + * @return Result. + * @throws IgniteCheckedException If failed. + * @throws IOException If failed. + */ + public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; + } + + /** + * Delegate. + */ + private static class Delegate { + /** RPC handler. */ + private final HadoopIgfsEx hadoop; + + /** Handshake request. */ + private final IgfsHandshakeResponse hndResp; + + /** Close guard. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Whether this delegate must be closed at the end of the next invocation. */ + private boolean doomed; + + /** + * Constructor. + * + * @param hadoop Hadoop. + * @param hndResp Handshake response. + */ + private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) { + this.hadoop = hadoop; + this.hndResp = hndResp; + } + + /** + * Close underlying RPC handler. + * + * @param force Force flag. + */ + private void close(boolean force) { + if (closeGuard.compareAndSet(false, true)) + hadoop.close(force); + } + } + + /** + * Helper method to find Igfs of the given name in the given Ignite instance. + * + * @param gridName The name of the grid to check. + * @param igfsName The name of Igfs. + * @return The file system instance, or null if not found. + */ + private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) { + if (Ignition.state(gridName) == STARTED) { + try { + for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) { + if (F.eq(fs.name(), igfsName)) + return (IgfsEx)fs; + } + } + catch (IgniteIllegalStateException ignore) { + // May happen if the grid state has changed: + } + } + + return null; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java new file mode 100644 index 0000000..ed8beb0 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.proto; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.QueueAclsInfo; +import org.apache.hadoop.mapreduce.QueueInfo; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskCompletionEvent; +import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapreduce.TaskTrackerInfo; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.LogParams; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.security.token.Token; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientException; +import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; +import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopMapReduceCounters; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolJobCountersTask; +import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolJobStatusTask; +import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolKillJobTask; +import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolNextTaskIdTask; +import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolSubmitJobTask; +import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolTaskArguments; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; + +/** + * Hadoop client protocol. + */ +public class HadoopClientProtocol implements ClientProtocol { + /** Protocol version. */ + private static final long PROTO_VER = 1L; + + /** Default Ignite system directory. */ + private static final String SYS_DIR = ".ignite/system"; + + /** Configuration. */ + private final Configuration conf; + + /** Ignite client. */ + private volatile GridClient cli; + + /** Last received version. */ + private long lastVer = -1; + + /** Last received status. */ + private HadoopJobStatus lastStatus; + + /** + * Constructor. + * + * @param conf Configuration. + * @param cli Ignite client. + */ + public HadoopClientProtocol(Configuration conf, GridClient cli) { + assert cli != null; + + this.conf = conf; + this.cli = cli; + } + + /** {@inheritDoc} */ + @Override public JobID getNewJobID() throws IOException, InterruptedException { + try { + conf.setLong(HadoopCommonUtils.REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); + + HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null); + + conf.setLong(HadoopCommonUtils.RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); + + return new JobID(jobID.globalId().toString(), jobID.localId()); + } + catch (GridClientException e) { + throw new IOException("Failed to get new job ID.", e); + } + } + + /** {@inheritDoc} */ + @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, + InterruptedException { + try { + conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); + + HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); + + if (status == null) + throw new IOException("Failed to submit job (null status obtained): " + jobId); + + return processStatus(status); + } + catch (GridClientException | IgniteCheckedException e) { + throw new IOException("Failed to submit job.", e); + } + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { + return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0); + } + + /** {@inheritDoc} */ + @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException { + return Cluster.JobTrackerStatus.RUNNING; + } + + /** {@inheritDoc} */ + @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public AccessControlList getQueueAdmins(String queueName) throws IOException { + return new AccessControlList("*"); + } + + /** {@inheritDoc} */ + @Override public void killJob(JobID jobId) throws IOException, InterruptedException { + try { + cli.compute().execute(HadoopProtocolKillJobTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + } + catch (GridClientException e) { + throw new IOException("Failed to kill job: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, + InterruptedException { + return false; + } + + /** {@inheritDoc} */ + @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException { + try { + Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); + + HadoopProtocolTaskArguments args = delay >= 0 ? + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); + + HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args); + + if (status == null) + throw new IOException("Job tracker doesn't have any information about the job: " + jobId); + + return processStatus(status); + } + catch (GridClientException e) { + throw new IOException("Failed to get job status: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { + try { + final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + + if (counters == null) + throw new IOException("Job tracker doesn't have any information about the job: " + jobId); + + return new HadoopMapReduceCounters(counters); + } + catch (GridClientException e) { + throw new IOException("Failed to get job counters: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException { + return new TaskReport[0]; + } + + /** {@inheritDoc} */ + @Override public String getFilesystemName() throws IOException, InterruptedException { + return FileSystem.get(conf).getUri().toString(); + } + + /** {@inheritDoc} */ + @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException { + return new JobStatus[0]; + } + + /** {@inheritDoc} */ + @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents) + throws IOException, InterruptedException { + return new TaskCompletionEvent[0]; + } + + /** {@inheritDoc} */ + @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException { + return new String[0]; + } + + /** {@inheritDoc} */ + @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { + return new TaskTrackerInfo[0]; + } + + /** {@inheritDoc} */ + @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { + return new TaskTrackerInfo[0]; + } + + /** {@inheritDoc} */ + @Override public String getSystemDir() throws IOException, InterruptedException { + Path sysDir = new Path(SYS_DIR); + + return sysDir.toString(); + } + + /** {@inheritDoc} */ + @Override public String getStagingAreaDir() throws IOException, InterruptedException { + String usr = UserGroupInformation.getCurrentUser().getShortUserName(); + + return HadoopUtils.stagingAreaDir(conf, usr).toString(); + } + + /** {@inheritDoc} */ + @Override public String getJobHistoryDir() throws IOException, InterruptedException { + return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getQueues() throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { + return new QueueAclsInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, + InterruptedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, + InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return PROTO_VER; + } + + /** {@inheritDoc} */ + @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); + } + + /** + * Process received status update. + * + * @param status Ignite status. + * @return Hadoop status. + */ + private JobStatus processStatus(HadoopJobStatus status) { + // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because + // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class + // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will + // change in future and either protocol will serve statuses for several jobs or status update will not be + // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap). + // (vozerov) + if (lastVer < status.version()) { + lastVer = status.version(); + + lastStatus = status; + } + else + assert lastStatus != null; + + return HadoopUtils.status(lastStatus, conf); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java new file mode 100644 index 0000000..d017d03 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext; + +/** + * Hadoop cleanup task implementation for v1 API. + */ +public class HadoopV1CleanupTask extends HadoopV1Task { + /** Abort flag. */ + private final boolean abort; + + /** + * @param taskInfo Task info. + * @param abort Abort flag. + */ + public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { + super(taskInfo); + + this.abort = abort; + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobContext jobCtx = ctx.jobContext(); + + try { + OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter(); + + if (abort) + committer.abortJob(jobCtx, JobStatus.State.FAILED); + else + committer.commitJob(jobCtx); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java new file mode 100644 index 0000000..a2128c2 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Counter; + +import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString; + +/** + * Hadoop counter implementation for v1 API. + */ +public class HadoopV1Counter extends Counters.Counter { + /** Delegate. */ + private final HadoopLongCounter cntr; + + /** + * Creates new instance. + * + * @param cntr Delegate counter. + */ + public HadoopV1Counter(HadoopLongCounter cntr) { + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String getName() { + return cntr.name(); + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return getName(); + } + + /** {@inheritDoc} */ + @Override public long getValue() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public void setValue(long val) { + cntr.value(val); + } + + /** {@inheritDoc} */ + @Override public void increment(long incr) { + cntr.increment(incr); + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public String makeEscapedCompactString() { + return toEscapedCompactString(new HadoopV2Counter(cntr)); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean contentEquals(Counters.Counter cntr) { + return getUnderlyingCounter().equals(cntr.getUnderlyingCounter()); + } + + /** {@inheritDoc} */ + @Override public long getCounter() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public Counter getUnderlyingCounter() { + return this; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java new file mode 100644 index 0000000..65ff280 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext; + +/** + * Hadoop map task implementation for v1 API. + */ +public class HadoopV1MapTask extends HadoopV1Task { + /** */ + private static final String[] EMPTY_HOSTS = new String[0]; + + /** + * Constructor. + * + * @param taskInfo + */ + public HadoopV1MapTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + InputFormat inFormat = jobConf.getInputFormat(); + + HadoopInputSplit split = info().inputSplit(); + + InputSplit nativeSplit; + + if (split instanceof HadoopFileBlock) { + HadoopFileBlock block = (HadoopFileBlock)split; + + nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); + } + else + nativeSplit = (InputSplit)ctx.getNativeSplit(split); + + assert nativeSplit != null; + + Reporter reporter = new HadoopV1Reporter(taskCtx); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(), + fileName(), ctx.attemptId()); + + RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter); + + Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); + + Object key = reader.createKey(); + Object val = reader.createValue(); + + assert mapper != null; + + try { + try { + while (reader.next(key, val)) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Map task cancelled."); + + mapper.map(key, val, collector, reporter); + } + } + finally { + mapper.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java new file mode 100644 index 0000000..7cbe85b --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop output collector. + */ +public class HadoopV1OutputCollector implements OutputCollector { + /** Job configuration. */ + private final JobConf jobConf; + + /** Task context. */ + private final HadoopTaskContext taskCtx; + + /** Optional direct writer. */ + private final RecordWriter writer; + + /** Task attempt. */ + private final TaskAttemptID attempt; + + /** + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @throws IOException In case of IO exception. + */ + HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite, + @Nullable String fileName, TaskAttemptID attempt) throws IOException { + this.jobConf = jobConf; + this.taskCtx = taskCtx; + this.attempt = attempt; + + if (directWrite) { + jobConf.set("mapreduce.task.attempt.id", attempt.toString()); + + OutputFormat outFormat = jobConf.getOutputFormat(); + + writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); + } + else + writer = null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void collect(Object key, Object val) throws IOException { + if (writer != null) + writer.write(key, val); + else { + try { + taskCtx.output().write(key, val); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + } + + /** + * Close writer. + * + * @throws IOException In case of IO exception. + */ + public void closeWriter() throws IOException { + if (writer != null) + writer.close(Reporter.NULL); + } + + /** + * Setup task. + * + * @throws IOException If failed. + */ + public void setup() throws IOException { + if (writer != null) + jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + + /** + * Commit task. + * + * @throws IOException In failed. + */ + public void commit() throws IOException { + if (writer != null) { + OutputCommitter outputCommitter = jobConf.getOutputCommitter(); + + TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); + + if (outputCommitter.needsTaskCommit(taskCtx)) + outputCommitter.commitTask(taskCtx); + } + } + + /** + * Abort task. + */ + public void abort() { + try { + if (writer != null) + jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + catch (IOException ignore) { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java new file mode 100644 index 0000000..97634d9 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.Partitioner; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; + +/** + * Hadoop partitioner adapter for v1 API. + */ +public class HadoopV1Partitioner implements HadoopPartitioner { + /** Partitioner instance. */ + private Partitioner<Object, Object> part; + + /** + * @param cls Hadoop partitioner class. + * @param conf Job configuration. + */ + public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) { + part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key, Object val, int parts) { + return part.getPartition(key, val, parts); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java new file mode 100644 index 0000000..92c024e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext; + +/** + * Hadoop reduce task implementation for v1 API. + */ +public class HadoopV1ReduceTask extends HadoopV1Task { + /** {@code True} if reduce, {@code false} if combine. */ + private final boolean reduce; + + /** + * Constructor. + * + * @param taskInfo Task info. + * @param reduce {@code True} if reduce, {@code false} if combine. + */ + public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { + super(taskInfo); + + this.reduce = reduce; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + HadoopTaskInput input = taskCtx.input(); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); + + Reducer reducer; + if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(), + jobConf); + else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(), + jobConf); + + assert reducer != null; + + try { + try { + while (input.next()) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Reduce task cancelled."); + + reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); + } + } + finally { + reducer.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java new file mode 100644 index 0000000..f3229e2 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; + +/** + * Hadoop reporter implementation for v1 API. + */ +public class HadoopV1Reporter implements Reporter { + /** Context. */ + private final HadoopTaskContext ctx; + + /** + * Creates new instance. + * + * @param ctx Context. + */ + public HadoopV1Reporter(HadoopTaskContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void setStatus(String status) { + // TODO + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(Enum<?> name) { + return getCounter(name.getDeclaringClass().getName(), name.name()); + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(String grp, String name) { + return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class)); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(Enum<?> key, long amount) { + getCounter(key).increment(amount); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(String grp, String cntr, long amount) { + getCounter(grp, cntr).increment(amount); + } + + /** {@inheritDoc} */ + @Override public InputSplit getInputSplit() throws UnsupportedOperationException { + throw new UnsupportedOperationException("reporter has no input"); // TODO + } + + /** {@inheritDoc} */ + @Override public float getProgress() { + return 0.5f; // TODO + } + + /** {@inheritDoc} */ + @Override public void progress() { + // TODO + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java new file mode 100644 index 0000000..5d6c052 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import java.io.IOException; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext; + +/** + * Hadoop setup task implementation for v1 API. + */ +public class HadoopV1SetupTask extends HadoopV1Task { + /** + * Constructor. + * + * @param taskInfo Task info. + */ + public HadoopV1SetupTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + try { + ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf()); + + OutputCommitter committer = ctx.jobConf().getOutputCommitter(); + + if (committer != null) + committer.setupJob(ctx.jobContext()); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java new file mode 100644 index 0000000..7b9080e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop API v1 splitter. + */ +public class HadoopV1Splitter { + /** */ + private static final String[] EMPTY_HOSTS = {}; + + /** + * @param jobConf Job configuration. + * @return Collection of mapped splits. + * @throws IgniteCheckedException If mapping failed. + */ + public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { + try { + InputFormat<?, ?> format = jobConf.getInputFormat(); + + assert format != null; + + InputSplit[] splits = format.getSplits(jobConf, 0); + + Collection<HadoopInputSplit> res = new ArrayList<>(splits.length); + + for (int i = 0; i < splits.length; i++) { + InputSplit nativeSplit = splits[i]; + + if (nativeSplit instanceof FileSplit) { + FileSplit s = (FileSplit)nativeSplit; + + res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + } + else + res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); + } + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * @param clsName Input split class name. + * @param in Input stream. + * @param hosts Optional hosts. + * @return File block or {@code null} if it is not a {@link FileSplit} instance. + * @throws IgniteCheckedException If failed. + */ + @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, + @Nullable String[] hosts) throws IgniteCheckedException { + if (!FileSplit.class.getName().equals(clsName)) + return null; + + FileSplit split = U.newInstance(FileSplit.class); + + try { + split.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + if (hosts == null) + hosts = EMPTY_HOSTS; + + return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java new file mode 100644 index 0000000..648b4f9 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v1; + +import java.io.IOException; +import java.text.NumberFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.ignite.internal.processors.hadoop.HadoopTask; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext; +import org.jetbrains.annotations.Nullable; + +/** + * Extended Hadoop v1 task. + */ +public abstract class HadoopV1Task extends HadoopTask { + /** Indicates that this task is to be cancelled. */ + private volatile boolean cancelled; + + /** + * Constructor. + * + * @param taskInfo Task info. + */ + protected HadoopV1Task(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** + * Gets file name for that task result. + * + * @return File name. + */ + public String fileName() { + NumberFormat numFormat = NumberFormat.getInstance(); + + numFormat.setMinimumIntegerDigits(5); + numFormat.setGroupingUsed(false); + + return "part-" + numFormat.format(info().taskNumber()); + } + + /** + * + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @param attempt Attempt of task. + * @return Collector. + * @throws IOException In case of IO exception. + */ + protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx, + boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { + HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite, + fileName, attempt) { + /** {@inheritDoc} */ + @Override public void collect(Object key, Object val) throws IOException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + super.collect(key, val); + } + }; + + collector.setup(); + + return collector; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancelled = true; + } + + /** Returns true if task is cancelled. */ + public boolean isCancelled() { + return cancelled; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java new file mode 100644 index 0000000..ea7128c --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v2; + +import java.util.Collection; +import java.util.LinkedList; + +/** + * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class. + */ +@SuppressWarnings("UnusedDeclaration") +public class HadoopDaemon extends Thread { + /** Lock object used for synchronization. */ + private static final Object lock = new Object(); + + /** Collection to hold the threads to be stopped. */ + private static Collection<HadoopDaemon> daemons = new LinkedList<>(); + + { + setDaemon(true); // always a daemon + } + + /** Runnable of this thread, may be this. */ + final Runnable runnable; + + /** + * Construct a daemon thread. + */ + public HadoopDaemon() { + super(); + + runnable = this; + + enqueueIfNeeded(); + } + + /** + * Construct a daemon thread. + */ + public HadoopDaemon(Runnable runnable) { + super(runnable); + + this.runnable = runnable; + + this.setName(runnable.toString()); + + enqueueIfNeeded(); + } + + /** + * Construct a daemon thread to be part of a specified thread group. + */ + public HadoopDaemon(ThreadGroup grp, Runnable runnable) { + super(grp, runnable); + + this.runnable = runnable; + + this.setName(runnable.toString()); + + enqueueIfNeeded(); + } + + /** + * Getter for the runnable. May return this. + * + * @return the runnable + */ + public Runnable getRunnable() { + return runnable; + } + + /** + * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable. + * + * @param r the runnable. + * @return true if it is. + */ + private static boolean isPeerCacheRunnable(Runnable r) { + String name = r.getClass().getName(); + + return name.startsWith("org.apache.hadoop.hdfs.PeerCache"); + } + + /** + * Enqueue this thread if it should be stopped upon the task end. + */ + private void enqueueIfNeeded() { + synchronized (lock) { + if (daemons == null) + throw new RuntimeException("Failed to create HadoopDaemon (its registry is already cleared): " + + "[classLoader=" + getClass().getClassLoader() + ']'); + + if (runnable.getClass().getClassLoader() == getClass().getClassLoader() && isPeerCacheRunnable(runnable)) + daemons.add(this); + } + } + + /** + * Stops all the registered threads. + */ + public static void dequeueAndStopAll() { + synchronized (lock) { + if (daemons != null) { + for (HadoopDaemon daemon : daemons) + daemon.interrupt(); + + daemons = null; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopExternalSplit.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopExternalSplit.java new file mode 100644 index 0000000..ef71dce --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopExternalSplit.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v2; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; + +/** + * Split serialized in external file. + */ +public class HadoopExternalSplit extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long off; + + /** + * For {@link Externalizable}. + */ + public HadoopExternalSplit() { + // No-op. + } + + /** + * @param hosts Hosts. + * @param off Offset of this split in external file. + */ + public HadoopExternalSplit(String[] hosts, long off) { + assert off >= 0 : off; + assert hosts != null; + + this.hosts = hosts; + this.off = off; + } + + /** + * @return Offset of this input split in external file. + */ + public long offset() { + return off; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(off); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + off = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopExternalSplit that = (HadoopExternalSplit) o; + + return off == that.off; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(off ^ (off >>> 32)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java new file mode 100644 index 0000000..8f94831 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v2; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.jetbrains.annotations.Nullable; + +/** + * The wrapper around external serializer. + */ +public class HadoopSerializationWrapper<T> implements HadoopSerialization { + /** External serializer - writer. */ + private final Serializer<T> serializer; + + /** External serializer - reader. */ + private final Deserializer<T> deserializer; + + /** Data output for current write operation. */ + private OutputStream currOut; + + /** Data input for current read operation. */ + private InputStream currIn; + + /** Wrapper around current output to provide OutputStream interface. */ + private final OutputStream outStream = new OutputStream() { + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + currOut.write(b); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b, int off, int len) throws IOException { + currOut.write(b, off, len); + } + }; + + /** Wrapper around current input to provide InputStream interface. */ + private final InputStream inStream = new InputStream() { + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return currIn.read(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + return currIn.read(b, off, len); + } + }; + + /** + * @param serialization External serializer to wrap. + * @param cls The class to serialize. + */ + public HadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException { + assert cls != null; + + serializer = serialization.getSerializer(cls); + deserializer = serialization.getDeserializer(cls); + + try { + serializer.open(outStream); + deserializer.open(inStream); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { + assert out != null; + assert obj != null; + + try { + currOut = (OutputStream)out; + + serializer.serialize((T)obj); + + currOut = null; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { + assert in != null; + + try { + currIn = (InputStream)in; + + T res = deserializer.deserialize((T) obj); + + currIn = null; + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + try { + serializer.close(); + deserializer.close(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file