Hi, I am running flink job with following functionality: 1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams. 2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream. 3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet. 4. I have shared mapstate for two streams. 5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout. Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side. Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations. /** * CoProcessFuntion to process cart and pg messages connected using connect operator. * @author jaswin.shah * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$ */ public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> { private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class); /** * Map state for cart messages, orderId+mid is key and cartMessage is value. */ private static MapState<String, CartPG> cartPgState = null; /** * Intializations for cart and pg mapStates * * @param config */ @Override public void open(Configuration config) { MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> ( Constants.CART_DATA, TypeInformation.of(String.class), TypeInformation.of(CartPG.class) ); cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor); } /** * * @return */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception { logger.info("On timer called key is {}",ctx.getCurrentKey()); String searchKey = ctx.getCurrentKey(); CartPG cartPg = cartPgState.get(searchKey); if(Objects.nonNull(cartPg)) { ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey())); cartPgState.remove(searchKey); } } /** * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from pgMapState. * 3. If not present, add orderId+mid as key and cart object as value in cartMapState. * @param cartMessage * @param context * @param collector * @throws Exception */ @Override public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception { Long cartEventTimeStamp = context.timestamp(); logger.info("cart time : {} ",cartEventTimeStamp); context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage()); String searchKey = cartMessage.createJoinStringCondition(); CartPG cartPG = cartPgState.get(searchKey); if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) { generateResultMessage(cartMessage,cartPG.getPgMessage(),collector); cartPgState.remove(searchKey); } else { cartPG = new CartPG(); cartPG.setCartMessage(cartMessage); cartPgState.put(searchKey,cartPG); } } /** * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from cartMapState. * 3. If not present, add orderId+mid as key and cart object as value in pgMapState. * @param pgMessage * @param context * @param collector * @throws Exception */ @Override public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception { Long pgEventTimeStamp = context.timestamp(); logger.info("pg time : {} ",pgEventTimeStamp); context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage()); String searchKey = pgMessage.createJoinStringCondition(); CartPG cartPG = cartPgState.get(searchKey); if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) { generateResultMessage(cartPG.getCartMessage(),pgMessage,collector); cartPgState.remove(searchKey); } else { cartPG = new CartPG(); cartPG.setPgMessage(pgMessage); cartPgState.put(searchKey,cartPG); } } /** * Create ResultMessage from cart and pg messages. * * @param cartMessage * @param pgMessage * @return */ private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) { ResultMessage resultMessage = new ResultMessage(); Payment payment = null; //Logic should be in cart: check for (Payment pay : cartMessage.getPayments()) { if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) { payment = pay; break; } } if(Objects.isNull(payment)) { return; } resultMessage.setOrderId(cartMessage.getId()); resultMessage.setMid(payment.getMid()); resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode()); resultMessage.setPgOrderStatus(pgMessage.getOrderStatus()); resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime()); resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue()); resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal()))); resultMessage.setCartPaymethod(payment.getPayment_method()); resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod()); checkDescripancyAndCollectResult(resultMessage,collector); } /** * Evaluate if there is descripancy of any fields between the messages from two different systems. * Write all the descripancy logic here. * * @param resultMessage */ private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) { if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) { resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY); collector.collect(resultMessage.clone()); } if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) { resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY); collector.collect(resultMessage.clone()); } if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) { resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY); collector.collect(resultMessage.clone()); } } } /** * Connect to cart and pg streams and process * * @param cartStream * @param pgStream * @return */ private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) { return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector()) .process(new CartPGCoprocessFunction()); } private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); public static Long extractCartTimeStamp(CartMessage cartMessage){ try { Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime()); return orderTimeStamp.getTime(); } catch (ParseException e) { logger.error("Exception in converting cart message timeStamp..",e); } return Instant.now().toEpochMilli(); } public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){ try { Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime()); return orderTimeStamp.getTime(); } catch (ParseException e) { logger.error("Exception in converting pg message timeStamp..",e); } return Instant.now().toEpochMilli(); } private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) { //1. Consume cartStream SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer()); cartStream.name(Constants.CART_SYSTEM); //2. Filter cart messages SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction()) ; //3. Map carts data filteredCartStream = CartMappingService.mapCartsData(filteredCartStream); //4. Assign timestamps and watermarks filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) { @Override public long extractTimestamp(CartMessage cartMessage) { return DateTimeUtils.extractCartTimeStamp(cartMessage); } }); return filteredCartStream; } private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) { //1. Consume pg streams SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer()); pgStream.name(Constants.PG_SYSTEM); //2. Assign timestamps and watermarks to pg messages pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) { @Override public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) { return DateTimeUtils.extractPGTimeStamp(pgMessage); } }); return pgStream; } Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here. Help will be highly appreciated.