I create new separate project with Adapter to send s4 events, how to improve the code or configuration, give me some suggestions pls. My java code here:
public class CsvFileAdapter extends AdapterApp { @Inject @Named("metrics.path") String mpath; @Inject @Named("file.csv") String csvFileName; private static Logger logger = LoggerFactory .getLogger(CsvFileAdapter.class); private Thread t; private StringBuffer sb = new StringBuffer(); private long nEvents = 0; long tStop = 0; long tStart = 0; @Override protected void onClose() { } @Override protected void onInit() { super.onInit(); t = new Thread(); } public void openAndRead() throws Exception { if (mpath != null && !mpath.isEmpty()) { CsvReporter.enable(new File(mpath), 20, TimeUnit.SECONDS); } Reader inputReader = null; BufferedReader br = null; String kpi_name; String mo_name; long timestamp; double kpi_value; tStart = System.currentTimeMillis(); String csvPath = csvFileName; if (mpath != null && !mpath.isEmpty()) { FileInputStream fis = new FileInputStream(csvPath); inputReader = new InputStreamReader(fis); br = new BufferedReader(inputReader); String delimiter = ","; String s; while ((s = br.readLine()) != null) { nEvents++; String[] temp = s.replace("\"","").split(delimiter); kpi_name = temp[0]; mo_name = temp[1]; timestamp = Long.valueOf(temp[2]); kpi_value = Double.valueOf(temp[3]); sb.append("").append(kpi_name).append(",").append(mo_name) .append(",").append(timestamp).append(",") .append(kpi_value); // ////////sending to s4 DataEvent event = new DataEvent(kpi_name, mo_name, kpi_value, timestamp); event.put("seadata", String.class, sb.toString()); sb.setLength(0); getRemoteStream().put(event); } fis.close(); br.close(); inputReader.close(); tStop = System.currentTimeMillis(); Gauge<Long> g2 = Metrics.newGauge(CsvFileAdapter.class, "sea total event length", new Gauge<Long>() { @Override public Long value() { return nEvents; } }); Metrics.newGauge(CsvFileAdapter.class, "sea total event time", new Gauge<Long>() { @Override public Long value() { return (tStop - tStart); } }); } } @Override protected void onStart() { try { t.start(); openAndRead(); } catch (Exception e) { throw new RuntimeException(e); } } From: Sky Zhao [mailto:sky.z...@ericsson.com] Sent: Monday, June 24, 2013 5:20 PM To: s4-user@incubator.apache.org Subject: About 200,000 events/s I try to use Adapter to send s4 events. With metrics report, 20,10462,88.63259092217602,539.6449108859357,18.577650313690874,6.241814566462701 40,36006,417.83633322358764,914.1057643161282,97.55624823196746,33.40088245418529 60,63859,674.1012974987167,1075.2326549158463,176.33878995148274,61.646803531230724 80,97835,953.6282787690939,1232.2934375999696,271.48890371088254,96.56144395108957 100,131535,1162.2060916405578,1323.3704459079934,363.98505627735324,131.98430793014757 120,165282,1327.52314133145,1384.2675551261093,453.5195236495672,167.61679021575551 140,190776,1305.7285112621298,1368.4361242524062,504.7782182758366,191.36049732440895 20,000 events per 20s => 1000 EVENTS/s Very slow, I modify the S4_HOME/subprojects/s4-comm/bin/default.s4.comm.properties s4.comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter s4.comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter s4.comm.listener.class=org.apache.s4.comm.tcp.TCPListener # I/O channel connection timeout, when applicable (e.g. used by netty) s4.comm.timeout=1000 # NOTE: the following numbers should be tuned according to the application, use case, and infrastructure # how many threads to use for the sender stage (i.e. serialization) #s4.sender.parallelism=1 s4.sender.parallelism=100 # maximum number of events in the buffer of the sender stage #s4.sender.workQueueSize=10000 s4.sender.workQueueSize=100000 # maximum sending rate from a given node, in events / s (used with throttling sender executors) s4.sender.maxRate=200000 # how many threads to use for the *remote* sender stage (i.e. serialization) #s4.remoteSender.parallelism=1 s4.remoteSender.parallelism=100 # maximum number of events in the buffer of the *remote* sender stage #s4.remoteSender.workQueueSize=10000 s4.remoteSender.workQueueSize=100000 # maximum *remote* sending rate from a given node, in events / s (used with throttling *remote* sender executors) s4.remoteSender.maxRate=200000 # maximum number of pending writes to a given comm channel #s4.emitter.maxPendingWrites=1000 s4.emitter.maxPendingWrites=10000 # maximum number of events in the buffer of the processing stage #s4.stream.workQueueSize=10000 s4.stream.workQueueSize=100000 only improve from 500 events 1000 events, I read file 88m only need 8s, but send events cost 620s now for 1,237,632 events, why slow, s4 can trigger 200,000 events/s, how I can do up to this values, pls give me detail instructions.