http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java new file mode 100644 index 0000000..924312e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.FileSystemCounter; +import org.apache.hadoop.mapreduce.counters.AbstractCounters; +import org.apache.hadoop.mapreduce.counters.Limits; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Counter; +import org.apache.ignite.internal.util.typedef.T2; + +/** + * Hadoop counters adapter. + */ +public class HadoopMapReduceCounters extends Counters { + /** */ + private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>(); + + /** + * Creates new instance based on given counters. + * + * @param cntrs Counters to adapt. + */ + public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) { + for (HadoopCounter cntr : cntrs.all()) + if (cntr instanceof HadoopLongCounter) + this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr); + } + + /** {@inheritDoc} */ + @Override public synchronized CounterGroup addGroup(CounterGroup grp) { + return addGroup(grp.getName(), grp.getDisplayName()); + } + + /** {@inheritDoc} */ + @Override public CounterGroup addGroup(String name, String displayName) { + return new HadoopMapReduceCounterGroup(this, name); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String grpName, String cntrName) { + return findCounter(grpName, cntrName, true); + } + + /** {@inheritDoc} */ + @Override public synchronized Counter findCounter(Enum<?> key) { + return findCounter(key.getDeclaringClass().getName(), key.name(), true); + } + + /** {@inheritDoc} */ + @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { + return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); + } + + /** {@inheritDoc} */ + @Override public synchronized Iterable<String> getGroupNames() { + Collection<String> res = new HashSet<>(); + + for (HadoopCounter counter : cntrs.values()) + res.add(counter.group()); + + return res; + } + + /** {@inheritDoc} */ + @Override public Iterator<CounterGroup> iterator() { + final Iterator<String> iter = getGroupNames().iterator(); + + return new Iterator<CounterGroup>() { + @Override public boolean hasNext() { + return iter.hasNext(); + } + + @Override public CounterGroup next() { + if (!hasNext()) + throw new NoSuchElementException(); + + return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next()); + } + + @Override public void remove() { + throw new UnsupportedOperationException("not implemented"); + } + }; + } + + /** {@inheritDoc} */ + @Override public synchronized CounterGroup getGroup(String grpName) { + return new HadoopMapReduceCounterGroup(this, grpName); + } + + /** {@inheritDoc} */ + @Override public synchronized int countCounters() { + return cntrs.size(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) { + for (CounterGroup group : other) { + for (Counter counter : group) { + findCounter(group.getName(), counter.getName()).increment(counter.getValue()); + } + } + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object genericRight) { + if (!(genericRight instanceof HadoopMapReduceCounters)) + return false; + + return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return cntrs.hashCode(); + } + + /** {@inheritDoc} */ + @Override public void setWriteAllCounters(boolean snd) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean getWriteAllCounters() { + return true; + } + + /** {@inheritDoc} */ + @Override public Limits limits() { + return null; + } + + /** + * Returns size of a group. + * + * @param grpName Name of the group. + * @return amount of counters in the given group. + */ + public int groupSize(String grpName) { + int res = 0; + + for (HadoopCounter counter : cntrs.values()) { + if (grpName.equals(counter.group())) + res++; + } + + return res; + } + + /** + * Returns counters iterator for specified group. + * + * @param grpName Name of the group to iterate. + * @return Counters iterator. + */ + public Iterator<Counter> iterateGroup(String grpName) { + Collection<Counter> grpCounters = new ArrayList<>(); + + for (HadoopLongCounter counter : cntrs.values()) { + if (grpName.equals(counter.group())) + grpCounters.add(new HadoopV2Counter(counter)); + } + + return grpCounters.iterator(); + } + + /** + * Find a counter in the group. + * + * @param grpName The name of the counter group. + * @param cntrName The name of the counter. + * @param create Create the counter if not found if true. + * @return The counter that was found or added or {@code null} if create is false. + */ + public Counter findCounter(String grpName, String cntrName, boolean create) { + T2<String, String> key = new T2<>(grpName, cntrName); + + HadoopLongCounter internalCntr = cntrs.get(key); + + if (internalCntr == null & create) { + internalCntr = new HadoopLongCounter(grpName,cntrName); + + cntrs.put(key, new HadoopLongCounter(grpName,cntrName)); + } + + return internalCntr == null ? null : new HadoopV2Counter(internalCntr); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java new file mode 100644 index 0000000..bb7c66a --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobPriority; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; +import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopSplitWrapper; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Hadoop utility methods. + */ +public class HadoopUtils { + /** Staging constant. */ + private static final String STAGING_CONSTANT = ".staging"; + + /** Old mapper class attribute. */ + private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class"; + + /** Old reducer class attribute. */ + private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; + + /** + * Constructor. + */ + private HadoopUtils() { + // No-op. + } + + /** + * Wraps native split. + * + * @param id Split ID. + * @param split Split. + * @param hosts Hosts. + * @throws IOException If failed. + */ + public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException { + ByteArrayOutputStream arr = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(arr); + + assert split instanceof Writable; + + ((Writable)split).write(out); + + out.flush(); + + return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts); + } + + /** + * Unwraps native split. + * + * @param o Wrapper. + * @return Split. + */ + public static Object unwrapSplit(HadoopSplitWrapper o) { + try { + Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance(); + + w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes()))); + + return w; + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * Convert Ignite job status to Hadoop job status. + * + * @param status Ignite job status. + * @return Hadoop job status. + */ + public static JobStatus status(HadoopJobStatus status, Configuration conf) { + JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId()); + + float setupProgress = 0; + float mapProgress = 0; + float reduceProgress = 0; + float cleanupProgress = 0; + + JobStatus.State state = JobStatus.State.RUNNING; + + switch (status.jobPhase()) { + case PHASE_SETUP: + setupProgress = 0.42f; + + break; + + case PHASE_MAP: + setupProgress = 1; + mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt(); + + break; + + case PHASE_REDUCE: + setupProgress = 1; + mapProgress = 1; + + if (status.totalReducerCnt() > 0) + reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + else + reduceProgress = 1f; + + break; + + case PHASE_CANCELLING: + case PHASE_COMPLETE: + if (!status.isFailed()) { + setupProgress = 1; + mapProgress = 1; + reduceProgress = 1; + cleanupProgress = 1; + + state = JobStatus.State.SUCCEEDED; + } + else + state = JobStatus.State.FAILED; + + break; + + default: + assert false; + } + + return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state, + JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A"); + } + + /** + * Gets staging area directory. + * + * @param conf Configuration. + * @param usr User. + * @return Staging area directory. + */ + public static Path stagingAreaDir(Configuration conf, String usr) { + return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT); + } + + /** + * Gets job file. + * + * @param conf Configuration. + * @param usr User. + * @param jobId Job ID. + * @return Job file. + */ + public static Path jobFile(Configuration conf, String usr, JobID jobId) { + return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); + } + + /** + * Checks the attribute in configuration is not set. + * + * @param attr Attribute name. + * @param msg Message for creation of exception. + * @throws IgniteCheckedException If attribute is set. + */ + public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException { + if (cfg.get(attr) != null) + throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode."); + } + + /** + * Creates JobInfo from hadoop configuration. + * + * @param cfg Hadoop configuration. + * @return Job info. + * @throws IgniteCheckedException If failed. + */ + public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException { + JobConf jobConf = new JobConf(cfg); + + boolean hasCombiner = jobConf.get("mapred.combiner.class") != null + || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null; + + int numReduces = jobConf.getNumReduceTasks(); + + jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null); + + if (jobConf.getUseNewMapper()) { + String mode = "new map API"; + + ensureNotSet(jobConf, "mapred.input.format.class", mode); + ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode); + + if (numReduces != 0) + ensureNotSet(jobConf, "mapred.partitioner.class", mode); + else + ensureNotSet(jobConf, "mapred.output.format.class", mode); + } + else { + String mode = "map compatibility"; + + ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode); + ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode); + + if (numReduces != 0) + ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode); + else + ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); + } + + if (numReduces != 0) { + jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null); + + if (jobConf.getUseNewReducer()) { + String mode = "new reduce API"; + + ensureNotSet(jobConf, "mapred.output.format.class", mode); + ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode); + } + else { + String mode = "reduce compatibility"; + + ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); + ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode); + } + } + + Map<String, String> props = new HashMap<>(); + + for (Map.Entry<String, String> entry : jobConf) + props.put(entry.getKey(), entry.getValue()); + + return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props); + } + + /** + * Throws new {@link IgniteCheckedException} with original exception is serialized into string. + * This is needed to transfer error outside the current class loader. + * + * @param e Original exception. + * @return IgniteCheckedException New exception. + */ + public static IgniteCheckedException transformException(Throwable e) { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + + e.printStackTrace(new PrintStream(os, true)); + + return new IgniteCheckedException(os.toString()); + } + + /** + * Returns work directory for job execution. + * + * @param locNodeId Local node ID. + * @param jobId Job ID. + * @return Working directory for job. + * @throws IgniteCheckedException If Failed. + */ + public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException { + return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId); + } + + /** + * Returns subdirectory of job working directory for task execution. + * + * @param locNodeId Local node ID. + * @param info Task info. + * @return Working directory for task. + * @throws IgniteCheckedException If Failed. + */ + public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException { + File jobLocDir = jobLocalDir(locNodeId, info.jobId()); + + return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); + } + + /** + * Creates {@link Configuration} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link Configuration}. + */ + public static Configuration safeCreateConfiguration() { + final ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(Configuration.class.getClassLoader()); + + try { + return new Configuration(); + } + finally { + HadoopCommonUtils.restoreContextClassLoader(oldLdr); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java new file mode 100644 index 0000000..a190b14 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.delegate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.util.UserNameMapper; +import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Arrays; + +/** + * Basic Hadoop file system factory delegate. + */ +public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate { + /** Proxy. */ + protected final HadoopFileSystemFactory proxy; + + /** Configuration of the secondary filesystem, never null. */ + protected Configuration cfg; + + /** Resulting URI. */ + protected URI fullUri; + + /** User name mapper. */ + private UserNameMapper usrNameMapper; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) { + this.proxy = proxy; + } + + /** {@inheritDoc} */ + @Override public FileSystem get(String name) throws IOException { + String name0 = IgfsUtils.fixUserName(name); + + if (usrNameMapper != null) + name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0)); + + return getWithMappedName(name0); + } + + /** + * Internal file system create routine. + * + * @param usrName User name. + * @return File system. + * @throws IOException If failed. + */ + protected FileSystem getWithMappedName(String usrName) throws IOException { + assert cfg != null; + + try { + // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation. + // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context + // classloader to classloader of current class to avoid strange class-cast-exceptions. + ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + return create(usrName); + } + finally { + HadoopCommonUtils.restoreContextClassLoader(oldLdr); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } + + /** + * Internal file system creation routine, invoked in correct class loader context. + * + * @param usrName User name. + * @return File system. + * @throws IOException If failed. + * @throws InterruptedException if the current thread is interrupted. + */ + protected FileSystem create(String usrName) throws IOException, InterruptedException { + return FileSystem.get(fullUri, cfg, usrName); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy; + + cfg = HadoopUtils.safeCreateConfiguration(); + + if (proxy0.getConfigPaths() != null) { + for (String cfgPath : proxy0.getConfigPaths()) { + if (cfgPath == null) + throw new NullPointerException("Configuration path cannot be null: " + + Arrays.toString(proxy0.getConfigPaths())); + else { + URL url = U.resolveIgniteUrl(cfgPath); + + if (url == null) { + // If secConfPath is given, it should be resolvable: + throw new IgniteException("Failed to resolve secondary file system configuration path " + + "(ensure that it exists locally and you have read access to it): " + cfgPath); + } + + cfg.addResource(url); + } + } + } + + // If secondary fs URI is not given explicitly, try to get it from the configuration: + if (proxy0.getUri() == null) + fullUri = FileSystem.getDefaultUri(cfg); + else { + try { + fullUri = new URI(proxy0.getUri()); + } + catch (URISyntaxException use) { + throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri()); + } + } + + usrNameMapper = proxy0.getUserNameMapper(); + + if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) + ((LifecycleAware)usrNameMapper).start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) + ((LifecycleAware)usrNameMapper).stop(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java new file mode 100644 index 0000000..0cec8ca --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap; + +import java.io.IOException; + +/** + * Caching Hadoop file system factory delegate. + */ +public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate { + /** Per-user file system cache. */ + private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { + @Override public FileSystem createValue(String key) throws IOException { + return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key); + } + } + ); + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) { + super(proxy); + } + + /** {@inheritDoc} */ + @Override public FileSystem getWithMappedName(String name) throws IOException { + return cache.getOrCreate(name); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + super.start(); + + // Disable caching. + cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + super.stop(); + + try { + cache.close(); + } + catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java new file mode 100644 index 0000000..20ac88e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; + +/** + * Hadoop file system factory delegate for non-standard factories. + */ +public class HadoopDefaultFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate { + /** Factory. */ + private final HadoopFileSystemFactory factory; + + /** + * Constructor. + * + * @param factory Factory. + */ + public HadoopDefaultFileSystemFactoryDelegate(HadoopFileSystemFactory factory) { + assert factory != null; + + this.factory = factory; + } + + /** {@inheritDoc} */ + @Override public FileSystem get(String usrName) throws IOException { + return (FileSystem)factory.get(usrName); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (factory instanceof LifecycleAware) + ((LifecycleAware)factory).start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (factory instanceof LifecycleAware) + ((LifecycleAware)factory).stop(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java new file mode 100644 index 0000000..9cc2be4 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.delegate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemCounterWriterDelegate; +import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.T2; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.Map; + +/** + * Counter writer delegate implementation. + */ +@SuppressWarnings("unused") +public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSystemCounterWriterDelegate { + /** */ + private static final String USER_MACRO = "${USER}"; + + /** */ + private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; + + /** + * Constructor. + * + * @param proxy Proxy (not used). + */ + public HadoopFileSystemCounterWriterDelegateImpl(IgniteHadoopFileSystemCounterWriter proxy) { + // No-op. + } + + /** {@inheritDoc} */ + public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException { + Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); + + final HadoopJobInfo jobInfo = job.info(); + + final HadoopJobId jobId = job.id(); + + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) + hadoopCfg.set(e.getKey(), e.getValue()); + + String user = jobInfo.user(); + + user = IgfsUtils.fixUserName(user); + + String dir = jobInfo.property(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY); + + if (dir == null) + dir = DEFAULT_COUNTER_WRITER_DIR; + + Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + try { + hadoopCfg.set(MRJobConfig.USER_NAME, user); + + FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg); + + fs.mkdirs(jobStatPath); + + try (PrintStream out = new PrintStream(fs.create( + new Path(jobStatPath, IgniteHadoopFileSystemCounterWriter.PERFORMANCE_COUNTER_FILE_NAME)))) { + for (T2<String, Long> evt : perfCntr.evts()) { + out.print(evt.get1()); + out.print(':'); + out.println(evt.get2().toString()); + } + + out.flush(); + } + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java new file mode 100644 index 0000000..fcad674 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.delegate; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsParentNotDirectoryException; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; +import org.apache.ignite.igfs.IgfsPathNotFoundException; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProperties; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Secondary file system implementation. + */ +@SuppressWarnings("unused") +public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSecondaryFileSystemDelegate { + /** The default user name. It is used if no user context is set. */ + private final String dfltUsrName; + + /** Factory. */ + private final HadoopFileSystemFactoryDelegate factory; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem proxy) { + assert proxy.getFileSystemFactory() != null; + + dfltUsrName = IgfsUtils.fixUserName(proxy.getDefaultUserName()); + + HadoopFileSystemFactory factory0 = proxy.getFileSystemFactory(); + + if (factory0 == null) + factory0 = new CachingHadoopFileSystemFactory(); + + factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + try { + return fileSystemForUser().exists(convert(path)); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); + + final FileSystem fileSys = fileSystemForUser(); + + try { + if (props0.userName() != null || props0.groupName() != null) + fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); + + if (props0.permission() != null) + fileSys.setPermission(convert(path), props0.permission()); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); + } + + //Result is not used in case of secondary FS. + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // Delegate to the secondary file system. + try { + if (!fileSystemForUser().rename(convert(src), convert(dest))) + throw new IgfsException("Failed to rename (secondary file system returned false) " + + "[src=" + src + ", dest=" + dest + ']'); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + try { + return fileSystemForUser().delete(convert(path), recursive); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + try { + if (!fileSystemForUser().mkdirs(convert(path))) + throw new IgniteException("Failed to make directories [path=" + path + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + try { + if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + try { + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); + + if (statuses == null) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsPath> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) + res.add(new IgfsPath(path, status.getPath().getName())); + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + try { + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); + + if (statuses == null) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsFile> res = new ArrayList<>(statuses.length); + + for (FileStatus s : statuses) { + IgfsEntryInfo fsInfo = s.isDirectory() ? + IgfsUtils.createDirectory( + IgniteUuid.randomUuid(), + null, + properties(s), + s.getAccessTime(), + s.getModificationTime() + ) : + IgfsUtils.createFile( + IgniteUuid.randomUuid(), + (int)s.getBlockSize(), + s.getLen(), + null, + null, + false, + properties(s), + s.getAccessTime(), + s.getModificationTime() + ); + + res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1)); + } + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + try { + return fileSystemForUser().create(convert(path), overwrite); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + HadoopIgfsProperties props0 = + new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); + + try { + return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize, + (short) replication, blockSize, null); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + + ", blockSize=" + blockSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + try { + return fileSystemForUser().append(convert(path), bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + try { + final FileStatus status = fileSystemForUser().getFileStatus(convert(path)); + + if (status == null) + return null; + + final Map<String, String> props = properties(status); + + return new IgfsFile() { + @Override public IgfsPath path() { + return path; + } + + @Override public boolean isFile() { + return status.isFile(); + } + + @Override public boolean isDirectory() { + return status.isDirectory(); + } + + @Override public int blockSize() { + // By convention directory has blockSize == 0, while file has blockSize > 0: + return isDirectory() ? 0 : (int)status.getBlockSize(); + } + + @Override public long groupBlockSize() { + return status.getBlockSize(); + } + + @Override public long accessTime() { + return status.getAccessTime(); + } + + @Override public long modificationTime() { + return status.getModificationTime(); + } + + @Override public String property(String name) throws IllegalArgumentException { + String val = props.get(name); + + if (val == null) + throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); + + return val; + } + + @Nullable @Override public String property(String name, @Nullable String dfltVal) { + String val = props.get(name); + + return val == null ? dfltVal : val; + } + + @Override public long length() { + return status.getLen(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props; + } + }; + } + catch (FileNotFoundException ignore) { + return null; + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + try { + // We don't use FileSystem#getUsed() since it counts only the files + // in the filesystem root, not all the files recursively. + return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed(); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get used space size of file system."); + } + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { + try { + // We don't use FileSystem#getUsed() since it counts only the files + // in the filesystem root, not all the files recursively. + fileSystemForUser().setTimes(convert(path), modificationTime, accessTime); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed set times for path: " + path); + } + } + + /** {@inheritDoc} */ + public void start() { + factory.start(); + } + + /** {@inheritDoc} */ + public void stop() { + factory.stop(); + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + URI uri = fileSystemForUser().getUri(); + + return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); + } + + /** + * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. + * + * @param e Exception to check. + * @param detailMsg Detailed error message. + * @return Appropriate exception. + */ + private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { + return cast(detailMsg, e); + } + + /** + * Cast IO exception to IGFS exception. + * + * @param e IO exception. + * @return IGFS exception. + */ + public static IgfsException cast(String msg, IOException e) { + if (e instanceof FileNotFoundException) + return new IgfsPathNotFoundException(e); + else if (e instanceof ParentNotDirectoryException) + return new IgfsParentNotDirectoryException(msg, e); + else if (e instanceof PathIsNotEmptyDirectoryException) + return new IgfsDirectoryNotEmptyException(e); + else if (e instanceof PathExistsException) + return new IgfsPathAlreadyExistsException(msg, e); + else + return new IgfsException(msg, e); + } + + /** + * Convert Hadoop FileStatus properties to map. + * + * @param status File status. + * @return IGFS attributes. + */ + private static Map<String, String> properties(FileStatus status) { + FsPermission perm = status.getPermission(); + + if (perm == null) + perm = FsPermission.getDefault(); + + HashMap<String, String> res = new HashMap<>(3); + + res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort())); + res.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); + res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); + + return res; + } + + /** + * Gets the FileSystem for the current context user. + * @return the FileSystem instance, never null. + */ + private FileSystem fileSystemForUser() { + String user = IgfsUserContext.currentUser(); + + if (F.isEmpty(user)) + user = IgfsUtils.fixUserName(dfltUsrName); + + assert !F.isEmpty(user); + + try { + return (FileSystem)factory.get(user); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsSecondaryFileSystemDelegateImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java new file mode 100644 index 0000000..c71dedb --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * Kerberos Hadoop file system factory delegate. + */ +public class HadoopKerberosFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate { + /** The re-login interval. */ + private long reloginInterval; + + /** Time of last re-login attempt, in system milliseconds. */ + private volatile long lastReloginTime; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopKerberosFileSystemFactoryDelegate(KerberosHadoopFileSystemFactory proxy) { + super(proxy); + } + + /** {@inheritDoc} */ + @Override public FileSystem getWithMappedName(String name) throws IOException { + reloginIfNeeded(); + + return super.getWithMappedName(name); + } + + /** {@inheritDoc} */ + @Override protected FileSystem create(String usrName) throws IOException, InterruptedException { + UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName, + UserGroupInformation.getLoginUser()); + + return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() { + @Override public FileSystem run() throws Exception { + return FileSystem.get(fullUri, cfg); + } + }); + } + + @Override public void start() throws IgniteException { + super.start(); + + KerberosHadoopFileSystemFactory proxy0 = (KerberosHadoopFileSystemFactory)proxy; + + A.ensure(!F.isEmpty(proxy0.getKeyTab()), "keyTab cannot not be empty."); + A.ensure(!F.isEmpty(proxy0.getKeyTabPrincipal()), "keyTabPrincipal cannot not be empty."); + A.ensure(proxy0.getReloginInterval() >= 0, "reloginInterval cannot not be negative."); + + reloginInterval = proxy0.getReloginInterval(); + + try { + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(proxy0.getKeyTabPrincipal(), proxy0.getKeyTab()); + } + catch (IOException ioe) { + throw new IgniteException("Failed login from keytab [keyTab=" + proxy0.getKeyTab() + + ", keyTabPrincipal=" + proxy0.getKeyTabPrincipal() + ']', ioe); + } + } + + /** + * Re-logins the user if needed. + * First, the re-login interval defined in factory is checked. The re-login attempts will be not more + * frequent than one attempt per {@code reloginInterval}. + * Second, {@code UserGroupInformation.checkTGTAndReloginFromKeytab()} method invoked that gets existing + * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login. + * + * <p>This operation expected to be called upon each operation with the file system created with the factory. + * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there + * is no need to invoke it otherwise specially. + * + * @throws IOException If login fails. + */ + private void reloginIfNeeded() throws IOException { + long now = System.currentTimeMillis(); + + if (now >= lastReloginTime + reloginInterval) { + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + + lastReloginTime = now; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java new file mode 100644 index 0000000..3644511 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.fs; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.GridStringBuilder; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** + * File system cache utility methods used by Map-Reduce tasks and jobs. + */ +public class HadoopFileSystemCacheUtils { + /** + * A common static factory method. Creates new HadoopLazyConcurrentMap. + * @return a new HadoopLazyConcurrentMap. + */ + public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { + return new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) throws IOException { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } + } + ); + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It gets the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * The file systems are created and cached in the given map upon first request. + * + * @param uri The file system uri. + * @param cfg The configuration. + * @param map The caching map. + * @return The file system. + * @throws IOException On error. + */ + public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg, + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map) + throws IOException { + assert map != null; + assert cfg != null; + + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + try { + final FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + fs = map.getOrCreate(key); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + private static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java new file mode 100644 index 0000000..5115cb4 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsConstants; +import org.jetbrains.annotations.Nullable; + +/** + * Utilities for configuring file systems to support the separate working directory per each thread. + */ +public class HadoopFileSystemsUtils { + /** Name of the property for setting working directory on create new local FS instance. */ + public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; + + /** + * Setup wrappers of filesystems to support the separate working directory. + * + * @param cfg Config for setup. + */ + public static void setupFileSystems(Configuration cfg) { + cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); + cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", + HadoopLocalFileSystemV2.class.getName()); + } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..feea932 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.fs; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jsr166.ConcurrentHashMap8; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap<K, V extends Closeable> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + ValueWrapper w = map.get(k); + + if (w == null) { + closeLock.readLock().lock(); + + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + wNew.init(); + + w = wNew; + } + } + finally { + closeLock.readLock().unlock(); + } + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Clears the map and closes all the values. + */ + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); + + try { + if (closed) + return; + + closed = true; + + Exception err = null; + + Set<K> keySet = map.keySet(); + + for (K key : keySet) { + V v = null; + + try { + v = map.get(key).getValue(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + if (v != null) { + try { + v.close(); + } + catch (Exception err0) { + if (err == null) + err = err0; + } + } + } + + map.clear(); + + if (err != null) + throw new IgniteCheckedException(err); + } + finally { + closeLock.writeLock().unlock(); + } + } + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Future. */ + private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); + + /** the key */ + private final K key; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + try { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } + } + + /** + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. + */ + V getValue() throws IgniteCheckedException { + return fut.get(); + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Should never return null. + * + * @param key the key to create value for + * @return the value. + * @throws IOException On failure. + */ + public V createValue(K key) throws IOException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java new file mode 100644 index 0000000..16828a6 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.fs; + +import java.io.File; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV1 extends LocalFileSystem { + /** + * Creates new local file system. + */ + public HadoopLocalFileSystemV1() { + super(new HadoopRawLocalFileSystem()); + } + + /** {@inheritDoc} */ + @Override public File pathToFile(Path path) { + return ((HadoopRawLocalFileSystem)getRaw()).convert(path); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java new file mode 100644 index 0000000..11db6f2 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.fs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumFs; +import org.apache.hadoop.fs.DelegateToFileSystem; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.local.LocalConfigKeys; + +import static org.apache.hadoop.fs.FsConstants.LOCAL_FS_URI; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV2 extends ChecksumFs { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { + super(new DelegateFS(cfg)); + } + + /** + * Creates new local file system. + * + * @param uri URI. + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { + this(cfg); + } + + /** + * Delegate file system. + */ + private static class DelegateFS extends DelegateToFileSystem { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { + super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); + } + + /** {@inheritDoc} */ + @Override public int getUriDefaultPort() { + return -1; + } + + /** {@inheritDoc} */ + @Override public FsServerDefaults getServerDefaults() throws IOException { + return LocalConfigKeys.getServerDefaults(); + } + + /** {@inheritDoc} */ + @Override public boolean isValidName(String src) { + return true; + } + } +} \ No newline at end of file