Hey Paul! Thanks for reporting the issue. I'm trying to reproduce the problem. I'll post the updates here.
Which version of Flink are you using? You probably meant that you were using Flink 0.8.1 not Maven 8.1, right? ;-) On 13 May 2015, at 13:16, Pa Rö <paul.roewer1...@googlemail.com> wrote: > my function code: > private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment > env) { > // load properties > Properties pro = new Properties(); > try { > pro.load(new FileInputStream("./resources/config.properties")); > } catch (Exception e) { > e.printStackTrace(); > } > String inputFile = pro.getProperty("input"); > // map csv file > return env.readCsvFile(inputFile) > .ignoreInvalidLines() > .fieldDelimiter('\u0009') > .lineDelimiter("\n") > .includeFields(true, true, false, false, false, false, false, > false, false, false, false > , false, false, false, false, false, false, false, false, > false, false > , false, false, false, false, false, false, false, false, > false, false > , false, false, false, false, false, false, false, false, > true, true > , false, false, false, false, false, false, false, false, > false, false > , false, false, false, false, false, false, false) > .types(String.class, Long.class, Double.class, Double.class) > .map(new TuplePointConverter()); > } > > and i use the GDET data from here: > > http://data.gdeltproject.org/events/index.html > > 2015-05-13 13:09 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > hi, > > i read a csv file from disk with flink (java, maven version 8.1) and get the > following exception: > > ERROR operators.DataSinkTask: Error in user code: Channel received an event > before completing the current partial record.: DataSink(Print to System.out) > (4/4) > java.lang.IllegalStateException: Channel received an event before completing > the current partial record. > at > org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) > at > org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) > at > org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) > at java.lang.Thread.run(Thread.java:745) > > my code: > > public class FlinkMain { > > public static void main(String[] args) { > // set up execution environment > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > //env.setDegreeOfParallelism(1); > // get input points > DataSet<GeoTimeDataTupel> points = getPointDataSet(env); > points.print(); > // execute program > try { > env.execute("KMeans Flink"); > } catch (Exception e) { > e.printStackTrace(); > } > } > > maybe someone have a solution? > > best regards paul >