This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "Tarantool -- an efficient key/value data store".
The branch perl-iproto-async has been updated
via 3d33690f89b75bacd47ca4e375cc8da3f7f3f619 (commit)
via d45f95ba1b5802ebfe759dc4a8a835ae13e62473 (commit)
from dd9c4eac159aa6b49b56d82e2444cd824ef3fa2b (commit)
Summary of changes:
mod/silverbox/client/perl/lib/MR/IProto.pm | 89 +++++++++++++++-----
.../client/perl/lib/MR/IProto/Cluster/Server.pm | 2 +-
.../client/perl/lib/MR/IProto/Connection/Sync.pm | 2 +
3 files changed, 71 insertions(+), 22 deletions(-)
commit 3d33690f89b75bacd47ca4e375cc8da3f7f3f619
Author: Aleksey Mashanov <[email protected]>
Date: Thu Dec 2 19:29:07 2010 +0300
Reuse connections
diff --git a/mod/silverbox/client/perl/lib/MR/IProto.pm
b/mod/silverbox/client/perl/lib/MR/IProto.pm
index 53e1e40..18125e3 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto.pm
@@ -127,13 +127,6 @@ has cluster => (
required => 1,
coerce => 1,
handles => [qw( timeout )],
- trigger => sub {
- my ($self, $new) = @_;
- foreach my $server ( @{$new->servers} ) {
- $server->debug_cb($self->debug_cb) unless $server->has_debug_cb();
- }
- return;
- },
);
=item max_parallel
@@ -375,6 +368,7 @@ See L<Mouse::Manual::Construction/BUILDARGS> for more
information.
=cut
+my %servers;
around BUILDARGS => sub {
my $orig = shift;
my $class = shift;
@@ -394,7 +388,7 @@ around BUILDARGS => sub {
$clusterargs{servers} = [
map {
my ($host, $port, $weight) = split /:/, $_;
- $server_class->new(
+ $servers{"$host:$port"} ||= $server_class->new(
%srvargs,
host => $host,
port => $port,
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
index a9d7371..d468d96 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
@@ -214,7 +214,7 @@ sub _build_debug_cb {
my ($self) = @_;
return sub {
my ($msg) = @_;
- warn "$msg\n";
+ warn "MR::IProto: $msg\n";
return;
};
}
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
index 5598bb6..5402284 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
@@ -113,6 +113,7 @@ sub set_timeout {
sub _build__socket {
my ($self) = @_;
+ $self->_debug(4, "connecting");
my $socket = IO::Socket::INET->new(
PeerHost => $self->host,
PeerPort => $self->port,
@@ -122,6 +123,7 @@ sub _build__socket {
$socket->sockopt(SO_KEEPALIVE, 1) if $self->tcp_keepalive;
$socket->setsockopt((getprotobyname('tcp'))[2], TCP_NODELAY, 1) if
$self->tcp_nodelay;
$self->_set_timeout($socket, $self->timeout) if $self->timeout;
+ $self->_debug(1, "connected");
return $socket;
}
commit d45f95ba1b5802ebfe759dc4a8a835ae13e62473
Author: Aleksey Mashanov <[email protected]>
Date: Thu Dec 2 18:56:41 2010 +0300
Less memory usage
diff --git a/mod/silverbox/client/perl/lib/MR/IProto.pm
b/mod/silverbox/client/perl/lib/MR/IProto.pm
index 19293b2..53e1e40 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto.pm
@@ -136,6 +136,18 @@ has cluster => (
},
);
+=item max_parallel
+
+Max amount of simultaneous request to all servers.
+
+=cut
+
+has max_parallel => (
+ is => 'ro',
+ isa => 'Int',
+ default => 1000,
+);
+
=item max_request_retries
Max amount of request retries which must be sent to different servers
@@ -195,6 +207,18 @@ has _reply_class => (
lazy_build => 1,
);
+has _queue => (
+ is => 'ro',
+ isa => 'ArrayRef',
+ lazy_build => 1,
+);
+
+has _in_progress => (
+ is => 'rw',
+ isa => 'Int',
+ default => 0,
+);
+
=head1 PUBLIC METHODS
=over
@@ -409,6 +433,11 @@ sub _build__reply_class {
return \%reply;
}
+sub _build__queue {
+ my ($self) = @_;
+ return [];
+}
+
=item _send( [ $message | \%args ], $callback? )
Pure asyncronious internal implementation of send.
@@ -457,7 +486,24 @@ sub _send {
my ($self, $message, $callback, $sync) = @_;
die "Callback must be specified" unless $callback;
die "Method must be called in void context" if defined wantarray;
+ return $self->_send_now($message, $callback, $sync) if $sync;
+ push @{$self->_queue}, [ $message, $callback ];
+ $self->_try_to_send();
+ return;
+}
+
+sub _try_to_send {
+ my ($self) = @_;
+ while( $self->_in_progress < $self->max_parallel && (my $task = shift @{
$self->_queue }) ) {
+ eval { $self->_send_now(@$task); 1 }
+ or $self->_report_error(@$task, $@);
+ }
+ return;
+}
+sub _send_now {
+ my ($self, $message, $callback, $sync) = @_;
+ $self->_in_progress( $self->_in_progress + 1 );
my ($req_msg, $key, $body, $unpack, $retry, $response_class, $no_reply);
# MR::IProto::Message OO-API
if( blessed($message) ) {
@@ -472,6 +518,7 @@ sub _send {
# Old-style compatible API
else {
$req_msg = $message->{msg};
+ $key = $message->{key};
$body = exists $message->{payload} ? $message->{payload}
: ref $message->{data} ? pack delete $message->{pack} || 'L*',
@{$message->{data}}
: $message->{data};
@@ -518,12 +565,7 @@ sub _send {
}
else {
undef $next_try;
- my $errobj = $unpack ? undef : MR::IProto::Error->new(
- request => $message,
- error => $error,
- errno => 0+$!,
- );
- $callback->($errobj, $error);
+ $self->_report_error($unpack ? undef : $message, $callback,
$error);
}
}
else {
@@ -544,18 +586,14 @@ sub _send {
}
else {
undef $next_try;
+ $self->_in_progress( $self->_in_progress - 1 );
+ $self->_try_to_send();
$callback->($data);
}
}
else {
undef $next_try;
- my $error = $@;
- my $errobj = $unpack ? undef : MR::IProto::Error->new(
- request => $message,
- error => $error,
- errno => 0+$!,
- );
- $callback->($errobj, $error);
+ $self->_report_error($unpack ? undef : $message, $callback,
$@);
}
}
return;
@@ -564,6 +602,21 @@ sub _send {
return;
}
+sub _report_error {
+ my ($self, $request, $callback, $error) = @_;
+ my $errobj = $request
+ ? MR::IProto::Error->new(
+ request => $request,
+ error => $error,
+ errno => 0+$!,
+ )
+ : undef;
+ $self->_in_progress( $self->_in_progress - 1 );
+ $self->_try_to_send();
+ $callback->($errobj, $error);
+ return;
+}
+
sub _debug {
my ($self, $level, $msg) = @_;
return if $self->debug < $level;
--
Tarantool -- an efficient key/value data store
_______________________________________________
Mailing list: https://launchpad.net/~tarantool-developers
Post to : [email protected]
Unsubscribe : https://launchpad.net/~tarantool-developers
More help : https://help.launchpad.net/ListHelp