I don't necessarily know how can I better describe it. The
MapState/ValueState is *always implicitly scoped to the current key*. It
will be scoped this way in all functions of the operator. In
processElement1, processElement2, onTimer. It will always hold whatever
you stored there for the current key. It will not have anything you
stored for different keys. ValueState will have the value that you
stored there for OnTimerContext.getCurrentKey(in
onTimer)/Context.getCurrentKey(in processElementX).


I really encourage you to analyze the example I posted as it it quite
similar to what you are doing.

Best,

Dawid


 On 25/05/2020 11:51, Jaswin Shah wrote:
> correcting: 
> By ctx.getCurrentKey()=> I meant to get the key registered at a
> timestamp when callback timeout for a key was registered.
> This was a reason to use getCurrentKey().
> So, with that I am fetching the events registered at that point of
> time for which till the current moment I didn't receive the events
> from other stream.
>
> I hope here my understanding is correct.
> Please correct me if I am wrong here.
>
> ------------------------------------------------------------------------
> *From:* Jaswin Shah <jaswin.s...@outlook.com>
> *Sent:* 25 May 2020 15:19
> *To:* Dawid Wysakowicz <dwysakow...@apache.org>; user@flink.apache.org
> <user@flink.apache.org>; ankit.sing...@paytm.com
> <ankit.sing...@paytm.com>; isha.sing...@paytm.com <isha.sing...@paytm.com>
> *Subject:* Re: Timeout Callbacks issue -Flink
>  
> By ctx.getCurrentKey()=> I meant to get the key registered at a
> timestamp when callback timeout for a key was registered.
> This was a reason to use getCurrentKey().
> So, with that I am fetching the events registered at that point of
> time for which till the current moment I didn't receive the callbacks.
>
> I hope here my understanding is correct.
> Please correct me if I am wrong here.
>
>
> ------------------------------------------------------------------------
> *From:* Dawid Wysakowicz
> *Sent:* Monday, May 25, 2020 15:14
> *To:* Jaswin Shah; user@flink.apache.org; ankit.sing...@paytm.com;
> isha.sing...@paytm.com
> *Subject:* Re: Timeout Callbacks issue -Flink
>
> You are right that a ValueState can keep a single value at any point
> of time. It is scoped to the current key of the operator though. So it
> keeps a single value for a key.
>
>
> If your
> cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new
> CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically
> the same thing a ValueState should be enough. It will always be scoped
> to the result of new CartJoinColumnsSelector()/new
> PGJoinColumnsSelector(). I assumed it is the same because you are
> always using the ctx.getCurrent in the onTimer method.
>
>
> See this example [1]. There even though a ValueState is used, we
> calculate counts per key.
>
>
>
> Best,
>
> Dawid
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example
>
>
>
> On 25/05/2020 11:17, Jaswin Shah wrote:
>> If I understand correctly, you are trying to tell that I should have
>> valueState of Map?
>> ------------------------------------------------------------------------
>> *From:* Jaswin Shah
>> *Sent:* 25 May 2020 14:43
>> *To:* Dawid Wysakowicz <dwysakow...@apache.org>
>> <mailto:dwysakow...@apache.org>; user@flink.apache.org
>> <mailto:user@flink.apache.org> <user@flink.apache.org>
>> <mailto:user@flink.apache.org>; ankit.sing...@paytm.com
>> <mailto:ankit.sing...@paytm.com> <ankit.sing...@paytm.com>
>> <mailto:ankit.sing...@paytm.com>; isha.sing...@paytm.com
>> <mailto:isha.sing...@paytm.com> <isha.sing...@paytm.com>
>> <mailto:isha.sing...@paytm.com>
>> *Subject:* Re: Timeout Callbacks issue -Flink
>>  
>> Thanks for responding Dawid.
>> I would like to know more about MapState solution you talked about.
>> As per my understanding valueState maintains a single value at any
>> point of time. So, here what I want to maintain is the first streams
>> information until matching event have not found in second stream. So,
>> in that case how valueState could benefit me? Can you please explain
>> me that, might be I have understood it incorrectly what you are
>> trying to convey here.
>>
>> Thanks,
>> Jaswin
>>
>>
>> ------------------------------------------------------------------------
>> *From:* Dawid Wysakowicz
>> *Sent:* Monday, May 25, 2020 14:23
>> *To:* Jaswin Shah; user@flink.apache.org
>> <mailto:user@flink.apache.org>; ankit.sing...@paytm.com
>> <mailto:ankit.sing...@paytm.com>; isha.sing...@paytm.com
>> <mailto:isha.sing...@paytm.com>
>> *Subject:* Re: Timeout Callbacks issue -Flink
>>
>> Hi Jaswin,
>>
>> I can't see any obvious problems in your code. It looks rather
>> correct. What exactly do you mean that "callback is coming earlier
>> than registered callback timeout"? Could you explain that with some
>> examples?
>>
>>
>> As for the different timezones. Flink does not make any assumptions
>> on the timestamp. It uses it simply as longs. I'd suggest revisiting
>> your timestamp extraction logic to make sure it performs the
>> extraction correctly. I don't know how your data encodes the
>> timestamps, but I think you have a bug or two there ;)
>>
>>
>> The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this
>> field has timestamps in UTC, but you are parsing it in your JVM local
>> time zone. You treat the 'Z' as a literal
>> (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist).
>> I don't know what the other field represents but you are also parsing
>> it in a local timezone. If the field represents local date it is
>> probably correct.
>>
>>
>> To mitigate those problems I'd strongly recommend using the java.time
>> API. For the extractCartTimeStamp you could use
>> Instant.parse("...").toEpochMilli. It expects the format you are
>> receiving. For the extractPGTimeStamp you could use
>> LocalDateTime.parse("..."), by default it uses the format you are
>> receiving. Then you should convert the local date time to an instant
>> LocalDateTime.parse("...").atZone(/* the zone which this date
>> represents */).toInstant().toEpochMilli(); This has nothing to do
>> with Flink though ;)
>>
>>
>> BTW one Flink issue I can see is that I think you don't need to use a
>> MapState there. Any kind of state in a KeyedCoProcessFunction is
>> always scoped to the current key. Therefore if you only ever put
>> items under the currentKey you will have at most single element in
>> your map. Think of the MapState as a map of maps MapState<UserKey,
>> Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a
>> ValueState should be enough imo.
>>
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 23/05/2020 14:39, Jaswin Shah wrote:
>>> ++
>>> Here, I am registering the callback time for an even with processing
>>> time and calculating the time value as events time + expiryTimeout
>>> value.
>>>
>>> Can this be the issue here due to hybrid timings usage?
>>> Also, do we need any special handling if we use event time semantics
>>> for callback timeouts registrations?
>>>
>>> Thanks,
>>> Jaswin
>>> ------------------------------------------------------------------------
>>> *From:* Jaswin Shah <jaswin.s...@outlook.com>
>>> <mailto:jaswin.s...@outlook.com>
>>> *Sent:* 23 May 2020 17:18
>>> *To:* user@flink.apache.org <mailto:user@flink.apache.org>
>>> <user@flink.apache.org> <mailto:user@flink.apache.org>; Arvid Heise
>>> <ar...@ververica.com> <mailto:ar...@ververica.com>; Yun Tang
>>> <myas...@live.com> <mailto:myas...@live.com>
>>> *Subject:* Timeout Callbacks issue -Flink
>>>  
>>> Hi,
>>> I am running flink job with following functionality:
>>>
>>>  1. I consume stream1 and stream2 from two kafka topics and assign
>>>     the watermarks to the events of two streams by extracting the
>>>     timestamps from the events in streams.
>>>  2. Then, I am connecting two streams and calling
>>>     KeyedCoProcessFunction on connectedStream.
>>>  3. I have processElement1 method and processElement2 methods which
>>>     receive the events of two streams 1 and 2 and do the join logic
>>>     as shown in below code snippet.
>>>  4. I have shared mapstate for two streams.
>>>  5. When an event comes to processElement method, I register the
>>>     callback time for that message to ensure if corresponding
>>>     matching message is not arrived from other stream, I will send
>>>     the message to sideOutput on invocation of callback method i.e.
>>>     onTimer.
>>>
>>> Something is getting wrong in the callback times registrations for
>>> events due to which for many messages of stream2 the callback is
>>> coming earlier than registered callback timeout.
>>> Also, the events from stream 2 are based on GMT times +5:30 as I can
>>> see in the timevalue in event message, for stream1 it;s normal TZ
>>> only. Though I am weak in analysing the timeout formats so could be
>>> wrong in analysis this side.
>>>
>>> Below is code snippets I have implemented for
>>> KeyedCoProcessFunctions and timestamp calculations and watermarks
>>> registrations.
>>> /** * CoProcessFuntion to process cart and pg messages connected
>>> using connect operator. * @author jaswin.shah * @version $Id:
>>> CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah
>>> Exp $$ */ public class CartPGCoprocessFunction extends 
>>> KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, 
>>> ResultMessage> {
>>>
>>>     private static final Logger logger = 
>>> LoggerFactory.getLogger(CartPGCoprocessFunction.class); /** * Map state for 
>>> cart messages, orderId+mid is key and
>>> cartMessage is value. */ private static MapState<String, CartPG> 
>>> cartPgState = null; /** * Intializations for cart and pg mapStates * * 
>>> @param
>>> config */ @Override public void open(Configuration config) {
>>>
>>>         MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new 
>>> MapStateDescriptor<> (
>>>             Constants.CART_DATA, TypeInformation.of(String.class), 
>>> TypeInformation.of(CartPG.class)
>>>         ); cartPgState = 
>>> getRuntimeContext().getMapState(cartPgMapStateDescriptor); }
>>>
>>>     /** * * @return */ @Override public void onTimer(long timestamp, 
>>> OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
>>>         logger.info("On timer called key is {}",ctx.getCurrentKey()); 
>>> String searchKey = ctx.getCurrentKey(); CartPG  cartPg = 
>>> cartPgState.get(searchKey); if(Objects.nonNull(cartPg)) {
>>>             ctx.output(CartPGSideOutput.getOutputTag(), 
>>> cartPgState.get(ctx.getCurrentKey())); cartPgState.remove(searchKey); }
>>>     }
>>>
>>>     /** * 1. Get orderId+mid from cartMessage and check in PGMapState if
>>> an entry is present. * 2. If present, match, checkDescripancy,
>>> process and delete entry from pgMapState. * 3. If not present, add
>>> orderId+mid as key and cart object as value in cartMapState. *
>>> @param cartMessage * @param context * @param collector * @throws
>>> Exception */ @Override public void processElement1(CartMessage cartMessage, 
>>> Context context, Collector<ResultMessage> collector) throws Exception {
>>>         Long cartEventTimeStamp = context.timestamp(); logger.info("cart 
>>> time : {} ",cartEventTimeStamp); 
>>> context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ 
>>> ConfigurationsManager.getMaxWaitTimeForPGMessage()); String searchKey = 
>>> cartMessage.createJoinStringCondition(); CartPG cartPG = 
>>> cartPgState.get(searchKey); if(Objects.nonNull(cartPG) && 
>>> Objects.nonNull(cartPG.getPgMessage())) {
>>>             
>>> generateResultMessage(cartMessage,cartPG.getPgMessage(),collector); 
>>> cartPgState.remove(searchKey); } else {
>>>             cartPG = new CartPG(); cartPG.setCartMessage(cartMessage); 
>>> cartPgState.put(searchKey,cartPG); }
>>>     }
>>>
>>>     /** * 1. Get orderId+mid from pgMessage and check in cartMapState if
>>> an entry is present. * 2. If present, match, checkDescripancy,
>>> process and delete entry from cartMapState. * 3. If not present, add
>>> orderId+mid as key and cart object as value in pgMapState. * @param
>>> pgMessage * @param context * @param collector * @throws Exception */
>>> @Override public void processElement2(PaymentNotifyRequestWrapper 
>>> pgMessage, Context context, Collector<ResultMessage> collector) throws 
>>> Exception {
>>>
>>>         Long pgEventTimeStamp = context.timestamp(); logger.info("pg time : 
>>> {} ",pgEventTimeStamp); 
>>> context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());
>>>  String searchKey = pgMessage.createJoinStringCondition(); CartPG cartPG = 
>>> cartPgState.get(searchKey); if(Objects.nonNull(cartPG) && 
>>> Objects.nonNull(cartPG.getCartMessage())) {
>>>             
>>> generateResultMessage(cartPG.getCartMessage(),pgMessage,collector); 
>>> cartPgState.remove(searchKey); } else {
>>>             cartPG = new CartPG(); cartPG.setPgMessage(pgMessage); 
>>> cartPgState.put(searchKey,cartPG); }
>>>     }
>>>
>>>     /** * Create ResultMessage from cart and pg messages. * * @param
>>> cartMessage * @param pgMessage * @return */ private void
>>> generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper 
>>> pgMessage,Collector<ResultMessage> collector) {
>>>         ResultMessage resultMessage = new ResultMessage(); Payment payment 
>>> = null; //Logic should be in cart: check for (Payment pay : 
>>> cartMessage.getPayments()) {
>>>             if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
>>> pay.mapToPaymentTypeInPG()) && 
>>> StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
>>>                 payment = pay; break; }
>>>         }
>>>         if(Objects.isNull(payment)) {
>>>             return; }
>>>
>>>         resultMessage.setOrderId(cartMessage.getId()); 
>>> resultMessage.setMid(payment.getMid()); 
>>> resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode()); 
>>> resultMessage.setPgOrderStatus(pgMessage.getOrderStatus()); 
>>> resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); 
>>> resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime()); 
>>> resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue()); 
>>> resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));
>>>  resultMessage.setCartPaymethod(payment.getPayment_method()); 
>>> resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());
>>>  checkDescripancyAndCollectResult(resultMessage,collector); }
>>>
>>>     /** * Evaluate if there is descripancy of any fields between the
>>> messages from two different systems. * Write all the descripancy
>>> logic here. * * @param resultMessage */ private void
>>> checkDescripancyAndCollectResult(ResultMessage resultMessage, 
>>> Collector<ResultMessage> collector) {
>>>
>>>         if 
>>> (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 
>>> resultMessage.getPgOrderStatus())) {
>>>             
>>> resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
>>>  collector.collect(resultMessage.clone()); }
>>>
>>>         if 
>>> (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount()))
>>>  {
>>>             
>>> resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
>>>  collector.collect(resultMessage.clone()); }
>>>
>>>         if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), 
>>> resultMessage.getPgPaymethod())) {
>>>             
>>> resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
>>>  collector.collect(resultMessage.clone()); }
>>>     }
>>> }
>>>
>>> /** * Connect to cart and pg streams and process * * @param
>>> cartStream * @param pgStream * @return */ private 
>>> SingleOutputStreamOperator<ResultMessage> 
>>> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> 
>>> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> 
>>> pgStream) {
>>>     return cartStream.connect(pgStream).keyBy(new 
>>> CartJoinColumnsSelector(),new PGJoinColumnsSelector())
>>>         .process(new CartPGCoprocessFunction()); }
>>>
>>>
>>> private final static SimpleDateFormat cartInputFormat = new 
>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); private final static 
>>> SimpleDateFormat pgInputFormat = new 
>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); public static Long 
>>> extractCartTimeStamp(CartMessage cartMessage){
>>>     try {
>>>         Date orderTimeStamp = 
>>> cartInputFormat.parse(cartMessage.fetchOrderCompletionTime()); return 
>>> orderTimeStamp.getTime(); } catch (ParseException e) {
>>>         logger.error("Exception in converting cart message timeStamp..",e); 
>>> }
>>>     return Instant.now().toEpochMilli(); }
>>>
>>> public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper 
>>> pgMessage){
>>>     try {
>>>         Date orderTimeStamp = 
>>> pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime()); return 
>>> orderTimeStamp.getTime(); } catch (ParseException e) {
>>>         logger.error("Exception in converting pg message timeStamp..",e); }
>>>     return Instant.now().toEpochMilli(); }
>>>
>>>
>>> private SingleOutputStreamOperator<CartMessage> 
>>> processCartStream(ParameterTool parameter, StreamExecutionEnvironment 
>>> executionEnvironment) {
>>>     //1. Consume cartStream SingleOutputStreamOperator<CartMessage> 
>>> cartStream = executionEnvironment.addSource(createCartConsumer()); 
>>> cartStream.name(Constants.CART_SYSTEM); //2. Filter cart messages 
>>> SingleOutputStreamOperator<CartMessage> filteredCartStream = 
>>> cartStream.filter(new CartFilterFunction())    ; //3. Map carts data 
>>> filteredCartStream = CartMappingService.mapCartsData(filteredCartStream); 
>>> //4. Assign timestamps and watermarks 
>>> filteredCartStream.assignTimestampsAndWatermarks(new 
>>> BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS))))
>>>  {
>>>         @Override public long extractTimestamp(CartMessage cartMessage) {
>>>             return DateTimeUtils.extractCartTimeStamp(cartMessage); }
>>>     }); return filteredCartStream; }
>>>
>>> private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> 
>>> processPgStream(ParameterTool parameter, StreamExecutionEnvironment 
>>> executionEnvironment) {
>>>
>>>     //1. Consume pg streams 
>>> SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = 
>>> executionEnvironment.addSource(createPGConsumer()); 
>>> pgStream.name(Constants.PG_SYSTEM); //2. Assign timestamps and watermarks 
>>> to pg messages pgStream.assignTimestampsAndWatermarks(new 
>>> BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS))))
>>>  {
>>>         @Override public long extractTimestamp(PaymentNotifyRequestWrapper 
>>> pgMessage) {
>>>             return DateTimeUtils.extractPGTimeStamp(pgMessage); }
>>>     }); return pgStream; }
>>>
>>> Can anyone please help what can be the issue here and if there is
>>> somewrong time values handled in the code here.
>>>
>>> Help will be highly appreciated.

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to