Hello Flinksters,

I am trying to use Flinkspector in a Scala code snippet of mine and Flink
is complaining. The code is here:

---------------------------------------------------------------------------------------------------------------

case class Reading(field1:String,field2:String,field3:Int)

object MultiWindowing {

  def main(args: Array[String]) {}

  //  WindowFunction<IN,OUT,KEY,W extends Window>

  class WindowPrinter extends WindowFunction[Reading, String, String,
TimeWindow] {

      //  .....
    }
  }

  val env = DataStreamTestEnvironment.createTestEnvironment(1)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val input: EventTimeInput[Reading]  =
    EventTimeInputBuilder
    .startWith(Reading("hans", "elephant", 15))
    .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
    .emit(Reading("pete", "elephant", 40), After.period(20,
TimeUnit.SECONDS))

  //acquire data source from input
  val stream = env.fromInput(input)

  //apply transformation
  val k = stream.keyBy(new KeySelector [Reading,String] {
    def getKey(r:Reading) =  r.field2
  })
    .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))

    k.sum(3)
    .print()

  env.execute()

}

---------------------------------------------------------------------------------------------------------------

And at runtime, I get this error:

----------------------------------------------------------------------------------------------------------------

Exception in thread "main" java.lang.ExceptionInInitializerError
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for
a simple type (non-tuple, non-array).
at
org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
at
org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
at
org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
... 6 more


---------------------------------------------------------------------------------------------------------------

Can someone help me by pointing out the mistake I am making?

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Reply via email to