http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java new file mode 100644 index 0000000..0d61e0d --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Standard hadoop counter to use via original Hadoop API in Hadoop jobs. + */ +public class HadoopLongCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The counter value. */ + private long val; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopLongCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopLongCounter(String grp, String name) { + super(grp, name); + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + out.writeLong(val); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + val = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounter cntr) { + val += ((HadoopLongCounter)cntr).val; + } + + /** + * Gets current value of this counter. + * + * @return Current value. + */ + public long value() { + return val; + } + + /** + * Sets current value by the given value. + * + * @param val Value to set. + */ + public void value(long val) { + this.val = val; + } + + /** + * Increment this counter by the given value. + * + * @param i Value to increase this counter by. + */ + public void increment(long i) { + val += i; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java new file mode 100644 index 0000000..dedc6b3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY; + +/** + * Counter for the job statistics accumulation. + */ +public class HadoopPerformanceCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The group name for this counter. */ + private static final String GROUP_NAME = "SYSTEM"; + + /** The counter name for this counter. */ + private static final String COUNTER_NAME = "PERFORMANCE"; + + /** Events collections. */ + private Collection<T2<String,Long>> evts = new ArrayList<>(); + + /** Node id to insert into the event info. */ + private UUID nodeId; + + /** */ + private int reducerNum; + + /** */ + private volatile Long firstShuffleMsg; + + /** */ + private volatile Long lastShuffleMsg; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopPerformanceCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopPerformanceCounter(String grp, String name) { + super(grp, name); + } + + /** + * Constructor to create instance to use this as helper. + * + * @param nodeId Id of the work node. + */ + public HadoopPerformanceCounter(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + U.writeCollection(out, evts); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + try { + evts = U.readCollection(in); + } + catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounter cntr) { + evts.addAll(((HadoopPerformanceCounter)cntr).evts); + } + + /** + * Gets the events collection. + * + * @return Collection of event. + */ + public Collection<T2<String, Long>> evts() { + return evts; + } + + /** + * Generate name that consists of some event information. + * + * @param info Task info. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(HadoopTaskInfo info, String evtType) { + return eventName(info.type().toString(), info.taskNumber(), evtType); + } + + /** + * Generate name that consists of some event information. + * + * @param taskType Task type. + * @param taskNum Number of the task. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(String taskType, int taskNum, String evtType) { + assert nodeId != null; + + return taskType + " " + taskNum + " " + evtType + " " + nodeId; + } + + /** + * Adds event of the task submission (task instance creation). + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskSubmit(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "submit"), ts)); + } + + /** + * Adds event of the task preparation. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskPrepare(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "prepare"), ts)); + } + + /** + * Adds event of the task finish. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskFinish(HadoopTaskInfo info, long ts) { + if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) { + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); + + lastShuffleMsg = null; + } + + evts.add(new T2<>(eventName(info, "finish"), ts)); + } + + /** + * Adds event of the task run. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskStart(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "start"), ts)); + } + + /** + * Adds event of the job preparation. + * + * @param ts Timestamp of the event. + */ + public void onJobPrepare(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB prepare " + nodeId, ts)); + } + + /** + * Adds event of the job start. + * + * @param ts Timestamp of the event. + */ + public void onJobStart(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB start " + nodeId, ts)); + } + + /** + * Adds client submission events from job info. + * + * @param info Job info. + */ + public void clientSubmissionEvents(HadoopJobInfo info) { + assert nodeId != null; + + addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY); + } + + /** + * Adds event with timestamp from some property in job info. + * + * @param evt Event type and phase. + * @param info Job info. + * @param propName Property name to get timestamp. + */ + private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) { + String val = info.property(propName); + + if (!F.isEmpty(val)) { + try { + evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val))); + } + catch (NumberFormatException e) { + throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e); + } + } + } + + /** + * Registers shuffle message event. + * + * @param reducerNum Number of reducer that receives the data. + * @param ts Timestamp of the event. + */ + public void onShuffleMessage(int reducerNum, long ts) { + this.reducerNum = reducerNum; + + if (firstShuffleMsg == null) + firstShuffleMsg = ts; + + lastShuffleMsg = ts; + } + + /** + * Gets system predefined performance counter from the HadoopCounters object. + * + * @param cntrs HadoopCounters object. + * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. + * @return Predefined performance counter. + */ + public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) { + HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + + if (nodeId != null) + cntr.nodeId(nodeId); + + return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + } + + /** + * Sets the nodeId field. + * + * @param nodeId Node id. + */ + private void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java new file mode 100644 index 0000000..1ecbee5 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.GridStringBuilder; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** + * File system cache utility methods used by Map-Reduce tasks and jobs. + */ +public class HadoopFileSystemCacheUtils { + /** + * A common static factory method. Creates new HadoopLazyConcurrentMap. + * @return a new HadoopLazyConcurrentMap. + */ + public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { + return new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) throws IOException { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } + } + ); + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It gets the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * The file systems are created and cached in the given map upon first request. + * + * @param uri The file system uri. + * @param cfg The configuration. + * @param map The caching map. + * @return The file system. + * @throws IOException On error. + */ + public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg, + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map) + throws IOException { + assert map != null; + assert cfg != null; + + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + try { + final FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + fs = map.getOrCreate(key); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + private static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java new file mode 100644 index 0000000..68c0dc4 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsConstants; +import org.jetbrains.annotations.Nullable; + +/** + * Utilities for configuring file systems to support the separate working directory per each thread. + */ +public class HadoopFileSystemsUtils { + /** Name of the property for setting working directory on create new local FS instance. */ + public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; + + /** + * Setup wrappers of filesystems to support the separate working directory. + * + * @param cfg Config for setup. + */ + public static void setupFileSystems(Configuration cfg) { + cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); + cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", + HadoopLocalFileSystemV2.class.getName()); + } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..681cddb --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jsr166.ConcurrentHashMap8; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap<K, V extends Closeable> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + + assert getClass().getClassLoader() == Ignite.class.getClassLoader(); + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + ValueWrapper w = map.get(k); + + if (w == null) { + closeLock.readLock().lock(); + + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + wNew.init(); + + w = wNew; + } + } + finally { + closeLock.readLock().unlock(); + } + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Clears the map and closes all the values. + */ + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); + + try { + if (closed) + return; + + closed = true; + + Exception err = null; + + Set<K> keySet = map.keySet(); + + for (K key : keySet) { + V v = null; + + try { + v = map.get(key).getValue(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + if (v != null) { + try { + v.close(); + } + catch (Exception err0) { + if (err == null) + err = err0; + } + } + } + + map.clear(); + + if (err != null) + throw new IgniteCheckedException(err); + } + finally { + closeLock.writeLock().unlock(); + } + } + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Future. */ + private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); + + /** the key */ + private final K key; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + try { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } + } + + /** + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. + */ + V getValue() throws IgniteCheckedException { + return fut.get(); + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Should never return null. + * + * @param key the key to create value for + * @return the value. + * @throws IOException On failure. + */ + public V createValue(K key) throws IOException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java new file mode 100644 index 0000000..cbb007f --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.File; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV1 extends LocalFileSystem { + /** + * Creates new local file system. + */ + public HadoopLocalFileSystemV1() { + super(new HadoopRawLocalFileSystem()); + } + + /** {@inheritDoc} */ + @Override public File pathToFile(Path path) { + return ((HadoopRawLocalFileSystem)getRaw()).convert(path); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java new file mode 100644 index 0000000..2484492 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumFs; +import org.apache.hadoop.fs.DelegateToFileSystem; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.local.LocalConfigKeys; + +import static org.apache.hadoop.fs.FsConstants.LOCAL_FS_URI; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV2 extends ChecksumFs { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { + super(new DelegateFS(cfg)); + } + + /** + * Creates new local file system. + * + * @param uri URI. + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { + this(cfg); + } + + /** + * Delegate file system. + */ + private static class DelegateFS extends DelegateToFileSystem { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { + super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); + } + + /** {@inheritDoc} */ + @Override public int getUriDefaultPort() { + return -1; + } + + /** {@inheritDoc} */ + @Override public FsServerDefaults getServerDefaults() throws IOException { + return LocalConfigKeys.getServerDefaults(); + } + + /** {@inheritDoc} */ + @Override public boolean isValidName(String src) { + return true; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java new file mode 100644 index 0000000..0aac4a3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +/** + * This class lists parameters that can be specified in Hadoop configuration. + * Hadoop configuration can be specified in {@code core-site.xml} file + * or passed to map-reduce task directly when using Hadoop driver for IGFS file system: + * <ul> + * <li> + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides + * the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()} + * IGFS data node configuration property. + * </li> + * <li> + * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If + * {@code true}, then all file system operations will be logged to a file. + * </li> + * <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li> + * <li> + * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before + * it gets flushed to log file. Higher values will imply greater performance, but will increase delay + * before record appears in the log file. + * </li> + * <li> + * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data + * node to which client is connected. If {@code true}, file will not be distributed and will be written + * to a single data node. Default value is {@code true}. + * </li> + * <li> + * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to + * local data node if it has enough free space. After some time it can be redistributed across nodes though. + * </li> + * </ul> + * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in + * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}. + * <p> + * Sample configuration that can be placed to {@code core-site.xml} file: + * <pre name="code" class="xml"> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.enabled</name> + * <value>true</value> + * </property> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.dir</name> + * <value>/home/apache/ignite/log/sampling</value> + * </property> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.batch_size</name> + * <value>16</value> + * </property> + * </pre> + * Parameters could also be specified per mapreduce job, e.g. + * <pre name="code" class="bash"> + * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4 + * </pre> + * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest + * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}. + */ +public class HadoopParameters { + /** Parameter name for control over file colocation write mode. */ + public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes"; + + /** Parameter name for custom sequential reads before prefetch value. */ + public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH = + "fs.igfs.%s.open.sequential_reads_before_prefetch"; + + /** Parameter name for client logger directory. */ + public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir"; + + /** Parameter name for log batch size. */ + public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size"; + + /** Parameter name for log enabled flag. */ + public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled"; + + /** Parameter name for prefer local writes flag. */ + public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java new file mode 100644 index 0000000..b8fc8e7 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.nio.file.Files; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Local file system implementation for Hadoop. + */ +public class HadoopRawLocalFileSystem extends FileSystem { + /** Working directory for each thread. */ + private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() { + @Override protected Path initialValue() { + return getInitialWorkingDirectory(); + } + }; + + /** + * Converts Hadoop path to local path. + * + * @param path Hadoop path. + * @return Local path. + */ + File convert(Path path) { + checkPath(path); + + if (path.isAbsolute()) + return new File(path.toUri().getPath()); + + return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath()); + } + + /** {@inheritDoc} */ + @Override public Path getHomeDirectory() { + return makeQualified(new Path(System.getProperty("user.home"))); + } + + /** {@inheritDoc} */ + @Override public Path getInitialWorkingDirectory() { + File f = new File(System.getProperty("user.dir")); + + return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null); + } + + /** {@inheritDoc} */ + @Override public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + + setConf(conf); + + String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP); + + if (initWorkDir != null) + setWorkingDirectory(new Path(initWorkDir)); + } + + /** {@inheritDoc} */ + @Override public URI getUri() { + return FsConstants.LOCAL_FS_URI; + } + + /** {@inheritDoc} */ + @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return new FSDataInputStream(new InStream(checkExists(convert(f)))); + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize, + short replication, long blockSize, Progressable progress) throws IOException { + File file = convert(f); + + if (!overwrite && !file.createNewFile()) + throw new IOException("Failed to create new file: " + f.toUri()); + + return out(file, false, bufSize); + } + + /** + * @param file File. + * @param append Append flag. + * @return Output stream. + * @throws IOException If failed. + */ + private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException { + return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), + bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme())); + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { + return out(convert(f), true, bufSize); + } + + /** {@inheritDoc} */ + @Override public boolean rename(Path src, Path dst) throws IOException { + return convert(src).renameTo(convert(dst)); + } + + /** {@inheritDoc} */ + @Override public boolean delete(Path f, boolean recursive) throws IOException { + File file = convert(f); + + if (file.isDirectory() && !recursive) + throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri()); + + return U.delete(file); + } + + /** {@inheritDoc} */ + @Override public void setWorkingDirectory(Path dir) { + workDir.set(fixRelativePart(dir)); + + checkPath(dir); + } + + /** {@inheritDoc} */ + @Override public Path getWorkingDirectory() { + return workDir.get(); + } + + /** {@inheritDoc} */ + @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { + if(f == null) + throw new IllegalArgumentException("mkdirs path arg is null"); + + Path parent = f.getParent(); + + File p2f = convert(f); + + if(parent != null) { + File parent2f = convert(parent); + + if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) + throw new FileAlreadyExistsException("Parent path is not a directory: " + parent); + + } + + return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileStatus(Path f) throws IOException { + return fileStatus(checkExists(convert(f))); + } + + /** + * @return File status. + */ + private FileStatus fileStatus(File file) throws IOException { + boolean dir = file.isDirectory(); + + java.nio.file.Path path = dir ? null : file.toPath(); + + return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(), + /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ? + new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI())); + } + + /** + * @param file File. + * @return Same file. + * @throws FileNotFoundException If does not exist. + */ + private static File checkExists(File file) throws FileNotFoundException { + if (!file.exists()) + throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist."); + + return file; + } + + /** {@inheritDoc} */ + @Override public FileStatus[] listStatus(Path f) throws IOException { + File file = convert(f); + + if (checkExists(file).isFile()) + return new FileStatus[] {fileStatus(file)}; + + File[] files = file.listFiles(); + + FileStatus[] res = new FileStatus[files.length]; + + for (int i = 0; i < res.length; i++) + res[i] = fileStatus(files[i]); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean supportsSymlinks() { + return true; + } + + /** {@inheritDoc} */ + @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException { + Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath()); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileLinkStatus(Path f) throws IOException { + return getFileStatus(getLinkTarget(f)); + } + + /** {@inheritDoc} */ + @Override public Path getLinkTarget(Path f) throws IOException { + File file = Files.readSymbolicLink(convert(f).toPath()).toFile(); + + return new Path(file.toURI()); + } + + /** + * Input stream. + */ + private static class InStream extends InputStream implements Seekable, PositionedReadable { + /** */ + private final RandomAccessFile file; + + /** + * @param f File. + * @throws IOException If failed. + */ + public InStream(File f) throws IOException { + file = new RandomAccessFile(f, "r"); + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + return file.read(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + return file.read(b, off, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + file.close(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + long pos0 = file.getFilePointer(); + + file.seek(pos); + int res = file.read(buf, off, len); + + file.seek(pos0); + + return res; + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { + if (read(pos, buf, off, len) != len) + throw new IOException(); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf) throws IOException { + readFully(pos, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + file.seek(pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + return file.getFilePointer(); + } + + /** {@inheritDoc} */ + @Override public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java new file mode 100644 index 0000000..fe43596 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.jetbrains.annotations.Nullable; + +/** + * Facade for communication with grid. + */ +public interface HadoopIgfs { + /** + * Perform handshake. + * + * @param logDir Log directory. + * @return Future with handshake result. + * @throws IgniteCheckedException If failed. + */ + public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; + + /** + * Close connection. + * + * @param force Force flag. + */ + public void close(boolean force); + + /** + * Command to retrieve file info for some IGFS path. + * + * @param path Path to get file info for. + * @return Future for info operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to update file properties. + * + * @param path IGFS path to update properties. + * @param props Properties to update. + * @return Future for update operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Sets last access time and last modification time for a file. + * + * @param path Path to update times. + * @param accessTime Last access time to set. + * @param modificationTime Last modification time to set. + * @throws IgniteCheckedException If failed. + */ + public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, + IOException; + + /** + * Command to rename given path. + * + * @param src Source path. + * @param dest Destination path. + * @return Future for rename operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; + + /** + * Command to delete given path. + * + * @param path Path to delete. + * @param recursive {@code True} if deletion is recursive. + * @return Future for delete operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; + + /** + * Command to get affinity for given path, offset and length. + * + * @param path Path to get affinity for. + * @param start Start position (offset). + * @param len Data length. + * @return Future for affinity command. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, + IOException; + + /** + * Gets path summary. + * + * @param path Path to get summary for. + * @return Future that will be completed when summary is received. + * @throws IgniteCheckedException If failed. + */ + public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to create directories. + * + * @param path Path to create. + * @return Future for mkdirs operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Command to get list of files in directory. + * + * @param path Path to list. + * @return Future for listFiles operation. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to get directory listing. + * + * @param path Path to list. + * @return Future for listPaths operation. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Performs status request. + * + * @return Status response. + * @throws IgniteCheckedException If failed. + */ + public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, + IOException; + + /** + * Command to create file and open it for output. + * + * @param path Path to file. + * @param overwrite If {@code true} then old file contents will be lost. + * @param colocate If {@code true} and called on data node, file will be written on that node. + * @param replication Replication factor. + * @param props File properties for creation. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Open file for output appending data to the end of a file. + * + * @param path Path to file. + * @param create If {@code true}, file will be created if does not exist. + * @param props File properties. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java new file mode 100644 index 0000000..d610091 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.IgniteCheckedException; + +/** + * Communication exception indicating a problem between file system and IGFS instance. + */ +public class HadoopIgfsCommunicationException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given throwable as a nested cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public HadoopIgfsCommunicationException(Exception cause) { + super(cause); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + */ + public HadoopIgfsCommunicationException(String msg) { + super(msg); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + * @param cause Cause. + */ + public HadoopIgfsCommunicationException(String msg, Exception cause) { + super(msg, cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java new file mode 100644 index 0000000..a44e1ae --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** + * IGFS endpoint abstraction. + */ +public class HadoopIgfsEndpoint { + /** Localhost. */ + public static final String LOCALHOST = "127.0.0.1"; + + /** IGFS name. */ + private final String igfsName; + + /** Grid name. */ + private final String gridName; + + /** Host. */ + private final String host; + + /** Port. */ + private final int port; + + /** + * Normalize IGFS URI. + * + * @param uri URI. + * @return Normalized URI. + * @throws IOException If failed. + */ + public static URI normalize(URI uri) throws IOException { + try { + if (!F.eq(IgniteFileSystem.IGFS_SCHEME, uri.getScheme())) + throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri); + + HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(uri.getAuthority()); + + StringBuilder sb = new StringBuilder(); + + if (endpoint.igfs() != null) + sb.append(endpoint.igfs()); + + if (endpoint.grid() != null) + sb.append(":").append(endpoint.grid()); + + return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), + uri.getPath(), uri.getQuery(), uri.getFragment()); + } + catch (URISyntaxException | IgniteCheckedException e) { + throw new IOException("Failed to normalize URI: " + uri, e); + } + } + + /** + * Constructor. + * + * @param connStr Connection string. + * @throws IgniteCheckedException If failed to parse connection string. + */ + public HadoopIgfsEndpoint(@Nullable String connStr) throws IgniteCheckedException { + if (connStr == null) + connStr = ""; + + String[] tokens = connStr.split("@", -1); + + IgniteBiTuple<String, Integer> hostPort; + + if (tokens.length == 1) { + igfsName = null; + gridName = null; + + hostPort = hostPort(connStr, connStr); + } + else if (tokens.length == 2) { + String authStr = tokens[0]; + + if (authStr.isEmpty()) { + gridName = null; + igfsName = null; + } + else { + String[] authTokens = authStr.split(":", -1); + + igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; + + if (authTokens.length == 1) + gridName = null; + else if (authTokens.length == 2) + gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + } + + hostPort = hostPort(connStr, tokens[1]); + } + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + + host = hostPort.get1(); + + assert hostPort.get2() != null; + + port = hostPort.get2(); + } + + /** + * Parse host and port. + * + * @param connStr Full connection string. + * @param hostPortStr Host/port connection string part. + * @return Tuple with host and port. + * @throws IgniteCheckedException If failed to parse connection string. + */ + private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { + String[] tokens = hostPortStr.split(":", -1); + + String host = tokens[0]; + + if (F.isEmpty(host)) + host = LOCALHOST; + + int port; + + if (tokens.length == 1) + port = IgfsIpcEndpointConfiguration.DFLT_PORT; + else if (tokens.length == 2) { + String portStr = tokens[1]; + + try { + port = Integer.valueOf(portStr); + + if (port < 0 || port > 65535) + throw new IgniteCheckedException("Invalid port number: " + connStr); + } + catch (NumberFormatException e) { + throw new IgniteCheckedException("Invalid port number: " + connStr); + } + } + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + + return F.t(host, port); + } + + /** + * @return IGFS name. + */ + @Nullable public String igfs() { + return igfsName; + } + + /** + * @return Grid name. + */ + @Nullable public String grid() { + return gridName; + } + + /** + * @return Host. + */ + public String host() { + return host; + } + + /** + * @return Host. + */ + public boolean isLocal() { + return F.eq(LOCALHOST, host); + } + + /** + * @return Port. + */ + public int port() { + return port; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsEndpoint.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java new file mode 100644 index 0000000..014e2a1 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.jetbrains.annotations.Nullable; + +/** + * Extended IGFS server interface. + */ +public interface HadoopIgfsEx extends HadoopIgfs { + /** + * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. + * If connection is closed already, callback will be invoked synchronously inside this method. + * + * @param delegate Stream delegate. + * @param lsnr Event listener. + */ + public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr); + + /** + * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. + * + * @param delegate Stream delegate. + */ + public void removeEventListener(HadoopIgfsStreamDelegate delegate); + + /** + * Asynchronously reads specified amount of bytes from opened input stream. + * + * @param delegate Stream delegate. + * @param pos Position to read from. + * @param len Data length to read. + * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining + * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will + * be the result of read future. + * @param outOff Output offset. + * @param outLen Output length. + * @return Read data. + */ + public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, + @Nullable final byte[] outBuf, final int outOff, final int outLen); + + /** + * Writes data to the stream with given streamId. This method does not return any future since + * no response to write request is sent. + * + * @param delegate Stream delegate. + * @param data Data to write. + * @param off Offset. + * @param len Length. + * @throws IOException If failed. + */ + public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException; + + /** + * Close server stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * Flush output stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * The user this Igfs instance works on behalf of. + * @return the user name. + */ + public String user(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java new file mode 100644 index 0000000..5ff1b2e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jetbrains.annotations.Nullable; + +/** + * IGFS client future that holds response parse closure. + */ +public class HadoopIgfsFuture<T> extends GridFutureAdapter<T> { + /** */ + private static final long serialVersionUID = 0L; + + /** Output buffer. */ + private byte[] outBuf; + + /** Output offset. */ + private int outOff; + + /** Output length. */ + private int outLen; + + /** Read future flag. */ + private boolean read; + + /** + * @return Output buffer. + */ + public byte[] outputBuffer() { + return outBuf; + } + + /** + * @param outBuf Output buffer. + */ + public void outputBuffer(@Nullable byte[] outBuf) { + this.outBuf = outBuf; + } + + /** + * @return Offset in output buffer to write from. + */ + public int outputOffset() { + return outOff; + } + + /** + * @param outOff Offset in output buffer to write from. + */ + public void outputOffset(int outOff) { + this.outOff = outOff; + } + + /** + * @return Length to write to output buffer. + */ + public int outputLength() { + return outLen; + } + + /** + * @param outLen Length to write to output buffer. + */ + public void outputLength(int outLen) { + this.outLen = outLen; + } + + /** + * @param read {@code True} if this is a read future. + */ + public void read(boolean read) { + this.read = read; + } + + /** + * @return {@code True} if this is a read future. + */ + public boolean read() { + return read; + } +} \ No newline at end of file