Repository: ignite
Updated Branches:
  refs/heads/ignite-1.6.8-hadoop 0f7ccb3d0 -> b49fe400b


Decoupled counter writer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b49fe400
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b49fe400
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b49fe400

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: b49fe400b5f299d1db09a6467637e00ce1ae29ac
Parents: 0f7ccb3
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Wed Sep 21 13:31:05 2016 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Wed Sep 21 13:31:05 2016 +0300

----------------------------------------------------------------------
 .../fs/IgniteHadoopFileSystemCounterWriter.java | 103 ------------------
 ...doopFileSystemCounterWriterDelegateImpl.java | 107 +++++++++++++++++++
 ...doopIgfsSecondaryFileSystemDelegateImpl.java |   2 -
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  71 ++++++++++++
 .../hadoop/delegate/HadoopDelegateUtils.java    |  16 +++
 .../HadoopFileSystemCounterWriterDelegate.java  |  36 +++++++
 6 files changed, 230 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
deleted file mode 100644
index 8085826..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import 
org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import 
org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.T2;
-
-/**
- * Statistic writer implementation that writes info into any Hadoop file 
system.
- */
-public class IgniteHadoopFileSystemCounterWriter implements 
HadoopCounterWriter {
-    /** */
-    public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
-
-    /** */
-    public static final String COUNTER_WRITER_DIR_PROPERTY = 
"ignite.counters.fswriter.directory";
-
-    /** */
-    private static final String USER_MACRO = "${USER}";
-
-    /** */
-    private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + 
USER_MACRO;
-
-    /** {@inheritDoc} */
-    @Override public void write(HadoopJob job, HadoopCounters cntrs)
-        throws IgniteCheckedException {
-
-        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
-
-        final HadoopJobInfo jobInfo = job.info();
-
-        final HadoopJobId jobId = job.id();
-
-        for (Map.Entry<String, String> e : 
((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
-            hadoopCfg.set(e.getKey(), e.getValue());
-
-        String user = jobInfo.user();
-
-        user = IgfsUtils.fixUserName(user);
-
-        String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
-
-        if (dir == null)
-            dir = DEFAULT_COUNTER_WRITER_DIR;
-
-        Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), 
jobId.toString());
-
-        HadoopPerformanceCounter perfCntr = 
HadoopPerformanceCounter.getCounter(cntrs, null);
-
-        try {
-            hadoopCfg.set(MRJobConfig.USER_NAME, user);
-
-            FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), 
hadoopCfg);
-
-            fs.mkdirs(jobStatPath);
-
-            try (PrintStream out = new PrintStream(fs.create(new 
Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
-                for (T2<String, Long> evt : perfCntr.evts()) {
-                    out.print(evt.get1());
-                    out.print(':');
-                    out.println(evt.get2().toString());
-                }
-
-                out.flush();
-            }
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
new file mode 100644
index 0000000..0b306bc
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import 
org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.T2;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+/**
+ * Counter writer delegate implementation.
+ */
+@SuppressWarnings("unused")
+public class HadoopFileSystemCounterWriterDelegateImpl implements 
HadoopFileSystemCounterWriterDelegate {
+    /** */
+    private static final String USER_MACRO = "${USER}";
+
+    /** */
+    private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + 
USER_MACRO;
+
+    /**
+     * Constructor.
+     *
+     * @param proxy Proxy (not used).
+     */
+    public 
HadoopFileSystemCounterWriterDelegateImpl(IgniteHadoopFileSystemCounterWriter 
proxy) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    public void write(HadoopJob job, HadoopCounters cntrs) throws 
IgniteCheckedException {
+        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
+
+        final HadoopJobInfo jobInfo = job.info();
+
+        final HadoopJobId jobId = job.id();
+
+        for (Map.Entry<String, String> e : 
((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
+            hadoopCfg.set(e.getKey(), e.getValue());
+
+        String user = jobInfo.user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        String dir = 
jobInfo.property(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY);
+
+        if (dir == null)
+            dir = DEFAULT_COUNTER_WRITER_DIR;
+
+        Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), 
jobId.toString());
+
+        HadoopPerformanceCounter perfCntr = 
HadoopPerformanceCounter.getCounter(cntrs, null);
+
+        try {
+            hadoopCfg.set(MRJobConfig.USER_NAME, user);
+
+            FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), 
hadoopCfg);
+
+            fs.mkdirs(jobStatPath);
+
+            try (PrintStream out = new PrintStream(fs.create(
+                new Path(jobStatPath, 
IgniteHadoopFileSystemCounterWriter.PERFORMANCE_COUNTER_FILE_NAME)))) {
+                for (T2<String, Long> evt : perfCntr.evts()) {
+                    out.print(evt.get1());
+                    out.print(':');
+                    out.println(evt.get2().toString());
+                }
+
+                out.flush();
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
index bf93f37..7f7100d 100644
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.FileNotFoundException;
@@ -74,7 +73,6 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl 
implements HadoopIgfsSeco
      *
      * @param proxy Proxy.
     */
-    @SuppressWarnings("unused")
     public 
HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem 
proxy) {
         assert proxy.getFileSystemFactory() != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
new file mode 100644
index 0000000..d11944e
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import 
org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import 
org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import 
org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemCounterWriterDelegate;
+
+/**
+ * Statistic writer implementation that writes info into any Hadoop file 
system.
+ */
+public class IgniteHadoopFileSystemCounterWriter implements 
HadoopCounterWriter {
+    /** */
+    public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
+
+    /** */
+    public static final String COUNTER_WRITER_DIR_PROPERTY = 
"ignite.counters.fswriter.directory";
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Delegate. */
+    private volatile HadoopFileSystemCounterWriterDelegate delegate;
+
+    /** {@inheritDoc} */
+    @Override public void write(HadoopJob job, HadoopCounters cntrs)
+        throws IgniteCheckedException {
+        delegate().write(job, cntrs);
+    }
+
+    /**
+     * Get delegate creating it if needed.
+     *
+     * @return Delegate.
+     */
+    private HadoopFileSystemCounterWriterDelegate delegate() {
+        HadoopFileSystemCounterWriterDelegate delegate0 = delegate;
+
+        if (delegate0 == null) {
+            synchronized (mux) {
+                delegate0 = delegate;
+
+                if (delegate0 == null) {
+                    delegate0 = 
HadoopDelegateUtils.counterWriterDelegate(this);
+
+                    delegate = delegate0;
+                }
+            }
+        }
+
+        return delegate0;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java
index 57c8138..6c39946 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.delegate;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory;
 import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
 import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
 import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory;
 
@@ -42,6 +43,10 @@ public class HadoopDelegateUtils {
     /** Factory proxy to delegate class name mapping. */
     private static final Map<String, String> FACTORY_CLS_MAP;
 
+    /** Counter writer delegate implementation. */
+    private static final String COUNTER_WRITER_DELEGATE_CLS =
+        
"org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemCounterWriterDelegateImpl";
+
     static {
         FACTORY_CLS_MAP = new HashMap<>();
 
@@ -83,6 +88,17 @@ public class HadoopDelegateUtils {
     }
 
     /**
+     * Create delegate for Hadoop counter writer.
+     *
+     * @param proxy Proxy.
+     * @return Delegate.
+     */
+    public static HadoopFileSystemCounterWriterDelegate counterWriterDelegate(
+        IgniteHadoopFileSystemCounterWriter proxy) {
+        return newInstance(COUNTER_WRITER_DELEGATE_CLS, proxy);
+    }
+
+    /**
      * Get new delegate instance.
      *
      * @param clsName Class name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b49fe400/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
new file mode 100644
index 0000000..541cf80
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+
+/**
+ * Counter writer delegate interface.
+ */
+public interface HadoopFileSystemCounterWriterDelegate {
+    /**
+     * Writes counters of given job to some statistics storage.
+     *
+     * @param job The job.
+     * @param cntrs Counters.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void write(HadoopJob job, HadoopCounters cntrs) throws 
IgniteCheckedException;
+}

Reply via email to