Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14718#discussion_r76803728
  
    --- Diff: 
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 ---
    @@ -170,13 +207,56 @@ protected void serviceInit(Configuration conf) throws 
Exception {
         }
       }
     
    +  private void reloadCreds(DB db) throws IOException {
    +    if (db != null) {
    +      if (isAuthenticationEnabled()) {
    +        logger.info("Going to reload spark shuffle data");
    +        DBIterator itr = db.iterator();
    +        itr.seek(APP_CREDS_KEY_PREFIX.getBytes(Charsets.UTF_8));
    +        while (itr.hasNext()) {
    +          Map.Entry<byte[], byte[]> e = itr.next();
    +          String key = new String(e.getKey(), Charsets.UTF_8);
    +          if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
    +            break;
    +          }
    +          String id = parseDbAppKey(key);
    +          ByteBuffer secret = mapper.readValue(e.getValue(), 
ByteBuffer.class);
    +          logger.info("Reloading tokens for app: " + id);
    +          secretManager.registerApp(id, secret);
    +        }
    +      }
    +    }
    +  }
    +
    +  private static String parseDbAppKey(String s) throws IOException {
    +    if (!s.startsWith(APP_CREDS_KEY_PREFIX)) {
    +      throw new IllegalArgumentException("expected a string starting with 
" + APP_CREDS_KEY_PREFIX);
    +      }
    +    String json = s.substring(APP_CREDS_KEY_PREFIX.length() + 1);
    +    AppId parsed = mapper.readValue(json, AppId.class);
    +    return parsed.appId;
    +  }
    +
    +  private static byte[] dbAppKey(AppId appExecId) throws IOException {
    +    // we stick a common prefix on all the keys so we can find them in the 
DB
    +    String appExecJson = mapper.writeValueAsString(appExecId);
    +    String key = (APP_CREDS_KEY_PREFIX+ ";" + appExecJson);
    +    return key.getBytes(Charsets.UTF_8);
    +  }
    +
       @Override
       public void initializeApplication(ApplicationInitializationContext 
context) {
         String appId = context.getApplicationId().toString();
         try {
           ByteBuffer shuffleSecret = context.getApplicationDataForService();
           logger.info("Initializing application {}", appId);
           if (isAuthenticationEnabled()) {
    +        AppId fullId = new AppId(appId);
    +        if (db != null) {
    +          byte[] key = dbAppKey(fullId);
    +          byte[] value = 
mapper.writeValueAsString(shuffleSecret).getBytes(Charsets.UTF_8);
    --- End diff --
    
    I'm not sure I follow, we are writing the applicationId -> shuffle secret 
here.   If we don't write the secret when the nodemanager comes back up, it 
won't know it and won't be able to compare to what the executor is sending and 
connections would fail. You can't just trust the executor id as someone could 
spoof that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to