Hello Flinksters, I am trying to use Flinkspector in a Scala code snippet of mine and Flink is complaining. The code is here:
--------------------------------------------------------------------------------------------------------------- case class Reading(field1:String,field2:String,field3:Int) object MultiWindowing { def main(args: Array[String]) {} // WindowFunction<IN,OUT,KEY,W extends Window> class WindowPrinter extends WindowFunction[Reading, String, String, TimeWindow] { // ..... } } val env = DataStreamTestEnvironment.createTestEnvironment(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val input: EventTimeInput[Reading] = EventTimeInputBuilder .startWith(Reading("hans", "elephant", 15)) .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS)) .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS)) //acquire data source from input val stream = env.fromInput(input) //apply transformation val k = stream.keyBy(new KeySelector [Reading,String] { def getKey(r:Reading) = r.field2 }) .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES)) k.sum(3) .print() env.execute() } --------------------------------------------------------------------------------------------------------------- And at runtime, I get this error: ---------------------------------------------------------------------------------------------------------------- Exception in thread "main" java.lang.ExceptionInInitializerError at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array). at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76) at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37) at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373) at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63) at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala) ... 6 more --------------------------------------------------------------------------------------------------------------- Can someone help me by pointing out the mistake I am making? -- Nirmalya -- Software Technologist http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them."