Hi, I'm using spark streaming to process data. I do a simple flatMap on each record as follows
package bb; import java.io.*; import java.net.*; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.List; import java.util.ArrayList; import java.util.Scanner; import java.util.StringTokenizer; import java.util.HashMap; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.*; import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; public class ReceiverTest { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("Receiver Test"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2000)); //JavaReceiverInputDStream<String> inputStream = jssc.receiverStream(new JavaCustomReceiver("localhost", 3081)); JavaReceiverInputDStream<String> inputStream = jssc.socketTextStream("localhost", 3081); JavaPairDStream<String, String> bab = inputStream.flatMapToPair(new PairFlatMapFunction<String, String, String>() { @Override public Iterable<Tuple2<String, String>> call(String t) { ArrayList<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>(); System.out.println(t); if (t.length() > 2) return ret; for (int i = 0; i < 5; ++i) { String str = t + i; ret.add(new Tuple2("", str)); Socket socket = null; PrintWriter pw = null; try { socket=new Socket("spark000",3081); pw = new PrintWriter(socket.getOutputStream()); pw.println(str); pw.flush(); } catch (Exception e) { e.printStackTrace(); } finally { try { pw.close(); socket.close(); } catch (Exception e) { e.printStackTrace(); } } } return ret; } }); bab.print(); jssc.start(); jssc.awaitTermination(); System.exit(0); } } For each record s, {"s0", "s1", "s2", "s3", "s4"} will be produced if length(s) <= 2. The new records are sent back to the socket server to be further processed. At the start, I send an "1" to the socket server. In the worker log (I'm using one machine so there is only one worker), I expect to see the following result (may appear in different order I think): 1 10 11 12 13 14 100 101 102 103 104 110 111 112 ... 143 144 However, the result is as follows 1 10 11 12 100 101 102 103 104 110 111 112 113 114 120 121 122 123 124 Namely "13" and "14" are not processed in flatMap. I've also tried a custom receiver(the first sample in https://spark.apache.org/docs/1.4.0/streaming-custom-receivers.html) instead of socketTextStream(). The guide says that store(single-record) is not reliable but store(multiple-records) is reliable, thus I changed that part to while (!isStopped() && (userInput = reader.readLine()) != null) { ArrayList<String> lt = new ArrayList<String>(); lt.add(userInput); store(lt.iterator()); } But I still see data loss. The socket server did receive all the data, but not all of them are stored to spark by store(). Can anyone spot the problem? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Receiver-drops-data-tp23529.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org