This is an automated email from the ASF dual-hosted git repository. krisztiankasa pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new e7602a0 AMBARI-25293 - Logsearch: logfeeder throws NPE when updating checkpoint (#2998) e7602a0 is described below commit e7602a087ef4ca759f6aed7b8f37e5ef3a20dcd5 Author: kasakrisz <33458261+kasakr...@users.noreply.github.com> AuthorDate: Tue Jun 4 19:43:58 2019 +0200 AMBARI-25293 - Logsearch: logfeeder throws NPE when updating checkpoint (#2998) --- .../logfeeder/input/file/FileCheckInHelper.java | 65 ++++++++++++++++++---- .../input/file/ResumeLineNumberHelper.java | 22 +++----- 2 files changed, 61 insertions(+), 26 deletions(-) diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java index 7b8f0cd..f96c0d2 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java @@ -18,18 +18,18 @@ */ package org.apache.ambari.logfeeder.input.file; +import java.io.File; +import java.io.RandomAccessFile; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + import org.apache.ambari.logfeeder.input.InputFile; import org.apache.ambari.logfeeder.input.InputFileMarker; import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.log4j.Level; import org.apache.log4j.Logger; -import java.io.File; -import java.io.RandomAccessFile; -import java.util.Date; -import java.util.Map; - public class FileCheckInHelper { private static final Logger LOG = Logger.getLogger(FileCheckInHelper.class); @@ -40,7 +40,14 @@ public class FileCheckInHelper { public static void checkIn(InputFile inputFile, InputFileMarker inputMarker) { try { Map<String, Object> jsonCheckPoint = inputFile.getJsonCheckPoints().get(inputMarker.getBase64FileKey()); + if (jsonCheckPoint == null) { + jsonCheckPoint = createNewCheckpointObject(inputFile); + attachCheckpointToInput(inputFile, jsonCheckPoint); + } File checkPointFile = inputFile.getCheckPointFiles().get(inputMarker.getBase64FileKey()); + if (checkPointFile == null || !checkPointFile.exists()) { + checkPointFile = FileCheckInHelper.attachCheckpointFileToInput(inputFile); + } int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); if (lineNumber > inputMarker.getLineNumber()) { @@ -80,15 +87,49 @@ public class FileCheckInHelper { FileUtil.move(tmpCheckPointFile, checkPointFile); if (inputFile.isClosed()) { - String logMessageKey = inputFile.getClass().getSimpleName() + "_FINAL_CHECKIN"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + inputFile.getShortDescription() + - ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO); + LOG.info(String.format("Wrote final checkPoint, input=%s, checkPointFile=%s, checkPoint=%s", inputFile.getShortDescription(), checkPointFile.getAbsolutePath(), jsonStr)); } } catch (Throwable t) { - String logMessageKey = inputFile.getClass().getSimpleName() + "_CHECKIN_EXCEPTION"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + inputFile.getShortDescription(), t, - LOG, Level.ERROR); + LOG.error("Caught exception checkIn. , input=" + inputFile.getShortDescription(), t); + } + } + + /** + * Create new checkpoint object + * @param inputFile file object which is used to fill the checkpoint defaults + * @return Created checkpoint object + */ + static Map<String, Object> createNewCheckpointObject(final InputFile inputFile) { + Map<String, Object> jsonCheckPoint = new HashMap<>(); + jsonCheckPoint.put("file_path", inputFile.getFilePath()); + try { + jsonCheckPoint.put("file_key", inputFile.getBase64FileKey()); + } catch (Exception e) { + LOG.error(String.format("Error during checkpoint object (path: %s) creationg: %s", inputFile.getFilePath(), e.getMessage())); } + return jsonCheckPoint; + } + + /** + * Attach a json checkpoint object to an input file + * @param inputFile input file object that will have the new checkpoint + * @param jsonCheckPoint holds checkpoint related data + */ + static void attachCheckpointToInput(final InputFile inputFile, final Map<String, Object> jsonCheckPoint) throws Exception { + inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(), jsonCheckPoint); + } + + /** + * Create a new file object for input checkpoint + * @param inputFile input file object that will have the new checkpoint file + * @return Newly created checkpoint file + */ + static File attachCheckpointFileToInput(final InputFile inputFile) throws Exception { + String checkPointFileName = inputFile.getBase64FileKey() + inputFile.getCheckPointExtension(); + File checkPointFolder = inputFile.getInputManager().getCheckPointFolderFile(); + File checkPointFile = new File(checkPointFolder, checkPointFileName); + inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), checkPointFile); + return checkPointFile; } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java index 9350200..614c3bc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java @@ -18,17 +18,16 @@ */ package org.apache.ambari.logfeeder.input.file; -import org.apache.ambari.logfeeder.input.InputFile; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.EOFException; import java.io.File; import java.io.RandomAccessFile; -import java.util.HashMap; import java.util.Map; +import org.apache.ambari.logfeeder.input.InputFile; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ResumeLineNumberHelper { private static final Logger LOG = LoggerFactory.getLogger(ResumeLineNumberHelper.class); @@ -43,10 +42,7 @@ public class ResumeLineNumberHelper { try { LOG.info("Checking existing checkpoint file. " + inputFile.getShortDescription()); - String checkPointFileName = inputFile.getBase64FileKey() + inputFile.getCheckPointExtension(); - File checkPointFolder = inputFile.getInputManager().getCheckPointFolderFile(); - checkPointFile = new File(checkPointFolder, checkPointFileName); - inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), checkPointFile); + checkPointFile = FileCheckInHelper.attachCheckpointFileToInput(inputFile); Map<String, Object> jsonCheckPoint = null; if (!checkPointFile.exists()) { LOG.info("Checkpoint file for log file " + inputFile.getFilePath() + " doesn't exist, starting to read it from the beginning"); @@ -74,12 +70,10 @@ public class ResumeLineNumberHelper { } if (jsonCheckPoint == null) { // This seems to be first time, so creating the initial checkPoint object - jsonCheckPoint = new HashMap<String, Object>(); - jsonCheckPoint.put("file_path", inputFile.getFilePath()); - jsonCheckPoint.put("file_key", inputFile.getBase64FileKey()); + FileCheckInHelper.createNewCheckpointObject(inputFile); } - inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(), jsonCheckPoint); + FileCheckInHelper.attachCheckpointToInput(inputFile, jsonCheckPoint); } catch (Throwable t) { LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);