[ https://issues.apache.org/jira/browse/YARN-3039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358048#comment-14358048 ]
Sangjin Lee commented on YARN-3039: ----------------------------------- Thanks [~djp] for the patch! I took a first pass, and have some comments. Sorry the comments are long, but the patch was pretty big. :) I agree with the most of the design as in the design doc as well as the implementation, but there are certain things that need your attention. Let me know what you think. I'll post more if I see anything else. I still need to look at aspects like thread safety a little more closely. - I see some methods are marked as Stable (e.g. AggregatorNodemanagerProtocol), but I feel that’s bit premature. Can we mark these still unstable or evolving? Note that at least at this point even the class names can change. - While we’re on the API annotations, I notice that the annotations are on methods rather than classes themselves. I usually set them on the classes with the understanding that the entire class is unstable, for example. Which is a more common practice? - YarnConfiguration.java -- I feel that the default for NM_AGGREGATOR_SERVICE_THREAD_COUNT doesn't have to be as high as 20 as the traffic will be pretty low; 5? - TimelineClient.java -- Since createTimelineClient(ApplicationId) was introduced only on this branch, we should be able to just replace it instead of adding a new deprecated method, no? - TImelineClientImpl.java -- putObjects() --- Not sure if I understand the comment “timelineServiceAddress couldn’t have been initialized”; can’t putObjects() be called in a steady state? If so, shouldn’t we check if timelineServiceAddress is not null before proceeding to loop and wait for the value to come in? Otherwise, we would introduce a 1 second latency in every put call even in a steady state? --- maxRetries -> retries might be a better variable name? --- It might be good to create a small helper method for polling for the timelineServiceAddress value --- Not sure if we need a while loop for needRetry; it either succeeds (at which point needRetry becomes false and you exit normally) or it doesn’t (in which case we go into the exception handling and we try *only once* to get the value). Basically I’m not sure whether this retry code is what you meant to do? --- I think it may be enough to make timelineServiceAddress volatile instead of making getter/setter synchronized. -- doPostingObject() has duplicate code with putObjects(); can we consider ways to eliminate code duplication? I know it calls different methods deep inside the implementation, but there should be a way to reduce code duplication. - TestRPC.java -- typo: ILLEAGAL_NUMBER_MESSAGE -> ILLEGAL_NUMBER_MESSAGE - Context.java -- We might want to update the comment (or method names) a little bit; initially I didn’t quite get the difference between the registered aggregators and known aggregators. - NodeManager.java -- removeRegisteredAggregators() -> removeRegisteredAggregator() (should be singular) -- We need removeKnownAggregator() as well; otherwise we’d blow the NM memory. :) Perhaps what we need is more like setKnownAggregators() instead of add? Essentially NM would need to replace its knowledge of the known aggregators every time RM updates it via heartbeat, right? - NodeStatusUpdaterImpl.java -- I’m not sure if all the add/remove***Aggregators() methods are called at the right time. Especially, I’m not sure if we’re handling the case of removing an aggregator (e.g. app finished). When an app finishes and the per-app aggregator is shut down, NM needs to be notified, remove it from its registered aggregators, and let RM know that it’s gone so that it can be removed from the RM state store as well. How is this being handled? It’s not clear to me. At least I don’t see any calls for removeRegisteredAggregators(). - ResourceTrackerService.java -- (see above) Shouldn’t we handle a situation where the aggregator is removed? updateAppAggregatorsMap() seems to deal with adding/updating an aggregator only. -- Any race condition in updateAppAggregatorsMap() when one AM (app attempt) goes down and another AM (app attempt) comes up? - RMAppImpl.java -- Would this be backward compatible from the RM state store perspective? - TimelineAggregatorsCollection.java -- What value will be set as timelineRestServerBindAddress? It appears to be just the value of TIMELINE_SERVICE_BIND_HOST. That doesn't sound right to me, For each node, we need to get the fully qualified hostname of the node. It doesn’t look like that’s the case here? - Would it be a good idea to provide an RPC method to query for the updated aggregator information? Waiting for the heartbeats is OK, but if it fails, it might be easier/better to query (e.g. NM discovering the aggregator address). > [Aggregator wireup] Implement ATS app-appgregator service discovery > ------------------------------------------------------------------- > > Key: YARN-3039 > URL: https://issues.apache.org/jira/browse/YARN-3039 > Project: Hadoop YARN > Issue Type: Sub-task > Components: timelineserver > Reporter: Sangjin Lee > Assignee: Junping Du > Attachments: Service Binding for applicationaggregator of ATS > (draft).pdf, Service Discovery For Application Aggregator of ATS (v2).pdf, > YARN-3039-no-test.patch, YARN-3039-v2-incomplete.patch, > YARN-3039-v3-core-changes-only.patch, YARN-3039-v4.patch > > > Per design in YARN-2928, implement ATS writer service discovery. This is > essential for off-node clients to send writes to the right ATS writer. This > should also handle the case of AM failures. -- This message was sent by Atlassian JIRA (v6.3.4#6332)