public class SparkDriverOutline {

    private static JavaStreamingContext createContext() {
        SparkConf conf = new SparkConf().setAppName("MongoDB Event Stream");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3));
        MongoDBReceiver receiver = new MongoDBReceiver(/* Some config */);
        JavaReceiverInputDStream<BSONObject> mongoDBReceiverStream = jssc.receiverStream(receiver);

        // Parse lines into JSONEvents
        JavaDStream<DBEvent> events = mongoDBReceiverStream.map(new Function<BSONObject, DBEvent>() {
            public DBEvent call(BSONObject bsonObject) throws Exception {
                // Parse the BSON to create a DBEvent
            }
        });

        // Make a Pair stream mapping id to event
        JavaPairDStream<String, DBEvent> event_tuples = events.mapToPair(new PairFunction<DBEvent, String, DBEvent>() {
            public Tuple2<String, DBEvent> call(DBEvent event) {
                // Map id to event
            }
        });

        // Partition the stream ready for the groupBy step
        JavaPairDStream<String, DBEvent> event_tuples_partitioned = event_tuples.transformToPair(new Function<JavaPairRDD<String, DBEvent>, JavaPairRDD<String, DBEvent>>() {
            public JavaPairRDD<String, DBEvent> call(JavaPairRDD<String, DBEvent> in) throws Exception {
                return in.partitionBy(new HashPartitioner(10));
            }
        });

        // Persist the DStream
        event_tuples_partitioned.persist();
        // Group by key
        JavaPairDStream<String, Iterable<DBEvent>> events_grouped_by_key = event_tuples_partitioned.groupByKey();

        // Process the RDDs
        events_grouped_by_key.foreachRDD(new Function<JavaPairRDD<String, Iterable<DBEvent>>, Void>() {
            public Void call(JavaPairRDD<String, Iterable<DBEvent>> rdd) throws Exception {
                rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Iterable<DBEvent>>>>() {
                    public void call(Iterator<Tuple2<String, Iterable<DBEvent>>> partitionRecords) throws Exception {
                        while(partitionRecords.hasNext()){
                            Tuple2<String, Iterable<DBEvent>> rec = partitionRecords.next();
                            // Do very little
                        }
                    }
                });
                rdd.unpersist();
                return null;
            }
        });
        return jssc;
    }

}
