Hi James, Can you elaborate why the "Broadcast State Pattern"[1] does not work for you? I'd definitely recommend that approach.
I highly discourage this usage, but if you insist you could copy over the ConnectedStreams#transform method and remove the check that guards both sides of the operator are either keyed or non-keyed. Best, Dawid [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/ On 06/09/2021 18:02, James Sandys-Lumsdaine wrote: > Hello, > > I have a Flink workflow which is partitioned on a key common to all > the stream objects and a key that is best suited to the high volume of > data I am processing. I now want to add in a new stream of prices that > I want to make available to all partitioned streams - however, this > new stream of prices does not have this common keyBy value. > > I have tried writing a piece of code using then broadcast() method (no > args) to get this new price stream to be sent to all the parallel > instances on an operator. The code looks like this: > > KeyedStream<RefData> keyedRefDataStream = ....; > > DataStream<Price> prices = ....; > DataStream<Price> broadcastPrices = prices.broadcast(); > > keyedRefDataStream > .connect(broadcastPrices) > .process(new RefDataPriceJoiner()); // implements > KeyedCoProcessFunction > > I then get an error saying the broadcastPrices stream must be keyed - > but I can't key it on the same key as the refData stream because it > lacks this field. > > I could reshuffle all my data by re-keying the ref data on a different > field but this will cause a huge amount of data to be sent over the > network compared with me being able to broadcast this much smaller > amount of data to my keyed streams. Note I am assuming this isn't a > "broadcast state" example - I assume the broadcast() method allows me > to send data to all partitions. > > Is any of this possible? Any pointers for me would be very helpful as > I can't find answer on the web or in the documentation. > > Many thanks, > > James.
OpenPGP_signature
Description: OpenPGP digital signature