Thank you for helping. Now I have modified my code, it also doesn’t work. Here
is my current code.
RawMessageSpout.java
public class RawMessageSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Queue<String> inQueue ;
ServerSocket serverSocket;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("message"));
}
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
_collector=spoutOutputCollector;
inQueue = new ConcurrentLinkedQueue<String>();
TCPReceiver tcpReceiver = new TCPReceiver(inQueue);
tcpReceiver.start();
}
@Override
public void nextTuple() {
//disable thread to send the string immediately
if (!inQueue.isEmpty()){
String readyString = inQueue.poll();
_collector.emit(new Values(readyString));
}
}
}
TCPReceiver.java
package communication;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Queue;
/**
* Created by cjf on 2014/9/15.
*/
public class TCPReceiver extends Thread {
private static Logger logger = Logger.getLogger(TCPReceiver.class);
private static final int BUFSIZE = 32;
private static final int PORT = 20001;
public Socket socket = null;
public Queue<String> inQueue;
ServerSocket serverSocket;
public TCPReceiver(Queue inQueue) {
//this.socket = socket;
this.inQueue = inQueue;
try {
serverSocket= new ServerSocket(PORT);
} catch (IOException e) {
logger.error(PORT+" 端口已被占用");
e.printStackTrace();
}
}
/**
* Package receiving method used for processing multi requests
*
* @param inQueue message in queue
*/
public void multiReceivePackage(Queue<String> inQueue) {
InputStream in = null;
try {
byte[] receiveBuf = new byte[BUFSIZE];
//socket = server.accept();
//System.out.println("Receive: Connected to socket:" +
socket.getRemoteSocketAddress() + "...sending echo string");
//System.out.println(socket.getRemoteSocketAddress());
logger.info("Receive: Connected to socket:" +
socket.getRemoteSocketAddress().toString() + "...sending echo string");
in = socket.getInputStream();
String temp = "";
while (in.read(receiveBuf) != -1) {
temp += new String(receiveBuf);
receiveBuf = new byte[BUFSIZE];
if (temp.indexOf("#E") >= 0) {
break;
}
}
logger.info("Message received: " + temp.trim());
inQueue.offer(temp.trim() + "|" +
socket.getRemoteSocketAddress().toString());
//System.out.println("Message and IP received: " + temp.trim() +
"|" + socket.getRemoteSocketAddress().toString());
logger.debug("Message and IP received: " + temp.trim() + "|" +
socket.getRemoteSocketAddress().toString());
socket.close();
//Thread.sleep(500);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p/>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
//System.out.println("TCPReceiver is running");
logger.debug("TCPReceiver is running");
//receivePackage(DataHandler.inQueue);
while (true){
try {
socket = serverSocket.accept();
multiReceivePackage(this.inQueue);
} catch (Exception e) {
//System.out.println("[" + Thread.currentThread().getName() +
"] Receiver 线程内出错。");
logger.error("[" + Thread.currentThread().getName() + "]
Receiver 线程内出错。");
e.printStackTrace();
}
}
}
public void receiveData(){
//System.out.println("TCPReceiver is running");
logger.debug("TCPReceiver is running");
//receivePackage(DataHandler.inQueue);
try {
socket = serverSocket.accept();
multiReceivePackage(this.inQueue);
} catch (Exception e) {
//System.out.println("[" + Thread.currentThread().getName() + "]
Receiver 线程内出错。");
logger.error("[" + Thread.currentThread().getName() + "] Receiver
线程内出错。");
e.printStackTrace();
}
}
}
发件人: 임정택 [mailto:[email protected]]
发送时间: 2015年4月7日 11:44
收件人: [email protected]
主题: Re: Cannot connect to socket in spout in cluster mode
Hello.
Maybe it's up to serverSocket. Could you expose whole code for verifying?
Thanks
Regard.
Jungtaek Lim (HeartSaVioR)
2015-04-07 12:27 GMT+09:00 Junfeng Chen <[email protected]
<mailto:[email protected]> >:
I intend to establish Socket connection in spout. Here is my code :
@Override
public void nextTuple() {
Socket socket = null;
try {
socket = serverSocket.accept();
} catch (IOException e) {
e.printStackTrace();
}
TCPReceiver tcpReceiver = new TCPReceiver(socket,inQueue);
//disable thread to send the string immediately
tcpReceiver.receiveData();
if (!inQueue.isEmpty()){
String readyString = inQueue.poll();
_collector.emit(new Values(readyString));
}
}
The nextTuple() is blocked until new socket is established. It works under
LocalCluster mode, but in production envirionment, tcp client cannot connect to
this socket server. I use the command “nc -l 20000”(my listening port is 20000)
while the program is running, it can receive the data from tcp client. Can any
one help me? Thanks
Regard,
Junfeng Chen(陈俊峰)
--
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior