Hi All,

Any help with this would be much appreciated

Thanks

Sherwin
> On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <[email protected]> wrote:
> 
> Hi ,
> 
> 
> 
> I would like to use Trident to process files and needed some 
> validation/advice as to whether I am implementing this correct.
> 
> 
> 
> Project Background: I need to process tracking information from carriers like 
> UPS, USPS etc. The data is contained in different file formats like CSV, EDI 
> etc. 
> 
> 1. The files are downloaded from FTP servers by apache Camel and put on to 
> S3, Camel also puts a message on Kafka (file details and location)
> 
> 2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the 
> file messages from Kafka, followed by a function that processes the file.
> 
> 3. Next I need to collect records with the same tracking # , for now using 
> Redis with tracking # as the key and a JSON structure representing multiple 
> scan events. The scan data for a given tracking # can be contained in the 
> same file or spread over multiple files over multiple days
> 
> 
> 
> My topology looks something like this
> 
> 
> 
> Stream stream=topology.newStream("tracking-file-metadata", 
> FileNameSpout.fileNameSpout(zkHosts,topicName)) 
> 
>         .parallelismHint(1)//parallelism should be number of partitions of 
> topic
> 
>         .each(new Fields("str"),new S3Reader(), new Fields("tracking_num", 
> "json", "record_type”))
> 
> .stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", "json", 
> "record_type”), new RedisQueryFunction(), new 
> Fields("redis_key1","transformed_json1"))
> 
> .partitionPersist(redisStateFactory, new Fields("redis_key1", 
> "transformed_json1"), new 
> RedisStateUpdater(redisStoreMapper).withExpire(86400000), 
> newFields("outputKey", "outputValue"));
> 
>                 .parallelismHint(3)
> 
> 
> 
> a couple of Questions
> 
> 
> 
> 1. The function S3Reader(), reads an input stream and parses the files,  one 
> record at a time, memory foot print is kept low since the entire file is not 
> read into memory. My question is that when the S3Function emits, will it emit 
> all the records in the file as a single batch ?. Since in the next step I 
> need to first query Redis to check if the tracking number exists, if it does 
> need to not append to the son blob, if not exists need to create a new JSON 
> blob. The redis query function takes a group of keys
> 
> 
> 
> 2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts 
> running in parallel will there be a race condition between different bolts 
> querying redis (for the same tracking #) ? or will the trident api ensure 
> that various components running in parallel will query redis, in order, so 
> that there are no inconsistent reads/writes 
> 
> 
> 
> Thanks 
> 
> Sherwin
> 

Reply via email to