I am not 100% sure but maybe (_, _) => {} captures a reference to object
TestSink which is not serializable. Maybe try to simply define a no
op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().Cheers, Till On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter <[email protected]> wrote: > Ok, this is about as simple as I can get. > > package com.maalka.flink.sinks > > import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate} > import com.typesafe.scalalogging.LazyLogging > import org.apache.flink.api.common.functions.RuntimeContext > import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, > JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder} > import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, > JdbcBatchingOutputFormat} > import > org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider > import > org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor > import org.apache.flink.connector.jdbc.internal.options.JdbcOptions > import org.apache.flink.streaming.api.functions.sink.SinkFunction > import org.apache.flink.streaming.api.scala._ > import com.maalka.flink.typeInformation.Implicits._ > > import java.util.function.Function > > object TestSink extends LazyLogging { > // START HERE > def process(messageStream: DataStream[MaalkaDataRecord], > signableUpdateStream: Option[DataStream[SignableUpdate]], > streamExecutionEnvironment: StreamExecutionEnvironment): Unit = > { > > insertAnalyticData("raw", > "insert into analytic_table ... ", > messageStream.map(_ => "A")) > } > > private def insertAnalyticData( > interval: String, > insertStatement: String, > messageStream: DataStream[String]): Unit = { > val connectionString = s"jdbc:postgresql://localhost/db" > val sink: SinkFunction[String] = JdbcSink.sink( > insertStatement, > (_, _) => {}, // I have a feeling that this is the lambda that can't > serialize > JdbcExecutionOptions.builder() > .withBatchIntervalMs(1000) > .withBatchSize(1000) > .withMaxRetries(10) > .build, > JdbcOptions.builder() > .setDBUrl(connectionString) > .setTableName("analytic_table") > .build > ) > > messageStream > .addSink(sink) > } > } > > > On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[email protected]> > wrote: > >> Hi Clay, >> >> could you maybe share the source code of >> com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this >> sink uses a lambda which is not serializable. Maybe it holds a reference to >> some non Serializable class as part of its closure. >> >> Cheers, >> Till >> >> On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[email protected]> >> wrote: >> >>> Thanks Till, the tickets and links were immensely useful. With that i >>> was able to make progress and even get things to compile. However, when i >>> run things a serializable exception is thrown. (see below) >>> >>> .addSink(JdbcSink.sink[SignableTableSchema]( >>>> addIntervalToInsertStatement(insertStatement, interval), >>>> (ps: PreparedStatement, rd: SignableTableSchema) => { >>>> ps.setString(1, rd.data_processing_id) >>>> ps.setTimestamp(2, rd.crc) >>>> ps.setString(3, rd.command) >>>> ps.setString(4, rd.result) >>>> ps.setOptionalString(5, rd.message) >>>> ps.setString(6, rd.arguments) >>>> ps.setOptionalString(7, rd.validatorUUID) >>>> }, >>>> getJdbcExecutionOptions, >>>> getJdbcOptions(interval, insertStatement) // <-- This is line 376 >>>> )) >>>> >>>> Where i set the executionOptions to behave in a bachfull way. >>> >>> def getJdbcExecutionOptions: JdbcExecutionOptions = { >>> JdbcExecutionOptions.builder() >>> .withBatchIntervalMs(1000) >>> .withBatchSize(1000) >>> .withMaxRetries(10) >>> .build >>> } >>> >>> >>> Any suggestions? >>> >>> [info] org.apache.flink.api.common.InvalidProgramException: The >>>> implementation of the AbstractJdbcOutputFormat is not serializable. The >>>> object probably contains or references non serializable fields. >>>> [info] at >>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) >>>> [info] at >>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) >>>> [info] at >>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) >>>> [info] at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) >>>> [info] at >>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203) >>>> [info] at >>>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243) >>>> [info] at >>>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110) >>>> [info] at >>>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376) >>>> [info] at >>>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262) >>>> [info] at >>>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250) >>>> [info] ... >>>> [info] Cause: java.io.NotSerializableException: Non-serializable >>>> lambda >>>> [info] at >>>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown >>>> Source) >>>> [info] at >>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>> Method) >>>> [info] at >>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> [info] at >>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> [info] at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>> [info] at >>>> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) >>>> [info] at >>>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) >>>> [info] at >>>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) >>>> [info] at >>>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) >>>> [info] at >>>> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379) >>>> >>> >>> >>> On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[email protected]> >>> wrote: >>> >>>> Hi Clay, >>>> >>>> I am not a Table API expert but let me try to answer your question: >>>> >>>> With FLINK-17748 [1] the community removed the registerTableSink in >>>> favour of the connect API. The connect API has been deprecated [2] because >>>> it was not well maintained. Now the recommended way for specifying sinks is >>>> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on >>>> how to use the DDL. Maybe Timo or Jark can point you towards a good guide >>>> on how to register your jdbc table sink. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-17748 >>>> [2] https://issues.apache.org/jira/browse/FLINK-18416 >>>> [3] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[email protected]> >>>> wrote: >>>> >>>>> Hey all. Hopefully this is an easy question. I'm porting my JDBC >>>>> postgres sink from 1.10 to 1.12 >>>>> >>>>> I'm using: >>>>> * StreamTableEnvironment >>>>> * JdbcUpsertTableSink >>>>> >>>>> What I'm having difficulty with is how to register the sink with the >>>>> streaming table environment. >>>>> >>>>> In 1.10: >>>>> >>>>> tableEnv.registerTableSink( >>>>>> s"${interval}_maalka_jdbc_output_table", >>>>>> jdbcTableSink) >>>>> >>>>> >>>>> This method doesn't exist in 1.12, what is the equivalent? >>>>> >>>>> Thanks! >>>>> Clay >>>>> >>>>>
