I’m sure this should work, but I’m missing something… I searched the archive 
first, but didn’t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This type 
(GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be 
used as key.

I’m just getting started with a 1.0 implementation of a new task. It’s a pretty 
straightforward reduce job, but I’m running into a snag with creating a 
KeyedStream.

Here’s the graph:
        StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
        see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<TimesliceData> dataStream = see.addSource(new 
FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
 new TimesliceDeserializer(), kafkaConsumerProperties));

        SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = 
dataStream
                .assignTimestampsAndWatermarks(new 
TimesliceTimestampExtractor())
                .flatMap(new TimesliceMapper());

        flattenedDataStream
                .keyBy("accountId", "agentId", "wideMetricId")
                .timeWindow(Time.seconds(60))
                .reduce(AggregatableTimeslice::aggregateWith)
                .print();

This fails on keyBy() with the message: 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: 
This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) 
cannot be used as key.
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, 
AggregatableTimeslice>, namely
public class TimesliceMapper implements FlatMapFunction<TimesliceData, 
AggregatableTimeslice> {
    @Override
    public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> 
out) throws Exception {
        for (Timeslice timeslice : value.getTimeslices()) {
            out.collect(new AggregatableTimesliceImpl(timeslice, value, 
value.getAgentId()));
        }
    }
}
AggregatableTimesliceImpl is a simple concrete implementation of the 
AggregatableTimeslice interface:
public interface AggregatableTimeslice {
    int getAccountId();
    int getAgentId();
    long getWideMetricId();
    AggregatableTimesliceStats getTimesliceStats();
}
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835

Reply via email to