I am trying to use the State Processor API.  Does that require HDFS or a
filesystem?

I wish there was a complete example that ties in both DataSet and
DataStream API, and the State Processor API.

So far I have not been able to get it to work.

Does anybody know where I can find examples of these type of techniques?



On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Marco,
>
> In general, I see three solutions here you could approach:
>
> 1. Use the StateProcessorAPI: You can run a program with the
> stateProcessorAPI that loads the data from JDBC and stores it into a Flink
> SavePoint. Afterwards, you start your streaming job from that savepoint
> which will load its state and within find all the data from JDBC stored
> already.
> 2. Load from master, distribute with the job: When you build up your
> jobgraph, you could execute the JDBC queries and put the result into some
> Serializable class which in turn you plug in a an operator in your stream
> (e.g. a map stage). The class along with all the queried data will be
> serialized and deserialized on the taskmanagers (Usually, I use this for
> configuration parameters, but it might be ok in this case as well)
> 3. Load from TaskManager: In your map-function, if the very first event is
> received, you can block processing and synchronously load the data from
> JDBC (So each Taskmanager performs the JDBC query itself). You then keep
> the data to be used for all subsequent map steps.
>
> I think, option 3 is the easiest to be implemented while option 1 might be
> the most elegant way in my opinion.
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Marco Villalobos" <mvillalo...@kineteque.com>
> *An: *"Leonard Xu" <xbjt...@gmail.com>
> *CC: *"user" <user@flink.apache.org>
> *Gesendet: *Mittwoch, 5. August 2020 04:33:23
> *Betreff: *Re: Two Queries and a Kafka Topic
>
> Hi Leonard,
>
> First, Thank you.
>
> I am currently trying to restrict my solution to Apache Flink 1.10 because
> its the current version supported by Amazon EMR.
> i am not ready to change our operational environment to solve this.
>
> Second, I am using the DataStream API.  The Kafka Topic is not in a table,
> it is in a DataStream.
>
> The SQL queries are literally from a PostgresSQL database, and only need
> to be run exactly once in the lifetime of the job.
>
> I am struggling to determine where this happens.
>
> JDBCInputFormat seems to query the SQL table repetitively, and also
> connecting streams and aggregating into one object is very complicated.
>
> Thus, I am wondering what is the right approach.
>
> Let me restate the parameters.
>
> SQL Query One = data in PostgreSQL (200K records) that is used for
> business logic.
> SQL Query Two = data in PostgreSQL (1000 records) that is used for
> business logic.
> Kafka Topic One = unlimited data-stream that uses the data-stream api and
> queries above to write into multiple sinks
>
> Asci Diagram:
>
> [SQL Query One] ----> [Aggregate to Map]
>
> Kafka ----> [Kafka Topic One]  --- [Keyed Process Function (Query One Map,
> Query Two Map)] ---<[Multiple Sinks]
>
> [SQL Query Two] ---->[Aggregate to Map]
>
>
> Maybe my graph above helps.  You see, I need Query One and Query Two only
> ever execute once.  After that the information they provide are used to
> correctly process the Kafka Topic.
>
> I'll take a deep further to try and understand what you said, thank you,
> but JDBCInputFormat seem to repetitively query the database.  Maybe I need
> to write a RichFunction or AsyncIO function and cache the results in state
> after that.
>
>
>
> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjt...@gmail.com> wrote:
>
> Hi, Marco
>
> If I need SQL Query One and SQL Query Two to happen just one time,
>
>
> Looks like you want to reuse this kafka table in one job, It’s supported
> to execute multiple query in one sql job in Flink 1.11.
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a
> single SQL job[1].
>
>
> Best
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>
>
> 在 2020年8月5日,04:34,Marco Villalobos <mvillalo...@kineteque.com> 写道:
>
> Lets say that I have:
>
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
>
> Let's also say that main data from this Flink job arrives in Kafka Topic
> One.
>
> If I need SQL Query One and SQL Query Two to happen just one time, when
> the job starts up, and afterwards maybe store it in Keyed State or
> Broadcast State, but it's not really part of the stream, then what is the
> best practice for supporting that in Flink
>
> The Flink job needs to stream data from Kafka Topic One, aggregate it, and
> perform computations that require all of the data in SQL Query One and SQL
> Query Two to perform its business logic.
>
> I am using Flink 1.10.
>
> I supposed to query the database before the Job I submitted, and then pass
> it on as parameters to a function?
> Or am I supposed to use JDBCInputFormat for both queries and create two
> streams, and somehow connect or broadcast both of them two the main stream
> that uses Kafka Topic One?
>
> I would appreciate guidance. Please.  Thank you.
>
> Sincerely,
>
> Marco A. Villalobos
>
>

Reply via email to