Re: flink jdbc connector不支持source

2022-04-10 Thread Qingsheng Ren
Hi,

JDBC connector 是支持 source 的,应该是没有将最新的文档同步翻译成中文,可以参考一下英文文档 [1]。

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/

祝好

> On Apr 10, 2022, at 11:07, casel.chen  wrote:
> 
> 现有一个场景是需要用flink一次性批量将某个mysql库下指定表(不同schema)同步到hudi表里面,查了一下官网flink jdbc 
> connector [1] 文档说明只支持sink,不支持source。请问社区有支持计划吗?如果没有的话,自己要如何开发,可以给个例子吗?谢谢!
> 
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/



Re: Re: flink jdbc source oom

2022-04-02 Thread r pp
我觉得 流处理中,无论是一个一个处理,还是一批一批处理,强调了 连续性,自定义sql 在连续性的保证上,想到的比较好的方式是自增 id
的方式(这就意味着只接受 insert 操作),而在一批数据中 排序、去重,其实对于整体而言 收效不好说, 除非
每一批数据都严格的分区(如不同日期),不过过滤是有好处的。

Michael Ran  于2022年4月1日周五 11:00写道:

> 这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join
> 之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
> 在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
> >@Peihui  当前社区的 jdbc table source 实现了这些接口:
> >ScanTableSource,
> >LookupTableSource,
> >SupportsProjectionPushDown,
> >SupportsLimitPushDown
> >
> >其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
> >projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
> >filter/aggregate pushdown 满足前置过滤需求
> >
> >
> >Best,
> >Lincoln Lee
> >
> >
> >r pp  于2022年3月31日周四 18:40写道:
> >
> >> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
> >>
>


-- 
Best,
  pp


Re: Re: flink jdbc source oom

2022-04-02 Thread r pp
我觉得 流处理中,无论是一个一个处理,还是一批一批处理,强调了 连续性,自定义sql 在连续性的保证上,想到的比较好的方式是自增 id
的方式(这就意味着只接受 insert 操作),而在一批数据中 排序、去重,其实对于整体而言 收效不好说, 除非
每一批数据都严格的分区(如不同日期),不过过滤是有好处的。

Michael Ran  于2022年4月1日周五 11:00写道:

> 这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join
> 之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
> 在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
> >@Peihui  当前社区的 jdbc table source 实现了这些接口:
> >ScanTableSource,
> >LookupTableSource,
> >SupportsProjectionPushDown,
> >SupportsLimitPushDown
> >
> >其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
> >projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
> >filter/aggregate pushdown 满足前置过滤需求
> >
> >
> >Best,
> >Lincoln Lee
> >
> >
> >r pp  于2022年3月31日周四 18:40写道:
> >
> >> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
> >>
>


-- 
Best,
  pp


Re: flink jdbc source oom

2022-03-31 Thread Guo Thompson
难道条件还不会下推么?

Peihui He  于2022年3月31日周四 10:33写道:

> Hi, all
>
> 请教下大家,使用flink jdbc 读取tidb中数据时如何在查询的时候能否根据条件在数据库层面做一些过滤呢?
> 当数据量很大比如几千万上亿的话,flink jdbc source 就很无力了。
>
>
> Best Regards!
>


Re: flink jdbc source oom

2022-03-31 Thread r pp
hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了


Re: Flink JDBC connector behavior

2022-02-08 Thread Roman Khachatryan
Hi,

scan.partition.num (the number of partitions [1]) translates into
parallel queries to the database (with different to/from). Batch size
is further calculated from lower and upper bounds and the number of
partitions.

scan.fetch-size hints JDBC driver to adjust the fetch size (see [2]).

The first thing I'd check is that you actually have the configured
number of parallel queries and then probably check if there is an
index on the partition column.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/#partitioned-scan
[2]
https://docs.oracle.com/javase/7/docs/api/java/sql/Statement.html#setFetchSize(int)

Regards,
Roman

On Tue, Feb 8, 2022 at 10:09 PM Qihua Yang  wrote:
>
> Hi,
>
> We are using flink jdbc connector to read whole database table line by line. 
> A few things I don't quite understand.
> We configured BATCH_SIZE=100 and PARTITION_NUM=1000, table is pretty big.
> What is the flink internal behavior to read data from table?
> Flink read BATCH_SIZE data each time? Or it read (tableSize/PARTITION_NUM) 
> data each time? Or it read whole table into memory each time?
> database metrics show the sql latency is extremely high, almost 20s.
> is there any way to optimize it?
>
> val query = String.format("SELECT * FROM %s", tableName)
>
> val options = JdbcOptions.builder()
> .setDBUrl(url)
> .setTableName(tableName)
> .setDriverName(DRIVER_NAME)
> .setUsername(userName)
> .setPassword(password)
> .build()
> val readOptions = JdbcReadOptions.builder()
> .setQuery(query)
> .setPartitionColumnName(PARTITION_KEY)
> .setPartitionLowerBound(esSinkConf.dbLowerBound)
> .setPartitionUpperBound(esSinkConf.dbUpperBound)
> .setNumPartitions(PARTITION_NUM)
> .setFetchSize(BATCH_SIZE)
> .build()
> val lookupOptions = JdbcLookupOptions.builder()
> .setCacheMaxSize(-1)
> .setCacheExpireMs(CACHE_SIZE)
> .setMaxRetryTimes(2)
> .build()
> val rawSource = JdbcTableSource.builder()
> .setOptions(options)
> .setReadOptions(readOptions)
> .setLookupOptions(lookupOptions)
> .setSchema(schema)
> .build().getDataStream(env)
>
>


Re: Flink jdbc Connector 特殊类型问题

2021-11-16 Thread Ada Luna
这指定不是个Bug。Flink SQL 类型是有限的。有限的类型囊括不了JDBC的数据源各种数据源的类型。

Shengkai Fang  于2021年11月16日周二 下午12:38写道:
>
> 如果是个 bug,建议在社区开个 issue 跟踪下这个问题。
>
> Shengkai Fang  于2021年11月16日周二 下午12:37写道:
>
> > 能分享下具体是什么错误类型吗?
> >
> > 我看了下代码,感觉不太好支持。具体的序列化器是由
> > `AbstractJdbcRowConverter`#createExternalConverter 决定的。
> > 根据你的描述,当前的序列化器不够通用导致了这个问题,可能需要改动下源码才能支持。
> >
> > Best,
> > Shengkai
> >
> > Ada Luna  于2021年11月12日周五 上午11:25写道:
> >
> >> Oracle中有VARCHAR 和 CLOB
> >> 如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。
> >> 我想扩展FlinkSQL DDL的类型有什么办法吗。是用RAW类型还是有其他更好办法。
> >> Oracle中VARCHAR和CLOB是两种不同的String,我需要在Sink写出的时候根据DDL的类型,调用不同的转换方法
> >>
> >> Ada Luna  于2021年11月12日周五 上午11:23写道:
> >> >
> >> > Oracle中有VARCHAR 和 CLOB
> >> > 如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。
> >>
> >


Re: Flink jdbc Connector 特殊类型问题

2021-11-15 Thread Shengkai Fang
如果是个 bug,建议在社区开个 issue 跟踪下这个问题。

Shengkai Fang  于2021年11月16日周二 下午12:37写道:

> 能分享下具体是什么错误类型吗?
>
> 我看了下代码,感觉不太好支持。具体的序列化器是由
> `AbstractJdbcRowConverter`#createExternalConverter 决定的。
> 根据你的描述,当前的序列化器不够通用导致了这个问题,可能需要改动下源码才能支持。
>
> Best,
> Shengkai
>
> Ada Luna  于2021年11月12日周五 上午11:25写道:
>
>> Oracle中有VARCHAR 和 CLOB
>> 如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。
>> 我想扩展FlinkSQL DDL的类型有什么办法吗。是用RAW类型还是有其他更好办法。
>> Oracle中VARCHAR和CLOB是两种不同的String,我需要在Sink写出的时候根据DDL的类型,调用不同的转换方法
>>
>> Ada Luna  于2021年11月12日周五 上午11:23写道:
>> >
>> > Oracle中有VARCHAR 和 CLOB
>> > 如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。
>>
>


