Broadcast state not getting filled with all the data in processElement

2024-03-14 Thread Nabil Hadji
im using flink 1.81.1 api on java 11 and im trying to use a BroadcastProcessFunction to filter a Products Datastream with a brand autorized Datastream as broadcast. So my first products Datastream contains different products that has a field brand and my second brands Datastream contains only bran

Re: Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-16 Thread Jun Qin
Hi Ken, > Broadcast state is weird in that it’s duplicated, apparently avoid “hot > spots” when restoring from state. So I’m wondering how Flink handles the case > of restoring broadcast state when the parallelism increases. The Flink doc is here: https://nightlies.apache.org/flink/f

Re: Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-16 Thread Ken Krugler
another test. Broadcast state is weird in that it’s duplicated, apparently avoid “hot spots” when restoring from state. So I’m wondering how Flink handles the case of restoring broadcast state when the parallelism increases. Regards, — Ken > On Dec 15, 2022, at 4:33 PM, Jun Qin wrote: >

回复:Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-15 Thread Jun Qin
the case.In addition, in those TMs where the restarting was slow, do you see anything suspicious in the logs, e.g., reconnecting?ThanksJun发自我的手机 原始邮件 发件人: Ken Krugler 日期: 2022年12月14日周三 19:32收件人: User 主题: Slow restart from savepoint with large broadcast state when increasing

Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-14 Thread Ken Krugler
Hi all, I have a job with a large amount of broadcast state (62MB). I took a savepoint when my workflow was running with parallelism 300. I then restarted the workflow with parallelism 400. The first 297 sub-tasks restored their broadcast state fairly quickly, but after that it slowed to a

Re: Broadcast state and job restarts

2022-10-27 Thread Zakelly Lan
Hi Alexis, Broadcast state is one type of the Operator State, which is included in savepoints and checkpoints and won't be lost. Please refer to https://stackoverflow.com/questions/62509773/flink-broadcast-state-rocksdb-state-backend/62510423#62510423 Best, Zakelly On Fri, Oct 28, 2022 at

Broadcast state and job restarts

2022-10-27 Thread Alexis Sarda-Espinosa
Hello, The documentation for broadcast state specifies that it is always kept in memory. My assumptions based on this statement are: 1. If a job restarts in the same Flink cluster (i.e. using a restart strategy), the tasks' attempt number increases and the broadcast state is restored since

Re: Broadcast state and OutOfMemoryError: Direct buffer memory

2022-10-21 Thread yanfei lei
Hi Dan, Usually broadcast state needs more network buffers, the network buffer used to exchange data records between tasks would request a portion of direct memory[1], I think it is possible to get the “Direct buffer memory” OOM errors in this scenarios. Maybe you can try to increase

Broadcast state and OutOfMemoryError: Direct buffer memory

2022-10-21 Thread Dan Hill
Hi. My team recently added broadcast state to our Flink jobs. We've started hitting this OOM "Direct buffer memory" error. Is this a common problem with broadcast state? Or is it likely a different problem? Thanks! - Dan

Broadcast state restoration for BroadcastProcessFunction

2022-10-14 Thread Alexis Sarda-Espinosa
Hello, I wrote a test for a broadcast function to check how it handles broadcast state during retries [1] (the gist only shows a subset of the test in Kotlin, but it's hopefully understandable). The test will not pass unless my function also implements CheckpointedFunction, although

RE: Re:Question about Flink Broadcast State event ordering

2022-10-10 Thread Qing Lim
Thanks both for your advice, I will give them a try! From: Schwalbe Matthias Sent: 10 October 2022 08:35 To: 仙路尽头谁为峰 ; Qing Lim Cc: User Subject: RE: Re:Question about Flink Broadcast State event ordering Hi Qing again, Another point to consider: broadcast streams are subject to watermarking

RE: Re:Question about Flink Broadcast State event ordering

2022-10-10 Thread Schwalbe Matthias
the time 😊 ) Best regards Thias From: 仙路尽头谁为峰 Sent: Wednesday, October 5, 2022 10:13 AM To: Qing Lim Cc: User Subject: [SPAM] 回复: Re:Question about Flink Broadcast State event ordering ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Qing: The key point is that the broadcast

