NealSun96 commented on code in PR #2189:
URL: https://github.com/apache/helix/pull/2189#discussion_r1028437826
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java:
##########
@@ -92,60 +93,79 @@ public synchronized Map<String, ResourceAssignment>
getBestPossibleAssignment()
}
/**
- * @return true if a new baseline was persisted.
+ * @param newAssignment
+ * @param path the path of the assignment record
+ * @param key the key of the assignment in the record
* @throws HelixException if the method failed to persist the baseline.
*/
- public synchronized boolean persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
- return persistAssignment(globalBaseline, getBaseline(), _baselinePath,
BASELINE_KEY);
+ private void persistAssignmentToMetadataStore(Map<String,
ResourceAssignment> newAssignment, String path, String key)
+ throws HelixException {
+ // TODO: Make the write async?
+ // Persist to ZK
+ HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
+ try {
+ _dataAccessor.compressedBucketWrite(path, combinedAssignments);
+ } catch (IOException e) {
+ // TODO: Improve failure handling
+ throw new HelixException(String.format("Failed to persist %s assignment
to path %s", key, path), e);
+ }
}
/**
- * @return true if a new best possible assignment was persisted.
- * @throws HelixException if the method failed to persist the baseline.
+ * Persist a new baseline assignment to metadata store first, then to memory
+ * @param globalBaseline
*/
- public synchronized boolean persistBestPossibleAssignment(
- Map<String, ResourceAssignment> bestPossibleAssignment) {
- return persistAssignment(bestPossibleAssignment,
getBestPossibleAssignment(), _bestPossiblePath,
- BEST_POSSIBLE_KEY);
+ public synchronized void persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
+ // write to metadata store
+ persistAssignmentToMetadataStore(globalBaseline, _baselinePath,
BASELINE_KEY);
+ // write to memory
+ getBaseline().clear();
+ getBaseline().putAll(globalBaseline);
}
- public synchronized void clearAssignmentMetadata() {
- persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath,
BASELINE_KEY);
- persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(),
_bestPossiblePath,
- BEST_POSSIBLE_KEY);
+ /**
+ * Persist a new best possible assignment to metadata store first, then to
memory.
+ * Increment best possible version by 1 - this is a high priority in-memory
write.
+ * @param bestPossibleAssignment
+ */
+ public synchronized void persistBestPossibleAssignment(Map<String,
ResourceAssignment> bestPossibleAssignment) {
+ // write to metadata store
+ persistAssignmentToMetadataStore(bestPossibleAssignment,
_bestPossiblePath, BEST_POSSIBLE_KEY);
+ // write to memory
+ getBestPossibleAssignment().clear();
+ getBestPossibleAssignment().putAll(bestPossibleAssignment);
+ _bestPossibleVersion++;
}
/**
- * @param newAssignment
- * @param cachedAssignment
- * @param path the path of the assignment record
- * @param key the key of the assignment in the record
- * @return true if a new assignment was persisted.
+ * Attempts to persist Best Possible Assignment in memory from an
asynchronous thread.
+ * Persist only happens when the provided version is not stale - this is a
low priority in-memory write.
+ * @param bestPossibleAssignment - new assignment to be persisted
+ * @param newVersion - attempted new version to write. This version is
obtained earlier from getBestPossibleVersion()
+ * @return true if the attempt succeeded, false otherwise.
*/
- // TODO: Enhance the return value so it is more intuitive to understand when
the persist fails and
- // TODO: when it is skipped.
- private boolean persistAssignment(Map<String, ResourceAssignment>
newAssignment,
- Map<String, ResourceAssignment> cachedAssignment, String path,
- String key) {
- // TODO: Make the write async?
- // If the assignment hasn't changed, skip writing to metadata store
- if (compareAssignments(cachedAssignment, newAssignment)) {
- return false;
- }
- // Persist to ZK
- HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
- try {
- _dataAccessor.compressedBucketWrite(path, combinedAssignments);
- } catch (IOException e) {
- // TODO: Improve failure handling
- throw new HelixException(
- String.format("Failed to persist %s assignment to path %s", key,
path), e);
+ public synchronized boolean asyncPersistBestPossibleAssignmentInMemory(
Review Comment:
I think so, but I don't see the need for it. Keep persisting synchronized is
less error prone without much performance sacrifice (cache refresh is very
fast, so it wouldn't block zookeeper persist for too long).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]