Hi Dan,
if there is no way of setting a uid(), then it sounds like a bug in the
API that should be fixed. Feel free to open an issue for it.
Regards,
Timo
On 13.12.21 08:19, Schwalbe Matthias wrote:
Hi Dan,
When I run into such problem I consider using the not so @public api levels:
* First of all uids are especially needed for operator that hold state
and is not so important for operators that don’t hold state
primitives, not sure of the implications created by
disableAutoGeneratedUIDs
* A DataStream is actually a Transformation[] assigned to the
StreamEnvironment (see DataStream#getTransformation())
* You can assign name() and uid() directly to Transformations
* Transformations export their input transformation:
Transformation@getInputs()
* This this ways you can locate the two Map transformations and assign
uids
* However the two maps are stateless and technically don’t need a uid
What do you think?
Thias
*From:*Dan Hill <quietgol...@gmail.com>
*Sent:* Montag, 13. Dezember 2021 06:30
*To:* user <user@flink.apache.org>
*Subject:* CoGroupedStreams and disableAutoGeneratedUIDs
Hi. I tried to use CoGroupedStreams w/ disableAutoGeneratedUIDs.
CoGroupedStreams creates two map operators without the ability to set
uids on them. These appear as "Map" in my operator graph. I noticed
that the CoGroupedStreams.apply function has two map calls without
setting uids. If I try to run with disableAutoGeneratedUIDs, I get the
following error "java.lang.IllegalStateException: Auto generated UIDs
have been disabled but no UID or hash has been assigned to operator Map".
How can I fix this? Extend the base CoGroupedStreams class?
```
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function,
TypeInformation<T> resultType) {
function =
(CoGroupFunction)this.input1.getExecutionEnvironment().clean(function);
CoGroupedStreams.UnionTypeInfo<T1, T2> unionType = new
CoGroupedStreams.UnionTypeInfo(this.input1.getType(),
this.input2.getType());
CoGroupedStreams.UnionKeySelector<T1, T2, KEY> unionKeySelector =
new CoGroupedStreams.UnionKeySelector(this.keySelector1, this.keySelector2);
DataStream<CoGroupedStreams.TaggedUnion<T1, T2>> taggedInput1 =
this.input1.map(new
CoGroupedStreams.Input1Tagger()).setParallelism(this.input1.getParallelism()).returns(unionType);
DataStream<CoGroupedStreams.TaggedUnion<T1, T2>> taggedInput2 =
this.input2.map(new
CoGroupedStreams.Input2Tagger()).setParallelism(this.input2.getParallelism()).returns(unionType);
DataStream<CoGroupedStreams.TaggedUnion<T1, T2>> unionStream =
taggedInput1.union(new DataStream[]{taggedInput2});
this.windowedStream = (new KeyedStream(unionStream,
unionKeySelector, this.keyType)).window(this.windowAssigner);
if (this.trigger != null) {
this.windowedStream.trigger(this.trigger);
}
if (this.evictor != null) {
this.windowedStream.evictor(this.evictor);
}
if (this.allowedLateness != null) {
this.windowedStream.allowedLateness(this.allowedLateness);
}
return this.windowedStream.apply(new
CoGroupedStreams.CoGroupWindowFunction(function), resultType);
}
```
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht
sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder
Verbreitung dieser Informationen ist streng verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility
for the confidentiality and the intactness of this message. If you have
received it in error, please advise the sender by return e-mail and
delete this message and any attachments. Any unauthorised use or
dissemination of this information is strictly prohibited.