Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14718#discussion_r76711678
  
    --- Diff: 
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 ---
    @@ -170,13 +207,56 @@ protected void serviceInit(Configuration conf) throws 
Exception {
         }
       }
     
    +  private void reloadCreds(DB db) throws IOException {
    +    if (db != null) {
    +      if (isAuthenticationEnabled()) {
    +        logger.info("Going to reload spark shuffle data");
    +        DBIterator itr = db.iterator();
    +        itr.seek(APP_CREDS_KEY_PREFIX.getBytes(Charsets.UTF_8));
    +        while (itr.hasNext()) {
    +          Map.Entry<byte[], byte[]> e = itr.next();
    +          String key = new String(e.getKey(), Charsets.UTF_8);
    +          if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
    +            break;
    +          }
    +          String id = parseDbAppKey(key);
    +          ByteBuffer secret = mapper.readValue(e.getValue(), 
ByteBuffer.class);
    +          logger.info("Reloading tokens for app: " + id);
    +          secretManager.registerApp(id, secret);
    +        }
    +      }
    +    }
    +  }
    +
    +  private static String parseDbAppKey(String s) throws IOException {
    +    if (!s.startsWith(APP_CREDS_KEY_PREFIX)) {
    +      throw new IllegalArgumentException("expected a string starting with 
" + APP_CREDS_KEY_PREFIX);
    +      }
    +    String json = s.substring(APP_CREDS_KEY_PREFIX.length() + 1);
    +    AppId parsed = mapper.readValue(json, AppId.class);
    +    return parsed.appId;
    +  }
    +
    +  private static byte[] dbAppKey(AppId appExecId) throws IOException {
    +    // we stick a common prefix on all the keys so we can find them in the 
DB
    +    String appExecJson = mapper.writeValueAsString(appExecId);
    +    String key = (APP_CREDS_KEY_PREFIX+ ";" + appExecJson);
    +    return key.getBytes(Charsets.UTF_8);
    +  }
    +
       @Override
       public void initializeApplication(ApplicationInitializationContext 
context) {
         String appId = context.getApplicationId().toString();
         try {
           ByteBuffer shuffleSecret = context.getApplicationDataForService();
           logger.info("Initializing application {}", appId);
           if (isAuthenticationEnabled()) {
    +        AppId fullId = new AppId(appId);
    +        if (db != null) {
    +          byte[] key = dbAppKey(fullId);
    +          byte[] value = 
mapper.writeValueAsString(shuffleSecret).getBytes(Charsets.UTF_8);
    --- End diff --
    
    Do you need to write the shuffle secret to the DB? It seems all you need 
really is a list of known executors, since they all share the same secret which 
is available in `context.getApplicationDataForService()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to