Great to hear that it is now working and thanks for letting the community
know :-)

On Wed, Feb 17, 2021 at 2:48 PM Clay Teeter <[email protected]> wrote:

> Yep, that was it!  thanks! And to complete the thread, this is the working
> revision.
>
> package com.maalka.flink.sinks
>
> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
> import com.typesafe.scalalogging.LazyLogging
> import org.apache.flink.api.common.functions.RuntimeContext
> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, 
> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, 
> JdbcBatchingOutputFormat}
> import 
> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
> import 
> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.scala._
> import com.maalka.flink.typeInformation.Implicits._
>
> import java.sql.PreparedStatement
> import java.util.function.Function
>
> object TestSink extends LazyLogging {
>   // START HERE
>   def process(messageStream: DataStream[MaalkaDataRecord],
>               signableUpdateStream: Option[DataStream[SignableUpdate]],
>               streamExecutionEnvironment: StreamExecutionEnvironment): Unit = 
> {
>
>     insertAnalyticData("raw",
>       "insert into analytic_table ... ",
>       messageStream.map(_ => "A"))
>   }
>
>   // it is required that you explicitly create a new JDBCStatementBuilder
>   val statementBuilder: JdbcStatementBuilder[String] =
>     new JdbcStatementBuilder[String] {
>       override def accept(ps: PreparedStatement, t: String): Unit = {
>         ps.setString(1, t)
>       }
>     }
>
>
>   private def insertAnalyticData(
>                           interval: String,
>                           insertStatement: String,
>                           messageStream: DataStream[String]): Unit = {
>     val connectionString = s"jdbc:postgresql://localhost/db"
>     val sink: SinkFunction[String] = JdbcSink.sink(
>       insertStatement,
>       statementBuilder,
>       JdbcExecutionOptions.builder()
>         .withBatchIntervalMs(1000)
>         .withBatchSize(1000)
>         .withMaxRetries(10)
>         .build,
>       JdbcOptions.builder()
>         .setDBUrl(connectionString)
>         .setTableName("analytic_table")
>         .build
>     )
>
>     messageStream
>       .addSink(sink)
>   }
> }
>
>
>
>
> On Wed, Feb 17, 2021 at 2:24 PM Till Rohrmann <[email protected]>
> wrote:
>
>> I am not 100% sure but maybe (_, _) => {} captures a reference to object
>> TestSink which is not serializable. Maybe try to simply define a no
>> op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().
>>
>> Cheers,
>> Till
>>
>> On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter <[email protected]>
>> wrote:
>>
>>> Ok, this is about as simple as I can get.
>>>
>>> package com.maalka.flink.sinks
>>>
>>> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
>>> import com.typesafe.scalalogging.LazyLogging
>>> import org.apache.flink.api.common.functions.RuntimeContext
>>> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, 
>>> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
>>> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, 
>>> JdbcBatchingOutputFormat}
>>> import 
>>> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
>>> import 
>>> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
>>> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
>>> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>>> import org.apache.flink.streaming.api.scala._
>>> import com.maalka.flink.typeInformation.Implicits._
>>>
>>> import java.util.function.Function
>>>
>>> object TestSink extends LazyLogging {
>>>   // START HERE
>>>   def process(messageStream: DataStream[MaalkaDataRecord],
>>>               signableUpdateStream: Option[DataStream[SignableUpdate]],
>>>               streamExecutionEnvironment: StreamExecutionEnvironment): Unit 
>>> = {
>>>
>>>     insertAnalyticData("raw",
>>>       "insert into analytic_table ... ",
>>>       messageStream.map(_ => "A"))
>>>   }
>>>
>>>   private def insertAnalyticData(
>>>                           interval: String,
>>>                           insertStatement: String,
>>>                           messageStream: DataStream[String]): Unit = {
>>>     val connectionString = s"jdbc:postgresql://localhost/db"
>>>     val sink: SinkFunction[String] = JdbcSink.sink(
>>>       insertStatement,
>>>       (_, _) => {},    // I have a feeling that this is the lambda that 
>>> can't serialize
>>>       JdbcExecutionOptions.builder()
>>>         .withBatchIntervalMs(1000)
>>>         .withBatchSize(1000)
>>>         .withMaxRetries(10)
>>>         .build,
>>>       JdbcOptions.builder()
>>>         .setDBUrl(connectionString)
>>>         .setTableName("analytic_table")
>>>         .build
>>>     )
>>>
>>>     messageStream
>>>       .addSink(sink)
>>>   }
>>> }
>>>
>>>
>>> On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[email protected]>
>>> wrote:
>>>
>>>> Hi Clay,
>>>>
>>>> could you maybe share the source code of
>>>> com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this
>>>> sink uses a lambda which is not serializable. Maybe it holds a reference to
>>>> some non Serializable class as part of its closure.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks Till, the tickets and links were immensely useful.  With that i
>>>>> was able to make progress and even get things to compile.  However, when i
>>>>> run things a serializable exception is thrown. (see below)
>>>>>
>>>>> .addSink(JdbcSink.sink[SignableTableSchema](
>>>>>>   addIntervalToInsertStatement(insertStatement, interval),
>>>>>>   (ps: PreparedStatement, rd: SignableTableSchema) => {
>>>>>>     ps.setString(1, rd.data_processing_id)
>>>>>>     ps.setTimestamp(2, rd.crc)
>>>>>>     ps.setString(3, rd.command)
>>>>>>     ps.setString(4, rd.result)
>>>>>>     ps.setOptionalString(5, rd.message)
>>>>>>     ps.setString(6, rd.arguments)
>>>>>>     ps.setOptionalString(7, rd.validatorUUID)
>>>>>>   },
>>>>>>   getJdbcExecutionOptions,
>>>>>>   getJdbcOptions(interval, insertStatement) // <-- This is line 376
>>>>>> ))
>>>>>>
>>>>>>  Where i set the executionOptions to behave in a bachfull way.
>>>>>
>>>>> def getJdbcExecutionOptions: JdbcExecutionOptions = {
>>>>>   JdbcExecutionOptions.builder()
>>>>>     .withBatchIntervalMs(1000)
>>>>>     .withBatchSize(1000)
>>>>>     .withMaxRetries(10)
>>>>>     .build
>>>>> }
>>>>>
>>>>>
>>>>> Any suggestions?
>>>>>
>>>>> [info]   org.apache.flink.api.common.InvalidProgramException: The
>>>>>> implementation of the AbstractJdbcOutputFormat is not serializable. The
>>>>>> object probably contains or references non serializable fields.
>>>>>> [info]   at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>>>>>> [info]   at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>>>>>> [info]   at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
>>>>>> [info]   at
>>>>>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
>>>>>> [info]   at
>>>>>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
>>>>>> [info]   at
>>>>>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
>>>>>> [info]   ...
>>>>>> [info]   Cause: java.io.NotSerializableException: Non-serializable
>>>>>> lambda
>>>>>> [info]   at
>>>>>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown
>>>>>> Source)
>>>>>> [info]   at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> [info]   at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> [info]   at
>>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> [info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>>> [info]   at
>>>>>> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
>>>>>> [info]   at
>>>>>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
>>>>>> [info]   at
>>>>>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
>>>>>> [info]   at
>>>>>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
>>>>>> [info]   at
>>>>>> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
>>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Clay,
>>>>>>
>>>>>> I am not a Table API expert but let me try to answer your question:
>>>>>>
>>>>>> With FLINK-17748 [1] the community removed the registerTableSink in
>>>>>> favour of the connect API. The connect API has been deprecated [2] 
>>>>>> because
>>>>>> it was not well maintained. Now the recommended way for specifying sinks 
>>>>>> is
>>>>>> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on
>>>>>> how to use the DDL. Maybe Timo or Jark can point you towards a good guide
>>>>>> on how to register your jdbc table sink.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-17748
>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-18416
>>>>>> [3]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey all.  Hopefully this is an easy question.  I'm porting my JDBC
>>>>>>> postgres sink from 1.10 to 1.12
>>>>>>>
>>>>>>> I'm using:
>>>>>>> * StreamTableEnvironment
>>>>>>> * JdbcUpsertTableSink
>>>>>>>
>>>>>>> What I'm having difficulty with is how to register the sink with the
>>>>>>> streaming table environment.
>>>>>>>
>>>>>>> In 1.10:
>>>>>>>
>>>>>>>     tableEnv.registerTableSink(
>>>>>>>>       s"${interval}_maalka_jdbc_output_table",
>>>>>>>>       jdbcTableSink)
>>>>>>>
>>>>>>>
>>>>>>> This method doesn't exist in 1.12, what is the equivalent?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Clay
>>>>>>>
>>>>>>>

Reply via email to