RE: Re:Question about Flink Broadcast State event ordering

2022-10-05 Thread Qing Lim
Oh, thank you for your explanation! From: 仙路尽头谁为峰 Sent: 05 October 2022 09:13 To: Qing Lim Cc: User Subject: 回复: Re:Question about Flink Broadcast State event ordering Hi Qing: The key point is that the broadcast side may have different partitions that interleaves. If you can make sure

回复: Re:Question about Flink Broadcast State event ordering

2022-10-05 Thread 仙路尽头谁为峰
收件人: xljtswf2022 抄送: User 主题: RE: Re:Question about Flink Broadcast State event ordering Hi, thanks for answering my question. Is there anyway to make the order reflecting the upstream? I wish to broadcast messages that has deletion semantic, so ordering matters here. I guess worst case I can

RE: Re:Question about Flink Broadcast State event ordering

2022-10-05 Thread Qing Lim
: 05 October 2022 03:02 To: Qing Lim Cc: User Subject: Re:Question about Flink Broadcast State event ordering Hi Qing: > I think this is refering to the order between broadcasted element and non > broadcasted element, right? No, as broadcast and nonbroadcast stream are different streams

Re:Question about Flink Broadcast State event ordering

2022-10-04 Thread xljtswf2022
//nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations, it says the following: > Order of events in Broadcast State may differ across tasks: Although > broadcasting the elements of a stream guarantees that all element

Question about Flink Broadcast State event ordering

2022-10-04 Thread Qing Lim
Hi Flink user group, I have a question around broadcast. Reading the docs https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations, it says the following: > Order of events in Broadcast State may differ across ta

RE: Loading broadcast state on BroadcastProcessFunction instantiation or open method

2022-09-28 Thread alfredo.vasquez.spglobal.com via user
Thank you, I have tried both approaches, Overriding open method did not work but by implementing CheckpointedFunction and overriding initializeState I was able to access and operate over broadcast state @Override public void initializeState(FunctionInitializationContext context) throws

RE: Loading broadcast state on BroadcastProcessFunction instantiation or open method

2022-09-27 Thread Schwalbe Matthias
, 2022 12:26 PM To: alfredo.vasq...@spglobal.com Cc: user@flink.apache.org Subject: Re: Loading broadcast state on BroadcastProcessFunction instantiation or open method ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Logically it would make sense to be able to initialize BroadcastState in

Re: Loading broadcast state on BroadcastProcessFunction instantiation or open method

2022-09-27 Thread David Anderson
state into the broadcast state. David On Mon, Sep 26, 2022 at 6:07 PM alfredo.vasquez.spglobal.com via user < user@flink.apache.org> wrote: > Hello community. > > > > Currently we have a BroadcastProcessFunction implementation that is > storing the broadcast state using

Loading broadcast state on BroadcastProcessFunction instantiation or open method

2022-09-26 Thread alfredo.vasquez.spglobal.com via user
Hello community. Currently we have a BroadcastProcessFunction implementation that is storing the broadcast state using a MapStateDescriptor. I have a use case that needs to load the BroadcastState to perform some operation before receiving any processElement or processBroadcastElement message

Re: Broadcast State + Stateful Operator + Async IO

2022-04-29 Thread Vishal Surana
Thanks a lot for your quick response! Your suggestion however would never work for our use case. Ours is a streaming system that must process 100 thousand messages per second and produce immediate results and it's simply impossible to rerun the job. Our job is a streaming job broken down into vari

Re: Broadcast State + Stateful Operator + Async IO

