Hello Eric, As Mohammad mentioned, we are going to have a release soon with a similar pub-sub mechanism incorporated, via Apache HCatalog integration. HCatalog is a metadata server that publishes events when data partitions are available, and Oozie server maintains the state of which partitions are available and which are yet not, to wait and trigger the workflow.
But I am interested in learning more about the 'complex' events and the stateless subscribers you mentioned. If I understand correctly, does a complex event mean having a sort of nested event structure, where even if data is partially available I.e. Some intermediate state reached, the workflow should start? Oozie state machine will not do that, unless there is an agreement with the producer of the data as to what comprises a 'data available' notification. HCatalog publishes a notification only when data partitions are finished being atomically written to. Also, can you elaborate a bit more on the stateless subscriber? Thanks, Mona On 5/13/13 3:48 PM, "Mohammad Islam" <[email protected]> wrote: > > >Hi Eric, >Thanks for your feedback. > >Oozie has similar (not the same) feature utilizing HCatalog and JMS >message. This feature is about to be released. > >https://issues.apache.org/jira/browse/OOZIE-561 > > > >Regards, >Mohammad > > >________________________________ > From: Eric Sammer <[email protected]> >To: lance <[email protected]> >Cc: [email protected]; Mohammad Islam <[email protected]>; >"[email protected]" <[email protected]> >Sent: Monday, May 13, 2013 3:27 PM >Subject: Re: Gaps in dataset prevents dependency > > > >Generally, I'm pretty convinced watching a single directory doesn't fly >for complex workflow initiation. Instead, pubsub messaging has always >worked as a better solution for me. In other words, decouple the bit that >controls the workflow state machine from the physical bits doing the work >by publishing / subscribing to events. The events themselves are >low-frequency, high value; Flume rolled a file, an hdfs dfs -put >completed, a directory was created, Sqoop completed. Each of these events >are published and a system (i.e. Oozie) relies on these events to step >through the state machine. Then, have a separate process that is a >dedicated schedule that simply emits pulses as events. Extending this >system is easy because you simply have other processes generate (or >consume) events. This is how most of the systems I've built in the past >worked, and it works exceedingly well. > >Bonus round: One can now handle complex events by specifying a stateful >listener. I built something this at Experian when I worked there and we >did stuff like: > >onEvent(Context c, Event e) { > int filesWaiting = c.get("state.file-arrived-count") > if (filesWaiting >= 3) { > // generate a complex event (the result of stateful stuff) > } else if ... >} > >We had a DSL that greatly simplified this, but you get the idea. The >entire system was just message passing. The "workflow engine" was simply >the state machine tracking with subscribers that do the hard work. >Subscribers can also be stateless and form the basis of scale out, >distributed workers (that can run on systems other than Hadoop). One >subscriber would be Oozie's MR job firing, for example. I've been hard >pressed to find an architecture that works better for distributed >coordination and workflow state. > >Anyway, my almost entirely unsolicited $0.02USD (subject to inflation and >exchange rates). > > > >On Thu, May 9, 2013 at 11:55 AM, lance <[email protected]> wrote: > >Makes sense - just seems like a common use case. I saw another thread >here: >>http://mail-archives.apache.org/mod_mbox/oozie-user/201304.mbox/%3CCA%2B9 >>7RN0Q7NH0iO7-7ntFM1ZYUP4OpgdO6udiB2C7Nsp5OjnHEQ%40mail.gmail.com%3E >> >> >>Same issue - >> >> >>Seems like there should be native support for this use case? >> >>On May 9, 2013, at 11:39 AM, Mona Chitnis <[email protected]> wrote: >> >>Hi Lance, >>> >>>Your use-case is not natively supported in Oozie. Some users with a >>>similar architecture, have formed a solution of a "dropbox" directory >>>where all or some of the incoming data is dumped, and then Oozie >>>coordinator checks for this dropbox as input-event. >>> >>>-- >>>Mona >>> >>> >>>On 5/9/13 9:29 AM, "lance" <[email protected]> wrote: >>> >>> >>>Thanks for the ideas Paul >>>> >>>>Thanks for the ideas Paul. I'll try some more experiments with just >>>>using >>>>the coordinator trigger. In the mean time I'm also going to see what >>>>else >>>>can be done with the non-existent paths. I think writing a java or >>>>script >>>>action to take care of this is very doable, but also frustrating given >>>>the use case. i.e. seems like oozie should natively embrace this use >>>>case >>>>- keep thinking I'm missing something here. >>>> >>>> >>>> >>>>On May 8, 2013, at 5:58 PM, Paul Chavez >>>><[email protected]> >>>>wrote: >>>> >>>> >>>>I think you can implement what you want already by not using >>>>>output-events and just having a coordinator trigger on it's configured >>>>>frequency. >>>>> >>>>>I have a coordinator that uses an output dataset and output event to >>>>>resolve a specific instance. The use case here is a nightly sqoop pull >>>>>from our OLTP DB but it applies here as well, you need a path and it >>>>>(may) not exist. Define an output dataset with the same frequency as >>>>>your coordinator, and pass the path in via an output event. Your >>>>>workflow will have to gracefully handle the non-existence of the path >>>>>in >>>>>this case, but at least your coordinator will always run on it's >>>>>interval. >>>>> >>>>> >>>>><dataset name="SqoopOut" frequency="${coord:days(1)}" >>>>>initial-instance="2012-12-01T07:55Z" timezone="America/Los_Angeles"> >>>>> >>>>><uri-template>${nameNode}/SqoopOutput/Data${YEAR}${MONTH}${DAY}</uri-t >>>>>emplate> >>>>></dataset> >>>>></datasets> >>>>><output-events> >>>>><data-out name="SqoopOutPath" dataset="SqoopOut"> >>>>><instance>${coord:current(-1)}</instance> >>>>></data-out> >>>>></output-events> >>>>> >>>>>Later in the workflow node: >>>>><property> >>>>><name>SqoopPath</name> >>>>><value>${coord:dataOut('SqoopOutPath')}</value> >>>>></property> >>>>> >>>>>I also use flume and am working on a post-processing coordinator for >>>>>various log flows. However in my case we do not expect any gaps in the >>>>>data and want to catch cases where the data has backed up due to an >>>>>upstream agent failure. In this case I put a dependency on the >>>>>current(0) instance for the input-event so that the coordinator waits >>>>>until the previous interval has finished writing. The actual workflow >>>>>only takes a formatted date string to use in a Hive query and resolves >>>>>from coord:nominalTime(-1), giving me the previous instance date. >>>>>However this relies on flume delivering events in the order that they >>>>>were received to guarantee no data is missed, an assumption that may >>>>>not >>>>>hold depending on your architecture. >>>>> >>>>>Hope that helps, >>>>>Paul Chavez >>>>> >>>>> >>>>> >>>>>-----Original Message----- >>>>>From: lance [mailto:[email protected]] >>>>>Sent: Wednesday, May 08, 2013 5:19 PM >>>>>To: [email protected]; Mohammad Islam >>>>>Subject: Re: Gaps in dataset prevents dependency >>>>> >>>>> >>>>>I think the generalization is to separate the trigger and the data >>>>>set. >>>>> >>>>>I.e. Trigger can be just how it is today - data files, exitence of >>>>>directories, but, the dataset could be defined separately as all data >>>>>within a range (or however it wants to be defined). In this case there >>>>>is not strict fulfillment, just any data that currently exists that >>>>>in a >>>>>range for instance. >>>>> >>>>> >>>>>As far as missing data producer creating a 0 length file - this is >>>>>flume - so I would think that this would be a pretty standard use >>>>>case. >>>>>Even the example in CDH using twitter days is using flume to pull in >>>>>tweets to an oozie workflow - yet says that they had to do a hack in >>>>>order to trigger because there was no good way in flume. Would like >>>>>to >>>>>see a better way of doing this. >>>>> >>>>>I think that I have a typical use case where logs are getting >>>>>collected, and batch loader jobs want to be executed fairly frequently >>>>>(say every 5 minutes). However, a hiccup in the network, or slow times >>>>>at night, might leave gaps - or even at startup time there may be logs >>>>>that start somewhere between the cutoffs. >>>>> >>>>> >>>>> >>>>> >>>>>On May 8, 2013, at 5:09 PM, Mohammad Islam <[email protected]> wrote: >>>>> >>>>> >>>>>Hi, >>>>>>Oozie currently supports data-trigger where ALL dependent data-sets >>>>>>are available. There is no way of specifying that if M out of N are >>>>>>available. >>>>>> >>>>>>This could be a new feature. In that case, can you define a >>>>>>generalized way of defining this? >>>>>> >>>>>>As a possible work around, missing data producer could create a file >>>>>>with length 0. >>>>>> >>>>>>Regards, >>>>>>Mohammad >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>________________________________ >>>>>>From: lance <[email protected]> >>>>>>To: [email protected] >>>>>>Sent: Wednesday, May 8, 2013 4:20 PM >>>>>>Subject: Gaps in dataset prevents dependency >>>>>> >>>>>> >>>>>>Would like a more verbose way to define the input-event - and the >>>>>>data-in. >>>>>> >>>>>>I'm trying to find ways to handle when a start/end instance isn't >>>>>>satisfied, event thought there is data to be processed. >>>>>>An example of this is when I'm parsing a set of 24 hours of logs, and >>>>>>there may be an hour at night that doesn't have anything produced. >>>>>>This >>>>>>use case is exacerbated when we are talking minutes and doing hourly >>>>>>rollups - but same scenario. >>>>>> >>>>>>Here is the example config: >>>>>> >>>>>>The coordinator runs every 5 minutes: >>>>>><coordinator-app name="cef-workflow-coordinator" >>>>>>frequency="${coord:minutes(5)}" start="${jobStart}" end="${jobEnd}" >>>>>> >>>>>>In this case the input dataset is produced in minutes: >>>>>> <dataset name="logs" frequency="${coord:minutes(1)}" >>>>>> initial-instance="${initialDataset}" >>>>>>timezone="America/Los_Angeles"> >>>>>> >>>>>><uri-template>${rootPath}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</ >>>>>>uri >>>>>>-template> >>>>>> <done-flag></done-flag> >>>>>> </dataset> >>>>>> >>>>>>Using the example from the twitter/flume CDH example, the actual >>>>>>indicator that the job should be executed is when a new directory is >>>>>>created that is in the next set of data: >>>>>> <input-events> >>>>>> <data-in name="input" dataset="logs"> >>>>>> >>>>>><start-instance>${coord:current((coord:tzOffset())-5)}</start-instanc >>>>>>e> >>>>>> >>>>>><end-instance>${coord:current(coord:tzOffset())}</end-instance> >>>>>> <!-- <instance>${coord:current(coord:tzOffset())}</instance> >>>>>>--> >>>>>> </data-in> >>>>>> <data-in name="readyIndicator" dataset="logs"> >>>>>> <instance>${coord:current(1 + >>>>>>(coord:tzOffset()))}</instance> >>>>>> </data-in> >>>>>> </input-events> >>>>>> >>>>>>Would like it to be that a directory got created that is in the >>>>>>future >>>>>>from this dataset (the trigger), and then take whatever is available >>>>>>in >>>>>>the last 5 datasets (minutes). >>>>>> >>>>>>Instead, if there is a gap in the previous 5 minutes (say no logs >>>>>>came >>>>>>in during minute t-3) then the dependency is never fulfilled. >>>>>> >>>>>> >>>>>>Thanks for the help >>>>>> >>>>> >>>> >>> >> > > >-- >Eric Sammer >twitter: esammer >data: www.cloudera.com
