The user defines how to convert key and value into byte[], so you can use any serialization mechanism you like (custom, Kryo, JSON, etc.).
Here is an example for setting up the the serializer: https://github.com/DataTorrent/examples/blob/master/tutorials/hdht/src/test/java/com/example/HDHTTestOperator.java You would replace KeyValPair<byte[], byte[]> with your complex type. Thanks, Thomas On Thu, Sep 1, 2016 at 11:14 AM, Jim <[email protected]> wrote: > Tushar, > > Great! My final question, I have been searching, and haven't found any > good examples of setting up a complex data type, and storing and retrieving > it from HDHT within an operator. My data that would want to store would > look like: > > Field Name Field Type > > OrderNumber integer (this is the key) > Status integer > 855Data string > 856Data string > 910Data string > > Any examples that you can point me to show me the best way to set up > accessing the HDHT? > > Thanks, > > Jim > > -----Original Message----- > From: Tushar Gosavi [mailto:[email protected]] > Sent: Thursday, September 1, 2016 1:06 PM > To: [email protected] > Subject: Re: HDHT question - looking for the datatorrent gurus! > > Hi Jim, > > Yes this is what I had in mind, The manage state needs to have separate > input for each of the 5 operators. The platform does not support connecting > multiple output port to a single input port, but you could achieve similar > effect using stream merge operator > (https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333 > 928154398e/library/src/main/java/com/datatorrent/lib/ > stream/StreamMerger.java) > > - Tushar. > > > On Thu, Sep 1, 2016 at 10:37 PM, Jim <[email protected]> wrote: > > Tushar, > > > > Funny that you described it that way, as that is exactly what I was > thinking about this morning. > > > > > > So the flow would be: > > > > > > > > Router Operator > > > > > > | > > > > > > Managed State Operator > > > > > | > > ------------------------------ > ------------------------------------------------------------ > --------------------------------------------- > > | > | > | | > > > > General Acknowledgement Detailed Acknowledgement > Ship Notification > Invoice > > > > | > | > | | > > ------------------------------ > ------------------------------------------------------------ > --------------------------------------------- > > > | > > ------------------------------ > ------------------------------------------------------------ > ------------------------------------------------ > > / each of the 4 operators at the end of > processing emits a record back to Managed State Operator / > > > > ---------------------------------------------------------------------- > > -------------------------------------------------------------------- > > > > > > In this scenario, would the managed state operator just have 1 input, > > that all the other operators emit to, or would it need to have separate > inputs for each of the 5 operators that would be emitting to it? > > > > This is what you were describing too, correct? > > > > Thanks, > > > > Jim > > > > -----Original Message----- > > From: Tushar Gosavi [mailto:[email protected]] > > Sent: Thursday, September 1, 2016 11:49 AM > > To: [email protected] > > Subject: Re: HDHT question - looking for the datatorrent gurus! > > > > Hi Jim, > > > > Currently HDHT is accessible only to single operator in a DAG. Single > HDHT store can not be managed by two different operator at a time which > could cause metadata corruption. Theoretically HDHT bucket could be read > from multiple operators, but only one writer is allowed. > > > > In your case a stage in transaction is processed completely by different > operator and then only next stage can start. It could still be achieved by > using a single operator which manages HDHT state, and having a loop in DAG > to send completed transaction ids to sequencer. > > > > - Sequence operator will emit transaction to transaction processing > operator. > > - If it receives an out of order transaction it will note it down in > HDHT. > > - The processing operator will send completed transaction id on a port > which is connected back to sequence operator. > > - On receiving data on this loopback port, sequence operator will update > HDHT and search for next transaction in order, which could be stored in > HDHT and will emit to next processing operator. > > > > - Tushar. > > > > > > On Sat, Aug 27, 2016 at 1:31 AM, Jim <[email protected]> wrote: > >> Good afternoon, > >> > >> > >> > >> I have an apex application where I may receive edi transactions, but > >> sometimes they arrive out of order and I want to hold any out of > >> sequence transactions till the correct time in the flow to process them. > >> > >> > >> > >> For example for a standard order, we will receive from the remote > vendor: > >> > >> > >> > >> 1.) General Acknowledgement > >> > >> 2.) Detailed Acknowledgement > >> > >> 3.) Ship Notification > >> > >> 4.) Invoice > >> > >> > >> > >> They are supposed to be sent and received in that order. > >> > >> > >> > >> However sometimes vendors systems have problems, etc. so they send > >> the all of these at the same time, and then we can receive them out of > sequence. > >> Data packets for these are very small, say from 1 to 512 bytes, and > >> the only time they will be out of sequence, we will receive them very > >> closely together. > >> > >> > >> > >> I am trying to think of the best way to do this in my datatorrent / > >> Hadoop / yarn facilities, instead of creating a datatable in > >> postgreSQl and using that. > >> > >> > >> > >> Can I create a flow that works like this (I am not sure if this makes > >> sense, or is the best way to solve my problem, while keeping state, > >> etc. maintained for all the operators): > >> > >> > >> > >> 1.) In the inbound transaction router, check the hdht store for the > order > >> number, if it doesn’t exist, this means it is a new order, if the > >> transaction trying to process is the general acknowledgment, emit the > >> data to the general acknowledgement operator; if it is not – store > >> the transaction data into the correct bucket identifying the > >> transaction is it for, as well as the next step to be the general > >> acknowledgement in HDHT by order number. > >> > >> 2.) Say the next transaction is the ship notification, in the > router, we > >> would check the HDHT store, see this is not the next expected > >> transaction (say it is supposed to be the detail acknowledgement), so > >> we would just post the data for the ship notification into HDHT the > store and say we are done. > >> > >> 3.) Say we now receive the detailed acknowledgement for an order > whose > >> next step IS the detailed acknowledgement, we would see this is the > >> correct next transaction, emit it to the detailed acknowledgement > >> operator, and update the HDHT store to show that the next transaction > >> should be the ship notification. NOTE: we can’t emit the ship > >> notification yet, till we have confirmed that the detailed > ackkowledgment has been completed. > >> > >> 4.) In each of the 4 transaction operators at the end of the > processing, > >> we would update the HDHT store to show the next expected step, and if > >> we already received data for the next expected step pull it from the > >> HDHT store, and write the transaction into our SQS queue which is the > >> input into the inbound transaction router at the beginning of the > >> application, so it processes through the system. > >> > >> > >> > >> I believe HDHT can be used to pass data throughout an entire > >> application, and is not limited to just a per operator basis, correct? > >> > >> > >> > >> Any comments / feedback? > >> > >> > >> > >> Thanks, > >> > >> > >> > >> Jim >
