The following code seems to do what I want. I repartition on RDD not
DStreams. I wonder if this has to do with the way windows work?
private static void saveTweetsCSV(JavaSparkContext jsc,
JavaDStream<TidyPojo> tidy, String outputURI) {
tidy.foreachRDD(new VoidFunction2<JavaRDD< TidyPojo >, Time> () {
private static final long serialVersionUID = 1L;
// typically we use the CSV file format for data a human needs
to work with
// We want to repartition the data so that we write the smallest
number
// of files possible how ever the max number of rows in a given
csv
// file is small enough for a human to work with easily.
final long maxNumRowsPerFile = 100;
@Override
public void call(JavaRDD<TidyPojo> rdd, Time time) throws
Exception {
long count = rdd.count();
//if(!rdd.isEmpty()) {
if (count > 0) {
long numPartisions = count / maxNumRowsPerFile + 1;
Long tmp = numPartisions;
rdd = rdd.repartition(tmp.intValue());
String dirPath = outputURI + "_CSV" + "-" +
time.milliseconds();
//
http://spark.apache.org/docs/latest/streaming-programming-guide.html#datafra
me-and-sql-operations
// Get the singleton instance of SQLContext
SQLContext sqlContext =
SQLContext.getOrCreate(rdd.context());
DataFrame df = sqlContext.createDataFrame(rdd,
TidyTwitterMLPojo.class);
TidyPojo.saveCSV(df, dirPath);
}
}
});
}
From: Andrew Davidson <[email protected]>
Date: Friday, January 29, 2016 at 1:54 PM
To: "user @spark" <[email protected]>
Subject: How to use DStream<T> reparation() ?
> My Streaming app has a requirement that my output be saved in the smallest
> number of file possible such that each file does not exceed a max number of
> rows. Based on my experience it appears that each partition will be written to
> separate output file.
>
> This was really easy to do in my batch processing using data frames and RDD.
> Its easy to call count() and then decide how many partitions I want and
> finally call repartition().
>
> I am having heck of time trying to figure out to do the same thing using spark
> streaming.
>
>
> JavaDStream<Pojo> tidy =
>
> JavaDStream<Long> counts = tidy.count();
>
>
>
> Bellow is the documentation for count. I do not see how I can use this to
> figure out how many partitions I need? Stream does not provide a collect().
> foreachRDD() can not return a value. I tried using an accumulator but that did
> not work
>
>
>
> Any suggestions would be greatly appreciated
>
>
> http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/strea
> ming/api/java/JavaDStream.html
> count
> JavaDStream
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/j
> ava/JavaDStream.html> <java.lang.Long> count()
> Return a new DStream in which each RDD has a single element generated by
> counting each RDD of this DStream.
> Returns:(undocumented)
>