Re: Flink jdbc Connector 特殊类型问题

2021-11-15 Thread Shengkai Fang
能分享下具体是什么错误类型吗?

我看了下代码,感觉不太好支持。具体的序列化器是由 `AbstractJdbcRowConverter`#createExternalConverter
决定的。 根据你的描述,当前的序列化器不够通用导致了这个问题,可能需要改动下源码才能支持。

Best,
Shengkai

Ada Luna  于2021年11月12日周五 上午11:25写道:

> Oracle中有VARCHAR 和 CLOB
> 如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。
> 我想扩展FlinkSQL DDL的类型有什么办法吗。是用RAW类型还是有其他更好办法。
> Oracle中VARCHAR和CLOB是两种不同的String,我需要在Sink写出的时候根据DDL的类型,调用不同的转换方法
>
> Ada Luna  于2021年11月12日周五 上午11:23写道:
> >
> > Oracle中有VARCHAR 和 CLOB
> > 如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。
>


Re: Flink jdbc Connector 特殊类型问题

2021-11-11 Thread Ada Luna
Oracle中有VARCHAR 和 CLOB
如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。
我想扩展FlinkSQL DDL的类型有什么办法吗。是用RAW类型还是有其他更好办法。
Oracle中VARCHAR和CLOB是两种不同的String,我需要在Sink写出的时候根据DDL的类型,调用不同的转换方法

Ada Luna  于2021年11月12日周五 上午11:23写道:
>
> Oracle中有VARCHAR 和 CLOB
> 如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。


Re: Flink JDBC connect with secret

2021-10-26 Thread Qihua Yang
Hi Jing,

Thank you for your suggestion. I will check if SSL parameters in URL works.

Thanks,
Qihua


On Sat, Oct 23, 2021 at 8:37 PM JING ZHANG  wrote:

> Hi Qihua,
> I checked user documents of several database vendors(postgres, oracle,
> solidDB,SQL server)[1][2][3][4][5], and studied how to use JDBC Driver with
> SSL to connect to these databases.
> Most of database vendors supports two ways:
> 1. Option1: Use Connection url
> 2. Option2:  Define in Properties when call `DriverManager.getConnection`
>
> Url is exposed to users in JDBC SQL connector currently, while properties
> parameters are not exposed yet.
> Would you please check whether defining SSL parameters in url could work
> first? If not, we would looking for other solution.
>
> [1] https://jdbc.postgresql.org/documentation/head/connect.html
> [2]
> https://www.oracle.com/technetwork/topics/wp-oracle-jdbc-thin-ssl-130128.pdf
> [3]
> https://support.unicomsi.com/manuals/soliddb/100/index.html#page/Administrator_Guide/6_Managing_network.07.13.html
> [4]
> https://docs.microsoft.com/en-us/sql/connect/jdbc/connecting-with-ssl-encryption?view=sql-server-ver15
> [5]
> https://www.ibm.com/docs/ar/informix-servers/14.10?topic=options-connecting-jdbc-applications-ssl
>
> Best,
> JING ZHANG
>
>
> Qihua Yang  于2021年10月23日周六 下午1:11写道:
>
>> Hi,
>>
>> We plan to use JDBC SQL connector to read/write database. I saw JDBC
>> connector use username and password. Is it possible to use secret(*.crt) to
>> access database. I didn't find guideline how to use it. How to config jdbc
>> with secret?
>>
>> val jdbc: JdbcConnectionOptions = 
>> JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>> .withUrl(url)
>> .withDriverName("org.postgresql.Driver")
>> .withUsername(userName)
>> .withPassword(password)
>> .build()
>>
>> Thanks,
>> Qihua
>>
>


Re: Flink JDBC connect with secret

2021-10-23 Thread JING ZHANG
Hi Qihua,
I checked user documents of several database vendors(postgres, oracle,
solidDB,SQL server)[1][2][3][4][5], and studied how to use JDBC Driver with
SSL to connect to these databases.
Most of database vendors supports two ways:
1. Option1: Use Connection url
2. Option2:  Define in Properties when call `DriverManager.getConnection`

Url is exposed to users in JDBC SQL connector currently, while properties
parameters are not exposed yet.
Would you please check whether defining SSL parameters in url could work
first? If not, we would looking for other solution.

[1] https://jdbc.postgresql.org/documentation/head/connect.html
[2]
https://www.oracle.com/technetwork/topics/wp-oracle-jdbc-thin-ssl-130128.pdf
[3]
https://support.unicomsi.com/manuals/soliddb/100/index.html#page/Administrator_Guide/6_Managing_network.07.13.html
[4]
https://docs.microsoft.com/en-us/sql/connect/jdbc/connecting-with-ssl-encryption?view=sql-server-ver15
[5]
https://www.ibm.com/docs/ar/informix-servers/14.10?topic=options-connecting-jdbc-applications-ssl

Best,
JING ZHANG


Qihua Yang  于2021年10月23日周六 下午1:11写道:

> Hi,
>
> We plan to use JDBC SQL connector to read/write database. I saw JDBC
> connector use username and password. Is it possible to use secret(*.crt) to
> access database. I didn't find guideline how to use it. How to config jdbc
> with secret?
>
> val jdbc: JdbcConnectionOptions = 
> JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
> .withUrl(url)
> .withDriverName("org.postgresql.Driver")
> .withUsername(userName)
> .withPassword(password)
> .build()
>
> Thanks,
> Qihua
>


Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread jie mei
Hi, Leonard


好的,我将会提一个PR来修复这个issue


Leonard Xu  于2020年12月10日周四 下午12:10写道:

> 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider,
> 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为
> OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库,
> 也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。
>
> 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?
>
> 祝好,
> Leonard
>
>
>
> 在 2020年12月10日,11:22,jie mei  写道:
>
> Hi,Jark
>
> 好的,我会就此创建一个issue
>
> Jark Wu  于2020年12月10日周四 上午11:17写道:
>
>> Hi Jie,
>>
>> 看起来确实是个问题。
>> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
>> 可以帮忙创建个 issue 么?
>>
>> Best,
>> Jark
>>
>> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:
>>
>> > Hi,
>> >是的,感觉你是对的。
>> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
>> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
>> snapshotState
>> > 时候调用format.flush。
>> >WDYT @Jark @ Leonard
>> >
>> > Best,
>> > Hailong
>> >
>> >
>> > 在 2020-12-09 17:13:14,"jie mei"  写道:
>> > >Hi, Community
>> > >
>> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
>> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
>> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
>> > >
>> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
>> > >
>> > >
>> > >@Public
>> > >public interface OutputFormat extends Serializable {
>> > >
>> > >   /**
>> > >* Configures this output format. Since output formats are
>> > >instantiated generically and hence parameterless,
>> > >* this method is the place where the output formats set their
>> > >basic fields based on configuration values.
>> > >* 
>> > >* This method is always called first on a newly instantiated
>> output format.
>> > >*
>> > >* @param parameters The configuration with all parameters.
>> > >*/
>> > >   void configure(Configuration parameters);
>> > >
>> > >   /**
>> > >* Opens a parallel instance of the output format to store the
>> > >result of its parallel instance.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> > >be configured.
>> > >*
>> > >* @param taskNumber The number of the parallel instance.
>> > >* @param numTasks The number of parallel tasks.
>> > >* @throws IOException Thrown, if the output could not be opened
>> > >due to an I/O problem.
>> > >*/
>> > >   void open(int taskNumber, int numTasks) throws IOException;
>> > >
>> > >
>> > >   /**
>> > >* Adds a record to the output.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> be opened.
>> > >*
>> > >* @param record The records to add to the output.
>> > >* @throws IOException Thrown, if the records could not be added to
>> > >to an I/O problem.
>> > >*/
>> > >   void writeRecord(IT record) throws IOException;
>> > >
>> > >   /**
>> > >* Method that marks the end of the life-cycle of parallel output
>> > >instance. Should be used to close
>> > >* channels and streams and release resources.
>> > >* After this method returns without an error, the output is
>> > >assumed to be correct.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> be opened.
>> > >*
>> > >* @throws IOException Thrown, if the input could not be closed
>> properly.
>> > >*/
>> > >   void close() throws IOException;
>> > >}
>> > >
>> > >
>> > >--
>> > >
>> > >*Best Regards*
>> > >*Jeremy Mei*
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>
>
>

