Hi Scott, I think it's not because of the morphlines. If the data appears properly on loggersink then it should be the same with any other sinks. I did some debugging on the file roll sink and apparently it rotates empty files as well: after the configured rollInterval it marks the current file to be rotated and when it tries to process the next event the file will be rotated even if there is nothing to write into it.
If you are interested in the details on code level: - here is the file rotation logic: https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java#L147 - this kick in even if there is no new event in the channel - this method (the process()) is called (through multiple levels) by the SinkRunner which sleeps for maximum 5 seconds if the sink returned BACKOFF (ie. there was no event to process): https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java#L148-L148 - the result is that you will have new (and apparently empty files) in every ~5 seconds IMHO this is a bug in the file roller sink, I'm going to file a JIRA ticket to get this fixed. Btw, the 3 seconds rollInterval seems a bit low for me (the default is 30 seconds), is it intentional? If yes, what do you want to achieve with it? Kind regards, Denes On Tue, Mar 7, 2017 at 9:59 AM Scott Handerson <[email protected]> wrote: > I attempt to use integrate morphline with flume. And I am able to use > logger sink to print data to the console. However, when switching to use > file_roller sink. The output folder only contains new line (empty line). > How can I get transformed data written to the file_roller sink? > > > Thanks. > > > Flume agent conf file: > > a1.sources = r1 > a1.sinks = k1 > a1.channels = c1 > > a1.sources.r1.type = spooldir > a1.sources.r1.spoolDir = /path/to/source > a1.sources.r1.interceptors = m > a1.sources.r1.interceptors.m.type = > org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder > a1.sources.r1.interceptors.m.morphlineFile = /path/to/conf/morphline.conf > a1.sources.r1.interceptors.m.morphlineId = morphline1 > > a1.sinks.k1.type = file_roll > a1.sinks.k1.sink.directory = /path/to/output > a1.sinks.k1.sink.rollInterval = 3 > > > a1.channels.c1.type = memory > a1.channels.c1.capacity = 100000 > > > a1.sources.r1.channels = c1 > a1.sinks.k1.channel = c1 > > > Morphline conf file: > > morphlines : [{ > id : morphline1 > importCommands : ["org.kitesdk.**" ] > commands : [ > { > readCSV { > separator : "\t" > columns : > [userID,movieID,rating,date_day,date_month,date_year,date_hour,date_minute,date_second] > ignoreFirstLine : false > trim : true > charset : UTF-8 > } > } > { > java { > imports : "import java.util.*;" > code: """ > record.removeAll("date_hour"); > record.removeAll("date_minute"); > record.removeAll("date_second"); > return child.process(record); > """ > } > } > { logInfo { format : "(after columns are removed) record: {}", args : > ["@{}"] } } > { > setValues { > _attachment_body : "@{bodybak}" > bodybak : [] > _attachment_mimetype : [] > } > } > ] > }] >