2022-04-29 Thread Guowei Ma
Hi Vishal If your scenario is to update the data in full every time. One idea is to rerun the job every time. For example, you have an `EnrichWithDatabaseAndWebSerivce` job, which is responsible for reading all data from a data source every time, and then joins the data with DB and Web services. E

Re: Broadcast State + Stateful Operator + Async IO

2022-04-28 Thread Vishal Surana
Yes. You have explained my requirements exactly as they are. My operator will talk to multiple databases and a couple of web services to enrich incoming input streams. I cannot think of a way to use the async IO operator. So I thought maybe convert these 7-10 calls into async calls and chain the Fu

Re: Broadcast State + Stateful Operator + Async IO

2022-04-28 Thread Guowei Ma
Hi Vishal I want to understand your needs first. Your requirements are: After a stateful operator receives a notification, it needs to traverse all the data stored in the operator state, communicate with an external system during the traversal process (maybe similar to join?). In order to improve

Broadcast State + Stateful Operator + Async IO

2022-04-28 Thread Vishal Surana
Hello, My application has a stateful operator which leverages RocksDB to store a large amount of state. It, along with other operators receive configuration as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends upon another input stream that triggers some communication with ex

Re: Broadcast state corrupted ?

2022-04-13 Thread Alexey Trenikhun
, 2022 6:44:05 PM To: Chesnay Schepler ; Flink User Mail List Subject: Re: Broadcast state corrupted ? * Failing environment is using MinIO. * We use s3p:// filesystem * I don’t see errors in the Job Manager log: {"timestamp":"2022-04-14T00:14:13.358Z",&

Re: Broadcast state corrupted ?

2022-04-13 Thread Alexey Trenikhun
t;,"thread_name":"jobmanager-future-thread-1","level":"INFO","level_value":2} * I tried cancel with savepoint and cancel from UI. It seems doesn't depend on shutdown, log below is for savepoint without shutdown. And I can't re

Re: Broadcast state corrupted ?

2022-04-13 Thread Chesnay Schepler
Thanks, Alexey *From:* Alexey Trenikhun *Sent:* Tuesday, April 12, 2022 7:10:17 AM *To:* Chesnay Schepler ; Flink User Mail List *Subject:* Re: Broadcast state corrupted ? I’ve tried to restore job in environment A (where we observe problem) from savepoint taken in environment B - restored

Re: Broadcast state corrupted ?

2022-04-13 Thread Alexey Trenikhun
Any suggestions how to troubleshoot the issue? I still can reproduce the problem in environment A Thanks, Alexey From: Alexey Trenikhun Sent: Tuesday, April 12, 2022 7:10:17 AM To: Chesnay Schepler ; Flink User Mail List Subject: Re: Broadcast state corrupted

Re: Broadcast state corrupted ?

2022-04-12 Thread Alexey Trenikhun
Schepler ; Flink User Mail List Subject: Re: Broadcast state corrupted ? I didn’t try same savepoint cross environments. Operator with broadcast state was added recently, I rolled back all environments, created save points with old version, upgraded to version with broadcast state, all 4 were

Re: Broadcast state corrupted ?

2022-04-11 Thread Alexey Trenikhun
I didn’t try same savepoint cross environments. Operator with broadcast state was added recently, I rolled back all environments, created save points with old version, upgraded to version with broadcast state, all 4 were upgraded fine, took savepoints in each environment and tried to restore

Re: Broadcast state corrupted ?

2022-04-11 Thread Chesnay Schepler
 KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer

Broadcast state corrupted ?

2022-04-10 Thread Alexey Trenikhun
Hello, We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I

Re: UDF and Broadcast State Pattern

