Till,

Thanks for looking into this.

I have removed the toList() from the collect() function, to align the code
with what I generally do in a Flink application. It throws an Exception, and
I can't figure out why.

*Here's my code (shortened for brevity):*

case class BuildingInformation(buildingID: Int, buildingManager: Int,
buildingAge: Int, productID: String, country: String)

object HVACReadingsAnalysis {

  def main(args: Array[String]): Unit = {

    val envDefault = ExecutionEnvironment.getExecutionEnvironment

    val buildings =
readBuildingInfo(envDefault,"./SensorFiles/building.csv")

    buildings.print

    envDefault.execute("HVAC Simulation")
  }

  private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String)
= {

   // [NS]: I can see the lines, read correctly from the CSV file here
    println("As read from CSV file")
    println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))

    // [NS]: Then, I read the same file using the library function
   env.readCsvFile [BuildingInformation] (
      inputPath,
      ignoreFirstLine = true,
      pojoFields =
Array("buildingID","buildingManager","buildingAge","productID","country")
    )
  }


*Relevant portion of the output:
*
As read from CSV file
BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
1,M1,25,AC1000,USA#
2,M2,27,FN39TG,France#
3,M3,28,JDNS77,Brazil#
4,M4,17,GG1919,Finland#
5,M5,3,ACMAX22,Hong Kong#
6,M6,9,AC1000,Singapore#
7,M7,13,FN39TG,South Africa#
8,M8,25,JDNS77,Australia#
9,M9,11,GG1919,Mexico#
10,M10,23,ACMAX22,China#
11,M11,14,AC1000,Belgium#
12,M12,26,FN39TG,Finland#
13,M13,25,JDNS77,Saudi Arabia#
14,M14,17,GG1919,Germany#
15,M15,19,ACMAX22,Israel#
16,M16,23,AC1000,Turkey#
17,M17,11,FN39TG,Egypt#
18,M18,25,JDNS77,Indonesia#
19,M19,14,GG1919,Canada#
20,M20,19,ACMAX22,Argentina
15:34:18,914 INFO  org.apache.flink.api.java.ExecutionEnvironment               
- The job has 0 registered types and 0 default Kryo serializers
15:34:19,104 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster        
- Starting FlinkMiniCluster.
15:34:19,912 INFO  akka.event.slf4j.Slf4jLogger                                 
- Slf4jLogger started


// ..
// ... more log statements
// ..

Exception in thread "main" java.lang.RuntimeException: No new data sinks
have been defined since the last execution. The last execution refers to the
latest call to 'execute()', 'count()', 'collect()', or 'print()'.
        at
org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
        at
org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
        at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
        at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
        at
main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
        at
main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Process finished with exit code 1




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to