+1 to this answer.

MERGE is what I found most compatible syntax when dealing with upsert /
replace.

AFAIK, almost all DBMS have some kind of dialect regrading upsert
functionality, so following the SQL standard might be your best solution
here.
And yes both the MERGE ingestion SQL and the execution logs are gonna be
more complex.

--
Rong

On Wed, Jul 4, 2018 at 1:15 AM Fabian Hueske <fhue...@gmail.com> wrote:

> There is also the SQL:2003 MERGE statement that can be used to implement
> UPSERT logic.
> It is a bit verbose but supported by Derby [1].
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/DERBY-3155
>
> 2018-07-04 10:10 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Chris,
>>
>> MySQL (and maybe other DBMS as well) offers special syntax for upserts.
>>
>> The answers to this SO question [1] recommend "INSERT INTO ... ON
>> DUPLICATE KEY UPDATE ..." or "REPLACE INTO ...".
>> However, AFAIK this syntax is not standardized and might vary from DBMS
>> to DBMS.
>>
>> Best, Fabian
>>
>> [1]
>> https://stackoverflow.com/questions/4205181/insert-into-a-mysql-table-or-update-if-exists
>>
>> 2018-07-03 12:14 GMT+02:00 Chris Ruegger <chris.rueg...@gmail.com>:
>>
>>> Fabian, Rong:
>>> Thanks for the help, greatly appreciated.
>>>
>>> I am currently using a Derby database for the append-only JDBC sink.
>>> So far I don't see a way to use a JDBC/relational database solution for
>>> a retract/upsert use case?
>>> Is it possible to set up JDBC sink with Derby or MySQL so that it goes
>>> back and updates or deletes/inserts previous rows and inserts new ones?
>>> I have not been able to find example source code that does that.
>>> Thanks again,
>>> Chris
>>>
>>>
>>> On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> In addition to what Rong said:
>>>>
>>>> - The types look OK.
>>>> - You can also use Types.STRING, and Types.LONG instead of
>>>> BasicTypeInfo.xxx
>>>> - Beware that in the failure case, you might have multiple entries in
>>>> the database table. Some databases support an upsert syntax which (together
>>>> with key or uniqueness constraints) can ensure that each result is added
>>>> just once, even if the query recovers from a failure.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-07-01 17:25 GMT+02:00 Rong Rong <walter...@gmail.com>:
>>>>
>>>>> Hi Chris,
>>>>>
>>>>> Looking at the code, seems like JDBCTypeUtil [1] is used for
>>>>> converting Flink TypeInformation into JDBC Type (Java.sql.type), and
>>>>> SQL_TIMESTAMP and SQL_TIME are both listed in the conversion mapping.
>>>>> However the JDBC types are different.
>>>>>
>>>>> Regarding the question whether your insert is correctly configured. It
>>>>> directly relates to how your DB executes the JDBC insert command.
>>>>> 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems
>>>>> like you can even execute your command without type array or type mapping
>>>>> cannot be found, in this case the PrepareStatement will be written with
>>>>> plain Object type. I tired it on MySQL and it actually works pretty well.
>>>>> 2. Another question is whether your underlying DB can handle "implicit
>>>>> type cast": For example, inserting an INTEGER type into a BIGINT column.
>>>>> AFAIK JDBCAppendableSink does not check compatibilities before 
>>>>> writeRecord,
>>>>> so it might be a good idea to include some sanity check beforehand.
>>>>>
>>>>> Thanks,
>>>>> Rong
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
>>>>> [2]
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java#L109
>>>>>
>>>>> On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Full Source except for mapper and timestamp assigner.
>>>>>>
>>>>>> Sample Input Stream record:
>>>>>> 1530447316589,Mary,./home
>>>>>>
>>>>>>
>>>>>> What are the correct parameters to pass for data types in the
>>>>>> JDBCAppendTableSink?
>>>>>> Am I doing this correctly?
>>>>>>
>>>>>>
>>>>>>                 // Get Execution Environment
>>>>>>                 StreamExecutionEnvironment env =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>
>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>>                 StreamTableEnvironment tableEnvironment =
>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>
>>>>>>                 // Get and Set execution parameters.
>>>>>>                 ParameterTool parms = ParameterTool.fromArgs(args);
>>>>>>                 env.getConfig().setGlobalJobParameters(parms);
>>>>>>
>>>>>>                 // Configure Checkpoint and Restart
>>>>>>                 // configureCheckpoint(env);
>>>>>>                 // configureRestart(env);
>>>>>>
>>>>>>                 // Get Our Data Stream
>>>>>>                 DataStream<Tuple3&lt;Long,String,String>> eventStream
>>>>>> = env
>>>>>>                                 .socketTextStream(parms.get("host"),
>>>>>> parms.getInt("port"))
>>>>>>                                 .map(new TableStreamMapper())
>>>>>>                                 .assignTimestampsAndWatermarks(new
>>>>>> MyEventTimestampAssigner());
>>>>>>
>>>>>>
>>>>>>                 // Register Table
>>>>>>                 // Dynamic Table From Stream
>>>>>>                 tableEnvironment.registerDataStream("pageViews",
>>>>>> eventStream,
>>>>>> "pageViewTime.rowtime, username, url");
>>>>>>
>>>>>>             // Continuous Query
>>>>>>                 String continuousQuery =
>>>>>>                                 "SELECT TUMBLE_START(pageViewTime,
>>>>>> INTERVAL '1' MINUTE) as wstart, " +
>>>>>>                                 "TUMBLE_END(pageViewTime, INTERVAL
>>>>>> '1' MINUTE) as wend, " +
>>>>>>                                 "username, COUNT(url) as viewcount
>>>>>> FROM pageViews " +
>>>>>>                                 "GROUP BY TUMBLE(pageViewTime,
>>>>>> INTERVAL '1' MINUTE), username";
>>>>>>
>>>>>>                 // Dynamic Table from Continuous Query
>>>>>>                 Table windowedTable =
>>>>>> tableEnvironment.sqlQuery(continuousQuery);
>>>>>>                 windowedTable.printSchema();
>>>>>>
>>>>>>                 // Convert Results to DataStream
>>>>>>                 Table resultTable = windowedTable
>>>>>>                         .select("wstart, wend, username,viewcount");
>>>>>>
>>>>>>
>>>>>>
>>>>>> TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> tupleTypeInfo =
>>>>>> new TupleTypeInfo<>(
>>>>>>                                 Types.SQL_TIMESTAMP,
>>>>>>                                 Types.SQL_TIMESTAMP,
>>>>>>                                 Types.STRING,
>>>>>>                                 Types.LONG);
>>>>>>
>>>>>> DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> resultDataStream =
>>>>>>
>>>>>> tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
>>>>>>                 resultDataStream.print();
>>>>>>
>>>>>>
>>>>>>                 // Write Result Table to Sink
>>>>>>                 // Configure Sink
>>>>>>                 JDBCAppendTableSink pageViewSink =
>>>>>> JDBCAppendTableSink.builder()
>>>>>>
>>>>>> .setDrivername("org.apache.derby.jdbc.ClientDriver")
>>>>>>
>>>>>> .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
>>>>>>                         .setUsername("chris")
>>>>>>                         .setPassword("xxxx")
>>>>>>                         .setBatchSize(1)
>>>>>>                         .setQuery("INSERT INTO chris.pageclicks
>>>>>> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
>>>>>>
>>>>>>
>>>>>> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
>>>>>>                         .build();
>>>>>>
>>>>>>
>>>>>>                 // Write Result Table to Sink
>>>>>>                 resultTable.writeToSink(pageViewSink);
>>>>>>                 System.out.println("WRITE TO SINK");
>>>>>>
>>>>>>
>>>>>>                 // Execute
>>>>>>                 env.execute("PageViewsTumble");
>>>>>>         }
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------------
>>> Simplicity is the ultimate sophistication
>>> --Leonardo DaVinci
>>>
>>> www.rueggerconsultingllc.com
>>>
>>>
>>
>>
>

Reply via email to