[ 
https://issues.apache.org/jira/browse/YARN-3336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhihai xu updated YARN-3336:
----------------------------
    Description: 
FileSystem memory leak in DelegationTokenRenewer.
Every time DelegationTokenRenewer#obtainSystemTokensForUser is called, a new 
FileSystem entry will be added to  FileSystem#CACHE which will never be garbage 
collected.
This is the implementation of obtainSystemTokensForUser:
{code}
  protected Token<?>[] obtainSystemTokensForUser(String user,
      final Credentials credentials) throws IOException, InterruptedException {
    // Get new hdfs tokens on behalf of this user
    UserGroupInformation proxyUser =
        UserGroupInformation.createProxyUser(user,
          UserGroupInformation.getLoginUser());
    Token<?>[] newTokens =
        proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
          @Override
          public Token<?>[] run() throws Exception {
            return FileSystem.get(getConfig()).addDelegationTokens(
              UserGroupInformation.getLoginUser().getUserName(), credentials);
          }
        });
    return newTokens;
  }
{code}

The memory leak happened when FileSystem.get(getConfig()) is called with a new 
proxy user.
Because createProxyUser will always create a new Subject.
The calling sequence is 
FileSystem.get(getConfig())=>FileSystem.get(getDefaultUri(conf), 
conf)=>FileSystem.CACHE.get(uri, conf)=>FileSystem.CACHE.getInternal(uri, conf, 
key)=>FileSystem.CACHE.map.get(key)=>createFileSystem(uri, conf)
{code}
public static UserGroupInformation createProxyUser(String user,
      UserGroupInformation realUser) {
    if (user == null || user.isEmpty()) {
      throw new IllegalArgumentException("Null user");
    }
    if (realUser == null) {
      throw new IllegalArgumentException("Null real user");
    }
    Subject subject = new Subject();
    Set<Principal> principals = subject.getPrincipals();
    principals.add(new User(user));
    principals.add(new RealUser(realUser));
    UserGroupInformation result =new UserGroupInformation(subject);
    result.setAuthenticationMethod(AuthenticationMethod.PROXY);
    return result;
  }
{code}

FileSystem#Cache#Key.equals will compare the ugi
{code}
      Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
        authority = 
uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
        this.unique = unique;
        this.ugi = UserGroupInformation.getCurrentUser();
      }
      public boolean equals(Object obj) {
        if (obj == this) {
          return true;
        }
        if (obj != null && obj instanceof Key) {
          Key that = (Key)obj;
          return isEqual(this.scheme, that.scheme)
                 && isEqual(this.authority, that.authority)
                 && isEqual(this.ugi, that.ugi)
                 && (this.unique == that.unique);
        }
        return false;        
      }
{code}

UserGroupInformation.equals will compare subject by reference.
{code}
  public boolean equals(Object o) {
    if (o == this) {
      return true;
    } else if (o == null || getClass() != o.getClass()) {
      return false;
    } else {
      return subject == ((UserGroupInformation) o).subject;
    }
  }
{code}

So in this case, every time createProxyUser and FileSystem.get(getConfig()) are 
called, a new FileSystem will be created and a new entry will be added to 
FileSystem.CACHE.

  was:
FileSystem memory leak in DelegationTokenRenewer.
Every time DelegationTokenRenewer#obtainSystemTokensForUser is called, a new 
FileSystem entry will be added to  FileSystem#CACHE which will never be garbage 
collected.
This is the implementation of obtainSystemTokensForUser:
{code}
  protected Token<?>[] obtainSystemTokensForUser(String user,
      final Credentials credentials) throws IOException, InterruptedException {
    // Get new hdfs tokens on behalf of this user
    UserGroupInformation proxyUser =
        UserGroupInformation.createProxyUser(user,
          UserGroupInformation.getLoginUser());
    Token<?>[] newTokens =
        proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
          @Override
          public Token<?>[] run() throws Exception {
            return FileSystem.get(getConfig()).addDelegationTokens(
              UserGroupInformation.getLoginUser().getUserName(), credentials);
          }
        });
    return newTokens;
  }
{code}

The memory leak happened when FileSystem.get(getConfig()) is called with a new 
proxy user.
Because createProxyUser will always create a new Subject.
{code}
public static UserGroupInformation createProxyUser(String user,
      UserGroupInformation realUser) {
    if (user == null || user.isEmpty()) {
      throw new IllegalArgumentException("Null user");
    }
    if (realUser == null) {
      throw new IllegalArgumentException("Null real user");
    }
    Subject subject = new Subject();
    Set<Principal> principals = subject.getPrincipals();
    principals.add(new User(user));
    principals.add(new RealUser(realUser));
    UserGroupInformation result =new UserGroupInformation(subject);
    result.setAuthenticationMethod(AuthenticationMethod.PROXY);
    return result;
  }
{code}

