Hi All, Any help with this would be much appreciated
Thanks Sherwin > On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <[email protected]> wrote: > > Hi , > > > > I would like to use Trident to process files and needed some > validation/advice as to whether I am implementing this correct. > > > > Project Background: I need to process tracking information from carriers like > UPS, USPS etc. The data is contained in different file formats like CSV, EDI > etc. > > 1. The files are downloaded from FTP servers by apache Camel and put on to > S3, Camel also puts a message on Kafka (file details and location) > > 2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the > file messages from Kafka, followed by a function that processes the file. > > 3. Next I need to collect records with the same tracking # , for now using > Redis with tracking # as the key and a JSON structure representing multiple > scan events. The scan data for a given tracking # can be contained in the > same file or spread over multiple files over multiple days > > > > My topology looks something like this > > > > Stream stream=topology.newStream("tracking-file-metadata", > FileNameSpout.fileNameSpout(zkHosts,topicName)) > > .parallelismHint(1)//parallelism should be number of partitions of > topic > > .each(new Fields("str"),new S3Reader(), new Fields("tracking_num", > "json", "record_type”)) > > .stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", "json", > "record_type”), new RedisQueryFunction(), new > Fields("redis_key1","transformed_json1")) > > .partitionPersist(redisStateFactory, new Fields("redis_key1", > "transformed_json1"), new > RedisStateUpdater(redisStoreMapper).withExpire(86400000), > newFields("outputKey", "outputValue")); > > .parallelismHint(3) > > > > a couple of Questions > > > > 1. The function S3Reader(), reads an input stream and parses the files, one > record at a time, memory foot print is kept low since the entire file is not > read into memory. My question is that when the S3Function emits, will it emit > all the records in the file as a single batch ?. Since in the next step I > need to first query Redis to check if the tracking number exists, if it does > need to not append to the son blob, if not exists need to create a new JSON > blob. The redis query function takes a group of keys > > > > 2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts > running in parallel will there be a race condition between different bolts > querying redis (for the same tracking #) ? or will the trident api ensure > that various components running in parallel will query redis, in order, so > that there are no inconsistent reads/writes > > > > Thanks > > Sherwin >