2021-12-15 Thread Krzysztof Chmielewski
ted are more low level process function type operations > (custom state registration, user access to timers, broadcast state as > you've discovered). There have been some discussions about how to add this > sort of functionality in a SQL compliant manner but nothing concrete. > >

Re: UDF and Broadcast State Pattern

2021-12-15 Thread Seth Wiesman
contain aggregates which do participate in checkpointing and are strongly consistent. What is not supported are more low level process function type operations (custom state registration, user access to timers, broadcast state as you've discovered). There have been some discussions about how t

Re: UDF and Broadcast State Pattern

2021-12-15 Thread Krzysztof Chmielewski
Thank you, yes I was thinking about simply running my own thread in UDF and consume some queue something like that. Having some background with DataStreamAPI i was hoping that I can reuse same mechanisms (like Broadcast State Pattern or CoProcessFunction) in Flink SQL. However it seems there is a

Re: UDF and Broadcast State Pattern

2021-12-14 Thread Caizhi Weng
Hi! Currently you can't use broadcast state in Flink SQL UDF because UDFs are all stateless. However you mentioned your use case that you want to control the logic in UDF with some information. If that is the case, you can just run a thread in your UDF to read that information and chang

UDF and Broadcast State Pattern

2021-12-14 Thread Krzysztof Chmielewski
Hi, Is there a way to build an UDF [1] for FLink SQL that can be used with Broadcast State Pattern [2]? I have a use case, where I would like to be able to use broadcast control stream to change logic in UDF. Regards, Krzysztof Chmielewski [1] https://nightlies.apache.org/flink/flink-docs

Re: Order of events in Broadcast State

2021-12-06 Thread Alexey Trenikhun
Thank you David From: David Anderson Sent: Monday, December 6, 2021 1:36:20 AM To: Alexey Trenikhun Cc: Flink User Mail List Subject: Re: Order of events in Broadcast State Event ordering in Flink is only maintained between pairs of events that take exactly

Re: Order of events in Broadcast State

2021-12-06 Thread David Anderson
n Sat, Dec 4, 2021 at 2:45 AM Alexey Trenikhun wrote: > [1] - > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/ > The Broadcast State Pattern | Apache Flink > <https://nightlies.apache.org/flink/flink-docs-master/docs/

Re: Order of events in Broadcast State

2021-12-03 Thread Alexey Trenikhun
[1] - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/ The Broadcast State Pattern | Apache Flink<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/> The Broadcast State Patter

Order of events in Broadcast State

2021-12-03 Thread Alexey Trenikhun
Hello, Trying to understand what statement "Order of events in Broadcast State may differ across tasks" in [1] means. Let's say I have keyed function "A" which broadcasting stream of rules, KeyedBroadcastProcessFunction "B" receives rules and updates broadcast

Re: Dynamic configuration via broadcast state

2021-04-09 Thread Arvid Heise
Hi Vishal, what you are trying to achieve is quite common and has its own documentation [1]. Currently, there is no way to hold back elements of the non-broadcast side (your question 2 in OP), so you have to save them until configuration arrives. If you have several configurable operators, you co

Re: Dynamic configuration via broadcast state

2021-04-06 Thread vishalovercome
I researched a bit more and another suggested solution is to build a custom source function that somehow waits for each operator to load it's configuration which is infact set in the open method of the source itself. I'm not sure if that's a good idea as that just exposes entire job configuration t

Dynamic configuration via broadcast state

2021-04-06 Thread vishalovercome
I have to make my flink job dynamically configurable and I'm thinking about using broadcast state. My current static job configuration file consists of configuration of entire set of operators which I load into a case class and then I explicitly pass the relevant configuration of each operat

Re: Initializing broadcast state

2021-01-28 Thread Guowei Ma
to update >> the keyedstate. But there is no key in `processBroadcastElement` . >> Best, >> Guowei >> >> >> On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner wrote: >> >>> Hi Guowei, >>> I am not using a keyed broadcast function, I use [

Re: Initializing broadcast state

2021-01-27 Thread Wei Jiang
(dimensionInitStream).broadcast(descriptor) ... ``` then the main stream can connect the broadcast state... e... i dont know why it works, how do you think about that? Guowei Ma wrote > Hi, Nick > You might need to handle it yourself If you have to process an element > only after you get the

Re: Initializing broadcast state

2021-01-26 Thread Jaffe, Julian
:31 PM To: Guowei Ma Cc: user Subject: Re: Initializing broadcast state Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements. Thanks a ton for your help. Best, Nick On Mon, Jan

Re: Initializing broadcast state

2021-01-26 Thread Nick Bendtner
Hi Guowei, >> I am not using a keyed broadcast function, I use [1]. My question is, >> can a non broadcast state, for instance value state/map state be updated >> whenever I get a broadcast event in *processBroadcastElement*. This way >> the state updates are consistent since

Re: Initializing broadcast state

2021-01-25 Thread Guowei Ma
am not using a keyed broadcast function, I use [1]. My question is, can > a non broadcast state, for instance value state/map state be updated > whenever I get a broadcast event in *processBroadcastElement*. This way > the state updates are consistent since each instance of the task

Re: Initializing broadcast state

2021-01-25 Thread Nick Bendtner
Hi Guowei, I am not using a keyed broadcast function, I use [1]. My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in *processBroadcastElement*. This way the state updates are consistent since each instance of the task gets

Re: Initializing broadcast state

2021-01-24 Thread Guowei Ma
ote: > Thanks Guowei. Another question I have is, what is the use of a broadcast > state when I can update a map state or value state inside of the process > broadcast element method and use that state to do a lookup in the process > element method like this example > > htt

Re: Initializing broadcast state

2021-01-24 Thread Nick Bendtner
Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example https://stackoverflow.com/questions/58307154

Re: Initializing broadcast state

2021-01-24 Thread Guowei Ma
Hi, Nick You might need to handle it yourself If you have to process an element only after you get the broadcast state. For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the

Initializing broadcast state

2021-01-24 Thread Nick Bendtner
Hi guys, What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first

Re: Latency tracking together with broadcast state can cause job failure

2020-06-16 Thread Arvid Heise
owner would take a look > at this problem. > > Really thank you for reporting this bug. > > [1] https://issues.apache.org/jira/browse/FLINK-17322 > > Best > Yun Tang > -- > *From:* Yun Tang > *Sent:* Wednesday, April 22, 2020 1:43 > *T

Re: Broadcast state vs data enrichment

2020-05-13 Thread Manas Kale
I see, thank you Roman! On Tue, May 12, 2020 at 4:59 PM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Thanks for the clarification. > > Apparently, the second option (with enricher) creates more load by adding > configuration to every event. Unless events are much bigger than the > co

Re: Broadcast state vs data enrichment

2020-05-12 Thread Khachatryan Roman
Thanks for the clarification. Apparently, the second option (with enricher) creates more load by adding configuration to every event. Unless events are much bigger than the configuration, this will significantly increase network, memory, and CPU usage. Btw, I think you don't need a broadcast in th

Re: Broadcast state vs data enrichment

2020-05-11 Thread Manas Kale
Sure. Apologies for not making this clear enough. > each operator only stores what it needs. Lets imagine this setup : BROADCAST STREAM config-stream | |

Re: Broadcast state vs data enrichment

2020-05-11 Thread Khachatryan Roman
Hi Manas, The approaches you described looks the same: > each operator only stores what it needs. > each downstream operator will "strip off" the config parameter that it needs. Can you please explain the difference? Regards, Roman On Mon, May 11, 2020 at 8:07 AM Manas Kale wrote: > Hi, > I

Broadcast state vs data enrichment

2020-05-10 Thread Manas Kale
Hi, I have a single broadcast message that contains configuration data consumed by different operators. For eg: config = { "config1" : 1, "config2" : 2, "config3" : 3 } Operator 1 will consume config1 only, operator 2 will consume config2 only etc. - Right now in my implementation the config

Re: Latency tracking together with broadcast state can cause job failure

2020-04-22 Thread Lasse Nedergaard
k at this problem. > > Really thank you for reporting this bug. > > [1] https://issues.apache.org/jira/browse/FLINK-17322 > > Best > Yun Tang > From: Yun Tang > Sent: Wednesday, April 22, 2020 1:43 > To: Lasse Nedergaard > Cc: user > Subject: Re: Latency tracki

Re: Latency tracking together with broadcast state can cause job failure

2020-04-22 Thread Yun Tang
Nedergaard Cc: user Subject: Re: Latency tracking together with broadcast state can cause job failure Hi Lasse Really sorry for missing your reply. I'll run your project and find the root cause in my day time. And thanks for @Robert Metzger<mailto:rmetz...@apache.org> 's kind r

Re: Latency tracking together with broadcast state can cause job failure

2020-04-21 Thread Yun Tang
1, 2020 20:01 To: Lasse Nedergaard Cc: Yun Tang ; user Subject: Re: Latency tracking together with broadcast state can cause job failure Hey Lasse, has the problem been resolved? (I'm also responding to this to make sure the thread gets attention again :) ) Best, Robert On Wed, Apr 1, 2

Re: Latency tracking together with broadcast state can cause job failure

2020-04-21 Thread Robert Metzger
aard > *Sent:* Tuesday, March 31, 2020 19:10 > *To:* user > *Subject:* Latency tracking together with broadcast state can cause job > failure > > Hi > > We have in both Flink 1.9.2 and 1.10 struggled with random deserialze and > Index out of range exception in one of our j

Re: Latency tracking together with broadcast state can cause job failure

2020-04-01 Thread Lasse Nedergaard
Nedergaard Sent: Tuesday, March 31, 2020 19:10 To: user Subject: Latency tracking together with broadcast state can cause job failure   Hi We have in both Flink 1.9.2 and 1.10 struggled with random deserialze and Index out of range exception in one of our job. We also get out of memory exceptions

Re: Latency tracking together with broadcast state can cause job failure

2020-04-01 Thread Yun Tang
Subject: Latency tracking together with broadcast state can cause job failure Hi We have in both Flink 1.9.2 and 1.10 struggled with random deserialze and Index out of range exception in one of our job. We also get out of memory exceptions. We have now identified it as a latency tracking together with

Latency tracking together with broadcast state can cause job failure

2020-03-31 Thread Lasse Nedergaard
Hi We have in both Flink 1.9.2 and 1.10 struggled with random deserialze and Index out of range exception in one of our job. We also get out of memory exceptions. We have now identified it as a latency tracking together with broadcast state Causing the problem. When we do integration testing

Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-27 Thread Jin Yi
id("*your-uid*"); > > Make sure the uid and the state-name are the same with those in your > savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast > state when opening. [3] > > > [1] > https://flink.apache.org/feature/2019/09/13/state-processor-api

Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-27 Thread Yun Tang
ule itself. MapStateDescriptor stateDescriptor = new MapStateDescriptor<>( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint() {})); // broadcast the rules and create the broad

Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-26 Thread Jin Yi
Hi Yun, After search around in the documentation, I tried extends BroadcastProcessFunction implements CheckpointedFunction. And I have initialized broadcast state in public void initializeState(FunctionInitializationContext context) method, it seems working fine. Here is the doc I followed

Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-26 Thread Jin Yi
Hi Yun, Thanks for the response, I have checked official document, and I have referred this example to write the broadcast state to a savepoint. My question is: I can use state processor api to read back the savepoint into a dataSet, but how can I use the dataSet as the initial value for the

Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-26 Thread Yun Tang
Hi Yi Can the official doc of writing broad cast state [1] satisfies your request? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 Best Yun Tang From: Jin Yi Sent: Thursday, January 23, 2020 8:12

[State Processor API] how to convert savepoint back to broadcast state

2020-01-22 Thread Jin Yi
Hi there, I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it? // load the existingSavepoint; ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); // read

Re: Restore metrics on broadcast state after restart

2019-12-19 Thread Gaël Renoux
ate() could visit your > restored broadcast state. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction > > Best > Yun Tang > > -- > *From:* Gaël Renoux > *Sent:* Tuesday, December 1

Re: Restore metrics on broadcast state after restart

2019-12-18 Thread Yun Tang
Hi Gaël You can try initializeState [1] to initialize your metrics values from states when restoring from a checkpoint. context.getOperatorStateStore().getBroadcastState() could visit your restored broadcast state. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-17 Thread KristoffSC
Thank you for your reply Timo. Regarding point 2. I'm sorry for the delay. I rerun my test and everything seems to be in order. Open method was called as first. I guess it was a false alarm. Sorry for that. Regards, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.

Restore metrics on broadcast state after restart

2019-12-17 Thread Gaël Renoux
Hi everyone I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of rules), and I have set up a few gauge metrics on that state (things such as number of known rules and timestamp of the last rule received). However, I have on an issue when the server restarts from a checkpoint

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-11 Thread Timo Walther
;store/process" this broadcast element in our broadcast state. @Override public void open(Configuration parameters) throws Exception { super.open(parameters); processingRulesDesc = new MapStateDescriptor<>( "RulesBroadcastState",

Re: Processing Events by custom rules kept in Broadcast State

2019-12-11 Thread Timo Walther
es such as Jackson's ObjectNode in there. Putting a JSON string or an object like you RuleParams into state depends on the performance. If the JSON format changes frequently, it might be better to just store string there. But reparsing might be expensive too so keeping the transient variable

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-11 Thread KristoffSC
descriptor which would be initialized in open method. So we actually cannot "store/process" this broadcast element in our broadcast state. @Override public void open(Configuration parameters) throws Exception { super.open(parameters); processingRulesDesc = new MapState

Re: Processing Events by custom rules kept in Broadcast State

2019-12-10 Thread vino yang
Hi KristoffSC, It seems the main differences are when to parse your rules and what could be put into the broadcast state. IMO, multiple solutions all can take effect. I prefer option 3. I'd like to parse the rules ASAP and let them be real rule event stream (not ruleset stream) in the s

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-10 Thread vino yang
://mail-archives.apache.org/mod_mbox/flink-user/201911.mbox/%3CCAArWwf4jmbaFeizO_YBZVBAMyiuvV95DetoVCkj4rJi4PYorpQ%40mail.gmail.com%3E [2]: https://stackoverflow.com/questions/54748158/how-could-flink-broadcast-state-be-initialized KristoffSC 于2019年12月11日周三 上午5:56写道: > Hi, > I was playing ar

Processing Events by custom rules kept in Broadcast State

2019-12-10 Thread KristoffSC
Hi, I think this would be the very basic use case for Broadcast State Pattern but I would like to know what are the best approaches to solve this problem. I have an operator that extends BroadcastProcessFunction. The brodcastElement is an element sent as Json format message by Kafka. It describes

Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-10 Thread KristoffSC
.process(new DriveEngineRuleOperator()) .name("Drive Rule Evaluation"); Where DriveEngineRuleOperator extends BroadcastProcessFunction and implements open, processElement and processBroadcastElement methods. I was following Flink's tutorials about broadcast stat

Re: Initialization of broadcast state before processing main stream

2019-11-15 Thread Vasily Melnik
is helps, > Maxim. > > On Thu, Nov 14, 2019 at 7:42 AM vino yang wrote: > >> Hi Vasily, >> >> Currently, Flink did not do the coordination between a general stream and >> broadcast stream, they are both streams. Your scene of using the broadcast >> state is a spe

Re: Initialization of broadcast state before processing main stream

2019-11-14 Thread Maxim Parkachov
ared lock in zookeeper. Hope this helps, Maxim. On Thu, Nov 14, 2019 at 7:42 AM vino yang wrote: > Hi Vasily, > > Currently, Flink did not do the coordination between a general stream and > broadcast stream, they are both streams. Your scene of using the broadcast > state is a spec

Re: Initialization of broadcast state before processing main stream

2019-11-13 Thread vino yang
Hi Vasily, Currently, Flink did not do the coordination between a general stream and broadcast stream, they are both streams. Your scene of using the broadcast state is a special one. In a more general scene, the states need to be broadcasted is an unbounded stream, the state events may be

Initialization of broadcast state before processing main stream

2019-11-13 Thread Vasily Melnik
Hi all. In our task we have two Kafka topics: - one with fact stream (web traffic) - one with dimension We would like to put dimension data into broadcast state and lookup on int with facts. But we see that not all dimension records are put into state before first fact record is processed, so

Ordered events in broadcast state

2019-11-04 Thread Filip Niksic
Hi all, The documentation for the broadcast state explicitly says that the order of broadcast events may differ across tasks, so the state updates should not depend on a particular order. [1] But what to do in the use cases where the order matters? Is there some way to enforce the order even at

Re: Broadcast state

2019-10-19 Thread Congxian Qiu
cate >> things. >> >> Best, >> Congxian >> >> >> Fabian Hueske 于2019年10月2日周三 下午5:30写道: >> >>> Hi, >>> >>> State is always associated with a single task in Flink. >>> The state of a task cannot be accessed by other

Re: Broadcast state

2019-10-17 Thread Navneeth Krishnan
f the same >> operator or tasks of other operators. >> This is true for every type of state, including broadcast state. >> >> Best, Fabian >> >> >> Am Di., 1. Okt. 2019 um 08:22 Uhr schrieb Navneeth Krishnan < >> reachnavnee...@gmail.com>: >>

Re: Broadcast state

2019-10-09 Thread Congxian Qiu
te of a task cannot be accessed by other tasks of the same operator > or tasks of other operators. > This is true for every type of state, including broadcast state. > > Best, Fabian > > > Am Di., 1. Okt. 2019 um 08:22 Uhr schrieb Navneeth Krishnan < > reachnavnee...@gma

Re: Broadcast state

2019-10-02 Thread Fabian Hueske
Hi, State is always associated with a single task in Flink. The state of a task cannot be accessed by other tasks of the same operator or tasks of other operators. This is true for every type of state, including broadcast state. Best, Fabian Am Di., 1. Okt. 2019 um 08:22 Uhr schrieb Navneeth

Re: Broadcast state

2019-09-30 Thread Navneeth Krishnan
Hi, I can use redis but I’m still having hard time figuring out how I can eliminate duplicate data. Today without broadcast state in 1.4 I’m using cache to lazy load the data. I thought the broadcast state will be similar to that of kafka streams where I have read access to the state across the

Re: Broadcast state

2019-09-30 Thread Congxian Qiu
asting memory. In my case there will around > million entries which needs to be used by at least two operators for now. > > Thanks > > On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez wrote: > >> This is how we currently use broadcast state. Our states are re-usable >> (code

Re: Broadcast state

2019-09-30 Thread Navneeth Krishnan
we currently use broadcast state. Our states are re-usable > (code-wise), every operator that wants to consume basically keeps the same > descriptor state locally by processBroadcastElement'ing into a local state. > > I am open to suggestions. I see this as a hard drawback of dataf

Re: Broadcast state

2019-09-30 Thread Oytun Tez
This is how we currently use broadcast state. Our states are re-usable (code-wise), every operator that wants to consume basically keeps the same descriptor state locally by processBroadcastElement'ing into a local state. I am open to suggestions. I see this as a hard drawback of dat

Re: Broadcast state

2019-09-30 Thread Oytun Tez
rld's Fastest Human Translation Platform. oy...@motaword.com — www.motaword.com On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan wrote: > Hi All, > > Is it possible to access a broadcast state across the pipeline? For > example, say I have a KeyedBroadcastProcessFunction which ad

  1   2   >