FileSystem#Cache#Key.equals will compare the ugi
{code}
      Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
        authority = 
uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
        this.unique = unique;
        this.ugi = UserGroupInformation.getCurrentUser();
      }
      public boolean equals(Object obj) {
        if (obj == this) {
          return true;
        }
        if (obj != null && obj instanceof Key) {
          Key that = (Key)obj;
          return isEqual(this.scheme, that.scheme)
                 && isEqual(this.authority, that.authority)
                 && isEqual(this.ugi, that.ugi)
                 && (this.unique == that.unique);
        }
        return false;        
      }
{code}

UserGroupInformation.equals will compare subject by reference.
{code}
  public boolean equals(Object o) {
    if (o == this) {
      return true;
    } else if (o == null || getClass() != o.getClass()) {
      return false;
    } else {
      return subject == ((UserGroupInformation) o).subject;
    }
  }
{code}

So in this case, every time createProxyUser and FileSystem.get(getConfig()) are 
called, a new FileSystem will be created and a new entry will be added to 
FileSystem.CACHE.


> FileSystem memory leak in DelegationTokenRenewer
> ------------------------------------------------
>
>                 Key: YARN-3336
>                 URL: https://issues.apache.org/jira/browse/YARN-3336
>             Project: Hadoop YARN
>          Issue Type: Bug
>          Components: resourcemanager
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>            Priority: Critical
>         Attachments: YARN-3336.000.patch
>
>
> FileSystem memory leak in DelegationTokenRenewer.
> Every time DelegationTokenRenewer#obtainSystemTokensForUser is called, a new 
> FileSystem entry will be added to  FileSystem#CACHE which will never be 
> garbage collected.
> This is the implementation of obtainSystemTokensForUser:
> {code}
>   protected Token<?>[] obtainSystemTokensForUser(String user,
>       final Credentials credentials) throws IOException, InterruptedException 
> {
>     // Get new hdfs tokens on behalf of this user
>     UserGroupInformation proxyUser =
>         UserGroupInformation.createProxyUser(user,
>           UserGroupInformation.getLoginUser());
>     Token<?>[] newTokens =
>         proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
>           @Override
>           public Token<?>[] run() throws Exception {
>             return FileSystem.get(getConfig()).addDelegationTokens(
>               UserGroupInformation.getLoginUser().getUserName(), credentials);
>           }
>         });
>     return newTokens;
>   }
> {code}
> The memory leak happened when FileSystem.get(getConfig()) is called with a 
> new proxy user.
> Because createProxyUser will always create a new Subject.
> The calling sequence is 
> FileSystem.get(getConfig())=>FileSystem.get(getDefaultUri(conf), 
> conf)=>FileSystem.CACHE.get(uri, conf)=>FileSystem.CACHE.getInternal(uri, 
> conf, key)=>FileSystem.CACHE.map.get(key)=>createFileSystem(uri, conf)
> {code}
> public static UserGroupInformation createProxyUser(String user,
>       UserGroupInformation realUser) {
>     if (user == null || user.isEmpty()) {
>       throw new IllegalArgumentException("Null user");
>     }
>     if (realUser == null) {
>       throw new IllegalArgumentException("Null real user");
>     }
>     Subject subject = new Subject();
>     Set<Principal> principals = subject.getPrincipals();
>     principals.add(new User(user));
>     principals.add(new RealUser(realUser));
>     UserGroupInformation result =new UserGroupInformation(subject);
>     result.setAuthenticationMethod(AuthenticationMethod.PROXY);
>     return result;
>   }
> {code}
> FileSystem#Cache#Key.equals will compare the ugi
> {code}
>       Key(URI uri, Configuration conf, long unique) throws IOException {
>         scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
>         authority = 
> uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
>         this.unique = unique;
>         this.ugi = UserGroupInformation.getCurrentUser();
>       }
>       public boolean equals(Object obj) {
>         if (obj == this) {
>           return true;
>         }
>         if (obj != null && obj instanceof Key) {
>           Key that = (Key)obj;
>           return isEqual(this.scheme, that.scheme)
>                  && isEqual(this.authority, that.authority)
>                  && isEqual(this.ugi, that.ugi)
>                  && (this.unique == that.unique);
>         }
>         return false;        
>       }
> {code}
> UserGroupInformation.equals will compare subject by reference.
> {code}
>   public boolean equals(Object o) {
>     if (o == this) {
>       return true;
>     } else if (o == null || getClass() != o.getClass()) {
>       return false;
>     } else {
>       return subject == ((UserGroupInformation) o).subject;
>     }
>   }
> {code}
> So in this case, every time createProxyUser and FileSystem.get(getConfig()) 
> are called, a new FileSystem will be created and a new entry will be added to 
> FileSystem.CACHE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to