-- 

*Best Regards*
*Jeremy Mei*


Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread Leonard Xu
你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, 
用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为OutputFormatSinkFunction没有继承CheckpointedFunction,
 没法保证在cp时将buffer数据刷到数据库,
也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。

修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?

祝好,
Leonard



> 在 2020年12月10日,11:22,jie mei  写道:
> 
> Hi,Jark
> 
> 好的,我会就此创建一个issue
> 
> Jark Wu mailto:imj...@gmail.com>> 于2020年12月10日周四 上午11:17写道:
> Hi Jie,
> 
> 看起来确实是个问题。
> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
> 可以帮忙创建个 issue 么?
> 
> Best,
> Jark
> 
> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com 
> > wrote:
> 
> > Hi,
> >是的,感觉你是对的。
> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> > 时候调用format.flush。
> >WDYT @Jark @ Leonard
> >
> > Best,
> > Hailong
> >
> >
> > 在 2020-12-09 17:13:14,"jie mei"  > > 写道:
> > >Hi, Community
> > >
> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> > >
> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
> > >
> > >
> > >@Public
> > >public interface OutputFormat extends Serializable {
> > >
> > >   /**
> > >* Configures this output format. Since output formats are
> > >instantiated generically and hence parameterless,
> > >* this method is the place where the output formats set their
> > >basic fields based on configuration values.
> > >* 
> > >* This method is always called first on a newly instantiated output 
> > > format.
> > >*
> > >* @param parameters The configuration with all parameters.
> > >*/
> > >   void configure(Configuration parameters);
> > >
> > >   /**
> > >* Opens a parallel instance of the output format to store the
> > >result of its parallel instance.
> > >* 
> > >* When this method is called, the output format it guaranteed to
> > >be configured.
> > >*
> > >* @param taskNumber The number of the parallel instance.
> > >* @param numTasks The number of parallel tasks.
> > >* @throws IOException Thrown, if the output could not be opened
> > >due to an I/O problem.
> > >*/
> > >   void open(int taskNumber, int numTasks) throws IOException;
> > >
> > >
> > >   /**
> > >* Adds a record to the output.
> > >* 
> > >* When this method is called, the output format it guaranteed to be 
> > > opened.
> > >*
> > >* @param record The records to add to the output.
> > >* @throws IOException Thrown, if the records could not be added to
> > >to an I/O problem.
> > >*/
> > >   void writeRecord(IT record) throws IOException;
> > >
> > >   /**
> > >* Method that marks the end of the life-cycle of parallel output
> > >instance. Should be used to close
> > >* channels and streams and release resources.
> > >* After this method returns without an error, the output is
> > >assumed to be correct.
> > >* 
> > >* When this method is called, the output format it guaranteed to be 
> > > opened.
> > >*
> > >* @throws IOException Thrown, if the input could not be closed 
> > > properly.
> > >*/
> > >   void close() throws IOException;
> > >}
> > >
> > >
> > >--
> > >
> > >*Best Regards*
> > >*Jeremy Mei*
> >
> >
> >
> >
> >
> >
> 
> 
> -- 
> 
> Best Regards
> Jeremy Mei



Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread jie mei
Hi,Jark

好的,我会就此创建一个issue

Jark Wu  于2020年12月10日周四 上午11:17写道:

> Hi Jie,
>
> 看起来确实是个问题。
> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
> 可以帮忙创建个 issue 么?
>
> Best,
> Jark
>
> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:
>
> > Hi,
> >是的,感觉你是对的。
> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
> snapshotState
> > 时候调用format.flush。
> >WDYT @Jark @ Leonard
> >
> > Best,
> > Hailong
> >
> >
> > 在 2020-12-09 17:13:14,"jie mei"  写道:
> > >Hi, Community
> > >
> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> > >
> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
> > >
> > >
> > >@Public
> > >public interface OutputFormat extends Serializable {
> > >
> > >   /**
> > >* Configures this output format. Since output formats are
> > >instantiated generically and hence parameterless,
> > >* this method is the place where the output formats set their
> > >basic fields based on configuration values.
> > >* 
> > >* This method is always called first on a newly instantiated output
> format.
> > >*
> > >* @param parameters The configuration with all parameters.
> > >*/
> > >   void configure(Configuration parameters);
> > >
> > >   /**
> > >* Opens a parallel instance of the output format to store the
> > >result of its parallel instance.
> > >* 
> > >* When this method is called, the output format it guaranteed to
> > >be configured.
> > >*
> > >* @param taskNumber The number of the parallel instance.
> > >* @param numTasks The number of parallel tasks.
> > >* @throws IOException Thrown, if the output could not be opened
> > >due to an I/O problem.
> > >*/
> > >   void open(int taskNumber, int numTasks) throws IOException;
> > >
> > >
> > >   /**
> > >* Adds a record to the output.
> > >* 
> > >* When this method is called, the output format it guaranteed to be
> opened.
> > >*
> > >* @param record The records to add to the output.
> > >* @throws IOException Thrown, if the records could not be added to
> > >to an I/O problem.
> > >*/
> > >   void writeRecord(IT record) throws IOException;
> > >
> > >   /**
> > >* Method that marks the end of the life-cycle of parallel output
> > >instance. Should be used to close
> > >* channels and streams and release resources.
> > >* After this method returns without an error, the output is
> > >assumed to be correct.
> > >* 
> > >* When this method is called, the output format it guaranteed to be
> opened.
> > >*
> > >* @throws IOException Thrown, if the input could not be closed
> properly.
> > >*/
> > >   void close() throws IOException;
> > >}
> > >
> > >
> > >--
> > >
> > >*Best Regards*
> > >*Jeremy Mei*
> >
> >
> >
> >
> >
> >
>


-- 

*Best Regards*
*Jeremy Mei*


Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread Jark Wu
Hi Jie,

看起来确实是个问题。
sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
可以帮忙创建个 issue 么?

Best,
Jark

On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>是的,感觉你是对的。
>   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> 时候调用format.flush。
>WDYT @Jark @ Leonard
>
> Best,
> Hailong
>
>
> 在 2020-12-09 17:13:14,"jie mei"  写道:
> >Hi, Community
> >
> >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> >
> >我的问题是:是否有办法强制刷新buffer中的数据入库?
> >
> >
> >@Public
> >public interface OutputFormat extends Serializable {
> >
> >   /**
> >* Configures this output format. Since output formats are
> >instantiated generically and hence parameterless,
> >* this method is the place where the output formats set their
> >basic fields based on configuration values.
> >* 
> >* This method is always called first on a newly instantiated output 
> > format.
> >*
> >* @param parameters The configuration with all parameters.
> >*/
> >   void configure(Configuration parameters);
> >
> >   /**
> >* Opens a parallel instance of the output format to store the
> >result of its parallel instance.
> >* 
> >* When this method is called, the output format it guaranteed to
> >be configured.
> >*
> >* @param taskNumber The number of the parallel instance.
> >* @param numTasks The number of parallel tasks.
> >* @throws IOException Thrown, if the output could not be opened
> >due to an I/O problem.
> >*/
> >   void open(int taskNumber, int numTasks) throws IOException;
> >
> >
> >   /**
> >* Adds a record to the output.
> >* 
> >* When this method is called, the output format it guaranteed to be 
> > opened.
> >*
> >* @param record The records to add to the output.
> >* @throws IOException Thrown, if the records could not be added to
> >to an I/O problem.
> >*/
> >   void writeRecord(IT record) throws IOException;
> >
> >   /**
> >* Method that marks the end of the life-cycle of parallel output
> >instance. Should be used to close
> >* channels and streams and release resources.
> >* After this method returns without an error, the output is
> >assumed to be correct.
> >* 
> >* When this method is called, the output format it guaranteed to be 
> > opened.
> >*
> >* @throws IOException Thrown, if the input could not be closed properly.
> >*/
> >   void close() throws IOException;
> >}
> >
> >
> >--
> >
> >*Best Regards*
> >*Jeremy Mei*
>
>
>
>
>
>


Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

