[FLINK-8877] [core] Set Kryo trace if Flink log level is TRACE

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0418b41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0418b41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0418b41

Branch: refs/heads/release-1.5
Commit: b0418b41f8fa02d3217b760c5bdfcdd7efdc1eac
Parents: 806f25a
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Mar 4 12:11:29 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 7 21:28:14 2018 +0100

----------------------------------------------------------------------
 .../typeutils/runtime/kryo/KryoSerializer.java  | 14 +++++
 .../typeutils/runtime/kryo/MinlogForwarder.java | 61 ++++++++++++++++++++
 2 files changed, 75 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0418b41/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index f60ce46..06ba906 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -74,6 +74,10 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(KryoSerializer.class);
 
+       static {
+               configureKryoLogging();
+       }
+
        // 
------------------------------------------------------------------------
 
        private final LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
@@ -483,6 +487,16 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                return kryoRegistrations;
        }
 
+       static void configureKryoLogging() {
+               // Kryo uses only DEBUG and TRACE levels
+               // we only forward TRACE level, because even DEBUG levels 
results in
+               // a logging for each object, which is infeasible in Flink.
+               if (LOG.isTraceEnabled()) {
+                       com.esotericsoftware.minlog.Log.setLogger(new 
MinlogForwarder(LOG));
+                       com.esotericsoftware.minlog.Log.TRACE();
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0418b41/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/MinlogForwarder.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/MinlogForwarder.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/MinlogForwarder.java
new file mode 100644
index 0000000..3467923
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/MinlogForwarder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.annotation.Internal;
+
+import com.esotericsoftware.minlog.Log;
+import com.esotericsoftware.minlog.Log.Logger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the Minlog Logger that forwards to slf4j.
+ */
+@Internal
+class MinlogForwarder extends Logger {
+
+       private final org.slf4j.Logger log;
+
+       MinlogForwarder(org.slf4j.Logger log) {
+               this.log = checkNotNull(log);
+       }
+
+       @Override
+       public void log (int level, String category, String message, Throwable 
ex) {
+               final String logString = "[KRYO " + category + "] " + message;
+               switch (level) {
+                       case Log.LEVEL_ERROR:
+                               log.error(logString, ex);
+                               break;
+                       case Log.LEVEL_WARN:
+                               log.warn(logString, ex);
+                               break;
+                       case Log.LEVEL_INFO:
+                               log.info(logString, ex);
+                               break;
+                       case Log.LEVEL_DEBUG:
+                               log.debug(logString, ex);
+                               break;
+                       case Log.LEVEL_TRACE:
+                               log.trace(logString, ex);
+                               break;
+               }
+       }
+}

Reply via email to