Hi Nikhil 

Thanks for your response

I’m sorry I replied on wrong thread, I have an updated question. I made some 
progress since then, I currently have 2 topologies. the first one is pretty 
straight forward as follows. It basically reads a file from S3, transforms to a 
general schema and writes to Kafka

TridentState kafkaState=topology.newStream("tracking-file-processor1", 
FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be 
number of partitions of topic
                                  .parallelismHint(1)
                                  .each(new Fields("str"),new 
S3FedexCSVReader(), new Fields(“record_id", "json", "record_type"))
                                  .shuffle()
                                  .partitionPersist(stateFactory, new 
Fields("record_id", "json", "record_type"), new TridentKafkaUpdater(), new 
Fields())

Kafka contains a message with the meta data of file on S3 mainly bucket info 
etc. KafkaSpout will read the messages and pass them on the S3Reader.

My main concern in this is 

1. The files are of varying sizes and could be close to 500Mb, the S3Reader 
function will stream the file from S3, read 1 record of the file at time, emit 
one record , trident will batch them before doing the partitionPersist, so 
basically the entire file would be in memory ? While processing multiple files 
(messageltiple kafka partitions) the memory requirement will increase ? Do i 
just parallelize and spread spout instances over multiple workers (i.e. if I 
have 2 kafka partitions, the spout will have 2 threads, can I split this over 2 
workers)  or is there a better way ?
2. This also means that the batch being written to kafka can vary in size and 
maybe quite large, is this acceptable ?
3. If i do need to write to a data source other than kafka, such as a regular 
db (most likely will be kafka but just want to gain some more knowledge) what 
would be the best way to do this ?

My second topology reads from Kafka, queries Redis which holds state (i.e. 
history of a particular tracking#), creates a summary stores in Redis and then 
writes back to kafka. It looks something like

TridentState stream=topology.newStream("tracking-file-metadata1", 
FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be 
number of partitions of topic
                        .parallelismHint(1)
                        .shuffle()
                                  .each(new Fields("str"),new 
RecordTransformer(), new Fields("tracking_num", "json", 
"record_type","carrier_id"))
                                .stateQuery(trackingNoToShippingInfo, new 
Fields("tracking_num"), new RedisQueryFunction(), new 
Fields("redis_existing_cal”))
                                .partitionBy(redis_existing_cal)
                                .each(new Fields("tracking_num", 
"redis_existing_cal","json"), new RedisKeyValMerge(), new 
Fields("redis_key2","val","currentJson1"))
                                .partitionPersist(redisStateFactory, new 
Fields("redis_key2", "val", "currentJson1"), new 
RedisStateUpdater(redisStoreMapper).withExpire(86400000), new 
Fields("redis_key2","currentJson2"))
                                .newValuesStream()
                                .partitionPersist(stateFactory, new 
Fields("redis_key2", "currentJson2"), new TridentKafkaUpdater(), new Fields());

My main concern here is if I get 2 records of the same tracking # in the same 
batch. The state query will return the existing value for the given tracking#, 
RedisKeyValMerge function will then create a history for the tracking# which is 
stored in Redis, the issue is that if 2 records with the same tracking # appear 
in the batch then the update to redis will not be correct since one will 
override the other. 

Can I use partitionAggregate in some way ? When I aggregate over the same 
tracking #. For example have a map with key tracking#  and value as tracking 
history. In this way If 2 records with the same tracking # appear in the same 
batch then I can build a proper history object .

Is this possible, if yes then can you point me to some partitionAggregate 
example which uses a map to aggregate ?

Any help with this would be much appreciated

Thanks in advance 

Sherwin





> On Jul 17, 2016, at 7:56 PM, Nikhil Singh <[email protected]> wrote:
> 
> Hi Sherwin,
> For 1) All the events emitted should be part of same tx id.. You can print 
> the tx_ids to verify that.
> 
> 2) You can ensure that by using partitionBy operation where all the tuples 
> for same tracking number will go to the same bolt and then you can ensure 
> that property.
> 
> I will suggest that you run with debug flags on and follow the tuples.
> 
> -Nikhil
> 
> 
> On Sunday, July 17, 2016 5:18 PM, Sherwin Pinto <[email protected]> wrote:
> 
> 
> Hi All,
> 
> Any help with this would be much appreciated
> 
> Thanks
> 
> Sherwin
>> On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <[email protected]> wrote:
>> 
>> Hi ,
>> 
>> I would like to use Trident to process files and needed some 
>> validation/advice as to whether I am implementing this correct.
>> 
>> Project Background: I need to process tracking information from carriers 
>> like UPS, USPS etc. The data is contained in different file formats like 
>> CSV, EDI etc. 
>> 1. The files are downloaded from FTP servers by apache Camel and put on to 
>> S3, Camel also puts a message on Kafka (file details and location)
>> 2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the 
>> file messages from Kafka, followed by a function that processes the file.
>> 3. Next I need to collect records with the same tracking # , for now using 
>> Redis with tracking # as the key and a JSON structure representing multiple 
>> scan events. The scan data for a given tracking # can be contained in the 
>> same file or spread over multiple files over multiple days
>> 
>> My topology looks something like this
>> 
>> Stream stream=topology.newStream("tracking-file-metadata", 
>> FileNameSpout.fileNameSpout(zkHosts,topicName)) 
>>         .parallelismHint(1)//parallelism should be number of partitions of 
>> topic
>>         .each(new Fields("str"),new S3Reader(), new Fields("tracking_num", 
>> "json", "record_type”))
>> .stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", "json", 
>> "record_type”), new RedisQueryFunction(), new 
>> Fields("redis_key1","transformed_json1"))
>> .partitionPersist(redisStateFactory, new Fields("redis_key1", 
>> "transformed_json1"), new 
>> RedisStateUpdater(redisStoreMapper).withExpire(86400000), 
>> newFields("outputKey", "outputValue"));
>>                 .parallelismHint(3)
>> 
>> a couple of Questions
>> 
>> 1. The function S3Reader(), reads an input stream and parses the files,  one 
>> record at a time, memory foot print is kept low since the entire file is not 
>> read into memory. My question is that when the S3Function emits, will it 
>> emit all the records in the file as a single batch ?. Since in the next step 
>> I need to first query Redis to check if the tracking number exists, if it 
>> does need to not append to the son blob, if not exists need to create a new 
>> JSON blob. The redis query function takes a group of keys
>> 
>> 2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts 
>> running in parallel will there be a race condition between different bolts 
>> querying redis (for the same tracking #) ? or will the trident api ensure 
>> that various components running in parallel will query redis, in order, so 
>> that there are no inconsistent reads/writes 
>> 
>> Thanks 
>> Sherwin
> 
> 
> 

Reply via email to