There you have your explanation. A loop actually has to be a loop for it to
work in Flink.

On Sat, 14 May 2016 at 16:35 subash basnet <yasub...@gmail.com> wrote:

> Hello,
>
> I had to use,
> private static IterativeStream<Centroid> *loop*;
> loop as global variable because I cannot broadcast it like that of DataSet
> API in DataStream API.
>
> I tried to use *closewith * like that of DataSet as below in DataStream
> but it gives me exception:
> DataStream<Centroid> finalCentroids = *loop*.closeWith(newCentroids);
>
>
> Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot
> close an iteration with a feedback DataStream that does not originate from
> said iteration.*
> at
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
> at wikiedits.StockAnalysis.main(StockAnalysis.java:64)
>
>
> Best Regards,
> Subash Basnet
>
>
> On Sat, May 14, 2016 at 4:26 PM, subash basnet <yasub...@gmail.com> wrote:
>
>> Hello Aljoscha,
>>
>> Below is the shorted version of StockAnalysis class which is a datastream
>> adapation of the *KMeans.java* dataset code.
>>
>> public class StockAnalysis{
>>     public static void main(String args[]){
>>        DataStream<Centroid> centroids = newCentroidDataStream.map(new
>> TupleCentroidConverter());
>>   *loop* = centroids.iterate(10);
>>   DataStream<Centroid> newCentroids = points.map(new
>> SelectNearestCenter()).map(new CountAppender()).keyBy(0)
>> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
>>       public static final class SelectNearestCenter extends
>> RichMapFunction<Point, Tuple2<String, Point>> {
>> private Collection<Centroid> centroids;
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> Iterator<Centroid> iter = DataStreamUtils.collect(*loop*);
>> this.*centroids* = Lists.newArrayList(iter);
>> }
>>                @Override
>> public Tuple2<String, Point> map(Point p) throws Exception {
>>                      for (Centroid centroid : *centroids*) {
>>                      }...................
>>                 }
>>      }
>>    }
>>
>> }
>>
>>
>> On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is
>>> eligible for Automatic Cleanup! (aljos...@apache.org) Add cleanup rule
>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DgC%252BkGO7SbhS%252FoALSDkW8dBXumMPYysp%252B5FWf%252FX8whAVqYWiJqTOpC2fjBOdm%252BrZr6ZTM6BmqH1lYr8kUEWi3BxO7RFl%252FqJC2kUoaP4Q2L98wc9thjH6dY6QYn7ZQ6hN0GCi5xDFMhOo%253D%26key%3DNwIHY0Ppe%252BKHFaaQd88hYlg52OwTtztNKydGoopQE7I%253D&tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>> | More info
>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>
>>> Could you please post your code.
>>>
>>> On Sat, 7 May 2016 at 19:16 subash basnet <yasub...@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I am getting the below error on execute of StreamExecutionEnvironment.
>>>>
>>>>
>>>> *Caused by: java.lang.IllegalStateException: Iteration
>>>> FeedbackTransformation{id=15, name='Feedback',
>>>> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
>>>> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
>>>> edges.*
>>>> The run method inside the thread class of DataStreamUtils handles this
>>>> exception:
>>>> @Override
>>>> public void run(){
>>>> try {
>>>> stream.getExecutionEnvironment().execute();
>>>> } catch (Exception e) {
>>>> throw new RuntimeException("Exception in execute()", e);
>>>> }
>>>> }
>>>>
>>>> I am not able to understand what to infer from this error message so
>>>> that I could solve it.
>>>>
>>>>
>>>> Best Regards,
>>>> Subash Basnet
>>>>
>>>
>>>
>>
>

Reply via email to