Hi, I'm following the pattern of filtering data by a certain criteria, and then saving the results to a different table. The code below illustrates the idea. The simple integration test I wrote suggests it works, simply asserting filtered data should be in their respective tables after being filtered. I could be adding more filters in this stream in the future. Is this a bad idea? public void run() { JavaPairReceiverInputDStream<String, Raw> stream = KafkaUtils.createStream(...); //Save to Master Table stream.map((rawTuple) -> { return rawTuple._2; }).foreach((rawRDD) -> { javaFunctions(rawRDD).writerBuilder(keyspace, rawTable, mapToRow(Raw.class)).saveToCassandra(); return null; }); //Process stream again, filter by a criteria, and save to a different table stream.map((rawTuple) -> { return rawTuple._2; }).filter((raw) -> { //Filter by some criteria }).map((raw) -> { return new Criteria1(raw.getSomeField()); }).foreachRDD((rdd) -> { javaFunctions(rdd).writerBuilder(keyspace, criteria1Table, mapToRow(Criteria1.class)).saveToCassandra(); return null; }); //Process stream for 3rd time, by a different criteria, and then save to a different table stream.map((rawTuple) -> { return rawTuple._2; }).filter((raw) -> { //filter by another criteria }).map((raw) -> { return new Criteria2(raw.getOtherField()); }).foreachRDD((rdd) -> { javaFunctions(rdd).writerBuilder(keyspace, criteria2Table, mapToRow(Criteria2.class)).saveToCassandra(); return null; }); streamingContext.start(); }}
-- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Single-stream-with-series-of-transformations-tp22689.html Sent from the Apache Spark User List mailing list archive at Nabble.com.