Hi Akil,

It didnt work. Here is the code...


package com.paypal;

import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;

import com.google.common.collect.Lists;

import org.apache.spark.streaming.receiver.Receiver;
import scala.Tuple2;

import java.net.ConnectException;
import java.net.Socket;
import java.util.Arrays;
import java.util.regex.Pattern;
import java.io.*;
/**
 * Hello world!
 *
 */
public class App3
{
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) {

        // Create the context with a 1 second batch size
        SparkConf sparkConf = new
SparkConf().setAppName("JavaNetworkWordCount");

        // ******* always give local[4] to execute and see the output
        JavaStreamingContext ssc = new JavaStreamingContext("local[4]",
"JavaNetworkWordCount",  new Duration(5000));

// throws an error saying requires JavaPairDstream and not JavaDstream.
        JavaDStream<String> lines =
ssc.fileStream("/Users/../Desktop/alarms.log");
        JavaDStream<String> words = lines.flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterable<String> call(String s) {
                        return Arrays.asList(s.split(" "));
                    }
                }
        );

        JavaPairDStream<String, Integer> ones = words.map(
                new Function<String, Integer>() {
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2(s, 1);
                    }
                }
        );

        JavaPairDStream<String, Integer> counts = ones.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                }
        );


        System.out.println("Hello world");
        wordCounts.print();

        ssc.start();
        ssc.awaitTermination();
    }


}

I am not able to figure out how to type cast the objects of Type
JavaPairDStream to JDstream. Can you provide me a working code for the same.
Thanks in advance. 

Regards
Aravindan





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115p9204.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to