YARN-2569. Added the log handling APIs for the long running services. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5338ac41 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5338ac41 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5338ac41 Branch: refs/heads/HDFS-6581 Commit: 5338ac416ab8ab3e7e0a7bfb4a53151fc457f673 Parents: a1fd804 Author: Zhijie Shen <zjs...@apache.org> Authored: Tue Sep 23 10:36:57 2014 -0700 Committer: Zhijie Shen <zjs...@apache.org> Committed: Tue Sep 23 10:36:57 2014 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../records/ApplicationSubmissionContext.java | 36 +++++ .../yarn/api/records/LogAggregationContext.java | 121 +++++++++++++++++ .../src/main/proto/yarn_protos.proto | 7 + .../pb/ApplicationSubmissionContextPBImpl.java | 40 ++++++ .../impl/pb/LogAggregationContextPBImpl.java | 134 +++++++++++++++++++ .../hadoop/yarn/api/TestPBImplRecords.java | 1 + 7 files changed, 342 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5338ac41/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a2d0536..0e4909e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED YARN-1250. Generic history service should support application-acls. (Zhijie Shen via junping_du) + YARN-2569. Added the log handling APIs for the long running services. (Xuan + Gong via zjshen) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc http://git-wip-us.apache.org/repos/asf/hadoop/blob/5338ac41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 723a2e0..2202380 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -54,6 +54,7 @@ import java.util.Set; * validityInterval into failure count. If failure count reaches to * maxAppAttempts, the application will be failed. * </li> + * <li>Optional, application-specific {@link LogAggregationContext}</li> * </ul> * </p> * @@ -128,6 +129,21 @@ public abstract class ApplicationSubmissionContext { return context; } + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers, LogAggregationContext logAggregationContext) { + ApplicationSubmissionContext context = + newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers); + context.setLogAggregationContext(logAggregationContext); + return context; + } /** * Get the <code>ApplicationId</code> of the submitted application. * @return <code>ApplicationId</code> of the submitted application @@ -381,4 +397,24 @@ public abstract class ApplicationSubmissionContext { @Stable public abstract void setAttemptFailuresValidityInterval( long attemptFailuresValidityInterval); + + /** + * Get <code>LogAggregationContext</code> of the application + * + * @return <code>LogAggregationContext</code> of the application + */ + @Public + @Stable + public abstract LogAggregationContext getLogAggregationContext(); + + /** + * Set <code>LogAggregationContext</code> for the application + * + * @param logAggregationContext + * for the application + */ + @Public + @Stable + public abstract void setLogAggregationContext( + LogAggregationContext logAggregationContext); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5338ac41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java new file mode 100644 index 0000000..9a0a157 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * <p><code>LogAggregationContext</code> represents all of the + * information needed by the <code>NodeManager</code> to handle + * the logs for an application.</p> + * + * <p>It includes details such as: + * <ul> + * <li>includePattern. It uses Java Regex to filter the log files + * which match the defined include pattern and those log files + * will be uploaded. </li> + * <li>excludePattern. It uses Java Regex to filter the log files + * which match the defined exclude pattern and those log files + * will not be uploaded. If the log file name matches both the + * include and the exclude pattern, this file will be excluded eventually</li> + * <li>rollingIntervalSeconds. The default value is -1. By default, + * the logAggregationService only uploads container logs when + * the application is finished. This configure defines + * how often the logAggregationSerivce uploads container logs in seconds. + * By setting this configure, the logAggregationSerivce can upload container + * logs periodically when the application is running. + * </li> + * </ul> + * </p> + * + * @see ApplicationSubmissionContext + */ + +@Evolving +@Public +public abstract class LogAggregationContext { + + @Public + @Unstable + public static LogAggregationContext newInstance(String includePattern, + String excludePattern, long rollingIntervalSeconds) { + LogAggregationContext context = Records.newRecord(LogAggregationContext.class); + context.setIncludePattern(includePattern); + context.setExcludePattern(excludePattern); + context.setRollingIntervalSeconds(rollingIntervalSeconds); + return context; + } + + /** + * Get include pattern + * + * @return include pattern + */ + @Public + @Unstable + public abstract String getIncludePattern(); + + /** + * Set include pattern + * + * @param includePattern + */ + @Public + @Unstable + public abstract void setIncludePattern(String includePattern); + + /** + * Get exclude pattern + * + * @return exclude pattern + */ + @Public + @Unstable + public abstract String getExcludePattern(); + + /** + * Set exclude pattern + * + * @param excludePattern + */ + @Public + @Unstable + public abstract void setExcludePattern(String excludePattern); + + /** + * Get rollingIntervalSeconds + * + * @return the rollingIntervalSeconds + */ + @Public + @Unstable + public abstract long getRollingIntervalSeconds(); + + /** + * Set rollingIntervalSeconds + * + * @param rollingIntervalSeconds + */ + @Public + @Unstable + public abstract void setRollingIntervalSeconds(long rollingIntervalSeconds); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5338ac41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d8c42cc..b368746 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto { optional bool keep_containers_across_application_attempts = 11 [default = false]; repeated string applicationTags = 12; optional int64 attempt_failures_validity_interval = 13 [default = -1]; + optional LogAggregationContextProto log_aggregation_context = 14; +} + +message LogAggregationContextProto { + optional string include_pattern = 1 [default = ".*"]; + optional string exclude_pattern = 2 [default = ""]; + optional int64 rolling_interval_seconds = 3 [default = -1]; } enum ApplicationAccessTypeProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5338ac41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 7b49a16..e4f183b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.api.records.impl.pb; import com.google.common.base.CharMatcher; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; @@ -53,6 +56,7 @@ extends ApplicationSubmissionContext { private ContainerLaunchContext amContainer = null; private Resource resource = null; private Set<String> applicationTags = null; + private LogAggregationContext logAggregationContext = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); @@ -110,6 +114,10 @@ extends ApplicationSubmissionContext { builder.clearApplicationTags(); builder.addAllApplicationTags(this.applicationTags); } + if (this.logAggregationContext != null) { + builder.setLogAggregationContext( + convertToProtoFormat(this.logAggregationContext)); + } } private void mergeLocalToProto() { @@ -415,4 +423,36 @@ extends ApplicationSubmissionContext { maybeInitBuilder(); builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); } + + private LogAggregationContextPBImpl convertFromProtoFormat( + LogAggregationContextProto p) { + return new LogAggregationContextPBImpl(p); + } + + private LogAggregationContextProto convertToProtoFormat( + LogAggregationContext t) { + return ((LogAggregationContextPBImpl) t).getProto(); + } + + @Override + public LogAggregationContext getLogAggregationContext() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.logAggregationContext != null) { + return this.logAggregationContext; + } // Else via proto + if (!p.hasLogAggregationContext()) { + return null; + } + logAggregationContext = convertFromProtoFormat(p.getLogAggregationContext()); + return logAggregationContext; + } + + @Override + public void setLogAggregationContext( + LogAggregationContext logAggregationContext) { + maybeInitBuilder(); + if (logAggregationContext == null) + builder.clearLogAggregationContext(); + this.logAggregationContext = logAggregationContext; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5338ac41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java new file mode 100644 index 0000000..4406ef9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder; +import com.google.protobuf.TextFormat; + +public class LogAggregationContextPBImpl extends LogAggregationContext{ + + LogAggregationContextProto proto = LogAggregationContextProto.getDefaultInstance(); + LogAggregationContextProto.Builder builder = null; + boolean viaProto = false; + + public LogAggregationContextPBImpl() { + builder = LogAggregationContextProto.newBuilder(); + } + + public LogAggregationContextPBImpl(LogAggregationContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public LogAggregationContextProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LogAggregationContextProto.newBuilder(proto); + } + viaProto = false; + } + + + @Override + public String getIncludePattern() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasIncludePattern()) { + return null; + } + return p.getIncludePattern(); + } + + @Override + public void setIncludePattern(String includePattern) { + maybeInitBuilder(); + if (includePattern == null) { + builder.clearIncludePattern(); + return; + } + builder.setIncludePattern(includePattern); + } + + @Override + public String getExcludePattern() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasExcludePattern()) { + return null; + } + return p.getExcludePattern(); + } + + @Override + public void setExcludePattern(String excludePattern) { + maybeInitBuilder(); + if (excludePattern == null) { + builder.clearExcludePattern(); + return; + } + builder.setExcludePattern(excludePattern); + } + + @Override + public long getRollingIntervalSeconds() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasRollingIntervalSeconds()) { + return -1; + } + return p.getRollingIntervalSeconds(); + } + + @Override + public void setRollingIntervalSeconds(long rollingIntervalSeconds) { + maybeInitBuilder(); + builder.setRollingIntervalSeconds(rollingIntervalSeconds); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5338ac41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index c6572e9..c463452 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -178,6 +178,7 @@ public class TestPBImplRecords { "http", "localhost", 8080, "file0")); typeValueCache.put(SerializedException.class, SerializedException.newInstance(new IOException("exception for test"))); + generateByNewInstance(LogAggregationContext.class); generateByNewInstance(ApplicationId.class); generateByNewInstance(ApplicationAttemptId.class); generateByNewInstance(ContainerId.class);