Hi Jaswin, I would like to clarify something first - what do you key your streams by, when joining them? It seems that what you want to do is to match each CartMessage with a corresponding Payment that has the same orderId+mid. If this is the case, you probably do not need the MapState in the first place.
Best, -- Alexander Fedulov | Solutions Architect <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time On Fri, May 22, 2020 at 8:57 AM Jaswin Shah <jaswin.s...@outlook.com> wrote: > public class CartPGCoprocessFunction extends > KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, > ResultMessage> { > > private static final Logger logger = > LoggerFactory.getLogger(CartPGCoprocessFunction.class); > > /** > * Map state for cart messages, orderId+mid is key and cartMessage is > value. > */ > private static MapState<String, CartMessage> cartState = null; > > /** > * Map state for pg messages, orderId+mid is key and pgMessage is value. > */ > private static MapState<String, PaymentNotifyRequestWrapper> pgState = > null; > > /** > * Intializations for cart and pg mapStates > * > * @param config > */ > @Override > public void open(Configuration config) { > MapStateDescriptor<String, CartMessage> cartStateDescriptor = new > MapStateDescriptor<> ( > Constants.CART_DATA, > TypeInformation.of(String.class), > TypeInformation.of(CartMessage.class) > ); > cartState = getRuntimeContext().getMapState(cartStateDescriptor); > > MapStateDescriptor<String, PaymentNotifyRequestWrapper> > pgStateDescriptor = new MapStateDescriptor<>( > Constants.PG_DATA, > TypeInformation.of(String.class), > TypeInformation.of(PaymentNotifyRequestWrapper.class) > ); > pgState = getRuntimeContext().getMapState(pgStateDescriptor); > } > > /** > * 1. Get orderId+mid from cartMessage and check in PGMapState if an > entry is present. > * 2. If present, match, checkDescripancy, process and delete entry from > pgMapState. > * 3. If not present, add orderId+mid as key and cart object as value in > cartMapState. > * @param cartMessage > * @param context > * @param collector > * @throws Exception > */ > @Override > public void processElement1(CartMessage cartMessage, Context context, > Collector<ResultMessage> collector) throws Exception { > String searchKey = cartMessage.createJoinStringCondition(); > PaymentNotifyRequestWrapper paymentNotifyObject = > pgState.get(searchKey); > if(Objects.nonNull(paymentNotifyObject)) { > generateResultMessage(cartMessage,paymentNotifyObject,collector); > pgState.remove(searchKey); > } else { > cartState.put(searchKey,cartMessage); > } > } > > /** > * 1. Get orderId+mid from pgMessage and check in cartMapState if an > entry is present. > * 2. If present, match, checkDescripancy, process and delete entry from > cartMapState. > * 3. If not present, add orderId+mid as key and cart object as value in > pgMapState. > * @param pgMessage > * @param context > * @param collector > * @throws Exception > */ > @Override > public void processElement2(PaymentNotifyRequestWrapper pgMessage, > Context context, Collector<ResultMessage> collector) throws Exception { > String searchKey = pgMessage.createJoinStringCondition(); > CartMessage cartMessage = cartState.get(searchKey); > if(Objects.nonNull(cartMessage)) { > generateResultMessage(cartMessage,pgMessage,collector); > cartState.remove(searchKey); > } else { > pgState.put(searchKey,pgMessage); > } > } > > /** > * Create ResultMessage from cart and pg messages. > * > * @param cartMessage > * @param pgMessage > * @return > */ > private void generateResultMessage(CartMessage cartMessage, > PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) { > ResultMessage resultMessage = new ResultMessage(); > Payment payment = null; > > //Logic should be in cart: check > for (Payment pay : cartMessage.getPayments()) { > if (StringUtils.equals(Constants.FORWARD_PAYMENT, > pay.mapToPaymentTypeInPG()) && > StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) { > payment = pay; > break; > } > } > if(Objects.isNull(payment)) { > return; > } > > resultMessage.setOrderId(cartMessage.getId()); > resultMessage.setMid(payment.getMid()); > > > resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode()); > resultMessage.setPgOrderStatus(pgMessage.getOrderStatus()); > > resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); > resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime()); > > resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue()); > > resultMessage.setCartOrderAmount(String.valueOf(cartMessage.getGrandtotal().longValue())); > > resultMessage.setCartPaymethod(payment.getPayment_method()); > > resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod()); > > checkDescripancyAndCollectResult(resultMessage,collector); > } > > /** > * Evaluate if there is descripancy of any fields between the messages > from two different systems. > * Write all the descripancy logic here. > * > * @param resultMessage > */ > private void checkDescripancyAndCollectResult(ResultMessage > resultMessage, Collector<ResultMessage> collector) { > > if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), > resultMessage.getPgOrderStatus())) { > > resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY); > collector.collect(resultMessage.clone()); > } > > if (!StringUtils.equals(resultMessage.getCartOrderAmount(), > resultMessage.getPgOrderAmount())) { > > resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY); > collector.collect(resultMessage.clone()); > } > > if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), > resultMessage.getPgPaymethod())) { > > resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY); > collector.collect(resultMessage.clone()); > } > } > } > > Hi, > I have implemented the flink job with MapStates. The functionality is > like, > > 1. I have two datastreams which I connect with connect operator and > then call coprocessfunction with every pair of objects. > 2. For element of first datastream, processElement1 method is called > and for an element of second datastream, processElement2 method is called. > 3. I have two MapStates in CoProcessFunction for both streams > separately. > 4. When processElement1 is called, it checks in MapState2 if > corresponding element with given id is present, if present, I match, and > delete. If not present, I add the object in MapState1. > 5. When processElement2 is called, it checks in MapState1 if > corresponding element with given id is present, if present, I match and > delete. I fnot present I add object in MapState2. > 6. Now, I want all the state data to be stored in Rocksdb. > 7. After few days, I want to run a batch streaming job on Rocksdb to > check if there are any objects which have not match found to create a > report of those. > > > I need a help on how to configure TTL for messages and collect them to > ontimer method on missing element timeout expiry and how to collect this > data in sideoutputs and run a batch process over side output. Few code > examples would be appreciated. > > Thanks, > Jaswin >