[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579899#comment-16579899 ] Hequn Cheng edited comment on FLINK-8545 at 8/14/18 3:20 PM: - [~pnowojski] Great to have your response. The {{UpsertStreamTable}} and {{AppendStreamTable}} are internal classes which are used during {{registerTableInternal}}. Flag is a good choice, but it is difficult to handle the DataStream type. When upsert from stream, the input type is always tuple type(see FLINK-8577). To solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream with type of tuple2. Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} and it may be translated into an {{UpsertWithRetractNode}} by the current RetractionRules. Thanks again for your suggestions. I have posted a simple design doc just now in [FLINK-8577|https://issues.apache.org/jira/browse/FLINK-8577]. was (Author: hequn8128): [~pnowojski] Great to have your response. The {{UpsertStreamTable}} and {{AppendStreamTable}} are internal classes which are used during {{registerTableInternal}}. Flag is a good choice, but it is difficult to handle the DataStream type. When upsert from stream, the input type is always tuple type(see FLINK-8577). To solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream with type of tuple2. Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} and it may be translated into an {{UpsertWithRetractNode}} by the current RetractionRules. Thanks again for your suggestions. I will post a design doc ASAP. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576235#comment-16576235 ] Hequn Cheng edited comment on FLINK-8545 at 8/10/18 1:09 PM: - [~pnowojski] Hi, current design is to add an {{UpsertStreamTable}}(also rename current DataStreamTable to AppendStreamTable) and use a rule to generate a {{DataStreamLastRow}} node. The node will process the upsert stream similar to {{group by + last}}. Furthermore, we also need a {{CalcLastRowTransposeRule}} to minimise the state. It would be great to get your suggestions on it. was (Author: hequn8128): [~pnowojski] Hi, current design is to add an {{UpsertStreamTable}} and use a rule to generate a {{DataStreamLastRow}} node. The node will process the upsert stream similar to {{group by + last}}. Furthermore, we also need a {{CalcLastRowTransposeRule}} to minimise the state. It would be great to get your suggestions on it. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548 ] Hequn Cheng edited comment on FLINK-8545 at 2/3/18 2:33 AM: Hi, the {{Tuple}} input format is a good choice. It is consistent with the {{UpsertStreamTableSink}}. Yes, upsert stream source should have state so that to generate retract messages. As for the problem of deletes on non-inserted keys, if i understand correctly, do you mean caching the unordered delete message until the corresponding insert message comes? I was thinking the scenario that ordered delete messages are ingested. For example, users use flink job to ingest delete messages from a source database and output to a target database. was (Author: hequn8128): Hi, the {{Tuple}} input format is a good choice. It is consistent with the {{UpsertStreamTableSink}}. Yes, upsert stream source should have state so that to generate retract messages. As for the problem of deletes on non-inserted keys, if i understand correctly, do you mean caching the unordered delete message until the corresponding insert message comes? I was thinking the scenario that only delete messages are ingested. For example, users use flink job to ingest delete messages from a source database and output to a target database. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548 ] Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:01 PM: Hi, the {{Tuple}} input format is a good choice. It is consistent with the {{UpsertStreamTableSink}}. Yes, upsert stream source should have state so that to generate retract messages. As for the problem of deletes on non-inserted keys, if i understand correctly, do you mean caching the unordered delete message until the corresponding insert message comes? I was think the scenario that only delete messages are ingested. For example, users use flink job to sync delete messages from a source database to a target database. was (Author: hequn8128): Hi, the {{Tuple}} input format is a good choice. It is consistent with the \{{UpsertStreamTableSink}}. Yes, upsert stream source should have state so that to generate retract messages. As for the problem of deletes on non-inserted keys, if i understand correctly, do you mean caching the unordered delete message until the corresponding insert message comes? I was think the scenario that only delete messages are ingested. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548 ] Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:09 PM: Hi, the {{Tuple}} input format is a good choice. It is consistent with the {{UpsertStreamTableSink}}. Yes, upsert stream source should have state so that to generate retract messages. As for the problem of deletes on non-inserted keys, if i understand correctly, do you mean caching the unordered delete message until the corresponding insert message comes? I was thinking the scenario that only delete messages are ingested. For example, users use flink job to ingest delete messages from a source database and output to a target database. was (Author: hequn8128): Hi, the {{Tuple}} input format is a good choice. It is consistent with the {{UpsertStreamTableSink}}. Yes, upsert stream source should have state so that to generate retract messages. As for the problem of deletes on non-inserted keys, if i understand correctly, do you mean caching the unordered delete message until the corresponding insert message comes? I was think the scenario that only delete messages are ingested. For example, users use flink job to ingest delete messages from a source database and output to a target database. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548 ] Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:08 PM: Hi, the {{Tuple}} input format is a good choice. It is consistent with the {{UpsertStreamTableSink}}. Yes, upsert stream source should have state so that to generate retract messages. As for the problem of deletes on non-inserted keys, if i understand correctly, do you mean caching the unordered delete message until the corresponding insert message comes? I was think the scenario that only delete messages are ingested. For example, users use flink job to ingest delete messages from a source database and output to a target database. was (Author: hequn8128): Hi, the {{Tuple}} input format is a good choice. It is consistent with the {{UpsertStreamTableSink}}. Yes, upsert stream source should have state so that to generate retract messages. As for the problem of deletes on non-inserted keys, if i understand correctly, do you mean caching the unordered delete message until the corresponding insert message comes? I was think the scenario that only delete messages are ingested. For example, users use flink job to sync delete messages from a source database to a target database. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724 ] Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:30 AM: Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the {{DataStream}} to {{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we need some refactors to adapt this. I will think more about it. Thanks. was (Author: hequn8128): Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the {{DataStream}} to {{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724 ] Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:29 AM: Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the {{DataStream}} to {{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. was (Author: hequn8128): Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the \{{DataStream}} to \{{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)