Hi Lance, Your use-case is not natively supported in Oozie. Some users with a similar architecture, have formed a solution of a "dropbox" directory where all or some of the incoming data is dumped, and then Oozie coordinator checks for this dropbox as input-event.
-- Mona On 5/9/13 9:29 AM, "lance" <[email protected]> wrote: >Thanks for the ideas Paul > >Thanks for the ideas Paul. I'll try some more experiments with just using >the coordinator trigger. In the mean time I'm also going to see what else >can be done with the non-existent paths. I think writing a java or script >action to take care of this is very doable, but also frustrating given >the use case. i.e. seems like oozie should natively embrace this use case >- keep thinking I'm missing something here. > > > >On May 8, 2013, at 5:58 PM, Paul Chavez <[email protected]> >wrote: > >> I think you can implement what you want already by not using >>output-events and just having a coordinator trigger on it's configured >>frequency. >> >> I have a coordinator that uses an output dataset and output event to >>resolve a specific instance. The use case here is a nightly sqoop pull >>from our OLTP DB but it applies here as well, you need a path and it >>(may) not exist. Define an output dataset with the same frequency as >>your coordinator, and pass the path in via an output event. Your >>workflow will have to gracefully handle the non-existence of the path in >>this case, but at least your coordinator will always run on it's >>interval. >> >> >> <dataset name="SqoopOut" frequency="${coord:days(1)}" >> initial-instance="2012-12-01T07:55Z" >> timezone="America/Los_Angeles"> >> >> >> <uri-template>${nameNode}/SqoopOutput/Data${YEAR}${MONTH}${DAY}</uri-t >>emplate> >> </dataset> >> </datasets> >> <output-events> >> <data-out name="SqoopOutPath" dataset="SqoopOut"> >> <instance>${coord:current(-1)}</instance> >> </data-out> >> </output-events> >> >> Later in the workflow node: >> <property> >> <name>SqoopPath</name> >> <value>${coord:dataOut('SqoopOutPath')}</value> >> </property> >> >> I also use flume and am working on a post-processing coordinator for >>various log flows. However in my case we do not expect any gaps in the >>data and want to catch cases where the data has backed up due to an >>upstream agent failure. In this case I put a dependency on the >>current(0) instance for the input-event so that the coordinator waits >>until the previous interval has finished writing. The actual workflow >>only takes a formatted date string to use in a Hive query and resolves >>from coord:nominalTime(-1), giving me the previous instance date. >>However this relies on flume delivering events in the order that they >>were received to guarantee no data is missed, an assumption that may not >>hold depending on your architecture. >> >> Hope that helps, >> Paul Chavez >> >> >> >> -----Original Message----- >> From: lance [mailto:[email protected]] >> Sent: Wednesday, May 08, 2013 5:19 PM >> To: [email protected]; Mohammad Islam >> Subject: Re: Gaps in dataset prevents dependency >> >> >> I think the generalization is to separate the trigger and the data set. >> >> I.e. Trigger can be just how it is today - data files, exitence of >>directories, but, the dataset could be defined separately as all data >>within a range (or however it wants to be defined). In this case there >>is not strict fulfillment, just any data that currently exists that in a >>range for instance. >> >> >> As far as missing data producer creating a 0 length file - this is >>flume - so I would think that this would be a pretty standard use case. >>Even the example in CDH using twitter days is using flume to pull in >>tweets to an oozie workflow - yet says that they had to do a hack in >>order to trigger because there was no good way in flume. Would like to >>see a better way of doing this. >> >> I think that I have a typical use case where logs are getting >>collected, and batch loader jobs want to be executed fairly frequently >>(say every 5 minutes). However, a hiccup in the network, or slow times >>at night, might leave gaps - or even at startup time there may be logs >>that start somewhere between the cutoffs. >> >> >> >> >> On May 8, 2013, at 5:09 PM, Mohammad Islam <[email protected]> wrote: >> >>> Hi, >>> Oozie currently supports data-trigger where ALL dependent data-sets >>>are available. There is no way of specifying that if M out of N are >>>available. >>> >>> This could be a new feature. In that case, can you define a >>>generalized way of defining this? >>> >>> As a possible work around, missing data producer could create a file >>>with length 0. >>> >>> Regards, >>> Mohammad >>> >>> >>> >>> >>> >>> >>> >>> ________________________________ >>> From: lance <[email protected]> >>> To: [email protected] >>> Sent: Wednesday, May 8, 2013 4:20 PM >>> Subject: Gaps in dataset prevents dependency >>> >>> >>> Would like a more verbose way to define the input-event - and the >>>data-in. >>> >>> I'm trying to find ways to handle when a start/end instance isn't >>>satisfied, event thought there is data to be processed. >>> An example of this is when I'm parsing a set of 24 hours of logs, and >>>there may be an hour at night that doesn't have anything produced. This >>>use case is exacerbated when we are talking minutes and doing hourly >>>rollups - but same scenario. >>> >>> Here is the example config: >>> >>> The coordinator runs every 5 minutes: >>> <coordinator-app name="cef-workflow-coordinator" >>>frequency="${coord:minutes(5)}" start="${jobStart}" end="${jobEnd}" >>> >>> In this case the input dataset is produced in minutes: >>> <dataset name="logs" frequency="${coord:minutes(1)}" >>> initial-instance="${initialDataset}" >>>timezone="America/Los_Angeles"> >>> >>><uri-template>${rootPath}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri >>>-template> >>> <done-flag></done-flag> >>> </dataset> >>> >>> Using the example from the twitter/flume CDH example, the actual >>>indicator that the job should be executed is when a new directory is >>>created that is in the next set of data: >>> <input-events> >>> <data-in name="input" dataset="logs"> >>> >>><start-instance>${coord:current((coord:tzOffset())-5)}</start-instance> >>> >>><end-instance>${coord:current(coord:tzOffset())}</end-instance> >>> <!-- <instance>${coord:current(coord:tzOffset())}</instance> >>>--> >>> </data-in> >>> <data-in name="readyIndicator" dataset="logs"> >>> <instance>${coord:current(1 + >>>(coord:tzOffset()))}</instance> >>> </data-in> >>> </input-events> >>> >>> Would like it to be that a directory got created that is in the future >>>from this dataset (the trigger), and then take whatever is available in >>>the last 5 datasets (minutes). >>> >>> Instead, if there is a gap in the previous 5 minutes (say no logs came >>>in during minute t-3) then the dependency is never fulfilled. >>> >>> >>> Thanks for the help >> >