2019-11-14 Thread Jingsong Li
Hi Polarisary:

Maybe I see what you mean. You want to use the upsert mode for an append
stream without keyFields.
In fact, both isAppend and keyFields are set automatically by the planner
framework. You can't control them.
So yes, it is related to sql, only upsert stream can be inserted into sink
with upsert mode.

Now whether the JDBCUpsertTableSink is an upsert mode is only controlled by
keyFields. So if you really want to support this scenario, you need to make
a small change to the JDBCUpsertTableSink:

@Override
public void setKeyFields(String[] keys) {
   if (keys != null) {
  this.keyFields = keys;
   }
}

And you need set your keyFields to JDBCUpsertTableSink.
You can have a try.

Best,
Jingsong Lee

On Thu, Nov 14, 2019 at 4:44 PM 张万新  wrote:

> A typical use case that will genreate updates (meaning not append only) is
> a non-widown groupy-by aggregation, like "select user, count(url) from
> clicks group by user".
>
> You can refer to the flink doc at
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html
>
>
> Polarisary  于2019年11月14日周四 下午3:35写道:
>
>> My sql is regular insert like “insert into sink_table select c1,c2,c3
>> from source_table”,
>> I want to know which case it will judge to append only? Does it has doc
>> for this?
>>
>> Many thanks!
>>
>>
>>
>>
>>
>> 在 2019年11月14日,上午10:05,张万新  写道:
>>
>> Yes it's related to your sql, flink checks the plan of your sql to judge
>> whether your job is append only or has updates. If your job is append only,
>> that means no result need to be updated.
>>
>> If you still have problems, please post your sql and complete error
>> message to help people understand your use case.
>>
>> Polarisary  于2019年11月13日周三 下午6:43写道:
>>
>>> Hi
>>> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql,
>>> the isAppendOnly is modified to ture, and keyFields is modified to null by
>>> StreamExecSink, but i want to upsert,
>>> Does this related to sql?
>>>
>>> the stack as follows:
>>> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
>>> at task.Device.main(Device.java:77)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>
>>> Hope to reply!
>>> many thanks
>>>
>>>
>>

-- 
Best, Jingsong Lee


Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

2019-11-14 Thread 张万新
A typical use case that will genreate updates (meaning not append only) is
a non-widown groupy-by aggregation, like "select user, count(url) from
clicks group by user".

You can refer to the flink doc at
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html


Polarisary  于2019年11月14日周四 下午3:35写道:

> My sql is regular insert like “insert into sink_table select c1,c2,c3 from
> source_table”,
> I want to know which case it will judge to append only? Does it has doc
> for this?
>
> Many thanks!
>
>
>
>
>
> 在 2019年11月14日,上午10:05,张万新  写道:
>
> Yes it's related to your sql, flink checks the plan of your sql to judge
> whether your job is append only or has updates. If your job is append only,
> that means no result need to be updated.
>
> If you still have problems, please post your sql and complete error
> message to help people understand your use case.
>
> Polarisary  于2019年11月13日周三 下午6:43写道:
>
>> Hi
>> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql,
>> the isAppendOnly is modified to ture, and keyFields is modified to null by
>> StreamExecSink, but i want to upsert,
>> Does this related to sql?
>>
>> the stack as follows:
>> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
>> at task.Device.main(Device.java:77)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>
>> Hope to reply!
>> many thanks
>>
>>
>


Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

2019-11-13 Thread Polarisary
My sql is regular insert like “insert into sink_table select c1,c2,c3 from 
source_table”, 
I want to know which case it will judge to append only? Does it has doc for 
this?

Many thanks!





> 在 2019年11月14日,上午10:05,张万新  写道:
> 
> Yes it's related to your sql, flink checks the plan of your sql to judge 
> whether your job is append only or has updates. If your job is append only, 
> that means no result need to be updated.
> 
> If you still have problems, please post your sql and complete error message 
> to help people understand your use case.
> 
> Polarisary mailto:polaris...@gmail.com>> 
> 于2019年11月13日周三 下午6:43写道:
> Hi
> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly 
> is modified to ture, and keyFields is modified to null by StreamExecSink, but 
> i want to upsert,
> Does this  related to sql?
> 
> the stack as follows:
> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
>   at task.Device.main(Device.java:77)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> 
> Hope to reply!
> many thanks
> 



Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

2019-11-13 Thread 张万新
Yes it's related to your sql, flink checks the plan of your sql to judge
whether your job is append only or has updates. If your job is append only,
that means no result need to be updated.

If you still have problems, please post your sql and complete error message
to help people understand your use case.

Polarisary  于2019年11月13日周三 下午6:43写道:

> Hi
> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql,
> the isAppendOnly is modified to ture, and keyFields is modified to null by
> StreamExecSink, but i want to upsert,
> Does this related to sql?
>
> the stack as follows:
> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
> at task.Device.main(Device.java:77)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>
> Hope to reply!
> many thanks
>
>


Re: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Rong Rong
+1, Thanks Konstantinos for opening the ticket.
This would definitely be a useful feature.

--
Rong

On Mon, Apr 15, 2019 at 7:34 AM Fabian Hueske  wrote:

> Great, thank you!
>
> Am Mo., 15. Apr. 2019 um 16:28 Uhr schrieb Papadopoulos, Konstantinos <
> konstantinos.papadopou...@iriworldwide.com>:
>
>> Hi Fabian,
>>
>>
>>
>> I opened the following issue to track the improvement proposed:
>>
>> https://issues.apache.org/jira/browse/FLINK-12198
>>
>>
>>
>> Best,
>>
>> Konstantinos
>>
>>
>>
>> *From:* Papadopoulos, Konstantinos
>> 
>> *Sent:* Δευτέρα, 15 Απριλίου 2019 12:30 μμ
>> *To:* Fabian Hueske 
>> *Cc:* Rong Rong ; user 
>> *Subject:* RE: Flink JDBC: Disable auto-commit mode
>>
>>
>>
>> Hi Fabian,
>>
>>
>>
>> Glad to hear that you agree for such an improvement. Of course, I can
>> handle it.
>>
>>
>>
>> Best,
>>
>> Konstantinos
>>
>>
>>
>> *From:* Fabian Hueske 
>> *Sent:* Δευτέρα, 15 Απριλίου 2019 11:56 πμ
>> *To:* Papadopoulos, Konstantinos <
>> konstantinos.papadopou...@iriworldwide.com>
>> *Cc:* Rong Rong ; user 
>> *Subject:* Re: Flink JDBC: Disable auto-commit mode
>>
>>
>>
>> Hi Konstantinos,
>>
>>
>>
>> This sounds like a useful extension to me.
>>
>> Would you like to create a Jira issue and contribute the improvement?
>>
>>
>>
>> In the meantime, you can just fork the code of JDBCInputFormat and adjust
>> it to your needs.
>>
>>
>>
>> Best, Fabian
>>
>>
>>
>> Am Mo., 15. Apr. 2019 um 08:53 Uhr schrieb Papadopoulos, Konstantinos <
>> konstantinos.papadopou...@iriworldwide.com>:
>>
>> Hi Rong,
>>
>>
>>
>> We have already tried to set the fetch size with no success. According to
>> PG documentation we have to set both configuration parameters (i.e.,
>> auto-commit to false and limit fetch) to achieve our purpose.
>>
>>
>>
>> Thanks,
>>
>> Konstantinos
>>
>>
>>
>> *From:* Rong Rong 
>> *Sent:* Παρασκευή, 12 Απριλίου 2019 6:50 μμ
>> *To:* Papadopoulos, Konstantinos <
>> konstantinos.papadopou...@iriworldwide.com>
>> *Cc:* user 
>> *Subject:* Re: Flink JDBC: Disable auto-commit mode
>>
>>
>>
>> Hi Konstantinos,
>>
>>
>>
>> Seems like setting for auto commit is not directly possible in the
>> current JDBCInputFormatBuilder.
>>
>> However there's a way to specify the fetch size [1] for your DB
>> round-trip, doesn't that resolve your issue?
>>
>>
>>
>> Similarly in JDBCOutputFormat, a batching mode was also used to stash
>> upload rows before flushing to DB.
>>
>>
>>
>> --
>>
>> Rong
>>
>>
>>
>> [1]
>> https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4
>> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.oracle.com%2Fcd%2FE18283_01%2Fjava.112%2Fe16548%2Fresltset.htm%23insertedID4=02%7C01%7C%7C2cc537b740b23dcd08d6c185070b%7C43728c2044474b27ac2e4bdabb3c0121%7C0%7C0%7C636909174427110104=vHwPnVT%2BI41Xxkp1Zfl%2BOgTReZ0ILL5RkhDez72jJvM%3D=0>
>>
>>
>>
>> On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos <
>> konstantinos.papadopou...@iriworldwide.com> wrote:
>>
>> Hi all,
>>
>> We are facing an issue when trying to integrate PostgreSQL with Flink
>> JDBC. When you establish a connection to the PostgreSQL database, it is in
>> auto-commit mode. It means that each SQL statement is treated as a
>> transaction and is automatically committed, but this functionality results
>> in unexpected behavior (e.g., out-of-memory errors) when executed for large
>> result sets. In order to bypass such issues, we must disable the
>> auto-commit mode. To do this, in a simple Java application, we call the
>> setAutoCommit() method of the Connection object.
>>
>> So, my question is: How can we achieve this by using JDBCInputFormat of
>> Flink?
>>
>> Thanks in advance,
>>
>> Konstantinos
>>
>>


