I view them as separate pieces of functionality. The splitting of a grouping PType (Pair, Tuple) seems reusable in a number of contexts. When we support Unions (or Either) we could provide similar functionality to split PCollection<Union<T, U>> -> Pair<PCollection<T>, PCollection<U>>.
On Thu, Aug 22, 2013 at 5:33 PM, Josh Wills <[email protected]> wrote: > I'm +1 for the PCollection<Pair<T, U>> -> Pair<PCollection<T>, > PCollection<U>> approach outlined by Brandon and Chao. I think the only > question is whether or not we want to add in the Union<T, U> (or Either<T, > U>?) feature as part of doing that change. > > J > > > On Wed, Aug 21, 2013 at 9:19 AM, Inman,Brandon > <[email protected]>wrote: > >> This is close to how I had imagined the implementation to look. Very >> roughly- >> >> public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends >> DoFn<Pair<U, ?>, U> { >> >> @Override >> public void process(Pair<U, ?> input, Emitter<U> emitter) { >> final U first = input.first(); >> if (first != null) { >> emitter.emit(first); >> } >> } >> } >> } >> >> There would be a very similar DoFn for second() that I'll omit for >> brevity. I originally envisioned the utility method calling the DoFn that >> generated the pair, but I like the idea of a smaller utility. The utility >> method should be as simple as... >> >> public static <T, U> Pair<PCollection<T>,PCollection<U>> >> filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T> >> firstPType, final PType<U> secondPType) { >> >> final PCollection<T> stdout = collection.parallelDo(new >> FirstEmittingDoFn<T>, firstPType); >> final PCollection<U> stderr = collection.parallelDo(new >> SecondEmittingDoFn<U>, secondPType); >> >> >> return Pair.of(stdout,stderr); >> } >> >> >> Disclaimer; I didn't try to compile (all) this code, so treat as >> pseudocode. >> >> From: Josh Wills <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Tuesday, August 20, 2013 9:40 PM >> To: "[email protected]" <[email protected]> >> Subject: Re: Multiple output channels from Crunch DoFn >> >> >> That does sound pretty clean... >> >> >> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi >> <[email protected]> wrote: >> >> Is it possible to provide a utility that transforms PCollection<Pair<A, >> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs >> and then write them to two Targets. This could be generalized to Tuples. >> >> >> 2013/8/21 Josh Wills <[email protected]> >> >> >> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <[email protected] >> > >> wrote: >> >> I like the flexibility of this approach, although would the idea of having >> some official constants defined for a small set of standard channels be >> reasonable (the concepts of "out" and "error" are pretty common, others >> may be warranted as well)? >> >> >> >> >> >> >> So I think the way I would handle this would be having a main output >> directory and an error output directory that was underneath it. Cascading >> does this trick within their existing flows where you can throw exceptions >> to "traps," which is essentially the >> same idea, though I'm not wild about control flow that relies on throwing >> exceptions. >> >> >> >> Is this something that you would see being added to core Crunch APIs (for >> example, directly to Pipeline), or implemented on top of Crunch with a >> filtering approach similar to my original post? If it's implemented on >> top, shouldn't materialization work >> as-is? >> >> >> >> >> >> >> Yes, your model would be simpler. I think that mine would require a >> special kind of Target implementation, a custom implementation of the >> Emitter interface that would be used for routing the outputs of the DoFn, >> and possibly some post-processing code to >> move the data to a sensible place. I don't know if that work is strictly >> necessary, and your impl is certainly much more straightforward than mine. >> :) >> >> >> >> >> If the type was PTable<String, T>, could Union<S,U> be a choice for T as >> appropriate? In our case, we would likely be looking at a PTable<String, T >> extends SpecificRecordBase> and not necessarily need Union with this >> approach. >> >> >> >> >> >> >> Yeah, I think it would be fine, but we'd have to be cognizant of it when >> we were implementing the union type, and it would be up to the client to >> ensure that the right data type ended up in the right file, which is maybe >> less good? >> >> >> >> >> >> From: Josh Wills <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Tuesday, August 20, 2013 1:00 PM >> To: "[email protected]" <[email protected]> >> Subject: Re: Multiple output channels from Crunch DoFn >> >> >> A related idea that has come up a few times has been the idea of having a >> way of writing values to different files based on a key: some kind of >> generalization of Target that would itself write multiple outputs under >> the covers, with the name >> of the output file indicated by some function of the key of the PTable. >> >> For this situation, we would have a PTable that was like PTable<String, >> Union<S, T>>, or just PTable<String, T> if the output types were all the >> same, and the String would specify the name of an output directory (that I >> suppose would live underneath some base >> output directory for the Target) that the record would be written to. >> >> There are a couple of limitations to this approach, I think: we couldn't >> consider this kind of PTable "materialized" w/o doing an overhaul of the >> materialization logic-- it would act sort of like an HTableTarget in that >> it would be write-only in flows. >> There are probably some others I can't think of off the top of my head. >> What do you guys think? >> >> J >> >> >> >> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan >> <[email protected]> wrote: >> >> I happen to have some context around this, so I wanted to expand on >> Brandon's question a bit. The use case here is we're dealing with a large >> volume of third-party input and expect a certain percentage of bogus or >> malformed data. Rather than simply logging >> instances of bad records, we want to treat it as a signal we can learn >> from, both for improving our processing logic and for creating structured >> reports we can use to troubleshoot data sources. >> >> This leads to the "standard out" and "standard error" metaphors Brandon >> mentions: in most cases, our Crunch DoFns would emit a processed structure >> useful downstream. But we'd also like to be able to emit a structured >> error -- probably as an Avro object in our >> case -- and persist that as a byproduct of our main processing pipeline. >> >> Would it make sense for such DoFn's to emit something some form of >> "Option" object? We could then attach two consuming functions to it: one >> that handles the "success" case, sending the resulting Avro object >> downstream. Another DoFn attached to the "Option" >> object would be a no-op unless the Option contained an "error" structure, >> at which point we persist it to some well-known location for later >> analysis. >> >> I think this is entirely achievable using existing mechanisms...but it >> seems like common enough use case (at least for us) to establish some >> idioms for dealing it. >> >> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote: >> >> > >> > We've been looking at ways to do multiple outputs in Crunch jobs, >> > specifically writing out some kind of Status or Error Avro object, based >> > on failures that occur processing individual records in various jobs. It >> > had been suggested that, rather than logging these errors to traditional >> > loggers, to consider them an output of the Crunch job. After some >> > internal discussion, it was suggested to run the ideas past the Crunch >> > community. >> > >> > >> > A major goal we have is to end with all the error output in a location >> > that makes it easy to run Hive queries or perform other MapReduce-style >> > analysis to quickly view all errors across the larger system without the >> > need go to multiple facilities. This means standardizing on the Avro >> > object, but it also necessitates decoupling the storage of the object >> >from >> > the "standard output" of the job. >> > >> > >> > As Crunch DoFns support a single Emitter per invocation of process(), >> the >> > solution that gathered the most support would be to emit an object >> >similar >> > to Pair<>, where first would be the "standard out" and second would be >> >the >> > "standard error". A DoFn would generally only populate one (nothing >> > preventing it from populating both if appropriate, but not really >> >intended >> > as a part of general use), and separate DoFns would filter out the two >> > components of the pair and write the values to the appropriate targets. >> > >> > As far as the emitted pairing object; the concept of a tagged union was >> > suggested although there currently isn't support in Java or Avro for the >> > concept; it was noted that >> > >> https://issues.apache.org/jira/browse/CRUNCH-239 >> < >> https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/ >> >> browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjr >> >> SpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMaf >> c%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82> >> > >> might be a close >> > candidate. Pair<> would meet the requirements, although it was suggested >> > that a simple object dedicated to the task could make a cleaner >> approach. >> > >> > Any general thoughts on this approach? Are there any other patterns that >> > might serve us better, or anything on the Crunch roadmap that might be >> > more appropriate? >> > >> > >> > Brandon Inman >> > Software Architect >> > www.cerner.com <http://www.cerner.com> >> > >> > >> > CONFIDENTIALITY NOTICE This message and any included attachments are >> >from Cerner Corporation and are intended only for the addressee. The >> >information contained in this message is confidential and may constitute >> >inside or non-public information under international, >> federal, or state securities laws. Unauthorized forwarding, printing, >> copying, distribution, or use of such information is strictly prohibited >> and may be unlawful. If you are not the addressee, please promptly delete >> this message and notify the sender of the >> delivery error by e-mail or you may call Cerner's corporate offices in >> Kansas City, Missouri, U.S.A at >> >> (+1) (816)221-1024 <tel:%28%2B1%29%20%28816%29221-1024>. >> >> >> >> >> >> >> >> >> >> >> -- >> Director of Data Science >> Cloudera >> < >> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf >> >> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg% >> >> 3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f >> 7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1> >> > >> Twitter: >> @josh_wills >> < >> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k >> >> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH >> >> 7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e2 >> 90f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> Director of Data Science >> Cloudera >> < >> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf >> >> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg% >> >> 3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e4 >> 31966b19fec39773cae0b9319fc310155b4ab636cabd4799a<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e431966b19fec39773cae0b9319fc310155b4ab636cabd4799a> >> > >> Twitter: >> @josh_wills >> < >> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k >> >> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH >> >> 7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae6 >> 0caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae60caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6> >> > >> >> > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> >
