[ 
https://issues.apache.org/jira/browse/YARN-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14729910#comment-14729910
 ] 

Joep Rottinghuis commented on YARN-3901:
----------------------------------------

Not sure if [~gtCarrera9] is asking what happens if we have consolidated a 
"finished" app into the flow aggregate and then we get an older batched put 
from the same app (from an older AM that wasn't properly killed or something). 
That is currently an open problem that we can address by storing the list of x 
most recently completed apps in a tag on the total sum. We can't quite store 
all app IDs that have been collapsed into the flow sum, because that list could 
potentially be really long. We could keep a list of the last x apps, to 
radically reduce the likelihood that stale batched writes end up messing up the 
aggregation if there were to be a race condition (however rare that might be). 
I think we can add that guarding behavior in a separate jira and not further 
complicate the first cut (that might suffer from this rare race condition).

As [~sjlee0] pointed out and we were discussing offline this morning 
scan.setMaxResultSize(limit); doesn't limit the # rows that are returned, but 
limits the size in bytes. Not sure we want to address that here, or if we'll 
let him adjust that in his patch.
We should limit the number of rows we retrieve from scan and if needed (as 
[~sjlee0] pointed out) add a PageFilter with a limit in addition to the limit 
to further restrict what is prefetched in a buffer (gets more complicated when 
scan spans region servers).

It is a little confusing to read the patch and see where ColumnPrefix has just 
an additional argument in the store method for Attribute... attributes and when 
the method with a String qualifier is changed to one with a byte[]. There seems 
to be a discrepancy in how EntityTablePrefix and ApplicationTablePrefix are 
handles. I'm not sure if is needed to have a getColumnQualifier with a long in 
ColumnHelper, but we may have to review this together interactively behind two 
laptops.

In HBaseTimelineWriterImpl. onApplicationFinished
You have an old comment:
281        // clear out the hashmap in case the downstream store/coprocessor
282           // added any more attributes during the put for END_TIME
that no longer makes sense. Also, I'd simply do:
{code}
storeFlowMetrics(rowKey, metrics, attribute1, 
AggregationOperations.SUM_FINAL.getAttribute());
{code}

I'd update the javadoc in AggregationOperations before the enum definitions. 
They are the old style and no longer make sense in the more generic case.
SUM indicates that the values need to be summed up, MIN means that only the 
minimum needs to be kept etc.

Initially I found the method name getIncomingAttributes and corresponding 
member names somewhat confusing (from the method perspective it isn't the 
incoming values, it is the outgoing values). Perhaps combinedAttribute and 
combineAttributes(...) makes more sense, but the provided logic seems correct.

FlowActivityColumnPrefix.IN_PROGRESS_TIME needs a better javadoc description to 
describe its use and meaning.

The coprocessor methods need a little more javadoc to explain what is going on. 
To the casual reader this is total voodoo.
The preGetOp creates a new scanner (ok), then does a single next on it (why?) 
and then bypasses the environment (huh?).
Similarly if in preScannerOpen we already set scan.setMaxVersions(); then why 
is the same still needed in PreGetOp, but in PostScannerOpen we don't do it 
anymore (presumably already done in the preOpen).

I like the more generic FlowRunCoprocessor (although it can have a name that is 
not associated with a table, because behavior is generic, and arg names such as 
frpa are probably artifact from previous version).
In getTagFromAttribute, is it possible to recognize a operation from an 
AggregationCompactionDimension without relying on an exception and catching it? 
For example, can you do AggregationOperation.isA(Attribute a) or something like 
that?

The other thing I realize with the coprocessor is this. It nicely maps 
attributes to tags, but we unnecessarily bloat every single put with the 
operation.
We could get creative and use a different column prefix for min and max 
columns. Then the coprocessor can pick that up during read/flush/compaction. 
That makes queries (and filters) much harder. So for now we're probably stuck 
with tagging each value. Perhaps not so bad for min and max given that after 
flush and compact we store only one value.

For SUM we will always have an Aggregation dimension, so adding a SUM tag then 
isn't needed. We assume an aggregation dimension w/o agg operation would 
default to SUM. We do certainly need to tag values with SUM_FINAL.

Aside from that, in FlowRunProcessor.prePut do we have to keep doing 
Tag.fromList(tags) for each cell, or can we create a Tag <list/array> once and 
re-use it?

When reading through the coprocessor it still has Application_ID specific code. 
There is no need to enumerate all the AGG dimensions, that will only be one per 
cell. So the agg dimension itself is an attribute, it has a value. In some 
cases it happens to be APP_ID, but in other cases it is something else.
That doesn't matter to the coprocessor.
In FlowScanner. getAggregationCompactionDimension it doesn't have to be cast to 
String does it? Couldn't it simply be a byte[] with whatever content it is?

In nextInternal there is no need to do getAggregationCompactionDimension unless 
it is a sum (or SUM_FINAL). I see MIN and MAX and SUM handled, but SUM_FINAL is 
ignored. Here SUM_FINAL should be treated as SUM.
addedCnt is increased only for regular cells. It should be increased once per 
collapsed cell as well I think in the case when we haven't seen this.
The behavior should probably differ between a read and a flush/compact.

I can't quite see the coprocessor handle this difference correctly yet. I think 
it should be part of the constructor. For reads we should return exactly one 
value: the final sum. For flushes we need to return the previous sum, plus one 
individual value for each aggregation dimension (modulo ones that can be 
compacted).

In processMetricCell you see the problem that numbers should be stored through 
Bytes.toBytes(long l) instead of GenericObjectMapper. That is where the tricky 
part comes in with mixing longs and doubles.

Also general comment: check your spacing. There are places where tabs are used 
instead of spaces.

Overall it looks really good, but I need to walk through the store and read 
cases with you and see how the flush/compact scanning works compared to the 
read. I think we're close, but let's review that together.




> Populate flow run data in the flow_run & flow activity tables
> -------------------------------------------------------------
>
>                 Key: YARN-3901
>                 URL: https://issues.apache.org/jira/browse/YARN-3901
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: timelineserver
>            Reporter: Vrushali C
>            Assignee: Vrushali C
>         Attachments: YARN-3901-YARN-2928.1.patch, 
> YARN-3901-YARN-2928.2.patch, YARN-3901-YARN-2928.3.patch, 
> YARN-3901-YARN-2928.WIP.2.patch, YARN-3901-YARN-2928.WIP.patch
>
>
> As per the schema proposed in YARN-3815 in 
> https://issues.apache.org/jira/secure/attachment/12743391/hbase-schema-proposal-for-aggregation.pdf
> filing jira to track creation and population of data in the flow run table. 
> Some points that are being  considered:
> - Stores per flow run information aggregated across applications, flow version
> RM’s collector writes to on app creation and app completion
> - Per App collector writes to it for metric updates at a slower frequency 
> than the metric updates to application table
> primary key: cluster ! user ! flow ! flow run id
> - Only the latest version of flow-level aggregated metrics will be kept, even 
> if the entity and application level keep a timeseries.
> - The running_apps column will be incremented on app creation, and 
> decremented on app completion.
> - For min_start_time the RM writer will simply write a value with the tag for 
> the applicationId. A coprocessor will return the min value of all written 
> values. - 
> - Upon flush and compactions, the min value between all the cells of this 
> column will be written to the cell without any tag (empty tag) and all the 
> other cells will be discarded.
> - Ditto for the max_end_time, but then the max will be kept.
> - Tags are represented as #type:value. The type can be not set (0), or can 
> indicate running (1) or complete (2). In those cases (for metrics) only 
> complete app metrics are collapsed on compaction.
> - The m! values are aggregated (summed) upon read. Only when applications are 
> completed (indicated by tag type 2) can the values be collapsed.
> - The application ids that have completed and been aggregated into the flow 
> numbers are retained in a separate column for historical tracking: we don’t 
> want to re-aggregate for those upon replay
> 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to