Re: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Fabian Hueske
Great, thank you!

Am Mo., 15. Apr. 2019 um 16:28 Uhr schrieb Papadopoulos, Konstantinos <
konstantinos.papadopou...@iriworldwide.com>:

> Hi Fabian,
>
>
>
> I opened the following issue to track the improvement proposed:
>
> https://issues.apache.org/jira/browse/FLINK-12198
>
>
>
> Best,
>
> Konstantinos
>
>
>
> *From:* Papadopoulos, Konstantinos
> 
> *Sent:* Δευτέρα, 15 Απριλίου 2019 12:30 μμ
> *To:* Fabian Hueske 
> *Cc:* Rong Rong ; user 
> *Subject:* RE: Flink JDBC: Disable auto-commit mode
>
>
>
> Hi Fabian,
>
>
>
> Glad to hear that you agree for such an improvement. Of course, I can
> handle it.
>
>
>
> Best,
>
> Konstantinos
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Δευτέρα, 15 Απριλίου 2019 11:56 πμ
> *To:* Papadopoulos, Konstantinos <
> konstantinos.papadopou...@iriworldwide.com>
> *Cc:* Rong Rong ; user 
> *Subject:* Re: Flink JDBC: Disable auto-commit mode
>
>
>
> Hi Konstantinos,
>
>
>
> This sounds like a useful extension to me.
>
> Would you like to create a Jira issue and contribute the improvement?
>
>
>
> In the meantime, you can just fork the code of JDBCInputFormat and adjust
> it to your needs.
>
>
>
> Best, Fabian
>
>
>
> Am Mo., 15. Apr. 2019 um 08:53 Uhr schrieb Papadopoulos, Konstantinos <
> konstantinos.papadopou...@iriworldwide.com>:
>
> Hi Rong,
>
>
>
> We have already tried to set the fetch size with no success. According to
> PG documentation we have to set both configuration parameters (i.e.,
> auto-commit to false and limit fetch) to achieve our purpose.
>
>
>
> Thanks,
>
> Konstantinos
>
>
>
> *From:* Rong Rong 
> *Sent:* Παρασκευή, 12 Απριλίου 2019 6:50 μμ
> *To:* Papadopoulos, Konstantinos <
> konstantinos.papadopou...@iriworldwide.com>
> *Cc:* user 
> *Subject:* Re: Flink JDBC: Disable auto-commit mode
>
>
>
> Hi Konstantinos,
>
>
>
> Seems like setting for auto commit is not directly possible in the current
> JDBCInputFormatBuilder.
>
> However there's a way to specify the fetch size [1] for your DB
> round-trip, doesn't that resolve your issue?
>
>
>
> Similarly in JDBCOutputFormat, a batching mode was also used to stash
> upload rows before flushing to DB.
>
>
>
> --
>
> Rong
>
>
>
> [1]
> https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4
> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.oracle.com%2Fcd%2FE18283_01%2Fjava.112%2Fe16548%2Fresltset.htm%23insertedID4=02%7C01%7C%7C2cc537b740b23dcd08d6c185070b%7C43728c2044474b27ac2e4bdabb3c0121%7C0%7C0%7C636909174427110104=vHwPnVT%2BI41Xxkp1Zfl%2BOgTReZ0ILL5RkhDez72jJvM%3D=0>
>
>
>
> On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos <
> konstantinos.papadopou...@iriworldwide.com> wrote:
>
> Hi all,
>
> We are facing an issue when trying to integrate PostgreSQL with Flink
> JDBC. When you establish a connection to the PostgreSQL database, it is in
> auto-commit mode. It means that each SQL statement is treated as a
> transaction and is automatically committed, but this functionality results
> in unexpected behavior (e.g., out-of-memory errors) when executed for large
> result sets. In order to bypass such issues, we must disable the
> auto-commit mode. To do this, in a simple Java application, we call the
> setAutoCommit() method of the Connection object.
>
> So, my question is: How can we achieve this by using JDBCInputFormat of
> Flink?
>
> Thanks in advance,
>
> Konstantinos
>
>


RE: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Papadopoulos, Konstantinos
Hi Fabian,

I opened the following issue to track the improvement proposed:
https://issues.apache.org/jira/browse/FLINK-12198

Best,
Konstantinos

From: Papadopoulos, Konstantinos 
Sent: Δευτέρα, 15 Απριλίου 2019 12:30 μμ
To: Fabian Hueske 
Cc: Rong Rong ; user 
Subject: RE: Flink JDBC: Disable auto-commit mode

Hi Fabian,

Glad to hear that you agree for such an improvement. Of course, I can handle it.

Best,
Konstantinos

From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Δευτέρα, 15 Απριλίου 2019 11:56 πμ
To: Papadopoulos, Konstantinos 
mailto:konstantinos.papadopou...@iriworldwide.com>>
Cc: Rong Rong mailto:walter...@gmail.com>>; user 
mailto:user@flink.apache.org>>
Subject: Re: Flink JDBC: Disable auto-commit mode

Hi Konstantinos,

This sounds like a useful extension to me.
Would you like to create a Jira issue and contribute the improvement?

In the meantime, you can just fork the code of JDBCInputFormat and adjust it to 
your needs.

Best, Fabian

Am Mo., 15. Apr. 2019 um 08:53 Uhr schrieb Papadopoulos, Konstantinos 
mailto:konstantinos.papadopou...@iriworldwide.com>>:
Hi Rong,

We have already tried to set the fetch size with no success. According to PG 
documentation we have to set both configuration parameters (i.e., auto-commit 
to false and limit fetch) to achieve our purpose.

Thanks,
Konstantinos

From: Rong Rong mailto:walter...@gmail.com>>
Sent: Παρασκευή, 12 Απριλίου 2019 6:50 μμ
To: Papadopoulos, Konstantinos 
mailto:konstantinos.papadopou...@iriworldwide.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Flink JDBC: Disable auto-commit mode

Hi Konstantinos,

Seems like setting for auto commit is not directly possible in the current 
JDBCInputFormatBuilder.
However there's a way to specify the fetch size [1] for your DB round-trip, 
doesn't that resolve your issue?

Similarly in JDBCOutputFormat, a batching mode was also used to stash upload 
rows before flushing to DB.

--
Rong

[1] 
https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.oracle.com%2Fcd%2FE18283_01%2Fjava.112%2Fe16548%2Fresltset.htm%23insertedID4=02%7C01%7C%7C2cc537b740b23dcd08d6c185070b%7C43728c2044474b27ac2e4bdabb3c0121%7C0%7C0%7C636909174427110104=vHwPnVT%2BI41Xxkp1Zfl%2BOgTReZ0ILL5RkhDez72jJvM%3D=0>

