Thanks a lot for the replies.

Below I paste my code:


        DataStreamSource<Tuple> source = env.addSource(new MySource());
        KeyedStream<Tuple, Integer> keyedStream =
DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(),
TypeInformation.of(Integer.class) );
        keyedStream.timeWindow(Time.seconds(1)).apply(new
WindowFunction<Tuple, Object, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow timeWindow,
Iterable<Tuple> iterable, Collector<Object> collector) throws Exception {
                collector.collect(1);
            }
        });
        env.execute("Test");

    static class DummyKeySelector implements KeySelector<Tuple, Integer> {

        @Override
        public Integer getKey(Tuple value) throws Exception {
            return value.getSourceID();
        }
    }

    static class MySource extends RichParallelSourceFunction<Tuple> {
        public MySource() {
            this.sourceID = sourceID;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            sourceID = sourceID +
getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void run(SourceContext<Tuple> ctx) throws Exception {
            while (true) {
                Tuple tuple = new Tuple(sourceID);
                ctx.collect(tuple);
            }
        }

        @Override
        public void cancel() {

        }
    }


Whatever I do, I get
Caused by: java.lang.IllegalArgumentException
    at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)

When I check the details from the source code, it seems that some keys are
not within allowed key range, that is why Flink throws an exception.
In this case, as Konstantin said, it is not possible to interpret source as
keyed.
Please correct me if I am wrong.


Thanks,
Adrienne







On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf <konstan...@ververica.com>
wrote:

> Hi Adrienne,
>
> you can only use DataStream#reinterpretAsKeyedStream on a stream, which
> has previously been keyed/partitioned by Flink with exactly the same
> KeySelector as given to reinterpretAsKeyedStream. It does not work with a
> key-partitioned stream, which has been partitioned by any other process.
>
> Best,
>
> Konstantin
>
> On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <walter...@gmail.com> wrote:
>
>> Hi Adrienne,
>>
>> I think you should be able to reinterpretAsKeyedStream by passing in a
>> DataStreamSource based on the ITCase example [1].
>> Can you share the full code/error logs or the IAE?
>>
>> --
>> Rong
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98
>>
>> On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <adrienneko...@gmail.com>
>> wrote:
>>
>>> Dear community,
>>>
>>> I have a use-case where sources are keyed.
>>> For example, there is a source function with parallelism 10, and each
>>> instance has its own key.
>>> I used reinterpretAsKeyedStream to convert source DataStream to
>>> KeyedStream, however, I get an IllegalArgument exception.
>>> Is reinterpretAsKeyedStream can be used with source operators as well,
>>> or should the operator to be used be already partitioned (by keyby(..)) ?
>>>
>>> Thanks,
>>> Adrienne
>>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Reply via email to