There you have your explanation. A loop actually has to be a loop for it to work in Flink.
On Sat, 14 May 2016 at 16:35 subash basnet <yasub...@gmail.com> wrote: > Hello, > > I had to use, > private static IterativeStream<Centroid> *loop*; > loop as global variable because I cannot broadcast it like that of DataSet > API in DataStream API. > > I tried to use *closewith * like that of DataSet as below in DataStream > but it gives me exception: > DataStream<Centroid> finalCentroids = *loop*.closeWith(newCentroids); > > > Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot > close an iteration with a feedback DataStream that does not originate from > said iteration.* > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75) > at wikiedits.StockAnalysis.main(StockAnalysis.java:64) > > > Best Regards, > Subash Basnet > > > On Sat, May 14, 2016 at 4:26 PM, subash basnet <yasub...@gmail.com> wrote: > >> Hello Aljoscha, >> >> Below is the shorted version of StockAnalysis class which is a datastream >> adapation of the *KMeans.java* dataset code. >> >> public class StockAnalysis{ >> public static void main(String args[]){ >> DataStream<Centroid> centroids = newCentroidDataStream.map(new >> TupleCentroidConverter()); >> *loop* = centroids.iterate(10); >> DataStream<Centroid> newCentroids = points.map(new >> SelectNearestCenter()).map(new CountAppender()).keyBy(0) >> .reduce(new CentroidAccumulator()).map(new CentroidAverager()); >> public static final class SelectNearestCenter extends >> RichMapFunction<Point, Tuple2<String, Point>> { >> private Collection<Centroid> centroids; >> @Override >> public void open(Configuration parameters) throws Exception { >> Iterator<Centroid> iter = DataStreamUtils.collect(*loop*); >> this.*centroids* = Lists.newArrayList(iter); >> } >> @Override >> public Tuple2<String, Point> map(Point p) throws Exception { >> for (Centroid centroid : *centroids*) { >> }................... >> } >> } >> } >> >> } >> >> >> On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> [image: Boxbe] <https://www.boxbe.com/overview> This message is >>> eligible for Automatic Cleanup! (aljos...@apache.org) Add cleanup rule >>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DgC%252BkGO7SbhS%252FoALSDkW8dBXumMPYysp%252B5FWf%252FX8whAVqYWiJqTOpC2fjBOdm%252BrZr6ZTM6BmqH1lYr8kUEWi3BxO7RFl%252FqJC2kUoaP4Q2L98wc9thjH6dY6QYn7ZQ6hN0GCi5xDFMhOo%253D%26key%3DNwIHY0Ppe%252BKHFaaQd88hYlg52OwTtztNKydGoopQE7I%253D&tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>> | More info >>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>> >>> Could you please post your code. >>> >>> On Sat, 7 May 2016 at 19:16 subash basnet <yasub...@gmail.com> wrote: >>> >>>> Hello all, >>>> >>>> I am getting the below error on execute of StreamExecutionEnvironment. >>>> >>>> >>>> *Caused by: java.lang.IllegalStateException: Iteration >>>> FeedbackTransformation{id=15, name='Feedback', >>>> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, >>>> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback >>>> edges.* >>>> The run method inside the thread class of DataStreamUtils handles this >>>> exception: >>>> @Override >>>> public void run(){ >>>> try { >>>> stream.getExecutionEnvironment().execute(); >>>> } catch (Exception e) { >>>> throw new RuntimeException("Exception in execute()", e); >>>> } >>>> } >>>> >>>> I am not able to understand what to infer from this error message so >>>> that I could solve it. >>>> >>>> >>>> Best Regards, >>>> Subash Basnet >>>> >>> >>> >> >