FWIW, here is an example for how this could be handled in a 
MorphlineInterceptor:

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    
    commands : [
      {
        tryRules {
          catchExceptions: true
          rules : [
            # first rule
            {
              commands : [
                # save initial state
                { setValues { _tmp : "@{_attachment_body}" } }
                
                # if JSON parsing succeeds replace _attachment_body with JSON 
jackson object
                { readJson {} } 
                
                # if we reach here the JSON parsing has succeeded
                # restore state prior to readJson command
                { setValues { _attachment_body : "@{_tmp}" } }
                { setValues { _tmp : [] } }
                { setValues { _attachment_mimetype : [] } }
              ]
            }
            
            # second rule is executed if the previous rule failed or threw an 
exception
            {
              commands : [
                { logDebug { format : "Marking event as malformed for 
downstream sink: {}", args: ["@{}"] } }
                { addValues { malformed : true } }
              ]
            }
            
          ]
        }
      }                                
    ]
  }
]

Also see 
http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#tryRules

Wolfgang.

On Jan 3, 2014, at 2:03 AM, ed wrote:

> Thank you Brock, Devin and Jimmy for the great information.  Dumping null 
> values in the the EventSerializer write method looks really easy to do but I 
> think using the custom interceptor to validate then tag the event for proper 
> good/bad routing sounds like a great idea and seems to fit into the Flume way 
> of doing things better.
> 
> Thank you again!
> 
> ~Ed
> 
> 
> On Fri, Jan 3, 2014 at 2:40 AM, Devin Suiter RDX <[email protected]> wrote:
> Yes, the regex interceptors and selectors can be very powerful - 
> experimenting with them was really exciting.
> 
> Brock, thanks for validating the ML idea - as with most things, the simplest 
> solution is probably the way to go, and in this use case, the morphlines 
> might be overkill.
> 
> Devin Suiter
> Jr. Data Solutions Software Engineer
> 
> 100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212
> Google Voice: 412-256-8556 | www.rdx.com
> 
> 
> On Thu, Jan 2, 2014 at 12:27 PM, Brock Noland <[email protected]> wrote:
> Jimmy, great to hear that method is working for you!
> 
> Devin, regarding the morphlines question. Since ML can have arbitrary java 
> plugins it *can* do just about anything. I generally think of ML as the T in 
> ETL. Doing the validation in ML might make sense. In general though I think 
> adding the custom header field as probably the best option for dealing with 
> bad data.
> 
> Once again, thank you everyone for using our software!
> 
> 
> On Thu, Jan 2, 2014 at 10:10 AM, Jimmy <[email protected]> wrote:
> We are doing similar thing what Brock mentioned - simple interceptor for JSON 
> validation with updating custom field in the header, then flume HDFS sink 
> pushes the data to good/bad target directory based on this custom field.... 
> then watch for bad directory in separate process. 
> 
> You could add notification to the flume flow, we wanted to keep it very 
> simple.
> 
> 
> 
> 
> ---------- Forwarded message ----------
> From: Devin Suiter RDX <[email protected]>
> Date: Thu, Jan 2, 2014 at 7:40 AM
> Subject: Re: Handling malformed data when using custom AvroEventSerializer 
> and HDFS Sink
> To: [email protected]
> 
> 
> Just throwing this out there, since I haven't had time to dig into the API 
> with a big fork, but, can morphlines offer any assistance here? 
> 
> Some kind of an interceptor that would parse for malformed data, package the 
> offending data and send it somewhere (email it, log it), and then project a 
> valid "there was something wrong here" piece of data into the field then 
> allow your channel to carry on? Or skip the projection piece and just move 
> along? I was just thinking that the projection of known data into a field 
> that previously had malformed data would allow you to easily locate those 
> records later with the projected data, but keep your data shape consistent.
> 
> Kind of looking to Brock as a sounding board as to the appropriateness of 
> this as a potential solution since morphlines takes some time to really 
> understand well...
> 
> Devin Suiter
> Jr. Data Solutions Software Engineer
> 
> 100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212
> Google Voice: 412-256-8556 | www.rdx.com
> 
> 
> On Thu, Jan 2, 2014 at 10:25 AM, Brock Noland <[email protected]> wrote:
> 
> On Tue, Dec 31, 2013 at 8:34 PM, ed <[email protected]> wrote:
> Hello,
> 
> We are using Flume v1.4 to load JSON formatted log data into HDFS as Avro.  
> Our flume setup looks like this:
> 
> NXLog ==> (FlumeHTTPSource -> HDFSSink w/ custom EventSerializer)
> 
> Right now our custom EventSerializer (which extends 
> AbstractAvroEventSerializer) takes the JSON input from the HTTPSource and 
> converts it into an avro record of the appropriate type for the incoming log 
> file.  This is working great and we use the serializer to add some additional 
> "synthetic" fields to the avro record that don't exist in the original JSON 
> log data.
> 
> My question concerns how to handle malformed JSON data (or really any error 
> inside of the custom EventSerializer).  It's very likely that as we parse the 
> JSON there will be records where something is malformed (either the JSON 
> itself, or a field is of the wrong type etc.).
> 
> For example, a "port" field which should always be an Integer might for some 
> reason have some ASCII text in it.  I'd like to catch these errors in the 
> EventSerializer and then write out the bad JSON to a log file somewhere that 
> we can monitor.
> 
> Yeah it would be nice to have a better story about this in Flume.
>  
> 
> What is the best way to do this?  
> 
> Typically people will either log it to a file or send it through another 
> "flow" to a different HDFS sink.
> 
>  
> Right now, all the logic for catching bad JSON would be inside of the 
> "convert" function of the EventSerializer.  Should the convert function 
> itself throw an exception that will be gracefully handled upstream
> 
> The exception will be logged but that is it..
>  
> or do I just return a "null" value if there was an error?  Would it be 
> appropriate to log errors directly to a database from inside the 
> EventSerializer convert method or would this be too slow?  
> 
> That might be too slow to do directly. If I did that I'd have a separate 
> thread doing that and then an in-memory queue between the serializer and 
> thread.
>  
> What are the best practices for this type of error handling?
> 
> If looks to me like we'd need to change AbstractAvroEventSerilizer to filter 
> out nulls:
> 
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/AbstractAvroEventSerializer.java#L106
> 
> which we could easily do.  Since you don't want to wait for that you could 
> override the write method to do this.
>  
> 
> Thank you for any assistance!
> 
> Best Regards,
> 
> Ed
> 
> 
> 
> -- 
> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
> 
> 
> 
> 
> -- 
> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
> 
> 

Reply via email to