Hi guys,

We are sending JSON events from our pipeline into a flume http source. We have written a custom multiplexer and sink serializer. The events are being routed into the correct channels and consumed OK by the sinks. The custom serializer takes a JSON event and outputs a csv. Files are being written to s3 ( using s3n as hdfs ) but rather than appending the written csv file, each event seems to be generating it own csv. The output is what I would expect using rollCount 1, however we do occasionally get several events ( maybe 4 ) written per csv. Please see below for config.

Ideally we want to use rollInterval of 24 hours, to generate a new .csv file every 24 hours, but have events pretty quickly flushed to the csv file after being sent. So one csv' per day that is consistently appended with whatever events we throw in. We found however that with a rollInterval of 24 hours the events weren't being flushed often enough...

Any help would be hugely appreciated!

Thanks.


Josh


## Sources ###################################################
agent.sources = http
agent.sources.http.type = http
agent.sources.http.bind = 0.0.0.0
agent.sources.http.port = 4444
agent.sources.http.channels = cappucino_s3_aggregate_profile_channel default_s3_channel cappucino_s3_trip_summary_channel

## Interceptors #################################################
agent.sources.http.interceptors = itime ihost
agent.sources.http.interceptors.itime.type = timestamp
agent.sources.http.interceptors.ihost.type = host
agent.sources.http.interceptors.ihost.useIP = false
agent.sources.http.interceptors.ihost.preserveExisting= false
agent.sources.http.interceptors.ihost.hostHeader = hostname

## Multiplex Channels Mapping ######################################
agent.sources.http.selector.type = com.mydrivesolutions.flume.serialization.PipelineEventsChannelSelector
agent.sources.http.selector.default = default_s3_channel

## Channels ########################################################
agent.channels = cappucino_s3_aggregate_profile_channel cappucino_s3_trip_summary_channel default_s3_channel

agent.channels.cappucino_s3_aggregate_profile_channel.type = file
agent.channels.cappucino_s3_aggregate_profile_channel.capacity = 10000000
agent.channels.cappucino_s3_aggregate_profile_channel.checkpointDir = /mnt/flume/cappucino_s3_aggregate_profile/checkpoint agent.channels.cappucino_s3_aggregate_profile_channel.dataDirs = /mnt/flume/cappucino_s3_aggregate_profile/data

agent.channels.cappucino_s3_trip_summary_channel.type = file
agent.channels.cappucino_s3_trip_summary_channel.capacity = 10000000
agent.channels.cappucino_s3_trip_summary_channel.checkpointDir = /mnt/flume/cappucino_s3_trip_summary/checkpoint agent.channels.cappucino_s3_trip_summary_channel.dataDirs = /mnt/flume/cappucino_s3_trip_summary/data

## Sinks ###########################################################
agent.sinks = cappucino_s3_aggregate_profile_sink1 cappucino_s3_aggregate_profile_sink2 cappucino_s3_trip_summary_sink1 cappucino_s3_trip_summary_sink2

## Serialize json events from the pipeline and write csv to HDFS (We are using s3 native FS as HDFS)
###############################################################################
## Capuccino_s3_aggregate_profile Sinks #################################################
agent.sinks.cappucino_s3_aggregate_profile_sink1.type = hdfs
agent.sinks.cappucino_s3_aggregate_profile_sink1.channel = cappucino_s3_aggregate_profile_channel agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.path = s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileType = DataStream
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.writeFormat = Text
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.columns = log_type reporting_bucket subscription_id agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.format = DriverProfile
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.delimiter = ,
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.appendNewline = false agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.distanceMeasure = MILES
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.maxOpenFiles = 5000
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.rollInterval = 20400
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.callTimeout = 60000
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileSuffix = .csv
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.inUseSuffix = .tmp
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.timeZone = UTC

agent.sinks.cappucino_s3_aggregate_profile_sink2.type = hdfs
agent.sinks.cappucino_s3_aggregate_profile_sink2.channel = cappucino_s3_aggregate_profile_channel agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.path = s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileType = DataStream
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.writeFormat = Text
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.columns = log_type reporting_bucket subscription_id agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.format = DriverProfile
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.delimiter = ,
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.appendNewline = false agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.distanceMeasure = MILES
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.maxOpenFiles = 5000
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.rollInterval = 20400
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.callTimeout = 60000
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileSuffix = .csv
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.inUseSuffix = .tmp
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.timeZone = UTC


## Confused_s3_trip_summary Sinks #################################################
agent.sinks.cappucino_s3_trip_summary_sink1.type = hdfs
agent.sinks.cappucino_s3_trip_summary_sink1.channel = cappucino_s3_trip_summary_channel agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.path = s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileType = DataStream
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.writeFormat = Text
agent.sinks.cappucino_s3_trip_summary_sink1.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder agent.sinks.cappucino_s3_trip_summary_sink1.serializer.columns = log_type reporting_bucket subscription_id
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.format = BodyCSV
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.delimiter = ,
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.appendNewline = false
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.distanceMeasure = MILES
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.maxOpenFiles = 5000
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.rollInterval = 20400
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.callTimeout = 60000
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileSuffix = .csv
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.inUseSuffix = .tmp
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.filePrefix = TripSummary.%y-%m-%d.%H.%M
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.timeZone = UTC

agent.sinks.cappucino_s3_trip_summary_sink2.type = hdfs
agent.sinks.cappucino_s3_trip_summary_sink2.channel = cappucino_s3_trip_summary_channel agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.path = s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileType = DataStream
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.writeFormat = Text
agent.sinks.cappucino_s3_trip_summary_sink2.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder agent.sinks.cappucino_s3_trip_summary_sink2.serializer.columns = log_type reporting_bucket subscription_id
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.format = BodyCSV
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.delimiter = ,
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.appendNewline = false
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.distanceMeasure = MILES
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.maxOpenFiles = 5000
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.rollInterval = 20400
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.callTimeout = 60000
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileSuffix = .csv
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.inUseSuffix = .tmp
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.filePrefix = TripSummary.%y-%m-%d.%H.%M
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.timeZone = UTC

## SinkGroups ###########################################################
agent.sinkgroups = cappucino_s3_aggregate_profile_sinkgroup cappucino_s3_trip_summary_sinkgroup

## Confused_s3_aggregate_profile Failover SinkGroup ########################################## agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.sinks = cappucino_s3_aggregate_profile_sink1 cappucino_s3_aggregate_profile_sink2 agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.type = failover agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink1 = 10 agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink2 = 5 agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.maxpenalty = 30000

## Confused_s3_trip_summary Failover SinkGroup ########################################## agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.sinks = cappucino_s3_trip_summary_sink1 cappucino_s3_trip_summary_sink2 agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.type = failover agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink1 = 10 agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink2 = 5 agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.maxpenalty = 30000

--
www.mydrivesolutions.com

This email and any attachments is private and confidential. If you have received this message in error please remove it from your systems and notify the author. MyDrive Solutions Limited is registered in England and Wales, No 07330334. Registered office: Surrey Technology Centre, 40 Occam Road, Guildford GU2 7YG, UK

Reply via email to