I’m sure this should work, but I’m missing something… I searched the archive
first, but didn’t have much luck finding any insights there.
TL;DR: org.apache.flink.api.common.InvalidProgramException: This type
(GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be
used as key.
I’m just getting started with a 1.0 implementation of a new task. It’s a pretty
straightforward reduce job, but I’m running into a snag with creating a
KeyedStream.
Here’s the graph:
StreamExecutionEnvironment see =
StreamExecutionEnvironment.getExecutionEnvironment();
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<TimesliceData> dataStream = see.addSource(new
FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
new TimesliceDeserializer(), kafkaConsumerProperties));
SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream =
dataStream
.assignTimestampsAndWatermarks(new
TimesliceTimestampExtractor())
.flatMap(new TimesliceMapper());
flattenedDataStream
.keyBy("accountId", "agentId", "wideMetricId")
.timeWindow(Time.seconds(60))
.reduce(AggregatableTimeslice::aggregateWith)
.print();
This fails on keyBy() with the message:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException:
This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>)
cannot be used as key.
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData,
AggregatableTimeslice>, namely
public class TimesliceMapper implements FlatMapFunction<TimesliceData,
AggregatableTimeslice> {
@Override
public void flatMap(TimesliceData value, Collector<AggregatableTimeslice>
out) throws Exception {
for (Timeslice timeslice : value.getTimeslices()) {
out.collect(new AggregatableTimesliceImpl(timeslice, value,
value.getAgentId()));
}
}
}
AggregatableTimesliceImpl is a simple concrete implementation of the
AggregatableTimeslice interface:
public interface AggregatableTimeslice {
int getAccountId();
int getAgentId();
long getWideMetricId();
AggregatableTimesliceStats getTimesliceStats();
}
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
[email protected]
M: +1 630 363 8835