On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos 
mailto:konstantinos.papadopou...@iriworldwide.com>>
 wrote:
Hi all,

We are facing an issue when trying to integrate PostgreSQL with Flink JDBC. 
When you establish a connection to the PostgreSQL database, it is in 
auto-commit mode. It means that each SQL statement is treated as a transaction 
and is automatically committed, but this functionality results in unexpected 
behavior (e.g., out-of-memory errors) when executed for large result sets. In 
order to bypass such issues, we must disable the auto-commit mode. To do this, 
in a simple Java application, we call the setAutoCommit() method of the 
Connection object.

So, my question is: How can we achieve this by using JDBCInputFormat of Flink?

Thanks in advance,

Konstantinos


RE: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Papadopoulos, Konstantinos
Hi Fabian,

Glad to hear that you agree for such an improvement. Of course, I can handle it.

Best,
Konstantinos

From: Fabian Hueske 
Sent: Δευτέρα, 15 Απριλίου 2019 11:56 πμ
To: Papadopoulos, Konstantinos 
Cc: Rong Rong ; user 
Subject: Re: Flink JDBC: Disable auto-commit mode

Hi Konstantinos,

This sounds like a useful extension to me.
Would you like to create a Jira issue and contribute the improvement?

In the meantime, you can just fork the code of JDBCInputFormat and adjust it to 
your needs.

Best, Fabian

Am Mo., 15. Apr. 2019 um 08:53 Uhr schrieb Papadopoulos, Konstantinos 
mailto:konstantinos.papadopou...@iriworldwide.com>>:
Hi Rong,

We have already tried to set the fetch size with no success. According to PG 
documentation we have to set both configuration parameters (i.e., auto-commit 
to false and limit fetch) to achieve our purpose.

Thanks,
Konstantinos

From: Rong Rong mailto:walter...@gmail.com>>
Sent: Παρασκευή, 12 Απριλίου 2019 6:50 μμ
To: Papadopoulos, Konstantinos 
mailto:konstantinos.papadopou...@iriworldwide.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Flink JDBC: Disable auto-commit mode

Hi Konstantinos,

Seems like setting for auto commit is not directly possible in the current 
JDBCInputFormatBuilder.
However there's a way to specify the fetch size [1] for your DB round-trip, 
doesn't that resolve your issue?

Similarly in JDBCOutputFormat, a batching mode was also used to stash upload 
rows before flushing to DB.

--
Rong

[1] 
https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.oracle.com%2Fcd%2FE18283_01%2Fjava.112%2Fe16548%2Fresltset.htm%23insertedID4=02%7C01%7C%7Cde1505b7262a40538d3a08d6c1804cc6%7C43728c2044474b27ac2e4bdabb3c0121%7C0%7C0%7C636909154132221766=jrFosl7Fo9Hhe1vJQIEqHJf5l3wSp6dzKgAHmaWLIxk%3D=0>

On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos 
mailto:konstantinos.papadopou...@iriworldwide.com>>
 wrote:
Hi all,

We are facing an issue when trying to integrate PostgreSQL with Flink JDBC. 
When you establish a connection to the PostgreSQL database, it is in 
auto-commit mode. It means that each SQL statement is treated as a transaction 
and is automatically committed, but this functionality results in unexpected 
behavior (e.g., out-of-memory errors) when executed for large result sets. In 
order to bypass such issues, we must disable the auto-commit mode. To do this, 
in a simple Java application, we call the setAutoCommit() method of the 
Connection object.

So, my question is: How can we achieve this by using JDBCInputFormat of Flink?

Thanks in advance,

Konstantinos


Re: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Fabian Hueske
Hi Konstantinos,

This sounds like a useful extension to me.
Would you like to create a Jira issue and contribute the improvement?

In the meantime, you can just fork the code of JDBCInputFormat and adjust
it to your needs.

Best, Fabian

Am Mo., 15. Apr. 2019 um 08:53 Uhr schrieb Papadopoulos, Konstantinos <
konstantinos.papadopou...@iriworldwide.com>:

> Hi Rong,
>
>
>
> We have already tried to set the fetch size with no success. According to
> PG documentation we have to set both configuration parameters (i.e.,
> auto-commit to false and limit fetch) to achieve our purpose.
>
>
>
> Thanks,
>
> Konstantinos
>
>
>
> *From:* Rong Rong 
> *Sent:* Παρασκευή, 12 Απριλίου 2019 6:50 μμ
> *To:* Papadopoulos, Konstantinos
> 
> *Cc:* user 
> *Subject:* Re: Flink JDBC: Disable auto-commit mode
>
>
>
> Hi Konstantinos,
>
>
>
> Seems like setting for auto commit is not directly possible in the current
> JDBCInputFormatBuilder.
>
> However there's a way to specify the fetch size [1] for your DB
> round-trip, doesn't that resolve your issue?
>
>
>
> Similarly in JDBCOutputFormat, a batching mode was also used to stash
> upload rows before flushing to DB.
>
>
>
> --
>
> Rong
>
>
>
> [1]
> https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4
> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.oracle.com%2Fcd%2FE18283_01%2Fjava.112%2Fe16548%2Fresltset.htm%23insertedID4=02%7C01%7C%7C44d9fa279b674725483008d6bf5e7b3e%7C43728c2044474b27ac2e4bdabb3c0121%7C0%7C1%7C636906809861700178=XE%2FjZhh%2F8dcBe679%2FccdGq0rutlAiXHBHTj45eoXM70%3D=0>
>
>
>
> On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos <
> konstantinos.papadopou...@iriworldwide.com> wrote:
>
> Hi all,
>
> We are facing an issue when trying to integrate PostgreSQL with Flink
> JDBC. When you establish a connection to the PostgreSQL database, it is in
> auto-commit mode. It means that each SQL statement is treated as a
> transaction and is automatically committed, but this functionality results
> in unexpected behavior (e.g., out-of-memory errors) when executed for large
> result sets. In order to bypass such issues, we must disable the
> auto-commit mode. To do this, in a simple Java application, we call the
> setAutoCommit() method of the Connection object.
>
> So, my question is: How can we achieve this by using JDBCInputFormat of
> Flink?
>
> Thanks in advance,
>
> Konstantinos
>
>


RE: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Papadopoulos, Konstantinos
Hi Rong,

We have already tried to set the fetch size with no success. According to PG 
documentation we have to set both configuration parameters (i.e., auto-commit 
to false and limit fetch) to achieve our purpose.

Thanks,
Konstantinos

From: Rong Rong 
Sent: Παρασκευή, 12 Απριλίου 2019 6:50 μμ
To: Papadopoulos, Konstantinos 
Cc: user 
Subject: Re: Flink JDBC: Disable auto-commit mode

Hi Konstantinos,

Seems like setting for auto commit is not directly possible in the current 
JDBCInputFormatBuilder.
However there's a way to specify the fetch size [1] for your DB round-trip, 
doesn't that resolve your issue?

Similarly in JDBCOutputFormat, a batching mode was also used to stash upload 
rows before flushing to DB.

--
Rong

[1] 
https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.oracle.com%2Fcd%2FE18283_01%2Fjava.112%2Fe16548%2Fresltset.htm%23insertedID4=02%7C01%7C%7C44d9fa279b674725483008d6bf5e7b3e%7C43728c2044474b27ac2e4bdabb3c0121%7C0%7C1%7C636906809861700178=XE%2FjZhh%2F8dcBe679%2FccdGq0rutlAiXHBHTj45eoXM70%3D=0>

On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos 
mailto:konstantinos.papadopou...@iriworldwide.com>>
 wrote:
Hi all,

We are facing an issue when trying to integrate PostgreSQL with Flink JDBC. 
When you establish a connection to the PostgreSQL database, it is in 
auto-commit mode. It means that each SQL statement is treated as a transaction 
and is automatically committed, but this functionality results in unexpected 
behavior (e.g., out-of-memory errors) when executed for large result sets. In 
order to bypass such issues, we must disable the auto-commit mode. To do this, 
in a simple Java application, we call the setAutoCommit() method of the 
Connection object.

