[GitHub] oleewere commented on a change in pull request #19: AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs

2018-11-12 Thread GitBox
oleewere commented on a change in pull request #19: AMBARI-24833. Re-implement 
S3/HDFS outputs as global cloud outputs
URL: https://github.com/apache/ambari-logsearch/pull/19#discussion_r232661059
 
 

 ##
 File path: 
ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
 ##
 @@ -19,12 +19,139 @@
 package org.apache.ambari.logfeeder.output.cloud;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClientFactory;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Class to handle common operations for cloud storage outputs
- * TODO !!!
+ * Output class for cloud outputs.
+ * Holds loggers - those will ship logs into specific folders, those files can 
be rolled out to an archive folder,
+ * from there an upload client will be able to ship the log archives to a 
cloud storage
  */
-public abstract class CloudStorageOutput extends Output {
+public class CloudStorageOutput extends Output {
+
+  private static final Logger logger = 
LogManager.getLogger(CloudStorageOutput.class);
+
+  private final Map cloudOutputLoggers = new 
ConcurrentHashMap<>();
+  private final UploadClient uploadClient;
+  private final LogFeederProps logFeederProps;
+  private final LoggerContext loggerContext;
+  private final CloudStorageUploader uploader;
+  private final RolloverConfig rolloverConfig;
+
+  public CloudStorageOutput(LogFeederProps logFeederProps) {
+this.uploadClient = UploadClientFactory.createUploadClient(logFeederProps);
+this.logFeederProps = logFeederProps;
+this.rolloverConfig = logFeederProps.getRolloverConfig();
+loggerContext = (LoggerContext) LogManager.getContext(false);
+uploader = new CloudStorageUploader(String.format("%s-uploader", 
logFeederProps.getCloudStorageDestination().getText()), uploadClient, 
logFeederProps);
+uploader.setDaemon(true);
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProperties) throws Exception {
+logger.info("Initialize cloud output.");
+uploadClient.init(logFeederProperties);
+uploader.start();
+  }
+
+  @Override
+  public String getOutputType() {
+return "cloud";
+  }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker) throws 
Exception {
+throw new UnsupportedOperationException("Copy file is not supported yet");
+  }
+
+  @Override
+  public void write(String jsonStr, InputMarker inputMarker) throws Exception {
+String uniqueThreadName = inputMarker.getInput().getThread().getName();
+Logger cloudLogger = null;
+if (cloudOutputLoggers.containsKey(uniqueThreadName)) {
+  cloudLogger = cloudOutputLoggers.get(uniqueThreadName);
+} else {
+  logger.info("New cloud input source found. Register: {}", 
uniqueThreadName);
+  cloudLogger = 
CloudStorageLoggerFactory.createLogger(inputMarker.getInput(), loggerContext, 
logFeederProps);
+  cloudOutputLoggers.put(uniqueThreadName, cloudLogger);
+}
+cloudLogger.info(jsonStr);
+inputMarker.getInput().checkIn(inputMarker);
+  }
+
+  @Override
+  public Long getPendingCount() {
+return 0L;
+  }
+
+  @Override
+  public String getWriteBytesMetricName() {
+return "write:cloud";
+  }
+
+  @Override
+  public String getShortDescription() {
+return "cloud";
+  }
+
+  @Override
+  public String getStatMetricName() {
+return null;
 
 Review comment:
   null is handled


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] oleewere commented on a change in pull request #19: AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs

2018-11-09 Thread GitBox
oleewere commented on a change in pull request #19: AMBARI-24833. Re-implement 
S3/HDFS outputs as global cloud outputs
URL: https://github.com/apache/ambari-logsearch/pull/19#discussion_r232367451
 
 

 ##
 File path: 
ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.util.Collection;
+
+/**
+ * Periodically checks a folder (contains archived logs) and if it finds any 
.log or .gz files, it will try to upload them to cloud storage by an upload 
client (cloud specific)
+ */
+public class CloudStorageUploader extends Thread {
+
+  private static final Logger logger = 
LogManager.getLogger(CloudStorageUploader.class);
+
+  private final UploadClient uploadClient;
+  private final LogFeederProps logFeederProps;
+  private final String clusterName;
+  private final String hostName;
+  private final String uploaderType;
+
+  public CloudStorageUploader(String name, UploadClient uploadClient, 
LogFeederProps logFeederProps) {
+super(name);
+this.uploadClient = uploadClient;
+this.logFeederProps = logFeederProps;
+this.uploaderType = logFeederProps.getCloudStorageDestination().getText();
+this.clusterName = logFeederProps.getClusterName();
+this.hostName = LogFeederUtil.hostName;
+  }
+
+  @Override
+  public void run() {
+logger.info("Start '{}' uploader", uploaderType);
+boolean stop = false;
+do {
+  try {
+doUpload();
+Thread.sleep(1000 * 
logFeederProps.getCloudStorageUploaderIntervalSeconds());
+  } catch (InterruptedException iex) {
+logger.info("Uploader ({}) thread interrupted", uploaderType);
+stop = true;
+  }
+} while (!stop && !Thread.currentThread().isInterrupted());
+  }
+
+  /**
+   * Finds .log and .gz files and upload them to cloud storage by an uploader 
client
+   */
+  void doUpload() {
+try {
+  final String archiveLogDir = String.join(File.separator, 
logFeederProps.getTmpDir(), uploaderType, "archived");
+  if (new File(archiveLogDir).exists()) {
+String[] extensions = {"log", "gz"};
+Collection filesToUpload = FileUtils.listFiles(new 
File(archiveLogDir), extensions, true);
+if (filesToUpload.isEmpty()) {
+  logger.debug("Not found any files to upload.");
+} else {
+  for (File file : filesToUpload) {
+String basePath = 
uploadClient.getOutputConfig().getOutputBasePath();
+String outputPath = String.format("%s/%s/%s/%s", clusterName, 
hostName, file.getParentFile().getName(), file.getName())
+  .replaceAll("//", "/");
+logger.info("Upload will start: input: {}, output: {}", 
file.getAbsolutePath(), outputPath);
+uploadClient.upload(file.getAbsolutePath(), outputPath, basePath);
 
 Review comment:
   that is handled by the uploadClient implementations, that is not extended to 
a method as with hdfs client that can be a parameter of the copy command


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services