Re: How to call the open method of JdbcSink?

2023-11-29 Thread Sai Vishnu
Hi Feng,

Thank you for your response and the suggestion. I was able to cast the
SinkFunction to GenericJdbcSinkFunction which enabled me to override the
open and close methods.
On proceeding further, I have observed that if the batch encounters
BatchUpdateException due to a packet inside the batch, then the driver does
not proceed further to update other entries to the db.

Is there any way to programmatically recover from this state and let the
execution move on to the next packet in the batch or the next batch?

Some homework that I did which could be helpful -
I see that flink-connector-jdbc uses com.oracle.database.jdbc.ojdbc8
[source:
https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/pom.xml#L69-L73]
and the Oracle documentation only says this: "After a command in a batch
update fails to execute properly and a BatchUpdateException is thrown, the
driver may or may not continue to process the remaining commands in the
batch".[source:
https://docs.oracle.com/javase/8/docs/api/java/sql/BatchUpdateException.html].
Is there a way to mandate this driver to continue the processing even after
encountering a failure?


I am looking for a way to recover programmatically from the
BatchUpdateException and let the driver continue updating to the dB. Any
packet failing should be added to a DLQ, or at least be logged with the
help of try-catch blocks. Any insights would be much appreciated.

One more observation: The BatchUpdateException is nested inside multiple
RuntimeExceptions and IOExceptions. Is there any consistency/pattern to
this?

Thanks,
Sai Vishnu Soudri


On Tue, 28 Nov 2023 at 20:29, Feng Jin  wrote:

> Hi Sai
>
> I think you can directly cast SinkFunction to GenericJdbcSinkFunction.
>
>
>
> https://github.com/apache/flink-connector-jdbc/blob/b477d452ba3aac38d53d1f5d4c4820bdad3ad9cd/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java#L63C41-L63C41
> ```
> public static  SinkFunction sink(
> String sql,
> JdbcStatementBuilder statementBuilder,
> JdbcExecutionOptions executionOptions,
> JdbcConnectionOptions connectionOptions) {
> return new GenericJdbcSinkFunction<>(
> new JdbcOutputFormat<>(
> new
> SimpleJdbcConnectionProvider(connectionOptions),
> executionOptions,
> () -> JdbcBatchStatementExecutor.simple(sql,
> statementBuilder)));
> }
> ```
>
>
> Best,
> Feng
>
>
> On Tue, Nov 28, 2023 at 5:49 PM Sai Vishnu 
> wrote:
>
>> Hi team,
>>
>>
>> I am using the JdbcSink from flink-connector-jdbc artifact, version
>> 3.1.0-1.17. I am trying to write a Sink wrapper that will internally call
>> the invoke method and open method of jdbc sink. While implementing, I see
>> that JdbcSink.*sink() *returns a SinkFunction which only exposes the
>> invoke method and not the open method.
>>
>>
>> Would appreciate any suggestions on how I can implement this. To add to
>> the requirement, the use case is to try and enclose the invoke operation in
>> a try catch block so that any exception during the db write process can be
>> caught and handled properly.
>>
>>
>> Thanks,
>>
>> Sai Vishnu Soudri
>>
>


Re: How to call the open method of JdbcSink?

2023-11-28 Thread Feng Jin
Hi Sai

I think you can directly cast SinkFunction to GenericJdbcSinkFunction.


https://github.com/apache/flink-connector-jdbc/blob/b477d452ba3aac38d53d1f5d4c4820bdad3ad9cd/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java#L63C41-L63C41
```
public static  SinkFunction sink(
String sql,
JdbcStatementBuilder statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcConnectionOptions connectionOptions) {
return new GenericJdbcSinkFunction<>(
new JdbcOutputFormat<>(
new SimpleJdbcConnectionProvider(connectionOptions),
executionOptions,
() -> JdbcBatchStatementExecutor.simple(sql,
statementBuilder)));
}
```


Best,
Feng


On Tue, Nov 28, 2023 at 5:49 PM Sai Vishnu 
wrote:

> Hi team,
>
>
> I am using the JdbcSink from flink-connector-jdbc artifact, version
> 3.1.0-1.17. I am trying to write a Sink wrapper that will internally call
> the invoke method and open method of jdbc sink. While implementing, I see
> that JdbcSink.*sink() *returns a SinkFunction which only exposes the
> invoke method and not the open method.
>
>
> Would appreciate any suggestions on how I can implement this. To add to
> the requirement, the use case is to try and enclose the invoke operation in
> a try catch block so that any exception during the db write process can be
> caught and handled properly.
>
>
> Thanks,
>
> Sai Vishnu Soudri
>


How to call the open method of JdbcSink?

2023-11-27 Thread Sai Vishnu
Hi team,


I am using the JdbcSink from flink-connector-jdbc artifact, version
3.1.0-1.17. I am trying to write a Sink wrapper that will internally call
the invoke method and open method of jdbc sink. While implementing, I see
that JdbcSink.*sink() *returns a SinkFunction which only exposes the invoke
method and not the open method.


Would appreciate any suggestions on how I can implement this. To add to the
requirement, the use case is to try and enclose the invoke operation in a
try catch block so that any exception during the db write process can be
caught and handled properly.


Thanks,

Sai Vishnu Soudri