So, my question is: How can we achieve this by using JDBCInputFormat of Flink?

Thanks in advance,

Konstantinos


Re: Flink JDBC: Disable auto-commit mode

2019-04-12 Thread Rong Rong
Hi Konstantinos,

Seems like setting for auto commit is not directly possible in the current
JDBCInputFormatBuilder.
However there's a way to specify the fetch size [1] for your DB round-trip,
doesn't that resolve your issue?

Similarly in JDBCOutputFormat, a batching mode was also used to stash
upload rows before flushing to DB.

--
Rong

[1]
https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4

On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos <
konstantinos.papadopou...@iriworldwide.com> wrote:

> Hi all,
>
> We are facing an issue when trying to integrate PostgreSQL with Flink
> JDBC. When you establish a connection to the PostgreSQL database, it is
> in auto-commit mode. It means that each SQL statement is treated as a
> transaction and is automatically committed, but this functionality results
> in unexpected behavior (e.g., out-of-memory errors) when executed for large
> result sets. In order to bypass such issues, we must disable the
> auto-commit mode. To do this, in a simple Java application, we call the
> setAutoCommit() method of the Connection object.
>
> So, my question is: How can we achieve this by using JDBCInputFormat of
> Flink?
>
> Thanks in advance,
>
> Konstantinos
>
>


Re: Flink Jdbc streaming source support in 1.7.1 or in future?

2019-01-23 Thread Puneet Kinra
Then common way is to read in the cdc .writing generic operator wont be
easy .

On Wed, Jan 23, 2019 at 12:45 PM Manjusha Vuyyuru 
wrote:

> But 'JDBCInputFormat' will exit once its done reading all data.I need
> something like which keeps polling to mysql and fetch if there are any
> updates or changes.
>
> Thanks,
> manju
>
> On Wed, Jan 23, 2019 at 7:10 AM Zhenghua Gao  wrote:
>
>> Actually flink-connectors/flink-jdbc module provided a JDBCInputFormat to
>> read data from a database.
>> u can have a try.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Flink Jdbc streaming source support in 1.7.1 or in future?

2019-01-23 Thread Fabian Hueske
I think this is very hard to build in a generic way.
The common approach here would be to get access to the changelog stream of
the table, writing it to a message queue / event log (like Kafka, Pulsar,
Kinesis, ...) and ingesting the changes from the event log into a Flink
application.

You can of course build a custom source that returns all added rows if you
have some meta data in the table, e.g., timestamps indicating when a row
was added.
However, this can easily become very complex, for example if you need to
handle deletes and updates.

Best, Fabian

Am Mi., 23. Jan. 2019 um 08:14 Uhr schrieb Manjusha Vuyyuru <
vmanjusha@gmail.com>:

> But 'JDBCInputFormat' will exit once its done reading all data.I need
> something like which keeps polling to mysql and fetch if there are any
> updates or changes.
>
> Thanks,
> manju
>
> On Wed, Jan 23, 2019 at 7:10 AM Zhenghua Gao  wrote:
>
>> Actually flink-connectors/flink-jdbc module provided a JDBCInputFormat to
>> read data from a database.
>> u can have a try.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Flink Jdbc streaming source support in 1.7.1 or in future?

2019-01-22 Thread Manjusha Vuyyuru
But 'JDBCInputFormat' will exit once its done reading all data.I need
something like which keeps polling to mysql and fetch if there are any
updates or changes.

Thanks,
manju

On Wed, Jan 23, 2019 at 7:10 AM Zhenghua Gao  wrote:

> Actually flink-connectors/flink-jdbc module provided a JDBCInputFormat to
> read data from a database.
> u can have a try.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink Jdbc streaming source support in 1.7.1 or in future?

2019-01-22 Thread Zhenghua Gao
Actually flink-connectors/flink-jdbc module provided a JDBCInputFormat to
read data from a database.
u can have a try.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink jdbc

2017-02-23 Thread Punit Tandel

HI,

Yes i have written custom jdbc sink function based on the jdbcoutformat 
for streaming and its working and writing records in postgres db or H2 
in memory db. However trying to figure out how many times open method is 
called and establishes database connection because for my integration 
tests its calling open() method for couple of times which is 
establishing connection couple of times and that is only for one single 
event coming from kafka. So can you please tell if its a default 
implementation that will establish connection multiple times ?
Or db connection will be taken from connection pool if i will run my app 
on Cluster? would be great help


Thanks


On 02/22/2017 10:47 PM, Fabian Hueske wrote:

Hi,

I should also mention that the JdbcOutputFormat batches writes to the 
database. Since it is not integrated with the Flink's checkpointing 
mechanism, data might get lost in case of a failure.
I would recommend to implement a JdbcSinkFunction based on the code of 
the JdbcOutputFormat.
If you use the batch JdbcOutputFormat you might get duplicates or lose 
data.


Best, Fabian

2017-02-16 15:39 GMT+01:00 Punit Tandel >:


Thanks for the info, At the moment i used the flink-jdbc to write
the streaming data coming from kafka which i can process and write
those data in postgres or mysql database configured on cluster or
sandbox, However when trying to write integration tests i am using
in memory H2 database which some what acting strange as i can not
see any error being thrown by write record method but at the same
time nothing is written in database. So kinda a little hard to
figure whats going wrong here.

Thanks


On 02/16/2017 02:02 PM, Fabian Hueske wrote:

The JdbcOutputFormat was originally meant for batch jobs.
It should be possible to use it for streaming jobs as well,
however, you should be aware that it is not integrated with Flink
checkpointing mechanism.
So, you might have duplicate data in case of failures.

I also don't know if or how well it works with H2.

Best, Fabian

2017-02-16 11:06 GMT+01:00 Punit Tandel
>:

Yes  i have been following the tutorials and reading from H2
and writing to H2 works fine, But problem here is data coming
from kafka and writing them to h2 engine does not seems to
work and cant see any error thrown while writing into in
memory H2 database, So couldnt say whats the error and why
those data are not inserted.

Have been trying to find out cause and looking for logs while
flink processes the operations but couldnt find any error
being thrown at the time of writing data. Any where i can
check for logs ?

Thanks


On 02/16/2017 01:10 AM, Ted Yu wrote:

See the tutorial at the beginning of:


flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

Looks like plugging in "org.h2.Driver" should do.

On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel
> wrote:

Hi All

Does flink jdbc support writing the data into H2 Database?

Thanks
Punit












Re: Flink jdbc

2017-02-22 Thread Fabian Hueske
Hi,

I should also mention that the JdbcOutputFormat batches writes to the
database. Since it is not integrated with the Flink's checkpointing
mechanism, data might get lost in case of a failure.
I would recommend to implement a JdbcSinkFunction based on the code of the
JdbcOutputFormat.
If you use the batch JdbcOutputFormat you might get duplicates or lose data.

Best, Fabian

2017-02-16 15:39 GMT+01:00 Punit Tandel :

> Thanks for the info, At the moment i used the flink-jdbc to write the
> streaming data coming from kafka which i can process and write those data
> in postgres or mysql database configured on cluster or sandbox, However
> when trying to write integration tests i am using in memory H2 database
> which some what acting strange as i can not see any error being thrown by
> write record method but at the same time nothing is written in database. So
> kinda a little hard to figure whats going wrong here.
>
> Thanks
>
> On 02/16/2017 02:02 PM, Fabian Hueske wrote:
>
> The JdbcOutputFormat was originally meant for batch jobs.
> It should be possible to use it for streaming jobs as well, however, you
> should be aware that it is not integrated with Flink checkpointing
> mechanism.
> So, you might have duplicate data in case of failures.
>
> I also don't know if or how well it works with H2.
>
> Best, Fabian
>
> 2017-02-16 11:06 GMT+01:00 Punit Tandel :
>
>> Yes  i have been following the tutorials and reading from H2 and writing
>> to H2 works fine, But problem here is data coming from kafka and writing
>> them to h2 engine does not seems to work and cant see any error thrown
>> while writing into in memory H2 database, So couldnt say whats the error
>> and why those data are not inserted.
>>
>> Have been trying to find out cause and looking for logs while flink
>> processes the operations but couldnt find any error being thrown at the
>> time of writing data. Any where i can check for logs ?
>>
>> Thanks
>>
>> On 02/16/2017 01:10 AM, Ted Yu wrote:
>>
>> See the tutorial at the beginning of:
>>
>> flink-connectors/flink-jdbc/src/main/java/org/apache/flink/
>> api/java/io/jdbc/JDBCInputFormat.java
>>
>> Looks like plugging in "org.h2.Driver" should do.
>>
>> On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel 
>> wrote:
>>
>>> Hi All
>>>
>>> Does flink jdbc support writing the data into H2 Database?
>>>
>>> Thanks
>>> Punit
>>>
>>>
>>
>>
>
>


