http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopLongCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopLongCounter.java new file mode 100644 index 0000000..0d61e0d --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopLongCounter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Standard hadoop counter to use via original Hadoop API in Hadoop jobs. + */ +public class HadoopLongCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The counter value. */ + private long val; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopLongCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopLongCounter(String grp, String name) { + super(grp, name); + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + out.writeLong(val); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + val = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounter cntr) { + val += ((HadoopLongCounter)cntr).val; + } + + /** + * Gets current value of this counter. + * + * @return Current value. + */ + public long value() { + return val; + } + + /** + * Sets current value by the given value. + * + * @param val Value to set. + */ + public void value(long val) { + this.val = val; + } + + /** + * Increment this counter by the given value. + * + * @param i Value to increase this counter by. + */ + public void increment(long i) { + val += i; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopPerformanceCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopPerformanceCounter.java new file mode 100644 index 0000000..b35b7fb --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopPerformanceCounter.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY; + +/** + * Counter for the job statistics accumulation. + */ +public class HadoopPerformanceCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The group name for this counter. */ + private static final String GROUP_NAME = "SYSTEM"; + + /** The counter name for this counter. */ + private static final String COUNTER_NAME = "PERFORMANCE"; + + /** Events collections. */ + private Collection<T2<String,Long>> evts = new ArrayList<>(); + + /** Node id to insert into the event info. */ + private UUID nodeId; + + /** */ + private int reducerNum; + + /** */ + private volatile Long firstShuffleMsg; + + /** */ + private volatile Long lastShuffleMsg; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopPerformanceCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopPerformanceCounter(String grp, String name) { + super(grp, name); + } + + /** + * Constructor to create instance to use this as helper. + * + * @param nodeId Id of the work node. + */ + public HadoopPerformanceCounter(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + U.writeCollection(out, evts); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + try { + evts = U.readCollection(in); + } + catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounter cntr) { + evts.addAll(((HadoopPerformanceCounter)cntr).evts); + } + + /** + * Gets the events collection. + * + * @return Collection of event. + */ + public Collection<T2<String, Long>> evts() { + return evts; + } + + /** + * Generate name that consists of some event information. + * + * @param info Task info. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(HadoopTaskInfo info, String evtType) { + return eventName(info.type().toString(), info.taskNumber(), evtType); + } + + /** + * Generate name that consists of some event information. + * + * @param taskType Task type. + * @param taskNum Number of the task. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(String taskType, int taskNum, String evtType) { + assert nodeId != null; + + return taskType + " " + taskNum + " " + evtType + " " + nodeId; + } + + /** + * Adds event of the task submission (task instance creation). + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskSubmit(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "submit"), ts)); + } + + /** + * Adds event of the task preparation. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskPrepare(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "prepare"), ts)); + } + + /** + * Adds event of the task finish. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskFinish(HadoopTaskInfo info, long ts) { + if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) { + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); + + lastShuffleMsg = null; + } + + evts.add(new T2<>(eventName(info, "finish"), ts)); + } + + /** + * Adds event of the task run. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskStart(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "start"), ts)); + } + + /** + * Adds event of the job preparation. + * + * @param ts Timestamp of the event. + */ + public void onJobPrepare(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB prepare " + nodeId, ts)); + } + + /** + * Adds event of the job start. + * + * @param ts Timestamp of the event. + */ + public void onJobStart(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB start " + nodeId, ts)); + } + + /** + * Adds client submission events from job info. + * + * @param info Job info. + */ + public void clientSubmissionEvents(HadoopJobInfo info) { + assert nodeId != null; + + addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY); + } + + /** + * Adds event with timestamp from some property in job info. + * + * @param evt Event type and phase. + * @param info Job info. + * @param propName Property name to get timestamp. + */ + private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) { + String val = info.property(propName); + + if (!F.isEmpty(val)) { + try { + evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val))); + } + catch (NumberFormatException e) { + throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e); + } + } + } + + /** + * Registers shuffle message event. + * + * @param reducerNum Number of reducer that receives the data. + * @param ts Timestamp of the event. + */ + public void onShuffleMessage(int reducerNum, long ts) { + this.reducerNum = reducerNum; + + if (firstShuffleMsg == null) + firstShuffleMsg = ts; + + lastShuffleMsg = ts; + } + + /** + * Gets system predefined performance counter from the HadoopCounters object. + * + * @param cntrs HadoopCounters object. + * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. + * @return Predefined performance counter. + */ + public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) { + HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + + if (nodeId != null) + cntr.nodeId(nodeId); + + return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + } + + /** + * Sets the nodeId field. + * + * @param nodeId Node id. + */ + private void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java new file mode 100644 index 0000000..e348820 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.util.UserNameMapper; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Arrays; + +/** + * Basic Hadoop file system factory delegate. + */ +public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate { + /** Proxy. */ + protected final HadoopFileSystemFactory proxy; + + /** Configuration of the secondary filesystem, never null. */ + protected Configuration cfg; + + /** Resulting URI. */ + protected URI fullUri; + + /** User name mapper. */ + private UserNameMapper usrNameMapper; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) { + this.proxy = proxy; + } + + /** {@inheritDoc} */ + @Override public FileSystem get(String name) throws IOException { + String name0 = IgfsUtils.fixUserName(name); + + if (usrNameMapper != null) + name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0)); + + return getWithMappedName(name0); + } + + /** + * Internal file system create routine. + * + * @param usrName User name. + * @return File system. + * @throws IOException If failed. + */ + protected FileSystem getWithMappedName(String usrName) throws IOException { + assert cfg != null; + + try { + // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation. + // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context + // classloader to classloader of current class to avoid strange class-cast-exceptions. + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + return create(usrName); + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } + + /** + * Internal file system creation routine, invoked in correct class loader context. + * + * @param usrName User name. + * @return File system. + * @throws IOException If failed. + * @throws InterruptedException if the current thread is interrupted. + */ + protected FileSystem create(String usrName) throws IOException, InterruptedException { + return FileSystem.get(fullUri, cfg, usrName); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy; + + cfg = HadoopUtils.safeCreateConfiguration(); + + if (proxy0.getConfigPaths() != null) { + for (String cfgPath : proxy0.getConfigPaths()) { + if (cfgPath == null) + throw new NullPointerException("Configuration path cannot be null: " + + Arrays.toString(proxy0.getConfigPaths())); + else { + URL url = U.resolveIgniteUrl(cfgPath); + + if (url == null) { + // If secConfPath is given, it should be resolvable: + throw new IgniteException("Failed to resolve secondary file system configuration path " + + "(ensure that it exists locally and you have read access to it): " + cfgPath); + } + + cfg.addResource(url); + } + } + } + + // If secondary fs URI is not given explicitly, try to get it from the configuration: + if (proxy0.getUri() == null) + fullUri = FileSystem.getDefaultUri(cfg); + else { + try { + fullUri = new URI(proxy0.getUri()); + } + catch (URISyntaxException use) { + throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri()); + } + } + + usrNameMapper = proxy0.getUserNameMapper(); + + if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) + ((LifecycleAware)usrNameMapper).start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) + ((LifecycleAware)usrNameMapper).stop(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java new file mode 100644 index 0000000..04bbeb8 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; + +import java.io.IOException; + +/** + * Caching Hadoop file system factory delegate. + */ +public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate { + /** Per-user file system cache. */ + private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { + @Override public FileSystem createValue(String key) throws IOException { + return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key); + } + } + ); + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) { + super(proxy); + } + + /** {@inheritDoc} */ + @Override public FileSystem getWithMappedName(String name) throws IOException { + return cache.getOrCreate(name); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + super.start(); + + // Disable caching. + cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + super.stop(); + + try { + cache.close(); + } + catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java new file mode 100644 index 0000000..3eb6239 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; + +/** + * Hadoop file system factory delegate for non-standard factories. + */ +public class HadoopDefaultFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate { + /** Factory. */ + private final HadoopFileSystemFactory factory; + + /** + * Constructor. + * + * @param factory Factory. + */ + public HadoopDefaultFileSystemFactoryDelegate(HadoopFileSystemFactory factory) { + assert factory != null; + + this.factory = factory; + } + + /** {@inheritDoc} */ + @Override public FileSystem get(String usrName) throws IOException { + return (FileSystem)factory.get(usrName); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (factory instanceof LifecycleAware) + ((LifecycleAware)factory).start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (factory instanceof LifecycleAware) + ((LifecycleAware)factory).stop(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java new file mode 100644 index 0000000..ae1b244 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopDefaultJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.T2; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.Map; + +/** + * Counter writer delegate implementation. + */ +@SuppressWarnings("unused") +public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSystemCounterWriterDelegate { + /** */ + private static final String USER_MACRO = "${USER}"; + + /** */ + private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; + + /** + * Constructor. + * + * @param proxy Proxy (not used). + */ + public HadoopFileSystemCounterWriterDelegateImpl(IgniteHadoopFileSystemCounterWriter proxy) { + // No-op. + } + + /** {@inheritDoc} */ + public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException { + Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); + + final HadoopJobInfo jobInfo = job.info(); + + final HadoopJobId jobId = job.id(); + + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) + hadoopCfg.set(e.getKey(), e.getValue()); + + String user = jobInfo.user(); + + user = IgfsUtils.fixUserName(user); + + String dir = jobInfo.property(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY); + + if (dir == null) + dir = DEFAULT_COUNTER_WRITER_DIR; + + Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + try { + hadoopCfg.set(MRJobConfig.USER_NAME, user); + + FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg); + + fs.mkdirs(jobStatPath); + + try (PrintStream out = new PrintStream(fs.create( + new Path(jobStatPath, IgniteHadoopFileSystemCounterWriter.PERFORMANCE_COUNTER_FILE_NAME)))) { + for (T2<String, Long> evt : perfCntr.evts()) { + out.print(evt.get1()); + out.print(':'); + out.println(evt.get2().toString()); + } + + out.flush(); + } + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java new file mode 100644 index 0000000..7f7100d --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsParentNotDirectoryException; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; +import org.apache.ignite.igfs.IgfsPathNotFoundException; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Secondary file system implementation. + */ +@SuppressWarnings("unused") +public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSecondaryFileSystemDelegate { + /** The default user name. It is used if no user context is set. */ + private final String dfltUsrName; + + /** Factory. */ + private final HadoopFileSystemFactoryDelegate factory; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem proxy) { + assert proxy.getFileSystemFactory() != null; + + dfltUsrName = IgfsUtils.fixUserName(proxy.getDefaultUserName()); + + HadoopFileSystemFactory factory0 = proxy.getFileSystemFactory(); + + if (factory0 == null) + factory0 = new CachingHadoopFileSystemFactory(); + + factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + try { + return fileSystemForUser().exists(convert(path)); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); + + final FileSystem fileSys = fileSystemForUser(); + + try { + if (props0.userName() != null || props0.groupName() != null) + fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); + + if (props0.permission() != null) + fileSys.setPermission(convert(path), props0.permission()); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); + } + + //Result is not used in case of secondary FS. + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // Delegate to the secondary file system. + try { + if (!fileSystemForUser().rename(convert(src), convert(dest))) + throw new IgfsException("Failed to rename (secondary file system returned false) " + + "[src=" + src + ", dest=" + dest + ']'); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + try { + return fileSystemForUser().delete(convert(path), recursive); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + try { + if (!fileSystemForUser().mkdirs(convert(path))) + throw new IgniteException("Failed to make directories [path=" + path + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + try { + if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + try { + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); + + if (statuses == null) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsPath> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) + res.add(new IgfsPath(path, status.getPath().getName())); + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + try { + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); + + if (statuses == null) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsFile> res = new ArrayList<>(statuses.length); + + for (FileStatus s : statuses) { + IgfsEntryInfo fsInfo = s.isDirectory() ? + IgfsUtils.createDirectory( + IgniteUuid.randomUuid(), + null, + properties(s), + s.getAccessTime(), + s.getModificationTime() + ) : + IgfsUtils.createFile( + IgniteUuid.randomUuid(), + (int)s.getBlockSize(), + s.getLen(), + null, + null, + false, + properties(s), + s.getAccessTime(), + s.getModificationTime() + ); + + res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1)); + } + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + try { + return fileSystemForUser().create(convert(path), overwrite); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + HadoopIgfsProperties props0 = + new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); + + try { + return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize, + (short) replication, blockSize, null); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + + ", blockSize=" + blockSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + try { + return fileSystemForUser().append(convert(path), bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + try { + final FileStatus status = fileSystemForUser().getFileStatus(convert(path)); + + if (status == null) + return null; + + final Map<String, String> props = properties(status); + + return new IgfsFile() { + @Override public IgfsPath path() { + return path; + } + + @Override public boolean isFile() { + return status.isFile(); + } + + @Override public boolean isDirectory() { + return status.isDirectory(); + } + + @Override public int blockSize() { + // By convention directory has blockSize == 0, while file has blockSize > 0: + return isDirectory() ? 0 : (int)status.getBlockSize(); + } + + @Override public long groupBlockSize() { + return status.getBlockSize(); + } + + @Override public long accessTime() { + return status.getAccessTime(); + } + + @Override public long modificationTime() { + return status.getModificationTime(); + } + + @Override public String property(String name) throws IllegalArgumentException { + String val = props.get(name); + + if (val == null) + throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); + + return val; + } + + @Nullable @Override public String property(String name, @Nullable String dfltVal) { + String val = props.get(name); + + return val == null ? dfltVal : val; + } + + @Override public long length() { + return status.getLen(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props; + } + }; + } + catch (FileNotFoundException ignore) { + return null; + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + try { + // We don't use FileSystem#getUsed() since it counts only the files + // in the filesystem root, not all the files recursively. + return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed(); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get used space size of file system."); + } + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { + try { + // We don't use FileSystem#getUsed() since it counts only the files + // in the filesystem root, not all the files recursively. + fileSystemForUser().setTimes(convert(path), modificationTime, accessTime); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed set times for path: " + path); + } + } + + /** {@inheritDoc} */ + public void start() { + factory.start(); + } + + /** {@inheritDoc} */ + public void stop() { + factory.stop(); + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + URI uri = fileSystemForUser().getUri(); + + return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); + } + + /** + * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. + * + * @param e Exception to check. + * @param detailMsg Detailed error message. + * @return Appropriate exception. + */ + private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { + return cast(detailMsg, e); + } + + /** + * Cast IO exception to IGFS exception. + * + * @param e IO exception. + * @return IGFS exception. + */ + public static IgfsException cast(String msg, IOException e) { + if (e instanceof FileNotFoundException) + return new IgfsPathNotFoundException(e); + else if (e instanceof ParentNotDirectoryException) + return new IgfsParentNotDirectoryException(msg, e); + else if (e instanceof PathIsNotEmptyDirectoryException) + return new IgfsDirectoryNotEmptyException(e); + else if (e instanceof PathExistsException) + return new IgfsPathAlreadyExistsException(msg, e); + else + return new IgfsException(msg, e); + } + + /** + * Convert Hadoop FileStatus properties to map. + * + * @param status File status. + * @return IGFS attributes. + */ + private static Map<String, String> properties(FileStatus status) { + FsPermission perm = status.getPermission(); + + if (perm == null) + perm = FsPermission.getDefault(); + + HashMap<String, String> res = new HashMap<>(3); + + res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort())); + res.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); + res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); + + return res; + } + + /** + * Gets the FileSystem for the current context user. + * @return the FileSystem instance, never null. + */ + private FileSystem fileSystemForUser() { + String user = IgfsUserContext.currentUser(); + + if (F.isEmpty(user)) + user = IgfsUtils.fixUserName(dfltUsrName); + + assert !F.isEmpty(user); + + try { + return (FileSystem)factory.get(user); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsSecondaryFileSystemDelegateImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java new file mode 100644 index 0000000..19c470e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * Kerberos Hadoop file system factory delegate. + */ +public class HadoopKerberosFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate { + /** The re-login interval. */ + private long reloginInterval; + + /** Time of last re-login attempt, in system milliseconds. */ + private volatile long lastReloginTime; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopKerberosFileSystemFactoryDelegate(KerberosHadoopFileSystemFactory proxy) { + super(proxy); + } + + /** {@inheritDoc} */ + @Override public FileSystem getWithMappedName(String name) throws IOException { + reloginIfNeeded(); + + return super.getWithMappedName(name); + } + + /** {@inheritDoc} */ + @Override protected FileSystem create(String usrName) throws IOException, InterruptedException { + UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName, + UserGroupInformation.getLoginUser()); + + return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() { + @Override public FileSystem run() throws Exception { + return FileSystem.get(fullUri, cfg); + } + }); + } + + @Override public void start() throws IgniteException { + super.start(); + + KerberosHadoopFileSystemFactory proxy0 = (KerberosHadoopFileSystemFactory)proxy; + + A.ensure(!F.isEmpty(proxy0.getKeyTab()), "keyTab cannot not be empty."); + A.ensure(!F.isEmpty(proxy0.getKeyTabPrincipal()), "keyTabPrincipal cannot not be empty."); + A.ensure(proxy0.getReloginInterval() >= 0, "reloginInterval cannot not be negative."); + + reloginInterval = proxy0.getReloginInterval(); + + try { + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(proxy0.getKeyTabPrincipal(), proxy0.getKeyTab()); + } + catch (IOException ioe) { + throw new IgniteException("Failed login from keytab [keyTab=" + proxy0.getKeyTab() + + ", keyTabPrincipal=" + proxy0.getKeyTabPrincipal() + ']', ioe); + } + } + + /** + * Re-logins the user if needed. + * First, the re-login interval defined in factory is checked. The re-login attempts will be not more + * frequent than one attempt per {@code reloginInterval}. + * Second, {@code UserGroupInformation.checkTGTAndReloginFromKeytab()} method invoked that gets existing + * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login. + * + * <p>This operation expected to be called upon each operation with the file system created with the factory. + * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there + * is no need to invoke it otherwise specially. + * + * @throws IOException If login fails. + */ + private void reloginIfNeeded() throws IOException { + long now = System.currentTimeMillis(); + + if (now >= lastReloginTime + reloginInterval) { + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + + lastReloginTime = now; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java new file mode 100644 index 0000000..1ecbee5 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.GridStringBuilder; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** + * File system cache utility methods used by Map-Reduce tasks and jobs. + */ +public class HadoopFileSystemCacheUtils { + /** + * A common static factory method. Creates new HadoopLazyConcurrentMap. + * @return a new HadoopLazyConcurrentMap. + */ + public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { + return new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) throws IOException { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } + } + ); + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It gets the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * The file systems are created and cached in the given map upon first request. + * + * @param uri The file system uri. + * @param cfg The configuration. + * @param map The caching map. + * @return The file system. + * @throws IOException On error. + */ + public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg, + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map) + throws IOException { + assert map != null; + assert cfg != null; + + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + try { + final FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + fs = map.getOrCreate(key); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + private static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java new file mode 100644 index 0000000..68c0dc4 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsConstants; +import org.jetbrains.annotations.Nullable; + +/** + * Utilities for configuring file systems to support the separate working directory per each thread. + */ +public class HadoopFileSystemsUtils { + /** Name of the property for setting working directory on create new local FS instance. */ + public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; + + /** + * Setup wrappers of filesystems to support the separate working directory. + * + * @param cfg Config for setup. + */ + public static void setupFileSystems(Configuration cfg) { + cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); + cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", + HadoopLocalFileSystemV2.class.getName()); + } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..681cddb --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jsr166.ConcurrentHashMap8; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap<K, V extends Closeable> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + + assert getClass().getClassLoader() == Ignite.class.getClassLoader(); + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + ValueWrapper w = map.get(k); + + if (w == null) { + closeLock.readLock().lock(); + + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + wNew.init(); + + w = wNew; + } + } + finally { + closeLock.readLock().unlock(); + } + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Clears the map and closes all the values. + */ + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); + + try { + if (closed) + return; + + closed = true; + + Exception err = null; + + Set<K> keySet = map.keySet(); + + for (K key : keySet) { + V v = null; + + try { + v = map.get(key).getValue(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + if (v != null) { + try { + v.close(); + } + catch (Exception err0) { + if (err == null) + err = err0; + } + } + } + + map.clear(); + + if (err != null) + throw new IgniteCheckedException(err); + } + finally { + closeLock.writeLock().unlock(); + } + } + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Future. */ + private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); + + /** the key */ + private final K key; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + try { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } + } + + /** + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. + */ + V getValue() throws IgniteCheckedException { + return fut.get(); + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Should never return null. + * + * @param key the key to create value for + * @return the value. + * @throws IOException On failure. + */ + public V createValue(K key) throws IOException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java new file mode 100644 index 0000000..cbb007f --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.File; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV1 extends LocalFileSystem { + /** + * Creates new local file system. + */ + public HadoopLocalFileSystemV1() { + super(new HadoopRawLocalFileSystem()); + } + + /** {@inheritDoc} */ + @Override public File pathToFile(Path path) { + return ((HadoopRawLocalFileSystem)getRaw()).convert(path); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java new file mode 100644 index 0000000..2484492 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumFs; +import org.apache.hadoop.fs.DelegateToFileSystem; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.local.LocalConfigKeys; + +import static org.apache.hadoop.fs.FsConstants.LOCAL_FS_URI; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV2 extends ChecksumFs { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { + super(new DelegateFS(cfg)); + } + + /** + * Creates new local file system. + * + * @param uri URI. + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { + this(cfg); + } + + /** + * Delegate file system. + */ + private static class DelegateFS extends DelegateToFileSystem { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { + super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); + } + + /** {@inheritDoc} */ + @Override public int getUriDefaultPort() { + return -1; + } + + /** {@inheritDoc} */ + @Override public FsServerDefaults getServerDefaults() throws IOException { + return LocalConfigKeys.getServerDefaults(); + } + + /** {@inheritDoc} */ + @Override public boolean isValidName(String src) { + return true; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java new file mode 100644 index 0000000..0aac4a3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +/** + * This class lists parameters that can be specified in Hadoop configuration. + * Hadoop configuration can be specified in {@code core-site.xml} file + * or passed to map-reduce task directly when using Hadoop driver for IGFS file system: + * <ul> + * <li> + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides + * the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()} + * IGFS data node configuration property. + * </li> + * <li> + * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If + * {@code true}, then all file system operations will be logged to a file. + * </li> + * <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li> + * <li> + * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before + * it gets flushed to log file. Higher values will imply greater performance, but will increase delay + * before record appears in the log file. + * </li> + * <li> + * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data + * node to which client is connected. If {@code true}, file will not be distributed and will be written + * to a single data node. Default value is {@code true}. + * </li> + * <li> + * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to + * local data node if it has enough free space. After some time it can be redistributed across nodes though. + * </li> + * </ul> + * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in + * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}. + * <p> + * Sample configuration that can be placed to {@code core-site.xml} file: + * <pre name="code" class="xml"> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.enabled</name> + * <value>true</value> + * </property> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.dir</name> + * <value>/home/apache/ignite/log/sampling</value> + * </property> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.batch_size</name> + * <value>16</value> + * </property> + * </pre> + * Parameters could also be specified per mapreduce job, e.g. + * <pre name="code" class="bash"> + * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4 + * </pre> + * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest + * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}. + */ +public class HadoopParameters { + /** Parameter name for control over file colocation write mode. */ + public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes"; + + /** Parameter name for custom sequential reads before prefetch value. */ + public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH = + "fs.igfs.%s.open.sequential_reads_before_prefetch"; + + /** Parameter name for client logger directory. */ + public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir"; + + /** Parameter name for log batch size. */ + public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size"; + + /** Parameter name for log enabled flag. */ + public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled"; + + /** Parameter name for prefer local writes flag. */ + public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes"; +} \ No newline at end of file