I am trying to use the State Processor API. Does that require HDFS or a filesystem?
I wish there was a complete example that ties in both DataSet and DataStream API, and the State Processor API. So far I have not been able to get it to work. Does anybody know where I can find examples of these type of techniques? On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal < theo.diefent...@scoop-software.de> wrote: > Hi Marco, > > In general, I see three solutions here you could approach: > > 1. Use the StateProcessorAPI: You can run a program with the > stateProcessorAPI that loads the data from JDBC and stores it into a Flink > SavePoint. Afterwards, you start your streaming job from that savepoint > which will load its state and within find all the data from JDBC stored > already. > 2. Load from master, distribute with the job: When you build up your > jobgraph, you could execute the JDBC queries and put the result into some > Serializable class which in turn you plug in a an operator in your stream > (e.g. a map stage). The class along with all the queried data will be > serialized and deserialized on the taskmanagers (Usually, I use this for > configuration parameters, but it might be ok in this case as well) > 3. Load from TaskManager: In your map-function, if the very first event is > received, you can block processing and synchronously load the data from > JDBC (So each Taskmanager performs the JDBC query itself). You then keep > the data to be used for all subsequent map steps. > > I think, option 3 is the easiest to be implemented while option 1 might be > the most elegant way in my opinion. > > Best regards > Theo > > ------------------------------ > *Von: *"Marco Villalobos" <mvillalo...@kineteque.com> > *An: *"Leonard Xu" <xbjt...@gmail.com> > *CC: *"user" <user@flink.apache.org> > *Gesendet: *Mittwoch, 5. August 2020 04:33:23 > *Betreff: *Re: Two Queries and a Kafka Topic > > Hi Leonard, > > First, Thank you. > > I am currently trying to restrict my solution to Apache Flink 1.10 because > its the current version supported by Amazon EMR. > i am not ready to change our operational environment to solve this. > > Second, I am using the DataStream API. The Kafka Topic is not in a table, > it is in a DataStream. > > The SQL queries are literally from a PostgresSQL database, and only need > to be run exactly once in the lifetime of the job. > > I am struggling to determine where this happens. > > JDBCInputFormat seems to query the SQL table repetitively, and also > connecting streams and aggregating into one object is very complicated. > > Thus, I am wondering what is the right approach. > > Let me restate the parameters. > > SQL Query One = data in PostgreSQL (200K records) that is used for > business logic. > SQL Query Two = data in PostgreSQL (1000 records) that is used for > business logic. > Kafka Topic One = unlimited data-stream that uses the data-stream api and > queries above to write into multiple sinks > > Asci Diagram: > > [SQL Query One] ----> [Aggregate to Map] > > Kafka ----> [Kafka Topic One] --- [Keyed Process Function (Query One Map, > Query Two Map)] ---<[Multiple Sinks] > > [SQL Query Two] ---->[Aggregate to Map] > > > Maybe my graph above helps. You see, I need Query One and Query Two only > ever execute once. After that the information they provide are used to > correctly process the Kafka Topic. > > I'll take a deep further to try and understand what you said, thank you, > but JDBCInputFormat seem to repetitively query the database. Maybe I need > to write a RichFunction or AsyncIO function and cache the results in state > after that. > > > > On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjt...@gmail.com> wrote: > > Hi, Marco > > If I need SQL Query One and SQL Query Two to happen just one time, > > > Looks like you want to reuse this kafka table in one job, It’s supported > to execute multiple query in one sql job in Flink 1.11. > You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a > single SQL job[1]. > > > Best > Leonard > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement > > > 在 2020年8月5日,04:34,Marco Villalobos <mvillalo...@kineteque.com> 写道: > > Lets say that I have: > > SQL Query One from data in PostgreSQL (200K records). > SQL Query Two from data in PostgreSQL (1000 records). > and Kafka Topic One. > > Let's also say that main data from this Flink job arrives in Kafka Topic > One. > > If I need SQL Query One and SQL Query Two to happen just one time, when > the job starts up, and afterwards maybe store it in Keyed State or > Broadcast State, but it's not really part of the stream, then what is the > best practice for supporting that in Flink > > The Flink job needs to stream data from Kafka Topic One, aggregate it, and > perform computations that require all of the data in SQL Query One and SQL > Query Two to perform its business logic. > > I am using Flink 1.10. > > I supposed to query the database before the Job I submitted, and then pass > it on as parameters to a function? > Or am I supposed to use JDBCInputFormat for both queries and create two > streams, and somehow connect or broadcast both of them two the main stream > that uses Kafka Topic One? > > I would appreciate guidance. Please. Thank you. > > Sincerely, > > Marco A. Villalobos > >