This is the last code (with some changes):
C#:
using Buddy;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AsyncClient
{
static partial class Program
{
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(10000, 5000);
ThreadPool.SetMinThreads(2000, 2500);
var taskCount = 1000;
var time = new Stopwatch();
var tasks = new List<Task>();
time.Start();
for (int i = 0; i < taskCount; i++)
{
tasks.Add(StartPing());
}
Task.WaitAll(tasks.ToArray());
time.Stop();
var l = string.Format("sent: {0}% rcvd: {1}%, fail: {2}%,
count: {3}", 100d * SentCount / taskCount, 100d * RcvdCount / taskCount,
100d * FailCount / taskCount, taskCount);
var l2 = string.Format("{0} op/sec", taskCount /
time.Elapsed.TotalSeconds);
Log(l);
Log(l2);
Console.WriteLine("press any key to exit");
Console.ReadKey();
}
public static long SentCount = 0;
public static long RcvdCount = 0;
public static long FailCount = 0;
public static long TaskCount = 0;
static Task StartPing()
{
var t = new Task(() => SampleSendH(),
TaskCreationOptions.PreferFairness);
t.Start();
return t;
}
static void SampleSendH()
{
Interlocked.Increment(ref TaskCount);
TcpClient c = null;
var enc = new ASCIIEncoding();
try
{
c = new TcpClient();
c.SendTimeout = TimeSpan.FromSeconds(0.5).Milliseconds;
c.ReceiveTimeout = TimeSpan.FromSeconds(0.5).Milliseconds;
c.Connect(IPAddress.Parse("127.0.0.1"), 3000);
var strm = c.GetStream();
try
{
//var message =
"9900095854,20130305075608,51.44650,35.74327,0,0,1307,4,160,0,0,0.000,0.000,20130305075747,444247,12074,4084";
var message = new string('-', 512);
//var message = "ping";
var bytes = enc.GetBytes(message);
strm.Write(bytes, 0, bytes.Length);
strm.Flush();
Interlocked.Increment(ref SentCount);
var retry = 0;
int b = 0;
var rcvdBytes = new List<byte>();
var received = false;
do
{
while ((b = strm.ReadByte()) > -1)
{
rcvdBytes.Add((byte)(b & 0xFF));
if
(Encoding.ASCII.GetString(rcvdBytes.ToArray()) == message)
{
Interlocked.Increment(ref RcvdCount);
received = true;
break;
}
}
retry++;
if (rcvdBytes.Count == 0)
Thread.Sleep(TimeSpan.FromMilliseconds(200));
} while (retry < 10 && !received);
}
finally
{
strm.Close();
}
}
catch (Exception x)
{
LogError(x);
Interlocked.Increment(ref FailCount);
}
finally { try { c.Close(); } catch { } }
}
#region tools
public static void Log(object o)
{
try
{
var entry = new StringBuilder();
entry = entry.Append(string.Format("[{0:yyyy-MM-dd
HH:mm:ss}] ", DateTime.Now));
entry = entry.Append((o ?? string.Empty).ToString());
Trace.WriteLine(entry.ToString().Trim());
}
catch { }
}
public static void Log(string format, object o, params object[]
rest)
{
try
{
var all = new List<object>();
all.Add(o);
all.AddRange(rest);
var entry = new StringBuilder();
entry = entry.Append(string.Format("[{0:yyyy-MM-dd
HH:mm:ss}] ", DateTime.Now));
entry = entry.Append(string.Format(format, all.ToArray()));
Trace.WriteLine(entry.ToString().Trim());
}
catch { }
}
static TraceSource __dataSource = new TraceSource("DataSource");
static TraceSource __errorSource = new TraceSource("ErrorSource");
public static void LogError(object o)
{
try
{
var entry = new StringBuilder();
entry = entry.Append(string.Format("[{0:yyyy-MM-dd
HH:mm:ss}] ", DateTime.Now));
entry = entry.Append((o ?? string.Empty).ToString());
__errorSource.WriteLine(entry.ToString().Trim());
}
catch { }
}
public static void LogError(string format, object o, params
object[] rest)
{
try
{
var all = new List<object>();
all.Add(o);
all.AddRange(rest);
var entry = new StringBuilder();
entry = entry.Append(string.Format("[{0:yyyy-MM-dd
HH:mm:ss}] ", DateTime.Now));
entry = entry.Append(string.Format(format, all.ToArray()));
__errorSource.WriteLine(entry.ToString().Trim());
}
catch { }
}
public static void LogData(object o)
{
try
{
var entry = new StringBuilder();
entry = entry.AppendLine(string.Format("[START
[{0:yyyy-MM-dd HH:mm:ss}] ", DateTime.Now));
entry = entry.AppendLine((o ?? string.Empty).ToString());
entry = entry.AppendLine("END]");
__dataSource.WriteLine(entry.ToString().Trim());
}
catch { }
}
#endregion
}
}
GoLang:
package main
import (
"log"
"net"
"os"
"runtime"
)
func main() {
ln, err := net.Listen("tcp", ":3000")
if err != nil {
log.Fatal(err)
os.Exit(1)
}
log.Println("listening on port 3000...")
for {
conn, err := ln.Accept()
if err != nil {
log.Println(err)
continue
}
go handleConn(&conn)
}
}
func handleConn(carg *net.Conn) {
c := *carg
buf := make([]byte, 1024*2)
for {
n, err := c.Read(buf)
if err != nil || n == 0 {
c.Close()
break
}
n, err = c.Write(buf[0:n])
if err != nil {
c.Close()
break
}
}
//log.Printf("conn %v closed", c.RemoteAddr())
}
func init() {
runtime.GOMAXPROCS(7)
flags := log.Flags()
log.SetFlags(flags | log.Lmicroseconds | log.Llongfile | log.Lshortfile)
fn := "log.log"
f, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModeAppend)
if err != nil {
log.Println(err)
}
log.SetOutput(f)
}
NodeJS:
var net = require('net');
var fs = require('fs');
var HOST = '127.0.0.1';
var PORT = 3000;
var timeout = 420000; // msec
var lg = function(message) {
console.log(message);
};
var server = net.createServer();
server.on('listening', function() {
lg('Server listening on ' + HOST +':'+ PORT);
});
server.on('connection', function(sock) {
sock.setTimeout(timeout, function() {
try {
sock.end();
}
catch(x) {
lg('on end' + x);
}
});
sock.setNoDelay(true);
sock.setEncoding('ascii');
sock.on('data', function(data) {
try {
sock.write(data);
}
catch(x) {
lg(x);
}
});
sock.on('end', function(data) {
try {
sock.end();
}
catch(x) {
lg('on end' + x);
}
});
sock.on('error', function(err) {
lg(err);
});
sock.on('close', function(data) {
try {
sock.end();
}
catch(x) {
lg(x);
}
try {
sock.destroy();
}
catch(x) {
lg('on close' + x);
}
});
sock.on('timeout', function() {
});
});
server.on('error', function(err) {
});
server.on('close', function() {
});
server.listen(PORT, HOST);
NodeJS (clustered):
var cluster = require('cluster');
var net = require('net');
var fs = require('fs');
var os = require('os');
var numCPUs = os.cpus().length;
var HOST = '127.0.0.1';
var PORT = 3000;
var timeout = 420000; // msec
if (cluster.isMaster) {
// Fork workers.
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', function (worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
var lg = function (message) {
console.log(message);
};
var server = net.createServer();
server.on('listening', function () {
lg('Server listening on ' + HOST + ':' + PORT);
});
server.on('connection', function (sock) {
sock.setTimeout(timeout, function () {
try {
sock.end();
}
catch (x) {
lg('on end' + x);
}
});
sock.setNoDelay(true);
sock.setEncoding('binary');
sock.on('data', function (data) {
try {
sock.write(data);
}
catch (x) {
lg(x);
}
});
sock.on('end', function (data) {
try {
sock.end();
}
catch (x) {
lg('on end' + x);
}
});
sock.on('error', function (err) {
lg(err);
});
sock.on('close', function (data) {
try {
sock.end();
}
catch (x) {
lg(x);
}
try {
sock.destroy();
}
catch (x) {
lg('on close' + x);
}
});
sock.on('timeout', function () {
});
});
server.on('error', function (err) {
});
server.on('close', function () {
});
server.listen(PORT, HOST);
}
>
--
--
Job Board: http://jobs.nodejs.org/
Posting guidelines:
https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en
---
You received this message because you are subscribed to the Google Groups
"nodejs" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.