I attempt to use integrate morphline with flume. And I am able to use logger
sink to print data to the console. However, when switching to use file_roller
sink. The output folder only contains new line (empty line). How can I get
transformed data written to the file_roller sink?
Thanks.
Flume agent conf file:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /path/to/source
a1.sources.r1.interceptors = m
a1.sources.r1.interceptors.m.type =
org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.r1.interceptors.m.morphlineFile = /path/to/conf/morphline.conf
a1.sources.r1.interceptors.m.morphlineId = morphline1
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /path/to/output
a1.sinks.k1.sink.rollInterval = 3
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Morphline conf file:
morphlines : [{
id : morphline1
importCommands : ["org.kitesdk.**" ]
commands : [
{
readCSV {
separator : "\t"
columns :
[userID,movieID,rating,date_day,date_month,date_year,date_hour,date_minute,date_second]
ignoreFirstLine : false
trim : true
charset : UTF-8
}
}
{
java {
imports : "import java.util.*;"
code: """
record.removeAll("date_hour");
record.removeAll("date_minute");
record.removeAll("date_second");
return child.process(record);
"""
}
}
{ logInfo { format : "(after columns are removed) record: {}", args : ["@{}"] }
}
{
setValues {
_attachment_body : "@{bodybak}"
bodybak : []
_attachment_mimetype : []
}
}
]
}]