Hi,
I am running flink job with following functionality:

  1.  I consume stream1 and stream2 from two kafka topics and assign the 
watermarks to the events of two streams by extracting the timestamps from the 
events in streams.
  2.  Then, I am connecting two streams and calling KeyedCoProcessFunction on 
connectedStream.
  3.  I have processElement1 method and processElement2 methods which receive 
the events of two streams 1 and 2 and do the join logic as shown in below code 
snippet.
  4.  I have shared mapstate for two streams.
  5.  When an event comes to processElement method, I register the callback 
time for that message to ensure if corresponding matching message is not 
arrived from other stream, I will send the message to sideOutput on invocation 
of callback method i.e. onTimer.

Something is getting wrong in the callback times registrations for events due 
to which for many messages of stream2 the callback is coming earlier than 
registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the 
timevalue in event message, for stream1 it;s normal TZ only. Though I am weak 
in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and 
timestamp calculations and watermarks registrations.

/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, 
ResultMessage> {

    private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new 
MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), 
cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
     * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, 
Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ 
ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
     * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        
context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) 
{
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        
resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        
resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        
resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from 
two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, 
Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 
resultMessage.getPgOrderStatus())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if 
(!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), 
resultMessage.getPgPaymethod())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}


/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> 
connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> 
cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new 
PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}



private final static SimpleDateFormat cartInputFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = 
cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = 
pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}



private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool 
parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = 
executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = 
cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS))))
 {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}


private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> 
processPgStream(ParameterTool parameter, StreamExecutionEnvironment 
executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = 
executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS))))
 {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong 
time values handled in the code here.

Help will be highly appreciated.

Reply via email to