Hi I am trying out a simple piece of code by writing my own JavaNetworkCount app to test out Spark Streaming
So here is the 2 set of the codes. // #1 JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1", 9999); // #2 JavaReceiverInputDStream<String> lines = sctx.socketStream( "127.0.0.1", 9999, new Function<InputStream, Iterable<String>>() { @Override public Iterable<String> call(InputStream arg0) throws Exception { // TODO Auto-generated method stub if(arg0 != null) System.out.println("CALL is called..."); BufferedReader reader = new BufferedReader(new InputStreamReader(arg0)); ArrayList<String> list = new ArrayList<String>(); while(reader.ready()) { String linetext = reader.readLine(); System.out.println(linetext); list.add(linetext); } if(list.size() > 0) System.out.println("ArrayList is not empty."); return list; } }, StorageLevel.MEMORY_AND_DISK_SER_2() ); I am writing the #2 to test out some other issues that I am facing, where the text stream from the TCP host is not received, but this is not my first concern. What I am concern about is. Using .socketTextStream(), the code manage to keep a persistent connection to the TCP host, while for #2 code using .socketStream(), I am unable to maintain a persistent connection. The following is the log printed when I run #2 14/07/10 01:55:42 INFO ReceiverSupervisorImpl: Receiver started again 14/07/10 01:55:43 INFO SocketReceiver: Connected to 127.0.0.1:9999 CALL is called... 14/07/10 01:55:43 INFO SocketReceiver: Stopped receiving 14/07/10 01:55:43 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Retrying connecting to 127.0.0.1:9999 14/07/10 01:55:43 INFO SocketReceiver: Closed socket to 127.0.0.1:9999 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Retrying connecting to 127.0.0.1:9999: 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Called receiver onStop 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Deregistering receiver 0 14/07/10 01:55:43 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to 127.0.0.1:9999 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopped receiver 0 14/07/10 01:55:45 INFO ReceiverTracker: Stream 0 received 0 blocks 14/07/10 01:55:45 INFO JobScheduler: Added jobs for time 1404984705000 ms 14/07/10 01:55:45 INFO ReceiverSupervisorImpl: Starting receiver again <iframe src="http://pastebin.com/embed_iframe.php?i=KVWEC1kU" style="border:none;width:100%"></iframe> I am very new to clustered computing, hadoop, spark, even streaming. So I may not get the entire concept right. So may I clarify, is there something in my #2 codes? am i able to achieve the same thing as what #1 is trying to do? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285.html Sent from the Apache Spark User List mailing list archive at Nabble.com.