[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-05-06 Thread ranqiqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100533#comment-17100533
 ] 

ranqiqiang commented on FLINK-17459:


Thanks [~jark]:

           I get  it . If want to do  sink like : 
{code:java}
insert
...
on duplicate key update
id = if( values(modiy_time) >= modiy_time, values(id),id)
name = if( values(modiy_time) >= modiy_time, values(name),name)
..
modiy_time = if( values(modiy_time) >= modiy_time, 
values(modiy_time),modiy_time)
{code}
 

The modiy_time  just like   version field !

 

 

> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-05-06 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100519#comment-17100519
 ] 

Jark Wu commented on FLINK-17459:
-

The suggested way is to create the jdbc table using DDL. 

> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-05-06 Thread ranqiqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100513#comment-17100513
 ] 

ranqiqiang commented on FLINK-17459:


Thanks [~jark] :

            I  used  StreamTableEnvironment, but the method is deprecated !

           
{code:java}
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
bsSettings);
// DemoSource extends RichSourceFunction {...}
DemoSource source = new DemoSource();

DataStreamSource streamSource = env.addSource(source);
tableEnv.createTemporaryView("test",streamSource);

//  @Deprecated  
tableEnv.registerTableSink("jdbc_sink", sink);
//  have any other method ?  maybe I used wrong?{code}
 

> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-05-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100442#comment-17100442
 ] 

Jark Wu commented on FLINK-17459:
-

Hi [~michael ran], here is an example:


{code:java}
tableEnv.createTemporaryView("test",streamSource);
JDBCUpsertTableSink sink = JDBCUpsertTableSink.builder()
.setOptions(options)
.setTableSchema(schema)
.setFlushIntervalMills(3000)
.build();
tableEnv.registerTableSink("jdbc_sink", sink);
tableEnv.sqlUpdate("insert into jdbc_sink select  order_id,user_id,status from 
test");
tableEnv.execute()
{code}


> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-05-05 Thread ranqiqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100433#comment-17100433
 ] 

ranqiqiang commented on FLINK-17459:


[~jark]  can you give an example by JDBCUpsertTableSink  to finish append ?

> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-04-30 Thread ranqiqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096221#comment-17096221
 ] 

ranqiqiang commented on FLINK-17459:


Thanks , Maybe I did wrong ! use JDBCUpsertTableSink to append 

code like:

 
{code:java}
//代码占位符
tableEnv.createTemporaryView("test",streamSource);
Table appendTable = tableEnv.sqlQuery("select  order_id,user_id,status from 
test");
DataStream stream = tableEnv.toAppendStream(appendTable, Row.class);

JDBCUpsertTableSink sink = JDBCUpsertTableSink.builder()
.setOptions(options)
.setTableSchema(schema)
.setFlushIntervalMills(3000)
.build();

// error : java.lang.UnsupportedOperationException: JDBCUpsertTableSink can not 
support 
// If I  add "sink.setIsAppendOnly(true)"
// error : org.apache.flink.types.Row cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2


// so I do like :
DataStream stream = tableEnv.toRetractStream(appendTable, Row.class);
sink.setIsAppendOnly(true)

// result is OK ! I just want to insert/append data


// I don't know where to set by framework ?
if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) {
   throw new UnsupportedOperationException("JDBCUpsertTableSink can not support 
");
}


{code}
 

 

> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-04-29 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096120#comment-17096120
 ] 

Jark Wu commented on FLINK-17459:
-


{code:java}
sink.setIsAppendOnly(true);
sink.setKeyFields(new String[]{"id"});
{code}

They are invoked by framework, you don't need to call them. And you can use it 
in both append-only and updating queries. 

Yes. JDBCAppendTableSink will be removed in the future. 


> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-04-29 Thread ranqiqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096029#comment-17096029
 ] 

ranqiqiang commented on FLINK-17459:


1.JDBCAppendTableSink will be abandoned ?

2.JDBCUpsertTableSink  could not support  setQuery , and JDBCUpsertTableSink 
how to do like  as JDBCAppendTableSink  to apending ? I could not found 
document !

> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-04-29 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17095458#comment-17095458
 ] 

Jark Wu commented on FLINK-17459:
-

We suggest to use {{JDBCUpsertTableSink}} in Table API & SQL which both support 
append-only queries and updating queries.

> JDBCAppendTableSink not  support  flush  by flushIntervalMills
> --
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: ranqiqiang
>Priority: Major
>
> {{JDBCAppendTableSink just support append by 
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like 
> "JDBCUpsertTableSink#flushIntervalMills"}}
>  
> {{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not 
> be append !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)