/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2020 All Rights Reserved.
 */
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private MapState<String, PGMessage> pgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new 
MapStateDescriptor<> (
            "cartData",
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PGMessage> pgStateDescriptor = new 
MapStateDescriptor<>(
            "pgData",
            TypeInformation.of(String.class),
            TypeInformation.of(PGMessage.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
     * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, 
Collector<ResultMessage> collector) throws Exception {
        String searchKey = cartMessage.createJoinStringCondition();
       if(pgState.contains(searchKey)) {
           generateResultMessage(cartMessage,pgState.get(searchKey));
           pgState.remove(searchKey);
       } else {
           cartState.put(searchKey,cartMessage);
       }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
     * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PGMessage pgMessage, Context context, 
Collector<ResultMessage> collector) throws Exception {
        String searchKey = pgMessage.createJoinStringCondition();
        if(cartState.contains(searchKey)) {
            generateResultMessage(cartState.get(searchKey),pgMessage);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }


    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private ResultMessage generateResultMessage(CartMessage cartMessage, 
PGMessage pgMessage) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPayMethod());

        checkDescripancyAndTriggerAlert(resultMessage);

        return resultMessage;
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from 
two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 
resultMessage.getPgOrderStatus())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            //Send message to kafka queue for order status discrepancy.
            sendMessageToKafkaTopic(resultMessage.toString());
        }

        if (!StringUtils.equals(resultMessage.getCartOrderAmount(), 
resultMessage.getPgOrderAmount())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            //Send message to kafka queue for pay method discrepancy.
            sendMessageToKafkaTopic(resultMessage.toString());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), 
resultMessage.getPgPaymethod())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            //Send message to kafka queue for pay amount discrepancy.
            sendMessageToKafkaTopic(resultMessage.toString());
        }
    }

    /**
     * Send a message to kafka topic
     *
     * @param message
     */
    private void sendMessageToKafkaTopic(String message) {
        Properties kafkaProperties = 
ConfigurationsManager.getResultSystemKafkaProperties();
        //kafkaProperties.put("transactional.id","trans123");
        Producer<String, String> producer = new 
KafkaProducer<>(kafkaProperties, new StringSerializer(), new 
StringSerializer());
        //producer.initTransactions();
        try {
            //producer.beginTransaction();
            producer.send(new 
ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
            //producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | 
AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to 
close the producer and exit.
            producer.close();
        } catch (KafkaException e) {
            producer.abortTransaction();
        } catch (MissingConfigurationsException e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

This is the snapshot of implementation I have done
________________________________
From: Jaswin Shah <jaswin.s...@outlook.com>
Sent: 18 May 2020 13:55
To: user@flink.apache.org <user@flink.apache.org>
Subject: Rocksdb implementation

Hi,
I have implemented the flink job with MapStates. The functionality is like,

  1.  I have two datastreams which I connect with connect operator and then 
call coprocessfunction with every pair of objects.
  2.  For element of first datastream, processElement1 method is called and for 
an element of second datastream, processElement2 method is called.
  3.  I have two MapStates in CoProcessFunction for both streams separately.
  4.  When processElement1 is called, it checks in MapState2 if corresponding 
element with given id is present, if present, I match, and delete. If not 
present, I add the object in MapState1.
  5.  When processElement2 is called, it checks in MapState1 if corresponding 
element with given id is present, if present, I match and delete. I fnot 
present I add object in MapState2.
  6.  Now, I want all the state data to be stored in Rocksdb.
  7.  After few days, I want to run a batch streaming job on Rocksdb to check 
if there are any objects which have not match found to create a report of those.

I need a help how can I store this state data in Rocksdb and how to do setups, 
configurations and codes for those which I am not understanding. Also, is it 
possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin

Reply via email to