Hi ,
I would like to use Trident to process files and needed some
validation/advice as to whether I am implementing this correct.
Project Background: I need to process tracking information from carriers
like UPS, USPS etc. The data is contained in different file formats like
CSV, EDI etc.
1. The files are downloaded from FTP servers by apache Camel and put on to
S3, Camel also puts a message on Kafka (file details and location)
2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the
file messages from Kafka, followed by a function that processes the file.
3. Next I need to collect records with the same tracking # , for now using
Redis with tracking # as the key and a JSON structure representing multiple
scan events. The scan data for a given tracking # can be contained in the
same file or spread over multiple files over multiple days
My topology looks something like this
Stream stream=topology.newStream("tracking-file-metadata",
FileNameSpout.fileNameSpout(zkHosts,topicName))
.parallelismHint(1)//parallelism should be number of partitions of
topic
.each(new Fields("str"),new S3Reader(), new Fields("tracking_num",
"json", "record_type”))
.stateQuery(trackingNoToShippingInfo, new Fields("tracking_num",
"json", "record_type”), new RedisQueryFunction(), new
Fields("redis_key1","transformed_json1"))
.partitionPersist(redisStateFactory, new Fields("redis_key1",
"transformed_json1"), new
RedisStateUpdater(redisStoreMapper).withExpire(86400000),
newFields("outputKey", "outputValue"));
.parallelismHint(3)
a couple of Questions
1. The function S3Reader(), reads an input stream and parses the files,
one record at a time, memory foot print is kept low since the entire file
is not read into memory. My question is that when the S3Function emits,
will it emit all the records in the file as a single batch ?. Since in the
next step I need to first query Redis to check if the tracking number
exists, if it does need to not append to the son blob, if not exists need
to create a new JSON blob. The redis query function takes a group of keys
2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts
running in parallel will there be a race condition between different bolts
querying redis (for the same tracking #) ? or will the trident api ensure
that various components running in parallel will query redis, in order, so
that there are no inconsistent reads/writes
Thanks