[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15447840#comment-15447840
 ] 

Chandni Singh edited comment on APEXMALHAR-2130 at 8/30/16 3:04 AM:
--------------------------------------------------------------------

Note: The main change in ManagedState which is required here is that 
timeBuckets (Window time in your example) is now computed outside ManagedState. 
TimeBuckets were being computed by TimeBucketAssigner within ManagedState but 
now it will be provided to it.

>>>>
Since event time is arbitrary, unlike processing time, the actual key 
representing the timebucket cannot be assumed a natural sequence. However, 
TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that 
is sequential starting from 0. We want to make the actual timebucket key based 
on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this 
does NOT assume that events are received in order. Based on event time, this 
method creates timebucket. In your use case, the time bucket is computed 
outside ManagedState so there are 2 ways to approach it:
 - create a special TimeBucketAssigner which will just return the input Window 
for the event. It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If 
it is null, then the time argument is used as timebucket save in Bucket.

>>>
Expiring and purging are done very differently and should be based on 3. 
Managed State should determine whether to purge a timebucket based on whether 
an Apex window is committed and whether all event windows that belong to that 
timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change 
is needed here. TimeBucketAssigner computes a timeBucket (in your case, this 
corresponds to Window time) and checks if the oldest buckets need to be purged 
(line 132 - 133). It figures out the lowest purgeable timebucket. In the 
endWindow, it informs IncrementalCheckpointManager, that it can delete all the 
timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager 
deletes the data up to that timebucket only when the window in which it was 
request to be purged gets committed. So this will remain the same for you as 
well.

I think this can also by achieved by creating a special TimeBucketAssigner and 
overriding a few methods.



was (Author: csingh):
Note: The main change in ManagedState which is required here is that 
timeBuckets (Window time in your example) is now computed outside ManagedState. 
TimeBuckets were being computed by TimeBucketAssigner within ManagedState but 
now it will be provided to it.

Since event time is arbitrary, unlike processing time, the actual key 
representing the timebucket cannot be assumed a natural sequence. However, 
TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that 
is sequential starting from 0. We want to make the actual timebucket key based 
on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this 
does NOT assume that events are received in order. Based on event time, this 
method creates timebucket. In your use case, the time bucket is computed 
outside ManagedState so there are 2 ways to approach it:
 - create a special TimeBucketAssigner which will just return the input Window 
for the event. It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If 
it is null, then the time argument is used as timebucket save in Bucket.

Expiring and purging are done very differently and should be based on 3. 
Managed State should determine whether to purge a timebucket based on whether 
an Apex window is committed and whether all event windows that belong to that 
timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change 
is needed here. TimeBucketAssigner computes a timeBucket (in your case, this 
corresponds to Window time) and checks if the oldest buckets need to be purged 
(line 132 - 133). It figures out the lowest purgeable timebucket. In the 
endWindow, it informs IncrementalCheckpointManager, that it can delete all the 
timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager 
deletes the data up to that timebucket only when the window in which it was 
request to be purged gets committed. So this will remain the same for you as 
well.

I think this can also by achieved by creating a special TimeBucketAssigner and 
overriding a few methods.


> implement scalable windowed storage
> -----------------------------------
>
>                 Key: APEXMALHAR-2130
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: bright chen
>            Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id.  This should be done incrementally (ManagedState) to 
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged 
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing, recovering and committing of a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to