[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-08-14 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579899#comment-16579899
 ] 

Hequn Cheng edited comment on FLINK-8545 at 8/14/18 3:20 PM:
-

[~pnowojski]
Great to have your response. The {{UpsertStreamTable}} and 
{{AppendStreamTable}} are internal classes which are used during 
{{registerTableInternal}}. 

Flag is a good choice, but it is difficult to handle the DataStream type. When 
upsert from stream, the input type is always tuple type(see FLINK-8577). To 
solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream 
with type of tuple2. 

Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} 
and it may be translated into an {{UpsertWithRetractNode}} by the current 
RetractionRules. 

Thanks again for your suggestions. I have posted a simple design doc just now 
in [FLINK-8577|https://issues.apache.org/jira/browse/FLINK-8577]. 


was (Author: hequn8128):
[~pnowojski]
Great to have your response. The {{UpsertStreamTable}} and 
{{AppendStreamTable}} are internal classes which are used during 
{{registerTableInternal}}. 

Flag is a good choice, but it is difficult to handle the DataStream type. When 
upsert from stream, the input type is always tuple type(see FLINK-8577). To 
solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream 
with type of tuple2. 

Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} 
and it may be translated into an {{UpsertWithRetractNode}} by the current 
RetractionRules. 

Thanks again for your suggestions. I will post a design doc ASAP. 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-08-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576235#comment-16576235
 ] 

Hequn Cheng edited comment on FLINK-8545 at 8/10/18 1:09 PM:
-

[~pnowojski] Hi, current design is to add an {{UpsertStreamTable}}(also rename 
current DataStreamTable to AppendStreamTable) and use a rule to generate a 
{{DataStreamLastRow}} node. The node will process the upsert stream similar to 
{{group by + last}}. Furthermore, we also need a {{CalcLastRowTransposeRule}} 
to minimise the state. 
It would be great to get your suggestions on it.


was (Author: hequn8128):
[~pnowojski] Hi, current design is to add an {{UpsertStreamTable}} and use a 
rule to generate a {{DataStreamLastRow}} node. The node will process the upsert 
stream similar to {{group by + last}}. Furthermore, we also need a 
{{CalcLastRowTransposeRule}} to minimise the state. 
It would be great to get your suggestions on it.

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/3/18 2:33 AM:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was thinking the scenario that ordered 
delete messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 


was (Author: hequn8128):
Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was thinking the scenario that only 
delete messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:01 PM:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. For example, users use flink job to sync delete messages 
from a source database to a target database.

 

 

 


was (Author: hequn8128):
Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
\{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. 

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:09 PM:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was thinking the scenario that only 
delete messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 


was (Author: hequn8128):
Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:08 PM:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 


was (Author: hequn8128):
Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. For example, users use flink job to sync delete messages 
from a source database to a target database.

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-01 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:30 AM:


Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the {{DataStream}} to {{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we need some refactors to adapt this. I will think 
more about it. Thanks.

 


was (Author: hequn8128):
Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the {{DataStream}} to {{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we may need some refactors to adapt this. I will 
think more about it. Thanks.

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-01 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:29 AM:


Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the {{DataStream}} to {{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we may need some refactors to adapt this. I will 
think more about it. Thanks.

 


was (Author: hequn8128):
Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the \{{DataStream}} to \{{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:

 
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we may need some refactors to adapt this. I will 
think more about it. Thanks.

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)