This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "Tarantool -- an efficient key/value data store".

The branch perl-iproto-async has been updated
       via  3d33690f89b75bacd47ca4e375cc8da3f7f3f619 (commit)
       via  d45f95ba1b5802ebfe759dc4a8a835ae13e62473 (commit)
      from  dd9c4eac159aa6b49b56d82e2444cd824ef3fa2b (commit)

Summary of changes:
 mod/silverbox/client/perl/lib/MR/IProto.pm         |   89 +++++++++++++++-----
 .../client/perl/lib/MR/IProto/Cluster/Server.pm    |    2 +-
 .../client/perl/lib/MR/IProto/Connection/Sync.pm   |    2 +
 3 files changed, 71 insertions(+), 22 deletions(-)

commit 3d33690f89b75bacd47ca4e375cc8da3f7f3f619
Author: Aleksey Mashanov <[email protected]>
Date:   Thu Dec 2 19:29:07 2010 +0300

    Reuse connections

diff --git a/mod/silverbox/client/perl/lib/MR/IProto.pm 
b/mod/silverbox/client/perl/lib/MR/IProto.pm
index 53e1e40..18125e3 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto.pm
@@ -127,13 +127,6 @@ has cluster => (
     required => 1,
     coerce   => 1,
     handles  => [qw( timeout )],
-    trigger  => sub {
-        my ($self, $new) = @_;
-        foreach my $server ( @{$new->servers} ) {
-            $server->debug_cb($self->debug_cb) unless $server->has_debug_cb();
-        }
-        return;
-    },
 );
 
 =item max_parallel
@@ -375,6 +368,7 @@ See L<Mouse::Manual::Construction/BUILDARGS> for more 
information.
 
 =cut
 
