[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629167#comment-15629167 ] Aljoscha Krettek commented on FLINK-3659: - Yep ;-) > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > Sketch of the user function: > {code} > interface BroadcastFlatMapFunction { > public void flatMap(IN in, Collector out); > public void processBroadcastInput(BIN in); > } > {code} > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628205#comment-15628205 ] Gyula Fora commented on FLINK-3659: --- I agree that we should try to keep the API as it is now (if possible), and just do runtime checks whether the user is reading/updating the correct types of states from the correct inputs. This goes back to the direction of your original PR with some additional runtime checks. > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > Sketch of the user function: > {code} > interface BroadcastFlatMapFunction { > public void flatMap(IN in, Collector out); > public void processBroadcastInput(BIN in); > } > {code} > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626962#comment-15626962 ] Jamie Grier commented on FLINK-3659: I would like to suggest that rather than adding a new API method, connectWithBroadcast(), we just enable this functionality via the current API and check access to state variants at runtime. In other words all of the following will work: {code:java} DataStream stream = new DataStream(); DataStream keyedStream = new DataStream().keyBy("..."); DataStream broadcastStream = new DataStream().broadcast(); stream.connect(stream); stream.connect(keyedStream); stream.connect(broadcastStream); keyedStream.connect(stream); keyedStream.connect(keyedStream); keyedStream.connect(broadcastStream); broadcastStream.connect(stream); broadcastStream.connect(keyedStream); broadcastStream.connect(broadcastStream); {code} ... and based on the actual input types to the LHS and RHS of these connected streams we check at runtime what they can do. For example: {code:java} keyedStream.connect(broadcastStream).flatMap(...) {code} In the above the user can access the keyed and broadcast state from his flatMap1() method (LHS), and can access only broadcast state from his flatMap2() method (RHS). The reason I suggest this is that it keeps the API simpler and more intuitive and there aren't any new APIs to learn -- other than for the new broadcast state access itself. People are already building things exactly this way -- they are just being forced to use Checkpointed to make their state fault-tolerant. This allows the same API as before just with some additional capabilities and this will work with re-scalable state properly. In a future version of Flink (2.0+) maybe we can start to think about @annotation based APIs more like the current Beam approach which I think is very nice. It allows both flexible and dynamic API evolution as well as "static" verification. Anyway maybe in the future we could do something more like this: {code:java} class MyCoFlatMap { // doesn't even need to extend anything @FlatMap(input=KeyedStream) void dataFunc(@BroadcastState("name") String s, @KeyedState("name") Integer i) { … } @FlatMap(input=BroadcastStream) void controlFunc(@BroadcastState("name") String s) { … } } {code} > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > Sketch of the user function: > {code} > interface BroadcastFlatMapFunction { > public void flatMap(IN in, Collector out); > public void processBroadcastInput(BIN in); > } > {code} > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15625748#comment-15625748 ] Aljoscha Krettek commented on FLINK-3659: - [~gyfora] Yes, access to keyed state would throw an exception when not processing elements from the keyed input. I would also assume that the broadcast state is the same on all nodes for checkpointing. > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > Sketch of the user function: > {code} > interface BroadcastFlatMapFunction { > public void flatMap(IN in, Collector out); > public void processBroadcastInput(BIN in); > } > {code} > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15625089#comment-15625089 ] Gyula Fora commented on FLINK-3659: --- (I have a workshop after lunch so might be slow to responds :() > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15625086#comment-15625086 ] Gyula Fora commented on FLINK-3659: --- Hi, Would this work in a way as connected streams work now but without the partitioning limitation and the Value state access would be "blocked" by some flag when we are in the method for processing the broadcast input. Or do we assume the broadcast state to be the same on all nodes for checkpointing? > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15625003#comment-15625003 ] Aljoscha Krettek commented on FLINK-3659: - [~gyfora] I updated the issue, what do you think about this? > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)