Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/7943#discussion_r37228848
--- Diff:
network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
---
@@ -52,25 +57,85 @@
public class ExternalShuffleBlockResolver {
private static final Logger logger =
LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
+ /**
+ * This a common prefix to the key for each app registration we stick in
leveldb, so they
+ * are easy to find, since leveldb lets you search based on prefix.
+ */
+ private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
+ private static final StoreVersion CURRENT_VERSION = new StoreVersion(1,
0);
+
// Map containing all registered executors' metadata.
- private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
+ @VisibleForTesting
+ final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
// Single-threaded Java executor used to perform expensive recursive
directory deletion.
private final Executor directoryCleaner;
private final TransportConf conf;
- public ExternalShuffleBlockResolver(TransportConf conf) {
- this(conf, Executors.newSingleThreadExecutor(
+ @VisibleForTesting
+ final File registeredExecutorFile;
+ @VisibleForTesting
+ final DB db;
+
+ public ExternalShuffleBlockResolver(TransportConf conf, File
registeredExecutorFile)
+ throws IOException {
+ this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
}
// Allows tests to have more control over when directories are cleaned
up.
@VisibleForTesting
- ExternalShuffleBlockResolver(TransportConf conf, Executor
directoryCleaner) {
+ ExternalShuffleBlockResolver(
+ TransportConf conf,
+ File registeredExecutorFile,
+ Executor directoryCleaner) throws IOException {
this.conf = conf;
- this.executors = Maps.newConcurrentMap();
+ this.registeredExecutorFile = registeredExecutorFile;
+ if (registeredExecutorFile != null) {
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LevelDBLogger());
+ DB tmpDb;
+ try {
+ tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist "))
{
+ logger.info("Creating state database at " +
registeredExecutorFile);
+ options.createIfMissing(true);
+ try {
+ tmpDb = JniDBFactory.factory.open(registeredExecutorFile,
options);
+ } catch (NativeDB.DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ } else {
+ // the leveldb file seems to be corrupt somehow. Lets just blow
it away and create a new
+ // one, so we can keep processing new apps
+ logger.error("error opening leveldb file {}. Creating new file,
will not be able to " +
+ "recover state for existing applications",
registeredExecutorFile, e);
+ for (File f: registeredExecutorFile.listFiles()) {
--- End diff --
minor, but if somehow the underlying file is a file and not a directory,
this will throw an NPE.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]