Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/2703#discussion_r22290121
--- Diff:
streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java ---
@@ -1703,6 +1710,65 @@ public void testTextFileStream() {
JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
}
+
+ @Test
+ public void testFileStream() throws Exception {
+ // Disable manual clock as FileInputDStream does not work with manual
clock
+ System.setProperty("spark.streaming.clock",
"org.apache.spark.streaming.util.SystemClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ // Set up some sequence files for streaming to read in
+ List<Tuple2<Long, Integer>> test_input = new ArrayList<Tuple2<Long,
Integer> >();
+ test_input.add(new Tuple2(1L, 123456));
+ test_input.add(new Tuple2(2L, 123456));
+ JavaPairRDD<Long, Integer> rdd = ssc.sc().parallelizePairs(test_input);
+ File tempDir = Files.createTempDir();
+ JavaPairRDD<LongWritable, IntWritable> saveable = rdd.mapToPair(
+ new PairFunction<Tuple2<Long, Integer>, LongWritable, IntWritable>()
{
+ public Tuple2<LongWritable, IntWritable> call(Tuple2<Long,
Integer> record) {
+ return new Tuple2(new LongWritable(record._1), new
IntWritable(record._2));
+ }});
+ saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/1/",
+ LongWritable.class, IntWritable.class,
+ SequenceFileOutputFormat.class);
+ saveable.saveAsNewAPIHadoopFile(tempDir.getAbsolutePath()+"/2/",
+ LongWritable.class, IntWritable.class,
+ SequenceFileOutputFormat.class);
+
+ // Construct a file stream from the above saved data
+ JavaPairDStream<LongWritable, IntWritable> testRaw = ssc.fileStream(
+ tempDir.getAbsolutePath() + "/" , SequenceFileInputFormat.class,
LongWritable.class,
+ IntWritable.class, false);
+ JavaPairDStream<Long, Integer> test = testRaw.mapToPair(
+ new PairFunction<Tuple2<LongWritable, IntWritable>, Long, Integer>()
{
+ public Tuple2<Long, Integer> call(Tuple2<LongWritable,
IntWritable> input) {
+ return new Tuple2(input._1().get(), input._2().get());
+ }
+ });
+ final Accumulator<Integer> elem = ssc.sc().intAccumulator(0);
--- End diff --
Why is it not possible to just call rdd.count() and add up the counts in a
global counter?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]