Hi,
I am using a Trident topology to process files, transform them from CSV, EDI,
XML to a general JSON format. I have a working prototype, but wanted to make
sure I am implementing this correctly. Here is the flow
1. Read message from kafka, this message is meta data of the file location on S3
2. Next a function/bolt streams and transform the file from S3, emitting
records one at a time.
3. Final step is a partitionPersist to kafka
Here’s the topology
TridentState kafkaState=topology.newStream("tracking-file-processor",
FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be
number of partitions of topic
.parallelismHint(1)
.each(new Fields("str"),new S3Reader(), new
Fields("tracking_num", "json", "record_type"))
.shuffle()
.partitionPersist(stateFactory, new
Fields("tracking_num", "json", "record_type"), new TridentKafkaUpdater(), new
Fields())
// .parallelismHint(10)
;
Questions
1. Is this the correct approach ?
2. The files are of varying sizes and could be close to 500Mb, the S3Reader
function will emit one record (of the file) at a time, trident will batch them
before doing the partitionPersist, so basically the entire file would be in
memory ? While processing multiple files the memory requirement will increase ?
Do i just parallelize and spread partitions over multiple workers or is there
a better way ?
3. This also means that the batch being written to kafka can vary in size and
maybe quite large, is this acceptable ?
4. If i do need to write to a data source other than kafka, such as a regular
db (most likely will be kafka but just want to gain some more knowledge) what
would be the best way to do this ?
Thanks in advance
Sherwin