Hi Sambaran,

I'm not sure if this is the best approach, though I don't know your full
use case/ implementation.

What kind of error do you get when trying to map into a PreparedStatement?
I assume you tried something like this?

SingleOutputStreamOperator<Row> stream =
env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB"));

stream.addSink(JdbcSink.sink(
   "EXEC ?",
   (preparedStatement, row) -> {
      // extend `preparedStatement` with row info
      preparedStatement.setString(0, (String) row.getField(0));
   },
   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
      .withUrl("jdbc:derby:memory:ebookshop")
      .withDriverName("org.apache.derby.jdbc.EmbeddedDriver")
      .build()));

Best,
Austin

On Tue, Apr 20, 2021 at 12:42 PM Sambaran <sambaran2...@gmail.com> wrote:

> Hi Austin,
>
> We are using this for jdbc interfacing to populate postgres tables based
> on the data coming from an event source.
>
> We tried with the approach mentioned in the doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html
> but did not find a suitable way to map SingleOutputStreamOperator<Row> .
> Can you please let me know if this is the right approach and if yes, how do
> we map the SingleOutputStreamOperator<Row> to the preparedstatement in
> JdbcStatementBuilder?
>
> Thanks for your help!
>
> Regards
> Sambaran
>
> On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Great, thanks for the clarification! I'm checking with others now. Are
>> you using other parts of the Table/SQL APIs, or just this for JDBC
>> interfacing?
>>
>> Best,
>> Austin
>>
>> On Tue, Apr 20, 2021 at 12:20 PM Sambaran <sambaran2...@gmail.com> wrote:
>>
>>> Hi Austin,
>>>
>>> Thanks for replying. This is exactly as you mentioned here. Do we have a
>>> way to execute the procedure with 1.11 or upper version as
>>> JDBCAppendTableSink is no longer available with these?
>>>
>>> Regards
>>> Sambaran
>>>
>>> On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey Sambaran,
>>>>
>>>> I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure
>>>> I understand what you're current solution looks like, it's something like
>>>> the following, where you're triggering a procedure on each element of a
>>>> stream?
>>>>
>>>>       JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
>>>>             .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
>>>>             .setDBUrl("jdbc:derby:memory:ebookshop")
>>>>             .setQuery("EXEC YourProcedure")
>>>>             .build();
>>>>
>>>>         SingleOutputStreamOperator<Row> stream =
>>>> env.fromElements(Row.of("a"), Row.of("b"));
>>>>         sink.emitDataStream(stream);
>>>>
>>>> Or something else?
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Apr 20, 2021 at 11:10 AM Sambaran <sambaran2...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am currently using JDBCAppendTableSink to execute database stored
>>>>> procedures from flink to populate data to external tables using
>>>>>  SingleOutputStreamOperator (version 1.7). Now we are trying to update to
>>>>> Flink 1.11/ later and found JDBCAppendTableSink has been removed, 
>>>>> currently
>>>>> when looking for an alternative I could not find any suitable approach
>>>>> which would call database stored procedure. Is there any alternative
>>>>> approach to resolve this?
>>>>>
>>>>> Regards
>>>>> Sambaran
>>>>>
>>>>

Reply via email to