Ok, this is about as simple as I can get.
package com.maalka.flink.sinks
import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions,
JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction,
JdbcBatchingOutputFormat}
import
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._
import java.util.function.Function
object TestSink extends LazyLogging {
// START HERE
def process(messageStream: DataStream[MaalkaDataRecord],
signableUpdateStream: Option[DataStream[SignableUpdate]],
streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {
insertAnalyticData("raw",
"insert into analytic_table ... ",
messageStream.map(_ => "A"))
}
private def insertAnalyticData(
interval: String,
insertStatement: String,
messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
insertStatement,
(_, _) => {}, // I have a feeling that this is the lambda
that can't serialize
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)
messageStream
.addSink(sink)
}
}
On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[email protected]> wrote:
> Hi Clay,
>
> could you maybe share the source code of
> com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this
> sink uses a lambda which is not serializable. Maybe it holds a reference to
> some non Serializable class as part of its closure.
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[email protected]>
> wrote:
>
>> Thanks Till, the tickets and links were immensely useful. With that i
>> was able to make progress and even get things to compile. However, when i
>> run things a serializable exception is thrown. (see below)
>>
>> .addSink(JdbcSink.sink[SignableTableSchema](
>>> addIntervalToInsertStatement(insertStatement, interval),
>>> (ps: PreparedStatement, rd: SignableTableSchema) => {
>>> ps.setString(1, rd.data_processing_id)
>>> ps.setTimestamp(2, rd.crc)
>>> ps.setString(3, rd.command)
>>> ps.setString(4, rd.result)
>>> ps.setOptionalString(5, rd.message)
>>> ps.setString(6, rd.arguments)
>>> ps.setOptionalString(7, rd.validatorUUID)
>>> },
>>> getJdbcExecutionOptions,
>>> getJdbcOptions(interval, insertStatement) // <-- This is line 376
>>> ))
>>>
>>> Where i set the executionOptions to behave in a bachfull way.
>>
>> def getJdbcExecutionOptions: JdbcExecutionOptions = {
>> JdbcExecutionOptions.builder()
>> .withBatchIntervalMs(1000)
>> .withBatchSize(1000)
>> .withMaxRetries(10)
>> .build
>> }
>>
>>
>> Any suggestions?
>>
>> [info] org.apache.flink.api.common.InvalidProgramException: The
>>> implementation of the AbstractJdbcOutputFormat is not serializable. The
>>> object probably contains or references non serializable fields.
>>> [info] at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>>> [info] at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>>> [info] at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>>> [info] at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>>> [info] at
>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>>> [info] at
>>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>>> [info] at
>>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
>>> [info] at
>>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
>>> [info] at
>>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
>>> [info] at
>>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
>>> [info] ...
>>> [info] Cause: java.io.NotSerializableException: Non-serializable lambda
>>> [info] at
>>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown
>>> Source)
>>> [info] at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> [info] at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> [info] at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> [info] at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> [info] at
>>> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
>>> [info] at
>>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
>>> [info] at
>>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
>>> [info] at
>>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
>>> [info] at
>>> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
>>>
>>
>>
>> On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[email protected]>
>> wrote:
>>
>>> Hi Clay,
>>>
>>> I am not a Table API expert but let me try to answer your question:
>>>
>>> With FLINK-17748 [1] the community removed the registerTableSink in
>>> favour of the connect API. The connect API has been deprecated [2] because
>>> it was not well maintained. Now the recommended way for specifying sinks is
>>> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on
>>> how to use the DDL. Maybe Timo or Jark can point you towards a good guide
>>> on how to register your jdbc table sink.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-17748
>>> [2] https://issues.apache.org/jira/browse/FLINK-18416
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[email protected]>
>>> wrote:
>>>
>>>> Hey all. Hopefully this is an easy question. I'm porting my JDBC
>>>> postgres sink from 1.10 to 1.12
>>>>
>>>> I'm using:
>>>> * StreamTableEnvironment
>>>> * JdbcUpsertTableSink
>>>>
>>>> What I'm having difficulty with is how to register the sink with the
>>>> streaming table environment.
>>>>
>>>> In 1.10:
>>>>
>>>> tableEnv.registerTableSink(
>>>>> s"${interval}_maalka_jdbc_output_table",
>>>>> jdbcTableSink)
>>>>
>>>>
>>>> This method doesn't exist in 1.12, what is the equivalent?
>>>>
>>>> Thanks!
>>>> Clay
>>>>
>>>>