Hey Community,

I'd like to update that the solution was eventually simple.

We achieved the desired results; each stream has been saved info into 
corresponded schema in specific table. the connection was handled by JDBC pool.


  1.  We have created X Schemas(With known format, i.e. employee_<id> ; under 
each schema the exact same table has been created
  2.  Postgres Sink - which extends GenericJdbcSinkFuncion
  3.  Postgres Ingestor - Created the Postgres Sink:

     *   PreparedStatement - Which was just insert query with values to the 
table
     *   JdbcStatementBuilder - Statement builder
     *   JdbcConnectionOptions - Postgres connection details.
     *   JdbcExecutionOptions - Execution details ( batch size was set to 1, 
interval 3seconds, max retries was 3)

The trick was to switch the schema within the JdbcStatementBuilder.

Example

@NoArgsConstructor
private static class EmployeesStatementBuilder implements 
JdbcStatementBuilder<EmployeeDetails>, Serializable {

    @Override
    public void accept(final PreparedStatement preparedSQLStatement, final 
EmployeeDetails employeeDetails)
            throws Exception {


            final String schemaName = "employee_" + employeeDetails.getId();

            try {

                 preparedSQLStatement.getConnection().setSchema(schemaName); < 
----- Here we set the schema per connection

                 // setting the prepared statement's values (JDBC uses 1-based 
index numbering, rather than zero-based)

        preparedSQLStatement.setString(EMPLOYEE_ID_ID_COLUMN, 
employeeDetails.getId());

                 preparedSQLStatement.setString(TIME_UNIT_ID_COLUMN, 
employeeDetails.getTimeUnit());

                 preparedSQLStatement.setString(ANNUAL_SALARY_ID_COLUMN, 
employeeDetails.getAnnualSalary());

             } catch (Exception exception) {
                log.error(exception);

                throw exception
            }
        }
    }

Cheers,

Tamir.


________________________________
From: Maciej Obuchowski <obuchowski.mac...@gmail.com>
Sent: Monday, June 7, 2021 6:23 PM
To: Nicolaus Weidner <nicolaus.weid...@data-artisans.com>
Cc: Tamir Sagi <tamir.s...@niceactimize.com>; user@flink.apache.org 
<user@flink.apache.org>
Subject: Re: Stream processing into single sink to multiple DB Schemas


EXTERNAL EMAIL


Hey,
We had similar problem, but with 1000s of tables. I've created issue [1] and PR 
with internally used solution [2], but unfortunately, there seems to be no 
interest in upstreaming this feature.

Thanks,
Maciej

[1] https://issues.apache.org/jira/browse/FLINK-21643
[2] https://github.com/apache/flink/pull/15102

pon., 7 cze 2021 o 17:15 Nicolaus Weidner 
<nicolaus.weid...@data-artisans.com<mailto:nicolaus.weid...@data-artisans.com>> 
napisaƂ(a):
Hi Tamir,

I assume you want to use the Jdbc connector?
You can use three filters on your input stream to separate it into three 
separate streams, then add a sink to each of those (see e.g. [1]). Then you can 
have a different SQL statement for each of the three sinks. If you specify the 
driver name in JdbcConnectionOptions, that driver will be used to obtain a DB 
connection (see [2]). So if you use a pooling driver (e.g. [3]), connections 
should automatically be taken from a shared pool.

Does that help?

Best wishes,
Nico

[1] 
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/#jdbc-connection-parameters
[3] https://commons.apache.org/proper/commons-dbcp/

On Mon, Jun 7, 2021 at 8:23 AM Tamir Sagi 
<tamir.s...@niceactimize.com<mailto:tamir.s...@niceactimize.com>> wrote:
Hey Community

Assuming there are 3 groups,
A, B, C

Each group represents a set of data about employees and salaries.
Group A ( 0-20K $)
Group B (20K$ - 50K$)
Group C ( > 50K$)

Is it possible to process stream data from single source containing information 
about employees and salaries and split the data into different DB schemas on 
the same DB? (Single Sink - Single Connection)

I Encountered Side output and dynamic tables
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/side_output/
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/

I'm not sure it's the right way.

If there is a better way , enlighten me

Thank you,

Tamir.




Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

[https://my-email-signature.link/signature.gif?u=1088647&e=164477752&v=d637d246db4762719ed5ea9e59e4c858d41ed49bd9bcb3315f58255c9b97f19e]

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Reply via email to