Thanks Till, the tickets and links were immensely useful.  With that i was
able to make progress and even get things to compile.  However, when i run
things a serializable exception is thrown. (see below)

.addSink(JdbcSink.sink[SignableTableSchema](
>   addIntervalToInsertStatement(insertStatement, interval),
>   (ps: PreparedStatement, rd: SignableTableSchema) => {
>     ps.setString(1, rd.data_processing_id)
>     ps.setTimestamp(2, rd.crc)
>     ps.setString(3, rd.command)
>     ps.setString(4, rd.result)
>     ps.setOptionalString(5, rd.message)
>     ps.setString(6, rd.arguments)
>     ps.setOptionalString(7, rd.validatorUUID)
>   },
>   getJdbcExecutionOptions,
>   getJdbcOptions(interval, insertStatement) // <-- This is line 376
> ))
>
>  Where i set the executionOptions to behave in a bachfull way.

def getJdbcExecutionOptions: JdbcExecutionOptions = {
  JdbcExecutionOptions.builder()
    .withBatchIntervalMs(1000)
    .withBatchSize(1000)
    .withMaxRetries(10)
    .build
}


Any suggestions?

[info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the AbstractJdbcOutputFormat is not serializable. The
> object probably contains or references non serializable fields.
> [info]   at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
> [info]   at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
> [info]   at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
> [info]   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
> [info]   at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
> [info]   at
> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
> [info]   at
> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
> [info]   at
> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
> [info]   at
> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
> [info]   at
> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
> [info]   ...
> [info]   Cause: java.io.NotSerializableException: Non-serializable lambda
> [info]   at
> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown
> Source)
> [info]   at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> [info]   at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> [info]   at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> [info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> [info]   at
> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
> [info]   at
> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
> [info]   at
> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
> [info]   at
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
> [info]   at
> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
>


On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[email protected]> wrote:

> Hi Clay,
>
> I am not a Table API expert but let me try to answer your question:
>
> With FLINK-17748 [1] the community removed the registerTableSink in favour
> of the connect API. The connect API has been deprecated [2] because it was
> not well maintained. Now the recommended way for specifying sinks is to use
> Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to
> use the DDL. Maybe Timo or Jark can point you towards a good guide on how
> to register your jdbc table sink.
>
> [1] https://issues.apache.org/jira/browse/FLINK-17748
> [2] https://issues.apache.org/jira/browse/FLINK-18416
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[email protected]>
> wrote:
>
>> Hey all.  Hopefully this is an easy question.  I'm porting my JDBC
>> postgres sink from 1.10 to 1.12
>>
>> I'm using:
>> * StreamTableEnvironment
>> * JdbcUpsertTableSink
>>
>> What I'm having difficulty with is how to register the sink with the
>> streaming table environment.
>>
>> In 1.10:
>>
>>     tableEnv.registerTableSink(
>>>       s"${interval}_maalka_jdbc_output_table",
>>>       jdbcTableSink)
>>
>>
>> This method doesn't exist in 1.12, what is the equivalent?
>>
>> Thanks!
>> Clay
>>
>>

Reply via email to