Hi, I assume the flatMap(new RecordSplit()) is emitting a RawRecord. Is it possible that you've also added an empty constructor to it while adding the compareTo() method?
I think the problem is that one of your types (probably the schema) is recognized as a nested POJO. Check out this documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/api_concepts.html#define-keys-using-field-expressions On Thu, Mar 23, 2017 at 4:35 AM, Telco Phone <tel...@yahoo.com> wrote: > > Getting this: > > DataStream<RawRecord> stream = > env.addSource(new FlinkKafkaConsumer08<>("raw", schema, > properties) > ).setParallelism(30).flatMap(new > RecordSplit()).setParallelism(30). > name("Raw splitter").keyBy("id","keyByHe > lper","schema"); > > Field expression must be equal to '*' or '_' for non-composite types. > org.apache.flink.api.common.operators.Keys$ExpressionKeys.< > init>(Keys.java:342) > org.apache.flink.streaming.api.datastream.DataStream.keyBy( > DataStream.java:273) > com.company.ingest.stream.RawRecord.main(RawRecord.java:38) > > I did add a new long compare > > @Override > public int compareTo(SchemaRecord o) { > return Long.compare(this.keyByHelper, o.keyByHelper); > > > I can't seem to get by this error... > > > >