I think you can implement what you want already by not using output-events and 
just having a coordinator trigger on it's configured frequency.

I have a coordinator that uses an output dataset and output event to resolve a 
specific instance. The use case here is a nightly sqoop pull from our OLTP DB 
but it applies here as well, you need a path and it (may) not exist. Define an 
output dataset with the same frequency as your coordinator, and pass the path 
in via an output event. Your workflow will have to gracefully handle the 
non-existence of the path in this case, but at least your coordinator will 
always run on it's interval.


        <dataset name="SqoopOut" frequency="${coord:days(1)}"
                        initial-instance="2012-12-01T07:55Z" 
timezone="America/Los_Angeles">
                        
<uri-template>${nameNode}/SqoopOutput/Data${YEAR}${MONTH}${DAY}</uri-template>
                </dataset>
        </datasets>
        <output-events>
                <data-out name="SqoopOutPath" dataset="SqoopOut">
                        <instance>${coord:current(-1)}</instance>
                </data-out>
        </output-events>

Later in the workflow node:
        <property>
                <name>SqoopPath</name>
                <value>${coord:dataOut('SqoopOutPath')}</value>
        </property>

I also use flume and am working on a post-processing coordinator for various 
log flows. However in my case we do not expect any gaps in the data and want to 
catch cases where the data has backed up due to an upstream agent failure. In 
this case I put a dependency on the current(0) instance for the input-event so 
that the coordinator waits until the previous interval has finished writing. 
The actual workflow only takes a formatted date string to use in a Hive query 
and resolves from coord:nominalTime(-1), giving me the previous instance date. 
However this relies on flume delivering events in the order that they were 
received to guarantee no data is missed, an assumption that may not hold 
depending on your architecture.

Hope that helps,
Paul Chavez



-----Original Message-----
From: lance [mailto:[email protected]] 
Sent: Wednesday, May 08, 2013 5:19 PM
To: [email protected]; Mohammad Islam
Subject: Re: Gaps in dataset prevents dependency


I think the generalization is to separate the trigger and the data set.  
I.e. Trigger can be just how it is today - data files, exitence of directories, 
but, the dataset could be defined separately as all data within a range (or 
however it wants to be defined). In this case there is not strict fulfillment, 
just any data that currently exists that in a range for instance.


As far as missing data producer creating a 0 length file - this is flume - so I 
would think that this would be a pretty standard use case. Even the example in 
CDH using twitter days is using flume to pull in tweets to an oozie workflow - 
yet says that they had to do a hack in order to trigger because there was no 
good way in flume.  Would like to see a better way of doing this.   

I think that I have a typical use case where logs are getting collected, and 
batch loader jobs want to be executed fairly frequently (say every 5 minutes). 
However, a hiccup in the network, or slow times at night, might leave gaps - or 
even at startup time there may be logs that start somewhere between the cutoffs.




On May 8, 2013, at 5:09 PM, Mohammad Islam <[email protected]> wrote:

> Hi,
> Oozie currently supports data-trigger where ALL dependent data-sets are 
> available. There is no way of specifying that if M out of N are available.
> 
> This could be a new feature. In that case, can you define a generalized way 
> of defining this?
> 
> As a possible work around, missing data producer could create a file with 
> length 0.
> 
> Regards,
> Mohammad
> 
> 
> 
> 
>  
> 
> 
> ________________________________
> From: lance <[email protected]>
> To: [email protected]
> Sent: Wednesday, May 8, 2013 4:20 PM
> Subject: Gaps in dataset prevents dependency
> 
> 
> Would like a more verbose way to define the input-event - and the data-in.
> 
> I'm trying to find ways to handle when a start/end instance isn't satisfied, 
> event thought there is data to be processed.
> An example of this is when I'm parsing a set of 24 hours of logs, and there 
> may be an hour at night that doesn't have anything produced. This use case is 
> exacerbated when we are talking minutes and doing hourly rollups - but same 
> scenario. 
> 
> Here is the example config:
> 
> The coordinator runs every 5 minutes:
> <coordinator-app name="cef-workflow-coordinator" 
> frequency="${coord:minutes(5)}" start="${jobStart}" end="${jobEnd}"
> 
> In this case the input dataset is produced in minutes:
>         <dataset name="logs" frequency="${coord:minutes(1)}"
>             initial-instance="${initialDataset}" 
> timezone="America/Los_Angeles">
>             
> <uri-template>${rootPath}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
>             <done-flag></done-flag>
>         </dataset>
> 
> Using the example from the twitter/flume CDH example, the actual indicator 
> that the job should be executed is when a new directory is created that is in 
> the next set of data:  
>     <input-events>
>         <data-in name="input" dataset="logs">
>             
> <start-instance>${coord:current((coord:tzOffset())-5)}</start-instance>
>             <end-instance>${coord:current(coord:tzOffset())}</end-instance>
>         <!--    <instance>${coord:current(coord:tzOffset())}</instance> -->
>         </data-in>
>         <data-in name="readyIndicator" dataset="logs">
>             <instance>${coord:current(1 + (coord:tzOffset()))}</instance>
>         </data-in>
>     </input-events>
> 
> Would like it to be that a directory got created that is in the future from 
> this dataset (the trigger), and then take whatever is available in the last 5 
> datasets (minutes).  
> 
> Instead, if there is a gap in the previous 5 minutes (say no logs came in 
> during minute t-3) then the dependency is never fulfilled.
> 
> 
> Thanks for the help

Reply via email to