Hi Sherwin,For 1) All the events emitted should be part of same tx id.. You can 
print the tx_ids to verify that.
2) You can ensure that by using partitionBy operation where all the tuples for 
same tracking number will go to the same bolt and then you can ensure that 
property.
I will suggest that you run with debug flags on and follow the tuples.
-Nikhil

    On Sunday, July 17, 2016 5:18 PM, Sherwin Pinto <[email protected]> wrote:
 

 Hi All,
Any help with this would be much appreciated
Thanks
Sherwin

On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <[email protected]> wrote:
Hi ,
I would like to use Trident to process files and needed some validation/advice 
as to whether I am implementing this correct.
Project Background: I need to process tracking information from carriers like 
UPS, USPS etc. The data is contained in different file formats like CSV, EDI 
etc. 1. The files are downloaded from FTP servers by apache Camel and put on to 
S3, Camel also puts a message on Kafka (file details and location)2. Next I 
have a Trident Topology with an OpaqueKafkaSpout that reads the file messages 
from Kafka, followed by a function that processes the file.3. Next I need to 
collect records with the same tracking # , for now using Redis with tracking # 
as the key and a JSON structure representing multiple scan events. The scan 
data for a given tracking # can be contained in the same file or spread over 
multiple files over multiple days
My topology looks something like this
Stream stream=topology.newStream("tracking-file-metadata", 
FileNameSpout.fileNameSpout(zkHosts,topicName))         
.parallelismHint(1)//parallelism should be number of partitions of topic        
.each(new Fields("str"),new S3Reader(), new Fields("tracking_num", "json", 
"record_type”)).stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", 
"json", "record_type”), new RedisQueryFunction(), new 
Fields("redis_key1","transformed_json1")).partitionPersist(redisStateFactory, 
new Fields("redis_key1", "transformed_json1"), new 
RedisStateUpdater(redisStoreMapper).withExpire(86400000), 
newFields("outputKey", "outputValue"));                .parallelismHint(3)
a couple of Questions
1. The function S3Reader(), reads an input stream and parses the files,  one 
record at a time, memory foot print is kept low since the entire file is not 
read into memory. My question is that when the S3Function emits, will it emit 
all the records in the file as a single batch ?. Since in the next step I need 
to first query Redis to check if the tracking number exists, if it does need to 
not append to the son blob, if not exists need to create a new JSON blob. The 
redis query function takes a group of keys
2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts 
running in parallel will there be a race condition between different bolts 
querying redis (for the same tracking #) ? or will the trident api ensure that 
various components running in parallel will query redis, in order, so that 
there are no inconsistent reads/writes 
Thanks Sherwin



  

Reply via email to