+my %servers;
 around BUILDARGS => sub {
     my $orig = shift;
     my $class = shift;
@@ -394,7 +388,7 @@ around BUILDARGS => sub {
         $clusterargs{servers} = [
             map {
                 my ($host, $port, $weight) = split /:/, $_;
-                $server_class->new(
+                $servers{"$host:$port"} ||= $server_class->new(
                     %srvargs,
                     host => $host,
                     port => $port,
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm 
b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
index a9d7371..d468d96 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
@@ -214,7 +214,7 @@ sub _build_debug_cb {
     my ($self) = @_;
     return sub {
         my ($msg) = @_;
-        warn "$msg\n";
+        warn "MR::IProto: $msg\n";
         return;
     };
 }
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm 
b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
index 5598bb6..5402284 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
@@ -113,6 +113,7 @@ sub set_timeout {
 
 sub _build__socket {
     my ($self) = @_;
+    $self->_debug(4, "connecting");
     my $socket = IO::Socket::INET->new(
         PeerHost => $self->host,
         PeerPort => $self->port,
@@ -122,6 +123,7 @@ sub _build__socket {
     $socket->sockopt(SO_KEEPALIVE, 1) if $self->tcp_keepalive;
     $socket->setsockopt((getprotobyname('tcp'))[2], TCP_NODELAY, 1) if 
$self->tcp_nodelay;
     $self->_set_timeout($socket, $self->timeout) if $self->timeout;
+    $self->_debug(1, "connected");
     return $socket;
 }
 

commit d45f95ba1b5802ebfe759dc4a8a835ae13e62473
Author: Aleksey Mashanov <[email protected]>
Date:   Thu Dec 2 18:56:41 2010 +0300

    Less memory usage

diff --git a/mod/silverbox/client/perl/lib/MR/IProto.pm 
b/mod/silverbox/client/perl/lib/MR/IProto.pm
index 19293b2..53e1e40 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto.pm
@@ -136,6 +136,18 @@ has cluster => (
     },
 );
 
+=item max_parallel
+
+Max amount of simultaneous request to all servers.
+
+=cut
+
+has max_parallel => (
+    is  => 'ro',
+    isa => 'Int',
+    default => 1000,
+);
+
 =item max_request_retries
 
 Max amount of request retries which must be sent to different servers
@@ -195,6 +207,18 @@ has _reply_class => (
     lazy_build => 1,
 );
 
+has _queue => (
+    is  => 'ro',
+    isa => 'ArrayRef',
+    lazy_build => 1,
+);
+
+has _in_progress => (
+    is  => 'rw',
+    isa => 'Int',
+    default => 0,
+);
+
 =head1 PUBLIC METHODS
 
 =over
@@ -409,6 +433,11 @@ sub _build__reply_class {
     return \%reply;
 }
 
+sub _build__queue {
+    my ($self) = @_;
+    return [];
+}
+
 =item _send( [ $message | \%args ], $callback? )
 
 Pure asyncronious internal implementation of send.
@@ -457,7 +486,24 @@ sub _send {
     my ($self, $message, $callback, $sync) = @_;
     die "Callback must be specified" unless $callback;
     die "Method must be called in void context" if defined wantarray;
+    return $self->_send_now($message, $callback, $sync) if $sync;
+    push @{$self->_queue}, [ $message, $callback ];
+    $self->_try_to_send();
+    return;
+}
+
+sub _try_to_send {
+    my ($self) = @_;
+    while( $self->_in_progress < $self->max_parallel && (my $task = shift @{ 
$self->_queue }) ) {
+        eval { $self->_send_now(@$task); 1 }
+            or $self->_report_error(@$task, $@);
+    }
+    return;
+}
 
+sub _send_now {
+    my ($self, $message, $callback, $sync) = @_;
+    $self->_in_progress( $self->_in_progress + 1 );
     my ($req_msg, $key, $body, $unpack, $retry, $response_class, $no_reply);
     # MR::IProto::Message OO-API
     if( blessed($message) ) {
@@ -472,6 +518,7 @@ sub _send {
     # Old-style compatible API
     else {
         $req_msg = $message->{msg};
+        $key = $message->{key};
         $body = exists $message->{payload} ? $message->{payload}
             : ref $message->{data} ? pack delete $message->{pack} || 'L*', 
@{$message->{data}}
             : $message->{data};
@@ -518,12 +565,7 @@ sub _send {
             }
             else {
                 undef $next_try;
-                my $errobj = $unpack ? undef : MR::IProto::Error->new(
-                    request => $message,
-                    error   => $error,
-                    errno   => 0+$!,
-                );
-                $callback->($errobj, $error);
+                $self->_report_error($unpack ? undef : $message, $callback, 
$error);
             }
         }
         else {
@@ -544,18 +586,14 @@ sub _send {
                 }
                 else {
                     undef $next_try;
+                    $self->_in_progress( $self->_in_progress - 1 );
+                    $self->_try_to_send();
                     $callback->($data);
                 }
             }
             else {
                 undef $next_try;
-                my $error = $@;
-                my $errobj = $unpack ? undef : MR::IProto::Error->new(
-                    request => $message,
-                    error   => $error,
-                    errno   => 0+$!,
-                );
-                $callback->($errobj, $error);
+                $self->_report_error($unpack ? undef : $message, $callback, 
$@);
             }
         }
         return;
@@ -564,6 +602,21 @@ sub _send {
     return;
 }
 
+sub _report_error {
+    my ($self, $request, $callback, $error) = @_;
+    my $errobj = $request
+        ? MR::IProto::Error->new(
+            request => $request,
+            error   => $error,
+            errno   => 0+$!,
+        )
+        : undef;
+    $self->_in_progress( $self->_in_progress - 1 );
+    $self->_try_to_send();
+    $callback->($errobj, $error);
+    return;
+}
+
 sub _debug {
     my ($self, $level, $msg) = @_;
     return if $self->debug < $level;

-- 
Tarantool -- an efficient key/value data store

_______________________________________________
Mailing list: https://launchpad.net/~tarantool-developers
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~tarantool-developers
More help   : https://help.launchpad.net/ListHelp

Reply via email to