Repository: ignite Updated Branches: refs/heads/ignite-1.6.8-hadoop 0f7ccb3d0 -> b49fe400b
Decoupled counter writer. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b49fe400 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b49fe400 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b49fe400 Branch: refs/heads/ignite-1.6.8-hadoop Commit: b49fe400b5f299d1db09a6467637e00ce1ae29ac Parents: 0f7ccb3 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Sep 21 13:31:05 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Sep 21 13:31:05 2016 +0300 ---------------------------------------------------------------------- .../fs/IgniteHadoopFileSystemCounterWriter.java | 103 ------------------ ...doopFileSystemCounterWriterDelegateImpl.java | 107 +++++++++++++++++++ ...doopIgfsSecondaryFileSystemDelegateImpl.java | 2 - .../fs/IgniteHadoopFileSystemCounterWriter.java | 71 ++++++++++++ .../hadoop/delegate/HadoopDelegateUtils.java | 16 +++ .../HadoopFileSystemCounterWriterDelegate.java | 36 +++++++ 6 files changed, 230 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java deleted file mode 100644 index 8085826..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.T2; - -/** - * Statistic writer implementation that writes info into any Hadoop file system. - */ -public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter { - /** */ - public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; - - /** */ - public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; - - /** */ - private static final String USER_MACRO = "${USER}"; - - /** */ - private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; - - /** {@inheritDoc} */ - @Override public void write(HadoopJob job, HadoopCounters cntrs) - throws IgniteCheckedException { - - Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); - - final HadoopJobInfo jobInfo = job.info(); - - final HadoopJobId jobId = job.id(); - - for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) - hadoopCfg.set(e.getKey(), e.getValue()); - - String user = jobInfo.user(); - - user = IgfsUtils.fixUserName(user); - - String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); - - if (dir == null) - dir = DEFAULT_COUNTER_WRITER_DIR; - - Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); - - try { - hadoopCfg.set(MRJobConfig.USER_NAME, user); - - FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg); - - fs.mkdirs(jobStatPath); - - try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { - for (T2<String, Long> evt : perfCntr.evts()) { - out.print(evt.get1()); - out.print(':'); - out.println(evt.get2().toString()); - } - - out.flush(); - } - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java new file mode 100644 index 0000000..0b306bc --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; +import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.T2; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.Map; + +/** + * Counter writer delegate implementation. + */ +@SuppressWarnings("unused") +public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSystemCounterWriterDelegate { + /** */ + private static final String USER_MACRO = "${USER}"; + + /** */ + private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; + + /** + * Constructor. + * + * @param proxy Proxy (not used). + */ + public HadoopFileSystemCounterWriterDelegateImpl(IgniteHadoopFileSystemCounterWriter proxy) { + // No-op. + } + + /** {@inheritDoc} */ + public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException { + Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); + + final HadoopJobInfo jobInfo = job.info(); + + final HadoopJobId jobId = job.id(); + + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) + hadoopCfg.set(e.getKey(), e.getValue()); + + String user = jobInfo.user(); + + user = IgfsUtils.fixUserName(user); + + String dir = jobInfo.property(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY); + + if (dir == null) + dir = DEFAULT_COUNTER_WRITER_DIR; + + Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + try { + hadoopCfg.set(MRJobConfig.USER_NAME, user); + + FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg); + + fs.mkdirs(jobStatPath); + + try (PrintStream out = new PrintStream(fs.create( + new Path(jobStatPath, IgniteHadoopFileSystemCounterWriter.PERFORMANCE_COUNTER_FILE_NAME)))) { + for (T2<String, Long> evt : perfCntr.evts()) { + out.print(evt.get1()); + out.print(':'); + out.println(evt.get2().toString()); + } + + out.flush(); + } + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java index bf93f37..7f7100d 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java @@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; import java.io.FileNotFoundException; @@ -74,7 +73,6 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco * * @param proxy Proxy. */ - @SuppressWarnings("unused") public HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem proxy) { assert proxy.getFileSystemFactory() != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java new file mode 100644 index 0000000..d11944e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemCounterWriterDelegate; + +/** + * Statistic writer implementation that writes info into any Hadoop file system. + */ +public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter { + /** */ + public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; + + /** */ + public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; + + /** Mutex. */ + private final Object mux = new Object(); + + /** Delegate. */ + private volatile HadoopFileSystemCounterWriterDelegate delegate; + + /** {@inheritDoc} */ + @Override public void write(HadoopJob job, HadoopCounters cntrs) + throws IgniteCheckedException { + delegate().write(job, cntrs); + } + + /** + * Get delegate creating it if needed. + * + * @return Delegate. + */ + private HadoopFileSystemCounterWriterDelegate delegate() { + HadoopFileSystemCounterWriterDelegate delegate0 = delegate; + + if (delegate0 == null) { + synchronized (mux) { + delegate0 = delegate; + + if (delegate0 == null) { + delegate0 = HadoopDelegateUtils.counterWriterDelegate(this); + + delegate = delegate0; + } + } + } + + return delegate0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java index 57c8138..6c39946 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.delegate; import org.apache.ignite.IgniteException; import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory; @@ -42,6 +43,10 @@ public class HadoopDelegateUtils { /** Factory proxy to delegate class name mapping. */ private static final Map<String, String> FACTORY_CLS_MAP; + /** Counter writer delegate implementation. */ + private static final String COUNTER_WRITER_DELEGATE_CLS = + "org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemCounterWriterDelegateImpl"; + static { FACTORY_CLS_MAP = new HashMap<>(); @@ -83,6 +88,17 @@ public class HadoopDelegateUtils { } /** + * Create delegate for Hadoop counter writer. + * + * @param proxy Proxy. + * @return Delegate. + */ + public static HadoopFileSystemCounterWriterDelegate counterWriterDelegate( + IgniteHadoopFileSystemCounterWriter proxy) { + return newInstance(COUNTER_WRITER_DELEGATE_CLS, proxy); + } + + /** * Get new delegate instance. * * @param clsName Class name. http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java new file mode 100644 index 0000000..541cf80 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; + +/** + * Counter writer delegate interface. + */ +public interface HadoopFileSystemCounterWriterDelegate { + /** + * Writes counters of given job to some statistics storage. + * + * @param job The job. + * @param cntrs Counters. + * @throws IgniteCheckedException If failed. + */ + public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException; +}