Hi Sherwin,For 1) All the events emitted should be part of same tx id.. You can
print the tx_ids to verify that.
2) You can ensure that by using partitionBy operation where all the tuples for
same tracking number will go to the same bolt and then you can ensure that
property.
I will suggest that you run with debug flags on and follow the tuples.
-Nikhil
On Sunday, July 17, 2016 5:18 PM, Sherwin Pinto <[email protected]> wrote:
Hi All,
Any help with this would be much appreciated
Thanks
Sherwin
On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <[email protected]> wrote:
Hi ,
I would like to use Trident to process files and needed some validation/advice
as to whether I am implementing this correct.
Project Background: I need to process tracking information from carriers like
UPS, USPS etc. The data is contained in different file formats like CSV, EDI
etc. 1. The files are downloaded from FTP servers by apache Camel and put on to
S3, Camel also puts a message on Kafka (file details and location)2. Next I
have a Trident Topology with an OpaqueKafkaSpout that reads the file messages
from Kafka, followed by a function that processes the file.3. Next I need to
collect records with the same tracking # , for now using Redis with tracking #
as the key and a JSON structure representing multiple scan events. The scan
data for a given tracking # can be contained in the same file or spread over
multiple files over multiple days
My topology looks something like this
Stream stream=topology.newStream("tracking-file-metadata",
FileNameSpout.fileNameSpout(zkHosts,topicName))
.parallelismHint(1)//parallelism should be number of partitions of topic
.each(new Fields("str"),new S3Reader(), new Fields("tracking_num", "json",
"record_type”)).stateQuery(trackingNoToShippingInfo, new Fields("tracking_num",
"json", "record_type”), new RedisQueryFunction(), new
Fields("redis_key1","transformed_json1")).partitionPersist(redisStateFactory,
new Fields("redis_key1", "transformed_json1"), new
RedisStateUpdater(redisStoreMapper).withExpire(86400000),
newFields("outputKey", "outputValue")); .parallelismHint(3)
a couple of Questions
1. The function S3Reader(), reads an input stream and parses the files, one
record at a time, memory foot print is kept low since the entire file is not
read into memory. My question is that when the S3Function emits, will it emit
all the records in the file as a single batch ?. Since in the next step I need
to first query Redis to check if the tracking number exists, if it does need to
not append to the son blob, if not exists need to create a new JSON blob. The
redis query function takes a group of keys
2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts
running in parallel will there be a race condition between different bolts
querying redis (for the same tracking #) ? or will the trident api ensure that
various components running in parallel will query redis, in order, so that
there are no inconsistent reads/writes
Thanks Sherwin