[FLINK-8877] [core] Set Kryo trace if Flink log level is TRACE
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06110d27 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06110d27 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06110d27 Branch: refs/heads/master Commit: 06110d27d5fcbf939610e2adc780e7ad1c467f6f Parents: b21a092 Author: Stephan Ewen <se...@apache.org> Authored: Sun Mar 4 12:11:29 2018 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Mar 7 18:10:34 2018 +0100 ---------------------------------------------------------------------- .../typeutils/runtime/kryo/KryoSerializer.java | 14 +++++ .../typeutils/runtime/kryo/MinlogForwarder.java | 61 ++++++++++++++++++++ 2 files changed, 75 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/06110d27/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index f60ce46..06ba906 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -74,6 +74,10 @@ public class KryoSerializer<T> extends TypeSerializer<T> { private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class); + static { + configureKryoLogging(); + } + // ------------------------------------------------------------------------ private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers; @@ -483,6 +487,16 @@ public class KryoSerializer<T> extends TypeSerializer<T> { return kryoRegistrations; } + static void configureKryoLogging() { + // Kryo uses only DEBUG and TRACE levels + // we only forward TRACE level, because even DEBUG levels results in + // a logging for each object, which is infeasible in Flink. + if (LOG.isTraceEnabled()) { + com.esotericsoftware.minlog.Log.setLogger(new MinlogForwarder(LOG)); + com.esotericsoftware.minlog.Log.TRACE(); + } + } + // -------------------------------------------------------------------------------------------- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { http://git-wip-us.apache.org/repos/asf/flink/blob/06110d27/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/MinlogForwarder.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/MinlogForwarder.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/MinlogForwarder.java new file mode 100644 index 0000000..3467923 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/MinlogForwarder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import org.apache.flink.annotation.Internal; + +import com.esotericsoftware.minlog.Log; +import com.esotericsoftware.minlog.Log.Logger; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An implementation of the Minlog Logger that forwards to slf4j. + */ +@Internal +class MinlogForwarder extends Logger { + + private final org.slf4j.Logger log; + + MinlogForwarder(org.slf4j.Logger log) { + this.log = checkNotNull(log); + } + + @Override + public void log (int level, String category, String message, Throwable ex) { + final String logString = "[KRYO " + category + "] " + message; + switch (level) { + case Log.LEVEL_ERROR: + log.error(logString, ex); + break; + case Log.LEVEL_WARN: + log.warn(logString, ex); + break; + case Log.LEVEL_INFO: + log.info(logString, ex); + break; + case Log.LEVEL_DEBUG: + log.debug(logString, ex); + break; + case Log.LEVEL_TRACE: + log.trace(logString, ex); + break; + } + } +}