Hi Jaswin,

I would like to clarify something first - what do you key your streams by,
when joining them?
It seems that what you want to do is to match each CartMessage with a
corresponding Payment that has the same orderId+mid. If this is the case,
you probably do not need the MapState in the first place.

Best,

--

Alexander Fedulov | Solutions Architect

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


On Fri, May 22, 2020 at 8:57 AM Jaswin Shah <jaswin.s...@outlook.com> wrote:

> public class CartPGCoprocessFunction extends 
> KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, 
> ResultMessage> {
>
>     private static final Logger logger = 
> LoggerFactory.getLogger(CartPGCoprocessFunction.class);
>
>     /**
>      * Map state for cart messages, orderId+mid is key and cartMessage is 
> value.
>      */
>     private static MapState<String, CartMessage> cartState = null;
>
>     /**
>      * Map state for pg messages, orderId+mid is key and pgMessage is value.
>      */
>     private static MapState<String, PaymentNotifyRequestWrapper> pgState = 
> null;
>
>     /**
>      * Intializations for cart and pg mapStates
>      *
>      * @param config
>      */
>     @Override
>     public void open(Configuration config) {
>         MapStateDescriptor<String, CartMessage> cartStateDescriptor = new 
> MapStateDescriptor<> (
>             Constants.CART_DATA,
>             TypeInformation.of(String.class),
>             TypeInformation.of(CartMessage.class)
>         );
>         cartState = getRuntimeContext().getMapState(cartStateDescriptor);
>
>         MapStateDescriptor<String, PaymentNotifyRequestWrapper> 
> pgStateDescriptor = new MapStateDescriptor<>(
>             Constants.PG_DATA,
>             TypeInformation.of(String.class),
>             TypeInformation.of(PaymentNotifyRequestWrapper.class)
>         );
>         pgState = getRuntimeContext().getMapState(pgStateDescriptor);
>     }
>
>     /**
>      * 1. Get orderId+mid from cartMessage and check in PGMapState if an 
> entry is present.
>      * 2. If present, match, checkDescripancy, process and delete entry from 
> pgMapState.
>      * 3. If not present, add orderId+mid as key and cart object as value in 
> cartMapState.
>      * @param cartMessage
>      * @param context
>      * @param collector
>      * @throws Exception
>      */
>     @Override
>     public void processElement1(CartMessage cartMessage, Context context, 
> Collector<ResultMessage> collector) throws Exception {
>         String searchKey = cartMessage.createJoinStringCondition();
>         PaymentNotifyRequestWrapper paymentNotifyObject = 
> pgState.get(searchKey);
>         if(Objects.nonNull(paymentNotifyObject)) {
>             generateResultMessage(cartMessage,paymentNotifyObject,collector);
>             pgState.remove(searchKey);
>         } else {
>             cartState.put(searchKey,cartMessage);
>         }
>     }
>
>     /**
>      * 1. Get orderId+mid from pgMessage and check in cartMapState if an 
> entry is present.
>      * 2. If present, match, checkDescripancy, process and delete entry from 
> cartMapState.
>      * 3. If not present, add orderId+mid as key and cart object as value in 
> pgMapState.
>      * @param pgMessage
>      * @param context
>      * @param collector
>      * @throws Exception
>      */
>     @Override
>     public void processElement2(PaymentNotifyRequestWrapper pgMessage, 
> Context context, Collector<ResultMessage> collector) throws Exception {
>         String searchKey = pgMessage.createJoinStringCondition();
>         CartMessage cartMessage = cartState.get(searchKey);
>         if(Objects.nonNull(cartMessage)) {
>             generateResultMessage(cartMessage,pgMessage,collector);
>             cartState.remove(searchKey);
>         } else {
>             pgState.put(searchKey,pgMessage);
>         }
>     }
>
>     /**
>      * Create ResultMessage from cart and pg messages.
>      *
>      * @param cartMessage
>      * @param pgMessage
>      * @return
>      */
>     private void generateResultMessage(CartMessage cartMessage, 
> PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
>         ResultMessage resultMessage = new ResultMessage();
>         Payment payment = null;
>
>         //Logic should be in cart: check
>         for (Payment pay : cartMessage.getPayments()) {
>             if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
> pay.mapToPaymentTypeInPG()) && 
> StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
>                 payment = pay;
>                 break;
>             }
>         }
>         if(Objects.isNull(payment)) {
>             return;
>         }
>
>         resultMessage.setOrderId(cartMessage.getId());
>         resultMessage.setMid(payment.getMid());
>
>         
> resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
>         resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());
>
>         resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
>         resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());
>
>         resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
>         
> resultMessage.setCartOrderAmount(String.valueOf(cartMessage.getGrandtotal().longValue()));
>
>         resultMessage.setCartPaymethod(payment.getPayment_method());
>         
> resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());
>
>         checkDescripancyAndCollectResult(resultMessage,collector);
>     }
>
>     /**
>      * Evaluate if there is descripancy of any fields between the messages 
> from two different systems.
>      * Write all the descripancy logic here.
>      *
>      * @param resultMessage
>      */
>     private void checkDescripancyAndCollectResult(ResultMessage 
> resultMessage, Collector<ResultMessage> collector) {
>
>         if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 
> resultMessage.getPgOrderStatus())) {
>             
> resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
>             collector.collect(resultMessage.clone());
>         }
>
>         if (!StringUtils.equals(resultMessage.getCartOrderAmount(), 
> resultMessage.getPgOrderAmount())) {
>             
> resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
>             collector.collect(resultMessage.clone());
>         }
>
>         if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), 
> resultMessage.getPgPaymethod())) {
>             
> resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
>             collector.collect(resultMessage.clone());
>         }
>     }
> }
>
> Hi,
> I have implemented the flink job with MapStates. The functionality is
> like,
>
>    1. I have two datastreams which I connect with connect operator and
>    then call coprocessfunction with every pair of objects.
>    2. For element of first datastream, processElement1 method is called
>    and for an element of second datastream, processElement2 method is called.
>    3. I have two MapStates in CoProcessFunction for both streams
>    separately.
>    4. When processElement1 is called, it checks in MapState2 if
>    corresponding element with given id is present, if present, I match, and
>    delete. If not present, I add the object in MapState1.
>    5. When processElement2 is called, it checks in MapState1 if
>    corresponding element with given id is present, if present, I match and
>    delete. I fnot present I add object in MapState2.
>    6. Now, I want all the state data to be stored in Rocksdb.
>    7. After few days, I want to run a batch streaming job on Rocksdb to
>    check if there are any objects which have not match found to create a
>    report of those.
>
>
> I need a help on how to configure TTL for messages and collect them to
> ontimer method on missing element timeout expiry and how to collect this
> data in sideoutputs and run a batch process over side output. Few code
> examples would be appreciated.
>
> Thanks,
> Jaswin
>

Reply via email to