Re: Flink jdbc

2017-02-16 Thread Punit Tandel
Thanks for the info, At the moment i used the flink-jdbc to write the 
streaming data coming from kafka which i can process and write those 
data in postgres or mysql database configured on cluster or sandbox, 
However when trying to write integration tests i am using in memory H2 
database which some what acting strange as i can not see any error being 
thrown by write record method but at the same time nothing is written in 
database. So kinda a little hard to figure whats going wrong here.


Thanks


On 02/16/2017 02:02 PM, Fabian Hueske wrote:

The JdbcOutputFormat was originally meant for batch jobs.
It should be possible to use it for streaming jobs as well, however, 
you should be aware that it is not integrated with Flink checkpointing 
mechanism.

So, you might have duplicate data in case of failures.

I also don't know if or how well it works with H2.

Best, Fabian

2017-02-16 11:06 GMT+01:00 Punit Tandel >:


Yes  i have been following the tutorials and reading from H2 and
writing to H2 works fine, But problem here is data coming from
kafka and writing them to h2 engine does not seems to work and
cant see any error thrown while writing into in memory H2
database, So couldnt say whats the error and why those data are
not inserted.

Have been trying to find out cause and looking for logs while
flink processes the operations but couldnt find any error being
thrown at the time of writing data. Any where i can check for logs ?

Thanks


On 02/16/2017 01:10 AM, Ted Yu wrote:

See the tutorial at the beginning of:


flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

Looks like plugging in "org.h2.Driver" should do.

On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel
> wrote:

Hi All

Does flink jdbc support writing the data into H2 Database?

Thanks
Punit









Re: Flink jdbc

2017-02-16 Thread Fabian Hueske
The JdbcOutputFormat was originally meant for batch jobs.
It should be possible to use it for streaming jobs as well, however, you
should be aware that it is not integrated with Flink checkpointing
mechanism.
So, you might have duplicate data in case of failures.

I also don't know if or how well it works with H2.

Best, Fabian

2017-02-16 11:06 GMT+01:00 Punit Tandel :

> Yes  i have been following the tutorials and reading from H2 and writing
> to H2 works fine, But problem here is data coming from kafka and writing
> them to h2 engine does not seems to work and cant see any error thrown
> while writing into in memory H2 database, So couldnt say whats the error
> and why those data are not inserted.
>
> Have been trying to find out cause and looking for logs while flink
> processes the operations but couldnt find any error being thrown at the
> time of writing data. Any where i can check for logs ?
>
> Thanks
>
> On 02/16/2017 01:10 AM, Ted Yu wrote:
>
> See the tutorial at the beginning of:
>
> flink-connectors/flink-jdbc/src/main/java/org/apache/
> flink/api/java/io/jdbc/JDBCInputFormat.java
>
> Looks like plugging in "org.h2.Driver" should do.
>
> On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel 
> wrote:
>
>> Hi All
>>
>> Does flink jdbc support writing the data into H2 Database?
>>
>> Thanks
>> Punit
>>
>>
>
>


Re: Flink jdbc

2017-02-16 Thread Punit Tandel
Yes  i have been following the tutorials and reading from H2 and writing 
to H2 works fine, But problem here is data coming from kafka and writing 
them to h2 engine does not seems to work and cant see any error thrown 
while writing into in memory H2 database, So couldnt say whats the error 
and why those data are not inserted.


Have been trying to find out cause and looking for logs while flink 
processes the operations but couldnt find any error being thrown at the 
time of writing data. Any where i can check for logs ?


Thanks


On 02/16/2017 01:10 AM, Ted Yu wrote:

See the tutorial at the beginning of:

flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

Looks like plugging in "org.h2.Driver" should do.

On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel 
> wrote:


Hi All

Does flink jdbc support writing the data into H2 Database?

Thanks
Punit






Re: Flink jdbc

2017-02-15 Thread Ted Yu
See the tutorial at the beginning of:

flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

Looks like plugging in "org.h2.Driver" should do.

On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel 
wrote:

> Hi All
>
> Does flink jdbc support writing the data into H2 Database?
>
> Thanks
> Punit
>
>


Re: Flink JDBC JDBCOutputFormat Open

2016-09-13 Thread Swapnil Chougule
Thanks Chesnay for update.

On Tue, Sep 13, 2016 at 12:13 AM, Chesnay Schepler 
wrote:

> Hello,
>
> the JDBC Sink completely ignores the taskNumber and parallelism.
>
> Regards,
> Chesnay
>
>
> On 12.09.2016 08:41, Swapnil Chougule wrote:
>
> Hi Team,
>
> I want to know how tasknumber & numtasks help in opening db connection in
> Flink JDBC JDBCOutputFormat Open. I checked with docs where it says:
> taskNumber - The number of the parallel instance. numTasks - The number
> of parallel tasks. But couldn't get clear idea among parallel instance &
> parallel tasks. How do they contribute in concurrency with JDBC Source/Sink?
>
> I also checked with code but couldn't drill down further
>
> /**
> * Connects to the target database and initializes the prepared statement.
> *
> * @param taskNumber The number of the parallel instance.
> * @throws IOException Thrown, if the output could not be opened due to an
> * I/O problem.
> */
> @Override
> public void open(int taskNumber, int numTasks) throws IOException {
> try {
> establishConnection();
> upload = dbConn.prepareStatement(query);
> } catch (SQLException sqe) {
> throw new IllegalArgumentException("open() failed.", sqe);
> } catch (ClassNotFoundException cnfe) {
> throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
> }
> }
>
> private void establishConnection() throws SQLException,
> ClassNotFoundException {
> Class.forName(drivername);
> if (username == null) {
> dbConn = DriverManager.getConnection(dbURL);
> } else {
> dbConn = DriverManager.getConnection(dbURL, username, password);
> }
> }
>
> Thanks,
> Swapnil
>
>
>
>


Re: Flink JDBC JDBCOutputFormat Open

2016-09-12 Thread Chesnay Schepler

Hello,

the JDBC Sink completely ignores the taskNumber and parallelism.

Regards,
Chesnay

On 12.09.2016 08:41, Swapnil Chougule wrote:

Hi Team,

I want to know how tasknumber & numtasks help in opening db connection 
in Flink JDBC JDBCOutputFormat Open. I checked with docs where it says:


|taskNumber| - The number of the parallel instance.
|numTasks| - The number of parallel tasks.

But couldn't get clear idea among parallel instance & parallel tasks. 
How do they contribute in concurrency with JDBC Source/Sink?


I also checked with code but couldn't drill down further

/**
* Connects to the target database and initializes the prepared statement.

*

* @param taskNumber The number of the parallel instance.

* @throws IOException Thrown, if the output could not be opened due to an

* I/O problem.

*/

@Override

public void open(int taskNumber, int numTasks) throws IOException {

try {

establishConnection();

upload = dbConn.prepareStatement(query);

} catch (SQLException sqe) {

throw new IllegalArgumentException("open() failed.", sqe);

} catch (ClassNotFoundException cnfe) {

throw new IllegalArgumentException("JDBC driver class not found.", cnfe);

}

}


private void establishConnection() throws SQLException, 
ClassNotFoundException {


Class.forName(drivername);

if (username == null) {

dbConn = DriverManager.getConnection(dbURL);

} else {

dbConn = DriverManager.getConnection(dbURL, username, password);

}

}

Thanks,
Swapnil