I'm trying to build a sample application using Flink that does the following:
1. Reads a stream of stock symbols (e.g. 'CSCO', 'FB') from a Kafka queue
2. For each symbol performs a real-time lookup of current prices and streams
the values
The program compiles fine but I get the following run-time error message:
"The implementation of the MapFunction is not serializable. The object
probably contains or references non serializable fields".
I suspect the problem is due to the way I'm accessing the
StreamExecutionEnvironment. Could someone please provide pointers to how I
can use values from a data stream to create a new streaming data source? Any
response is appreciated.
Relevant code snippet is provided below:
public class RetrieveStockPrices {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamExecEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "stocks");
DataStream<String> streamOfStockSymbols =
streamExecEnv.addSource(new
FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(),
properties));
streamOfStockSymbols.map(new MapFunction<String, String> () {
@Override
public String map(String stockSymbol) throws Exception {
DataStream<String> stockPrices =
streamExecEnv.addSource(new
LookupStockPrice(stockSymbol));
stockPrices.print();
return null;
}
});
streamExecEnv.execute("Retrieve Stock Prices");
}
}
public class LookupStockPrice extends RichSourceFunction<String> {
public String stockSymbol = null;
public boolean isRunning = true;
public LookupStockPrice(String inSymbol) {
stockSymbol = inSymbol;
}
@Override
public void open(Configuration parameters) throws Exception {
isRunning = true;
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void run(SourceFunction.SourceContext<String> ctx)
throws Exception {
while (isRunning) {
//TODO: query Google Finance API
ctx.collect("12.5");
}
}
}
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-do-I-use-values-from-a-data-stream-to-create-a-new-streaming-data-source-tp10680.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.