Hi Nikhil
Thanks for your response
I’m sorry I replied on wrong thread, I have an updated question. I made some
progress since then, I currently have 2 topologies. the first one is pretty
straight forward as follows. It basically reads a file from S3, transforms to a
general schema and writes to Kafka
TridentState kafkaState=topology.newStream("tracking-file-processor1",
FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be
number of partitions of topic
.parallelismHint(1)
.each(new Fields("str"),new
S3FedexCSVReader(), new Fields(“record_id", "json", "record_type"))
.shuffle()
.partitionPersist(stateFactory, new
Fields("record_id", "json", "record_type"), new TridentKafkaUpdater(), new
Fields())
Kafka contains a message with the meta data of file on S3 mainly bucket info
etc. KafkaSpout will read the messages and pass them on the S3Reader.
My main concern in this is
1. The files are of varying sizes and could be close to 500Mb, the S3Reader
function will stream the file from S3, read 1 record of the file at time, emit
one record , trident will batch them before doing the partitionPersist, so
basically the entire file would be in memory ? While processing multiple files
(messageltiple kafka partitions) the memory requirement will increase ? Do i
just parallelize and spread spout instances over multiple workers (i.e. if I
have 2 kafka partitions, the spout will have 2 threads, can I split this over 2
workers) or is there a better way ?
2. This also means that the batch being written to kafka can vary in size and
maybe quite large, is this acceptable ?
3. If i do need to write to a data source other than kafka, such as a regular
db (most likely will be kafka but just want to gain some more knowledge) what
would be the best way to do this ?
My second topology reads from Kafka, queries Redis which holds state (i.e.
history of a particular tracking#), creates a summary stores in Redis and then
writes back to kafka. It looks something like
TridentState stream=topology.newStream("tracking-file-metadata1",
FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be
number of partitions of topic
.parallelismHint(1)
.shuffle()
.each(new Fields("str"),new
RecordTransformer(), new Fields("tracking_num", "json",
"record_type","carrier_id"))
.stateQuery(trackingNoToShippingInfo, new
Fields("tracking_num"), new RedisQueryFunction(), new
Fields("redis_existing_cal”))
.partitionBy(redis_existing_cal)
.each(new Fields("tracking_num",
"redis_existing_cal","json"), new RedisKeyValMerge(), new
Fields("redis_key2","val","currentJson1"))
.partitionPersist(redisStateFactory, new
Fields("redis_key2", "val", "currentJson1"), new
RedisStateUpdater(redisStoreMapper).withExpire(86400000), new
Fields("redis_key2","currentJson2"))
.newValuesStream()
.partitionPersist(stateFactory, new
Fields("redis_key2", "currentJson2"), new TridentKafkaUpdater(), new Fields());
My main concern here is if I get 2 records of the same tracking # in the same
batch. The state query will return the existing value for the given tracking#,
RedisKeyValMerge function will then create a history for the tracking# which is
stored in Redis, the issue is that if 2 records with the same tracking # appear
in the batch then the update to redis will not be correct since one will
override the other.
Can I use partitionAggregate in some way ? When I aggregate over the same
tracking #. For example have a map with key tracking# and value as tracking
history. In this way If 2 records with the same tracking # appear in the same
batch then I can build a proper history object .
Is this possible, if yes then can you point me to some partitionAggregate
example which uses a map to aggregate ?
Any help with this would be much appreciated
Thanks in advance
Sherwin
> On Jul 17, 2016, at 7:56 PM, Nikhil Singh <[email protected]> wrote:
>
> Hi Sherwin,
> For 1) All the events emitted should be part of same tx id.. You can print
> the tx_ids to verify that.
>
> 2) You can ensure that by using partitionBy operation where all the tuples
> for same tracking number will go to the same bolt and then you can ensure
> that property.
>
> I will suggest that you run with debug flags on and follow the tuples.
>
> -Nikhil
>
>
> On Sunday, July 17, 2016 5:18 PM, Sherwin Pinto <[email protected]> wrote:
>
>
> Hi All,
>
> Any help with this would be much appreciated
>
> Thanks
>
> Sherwin
>> On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <[email protected]> wrote:
>>
>> Hi ,
>>
>> I would like to use Trident to process files and needed some
>> validation/advice as to whether I am implementing this correct.
>>
>> Project Background: I need to process tracking information from carriers
>> like UPS, USPS etc. The data is contained in different file formats like
>> CSV, EDI etc.
>> 1. The files are downloaded from FTP servers by apache Camel and put on to
>> S3, Camel also puts a message on Kafka (file details and location)
>> 2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the
>> file messages from Kafka, followed by a function that processes the file.
>> 3. Next I need to collect records with the same tracking # , for now using
>> Redis with tracking # as the key and a JSON structure representing multiple
>> scan events. The scan data for a given tracking # can be contained in the
>> same file or spread over multiple files over multiple days
>>
>> My topology looks something like this
>>
>> Stream stream=topology.newStream("tracking-file-metadata",
>> FileNameSpout.fileNameSpout(zkHosts,topicName))
>> .parallelismHint(1)//parallelism should be number of partitions of
>> topic
>> .each(new Fields("str"),new S3Reader(), new Fields("tracking_num",
>> "json", "record_type”))
>> .stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", "json",
>> "record_type”), new RedisQueryFunction(), new
>> Fields("redis_key1","transformed_json1"))
>> .partitionPersist(redisStateFactory, new Fields("redis_key1",
>> "transformed_json1"), new
>> RedisStateUpdater(redisStoreMapper).withExpire(86400000),
>> newFields("outputKey", "outputValue"));
>> .parallelismHint(3)
>>
>> a couple of Questions
>>
>> 1. The function S3Reader(), reads an input stream and parses the files, one
>> record at a time, memory foot print is kept low since the entire file is not
>> read into memory. My question is that when the S3Function emits, will it
>> emit all the records in the file as a single batch ?. Since in the next step
>> I need to first query Redis to check if the tracking number exists, if it
>> does need to not append to the son blob, if not exists need to create a new
>> JSON blob. The redis query function takes a group of keys
>>
>> 2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts
>> running in parallel will there be a race condition between different bolts
>> querying redis (for the same tracking #) ? or will the trident api ensure
>> that various components running in parallel will query redis, in order, so
>> that there are no inconsistent reads/writes
>>
>> Thanks
>> Sherwin
>
>
>