[ https://issues.apache.org/jira/browse/YARN-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14729910#comment-14729910 ]
Joep Rottinghuis commented on YARN-3901: ---------------------------------------- Not sure if [~gtCarrera9] is asking what happens if we have consolidated a "finished" app into the flow aggregate and then we get an older batched put from the same app (from an older AM that wasn't properly killed or something). That is currently an open problem that we can address by storing the list of x most recently completed apps in a tag on the total sum. We can't quite store all app IDs that have been collapsed into the flow sum, because that list could potentially be really long. We could keep a list of the last x apps, to radically reduce the likelihood that stale batched writes end up messing up the aggregation if there were to be a race condition (however rare that might be). I think we can add that guarding behavior in a separate jira and not further complicate the first cut (that might suffer from this rare race condition). As [~sjlee0] pointed out and we were discussing offline this morning scan.setMaxResultSize(limit); doesn't limit the # rows that are returned, but limits the size in bytes. Not sure we want to address that here, or if we'll let him adjust that in his patch. We should limit the number of rows we retrieve from scan and if needed (as [~sjlee0] pointed out) add a PageFilter with a limit in addition to the limit to further restrict what is prefetched in a buffer (gets more complicated when scan spans region servers). It is a little confusing to read the patch and see where ColumnPrefix has just an additional argument in the store method for Attribute... attributes and when the method with a String qualifier is changed to one with a byte[]. There seems to be a discrepancy in how EntityTablePrefix and ApplicationTablePrefix are handles. I'm not sure if is needed to have a getColumnQualifier with a long in ColumnHelper, but we may have to review this together interactively behind two laptops. In HBaseTimelineWriterImpl. onApplicationFinished You have an old comment: 281 // clear out the hashmap in case the downstream store/coprocessor 282 // added any more attributes during the put for END_TIME that no longer makes sense. Also, I'd simply do: {code} storeFlowMetrics(rowKey, metrics, attribute1, AggregationOperations.SUM_FINAL.getAttribute()); {code} I'd update the javadoc in AggregationOperations before the enum definitions. They are the old style and no longer make sense in the more generic case. SUM indicates that the values need to be summed up, MIN means that only the minimum needs to be kept etc. Initially I found the method name getIncomingAttributes and corresponding member names somewhat confusing (from the method perspective it isn't the incoming values, it is the outgoing values). Perhaps combinedAttribute and combineAttributes(...) makes more sense, but the provided logic seems correct. FlowActivityColumnPrefix.IN_PROGRESS_TIME needs a better javadoc description to describe its use and meaning. The coprocessor methods need a little more javadoc to explain what is going on. To the casual reader this is total voodoo. The preGetOp creates a new scanner (ok), then does a single next on it (why?) and then bypasses the environment (huh?). Similarly if in preScannerOpen we already set scan.setMaxVersions(); then why is the same still needed in PreGetOp, but in PostScannerOpen we don't do it anymore (presumably already done in the preOpen). I like the more generic FlowRunCoprocessor (although it can have a name that is not associated with a table, because behavior is generic, and arg names such as frpa are probably artifact from previous version). In getTagFromAttribute, is it possible to recognize a operation from an AggregationCompactionDimension without relying on an exception and catching it? For example, can you do AggregationOperation.isA(Attribute a) or something like that? The other thing I realize with the coprocessor is this. It nicely maps attributes to tags, but we unnecessarily bloat every single put with the operation. We could get creative and use a different column prefix for min and max columns. Then the coprocessor can pick that up during read/flush/compaction. That makes queries (and filters) much harder. So for now we're probably stuck with tagging each value. Perhaps not so bad for min and max given that after flush and compact we store only one value. For SUM we will always have an Aggregation dimension, so adding a SUM tag then isn't needed. We assume an aggregation dimension w/o agg operation would default to SUM. We do certainly need to tag values with SUM_FINAL. Aside from that, in FlowRunProcessor.prePut do we have to keep doing Tag.fromList(tags) for each cell, or can we create a Tag <list/array> once and re-use it? When reading through the coprocessor it still has Application_ID specific code. There is no need to enumerate all the AGG dimensions, that will only be one per cell. So the agg dimension itself is an attribute, it has a value. In some cases it happens to be APP_ID, but in other cases it is something else. That doesn't matter to the coprocessor. In FlowScanner. getAggregationCompactionDimension it doesn't have to be cast to String does it? Couldn't it simply be a byte[] with whatever content it is? In nextInternal there is no need to do getAggregationCompactionDimension unless it is a sum (or SUM_FINAL). I see MIN and MAX and SUM handled, but SUM_FINAL is ignored. Here SUM_FINAL should be treated as SUM. addedCnt is increased only for regular cells. It should be increased once per collapsed cell as well I think in the case when we haven't seen this. The behavior should probably differ between a read and a flush/compact. I can't quite see the coprocessor handle this difference correctly yet. I think it should be part of the constructor. For reads we should return exactly one value: the final sum. For flushes we need to return the previous sum, plus one individual value for each aggregation dimension (modulo ones that can be compacted). In processMetricCell you see the problem that numbers should be stored through Bytes.toBytes(long l) instead of GenericObjectMapper. That is where the tricky part comes in with mixing longs and doubles. Also general comment: check your spacing. There are places where tabs are used instead of spaces. Overall it looks really good, but I need to walk through the store and read cases with you and see how the flush/compact scanning works compared to the read. I think we're close, but let's review that together. > Populate flow run data in the flow_run & flow activity tables > ------------------------------------------------------------- > > Key: YARN-3901 > URL: https://issues.apache.org/jira/browse/YARN-3901 > Project: Hadoop YARN > Issue Type: Sub-task > Components: timelineserver > Reporter: Vrushali C > Assignee: Vrushali C > Attachments: YARN-3901-YARN-2928.1.patch, > YARN-3901-YARN-2928.2.patch, YARN-3901-YARN-2928.3.patch, > YARN-3901-YARN-2928.WIP.2.patch, YARN-3901-YARN-2928.WIP.patch > > > As per the schema proposed in YARN-3815 in > https://issues.apache.org/jira/secure/attachment/12743391/hbase-schema-proposal-for-aggregation.pdf > filing jira to track creation and population of data in the flow run table. > Some points that are being considered: > - Stores per flow run information aggregated across applications, flow version > RM’s collector writes to on app creation and app completion > - Per App collector writes to it for metric updates at a slower frequency > than the metric updates to application table > primary key: cluster ! user ! flow ! flow run id > - Only the latest version of flow-level aggregated metrics will be kept, even > if the entity and application level keep a timeseries. > - The running_apps column will be incremented on app creation, and > decremented on app completion. > - For min_start_time the RM writer will simply write a value with the tag for > the applicationId. A coprocessor will return the min value of all written > values. - > - Upon flush and compactions, the min value between all the cells of this > column will be written to the cell without any tag (empty tag) and all the > other cells will be discarded. > - Ditto for the max_end_time, but then the max will be kept. > - Tags are represented as #type:value. The type can be not set (0), or can > indicate running (1) or complete (2). In those cases (for metrics) only > complete app metrics are collapsed on compaction. > - The m! values are aggregated (summed) upon read. Only when applications are > completed (indicated by tag type 2) can the values be collapsed. > - The application ids that have completed and been aggregated into the flow > numbers are retained in a separate column for historical tracking: we don’t > want to re-aggregate for those upon replay > -- This message was sent by Atlassian JIRA (v6.3.4#6332)