I think you can implement what you want already by not using output-events and
just having a coordinator trigger on it's configured frequency.
I have a coordinator that uses an output dataset and output event to resolve a
specific instance. The use case here is a nightly sqoop pull from our OLTP DB
but it applies here as well, you need a path and it (may) not exist. Define an
output dataset with the same frequency as your coordinator, and pass the path
in via an output event. Your workflow will have to gracefully handle the
non-existence of the path in this case, but at least your coordinator will
always run on it's interval.
<dataset name="SqoopOut" frequency="${coord:days(1)}"
initial-instance="2012-12-01T07:55Z"
timezone="America/Los_Angeles">
<uri-template>${nameNode}/SqoopOutput/Data${YEAR}${MONTH}${DAY}</uri-template>
</dataset>
</datasets>
<output-events>
<data-out name="SqoopOutPath" dataset="SqoopOut">
<instance>${coord:current(-1)}</instance>
</data-out>
</output-events>
Later in the workflow node:
<property>
<name>SqoopPath</name>
<value>${coord:dataOut('SqoopOutPath')}</value>
</property>
I also use flume and am working on a post-processing coordinator for various
log flows. However in my case we do not expect any gaps in the data and want to
catch cases where the data has backed up due to an upstream agent failure. In
this case I put a dependency on the current(0) instance for the input-event so
that the coordinator waits until the previous interval has finished writing.
The actual workflow only takes a formatted date string to use in a Hive query
and resolves from coord:nominalTime(-1), giving me the previous instance date.
However this relies on flume delivering events in the order that they were
received to guarantee no data is missed, an assumption that may not hold
depending on your architecture.
Hope that helps,
Paul Chavez
-----Original Message-----
From: lance [mailto:[email protected]]
Sent: Wednesday, May 08, 2013 5:19 PM
To: [email protected]; Mohammad Islam
Subject: Re: Gaps in dataset prevents dependency
I think the generalization is to separate the trigger and the data set.
I.e. Trigger can be just how it is today - data files, exitence of directories,
but, the dataset could be defined separately as all data within a range (or
however it wants to be defined). In this case there is not strict fulfillment,
just any data that currently exists that in a range for instance.
As far as missing data producer creating a 0 length file - this is flume - so I
would think that this would be a pretty standard use case. Even the example in
CDH using twitter days is using flume to pull in tweets to an oozie workflow -
yet says that they had to do a hack in order to trigger because there was no
good way in flume. Would like to see a better way of doing this.
I think that I have a typical use case where logs are getting collected, and
batch loader jobs want to be executed fairly frequently (say every 5 minutes).
However, a hiccup in the network, or slow times at night, might leave gaps - or
even at startup time there may be logs that start somewhere between the cutoffs.
On May 8, 2013, at 5:09 PM, Mohammad Islam <[email protected]> wrote:
> Hi,
> Oozie currently supports data-trigger where ALL dependent data-sets are
> available. There is no way of specifying that if M out of N are available.
>
> This could be a new feature. In that case, can you define a generalized way
> of defining this?
>
> As a possible work around, missing data producer could create a file with
> length 0.
>
> Regards,
> Mohammad
>
>
>
>
>
>
>
> ________________________________
> From: lance <[email protected]>
> To: [email protected]
> Sent: Wednesday, May 8, 2013 4:20 PM
> Subject: Gaps in dataset prevents dependency
>
>
> Would like a more verbose way to define the input-event - and the data-in.
>
> I'm trying to find ways to handle when a start/end instance isn't satisfied,
> event thought there is data to be processed.
> An example of this is when I'm parsing a set of 24 hours of logs, and there
> may be an hour at night that doesn't have anything produced. This use case is
> exacerbated when we are talking minutes and doing hourly rollups - but same
> scenario.
>
> Here is the example config:
>
> The coordinator runs every 5 minutes:
> <coordinator-app name="cef-workflow-coordinator"
> frequency="${coord:minutes(5)}" start="${jobStart}" end="${jobEnd}"
>
> In this case the input dataset is produced in minutes:
> <dataset name="logs" frequency="${coord:minutes(1)}"
> initial-instance="${initialDataset}"
> timezone="America/Los_Angeles">
>
> <uri-template>${rootPath}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
> <done-flag></done-flag>
> </dataset>
>
> Using the example from the twitter/flume CDH example, the actual indicator
> that the job should be executed is when a new directory is created that is in
> the next set of data:
> <input-events>
> <data-in name="input" dataset="logs">
>
> <start-instance>${coord:current((coord:tzOffset())-5)}</start-instance>
> <end-instance>${coord:current(coord:tzOffset())}</end-instance>
> <!-- <instance>${coord:current(coord:tzOffset())}</instance> -->
> </data-in>
> <data-in name="readyIndicator" dataset="logs">
> <instance>${coord:current(1 + (coord:tzOffset()))}</instance>
> </data-in>
> </input-events>
>
> Would like it to be that a directory got created that is in the future from
> this dataset (the trigger), and then take whatever is available in the last 5
> datasets (minutes).
>
> Instead, if there is a gap in the previous 5 minutes (say no logs came in
> during minute t-3) then the dependency is never fulfilled.
>
>
> Thanks for the help