package Netilla::CONDA::Component::StoreKeeper;

use strict;
use warnings;

use Error qw(:try);

use Net::NAD::Exception;
use Netilla::CONDA::Component::FileServer;
use Netilla::CONDA::Component::Replicator;
use Netilla::CONDA::Component::SOAP::TCP::Server;
use Netilla::CONDA::Component::Worker;
use Netilla::CONDA::FunctionQueue;
use Netilla::CONDA::Logger qw(:all);
use Netilla::CONDA::Request;
use Netilla::CONDA::ResultSet;

use Time::HiRes;
use POE::Component::Client::NAD::Exception;
use POE::Component::Client::NAD;
use POE::Component::Logger;


use POE;


use constant N_CONDA_LOG_CONFIG_FILE => '/etc/conda_log.conf';


sub new {
	my $type = shift;
	my $class = ref($type) || $type;
	my %params = @_;

	die $type . "->new() requires an even number of parameters"
		if (@_ & 1);

	my $self = bless {
		_worker_id => 0,
		_results => Netilla::CONDA::ResultSet->new(),
		_queue => Netilla::CONDA::FunctionQueue->new(),
	}, $class;

	$self->{_session} = POE::Session->create(
		object_states =>
			[ $self => {
				_start	=> "_poe_start",
				_default => "_poe_default",
				_child => "_poe_child",
				# DEPRECATED.  poe_signal should be split apart and registered
				#_signal => "poe_signal",
				_stop => "poe_shutdown",
				auth_client_request =>
					"poe_auth_client_request",
				entry_point => "poe_entry_point",
				request => "poe_request",
				mrequest => "poe_mrequest",
				fork_damnit => "poe_fork_damnit",
				execute_next => "poe_execute_next",
				got_response => "poe_got_response",
				got_blocking_response => "poe_got_blocking_response",
				get_result => "poe_get_result",
				shutdown => "poe_shutdown",
				# New Signal handler events
				sig_term => "poe_TERM",    
				sig_ttin => "poe_TTIN",    
				sig_ttout => "poe_TTOUT",    
				sig_usr1 => "log_level_up",
				sig_usr2 => "log_level_down",
				},
			],
		args => [
			Debug => $params{debug},
			replication => $params{replication},
			],
	);


	return $self;
}


########################################################################
#
# POE methods
#

# start as soon as possible
#
# parse commad line switch
#
# read configuration file 
#
# initialize soap server (maybe one, or two, both private and public,
# logger
#
sub _poe_start {
	my ($self, $kernel, $session, %params) =
	  @_[OBJECT, KERNEL, SESSION, ARG0 .. $#_ ];

	# initialize SOAP servers, first the private one, then the public one
	#  if necessary
	my %soap_params_common = (
		ClientRequest => sub {
			$kernel->call($session, 'entry_point', @_); 
	 	},
		Error => sub {
			splice(@_, OBJECT, 1, $self); # inject $self
			&poe_soap_error(@_);
		},
		Debug => 1,
	);

	Netilla::CONDA::Component::SOAP::TCP::Server->new(
		%soap_params_common,
		Alias => "conda",
	);

	Netilla::CONDA::Component::Replicator->new(
		replication => $params{replication},
	);

	POE::Component::Logger->spawn(ConfigFile => N_CONDA_LOG_CONFIG_FILE);

	$kernel->sig('SHUTDOWN', 'shutdown');

	# Assign events to signal handlers
	$kernel->sig('TERM','sig_term');
	$kernel->sig('USR1','sig_usr1');
	$kernel->sig('USR2','sig_usr2');
	$kernel->sig('TTIN','sig_ttin');
	$kernel->sig('TTOUT','sig_ttout');

	print_log(SYSTEM, "Starting StoreKeeper.\n");
}


sub _poe_default {
	my ($type, $params) = @_[ARG0, ARG1];

	my $response = $params->{response};
	
	$response->(Netilla::CONDA::Response->new(
		success => 0,
		error => "Invalid request type: '$type'",
	));

	return;
}


sub _poe_child {
}


sub poe_entry_point {
	use Time::Delta;
	my $log = Time::Delta->new();
	$log->delta("poe_entry_point called");
	print_log(REQUEST, "poe_entry_point called.\n");

	my ($kernel, $session, $heap, %params) = @_[KERNEL, SESSION, HEAP, ARG0 .. $#_ ];

	my $response = $params{'response'};
	my $token = $params{'type'} eq 'mrequest' ?
			$params{'requests'}->[0]->{'token'} : $params{'token'};

	my %nad_params = (
		token           => $token,
		ConnectError    => sub {
			my ($syscall_name, $error_number, $error_string)
				= @_[ARG0, ARG1, ARG2];

			my $message = "$syscall_name(): ($error_number) $error_string";
			print_log(AUTHZ, "ConnectError occured: $message\n");
			$response->(Netilla::CONDA::Response->new(
				success => 0,
				error => "Net::NAD::ConnectException: " .
				   Net::NAD::ConnectException->description,
				   ", $syscall_name $error_number $error_string"
			));
		},
		ServerError     => sub {
			my ($syscall_name, $error_number, $error_string)
				= @_[ARG0, ARG1, ARG2];

			my $message = "$syscall_name(): ($error_number) $error_string";
			print_log(AUTHZ, "ServerError occured: $message\n");
			$response->(Netilla::CONDA::Response->new(
				success => 0,
				error => "Net::NAD::ServerException " .
				   Net::NAD::ServerException->description,
				   ", $syscall_name $error_number $error_string"
			   ));
		},
		Exception       => sub {
			my $e = $_[ARG0];
			print_log(AUTHZ, ref($e) . " occured: $e\n");
			$response->(Netilla::CONDA::Response->new(
				success => 0,
				error => ref($e) . ": " . $e->message,
			));
		},
	);

	my $exception;
	try {
		$heap->{'nad'} = POE::Component::Client::NAD->new(%nad_params);
	} catch Exception::Class::Base with {
		my $e = shift;
		my $error_message =
			ref($e) .
			"cannot create POE::Component::Client::NAD: " .
			$e->message;

		$response->(Netilla::CONDA::Response->new(
			success => 0,
			error => $error_message,
		));

		return;
	};

	$kernel->post($heap->{'nad'}, "show_vars",
		response => sub {
			my %nad_vars = @_;

			$heap->{'nad_vars'} = \%nad_vars;

			$kernel->call(
				$session,
				($params{type} eq 'mrequest') ?
					'mrequest' : 'auth_client_request',
				%params,
			);
		},
	);
	$log->delta("poe_entry_point finished");
}


sub poe_auth_client_request {
	print_log(REQUEST, "poe_auth_client_request called.\n");

	my ($kernel, $session, $heap, %params) = @_[KERNEL, SESSION, HEAP, ARG0 .. $#_ ];

	my ($token, $response, $function, $type) =
		@params{qw(token response function type)};

	unless (exists($Netilla::CONDA::Request::functions{$function})) {
		print_log(REQUEST, "Function $function does not exist.\n");

		$response->(Netilla::CONDA::Response->new(
				success => 0,
				error => "Function '$function' does not exist",
		));

		return;
	}

	my @caps = (exists($Netilla::CONDA::Request::functions{$function}{caps})
		&& 'ARRAY' eq
		ref($Netilla::CONDA::Request::functions{$function}{caps}) ?
		@{$Netilla::CONDA::Request::functions{$function}{caps}} : ()
	);

	print_log(AUTHZ,
		"About to check caps (@caps) for function $function\n");

	# empty caps list means that request does not need any capabilities
	# to be run.  always good.
	unless (@caps) {
		$kernel->post($session, $type, %params);
		print_log(AUTHZ, "Function $function OK.\n");
		return;
	}

	my $nad_response = sub {
		my $result = shift;

		if (!is_exception($result)) {
			print_log(AUTHZ, "Function $function OK.\n");
			$kernel->post($session, $type, %params);
		} else {
			print_log(AUTHZ, "Function $function NOT OK.\n");

			# Log authorization failure to syslog
			my $username = $heap->{'nad_vars'}->{'QualifiedUsername'};
			Logger->log({ level => "error", "message" => "$username: $function(): authorization failure" });

			my $error_message =
			"Requested operation not authorized for current role";
			if ($result->isa('Net::NAD::CapabilityNotAuthorizedException')) {
					$response->(Netilla::CONDA::Response->new(
										success => 0,
										error => $error_message,
					));
			}
			else {
				$response->(Netilla::CONDA::Response->new(
					success => 0,
					error => ref($result) . ": " . $result->message,
				)) if (defined($response) && ref($response) eq 'CODE');
			}
		}
	};
	
	$kernel->post($heap->{'nad'}, "check_caps",
			caps => \@caps,
			response => $nad_response
	);
}


sub poe_mrequest {
	print_log(REQUEST, "poe_mrequest called.\n");

	my ($kernel, $session, %params) = @_[KERNEL, SESSION, ARG0 .. $#_ ];

	my ($requests, $response) = @params{qw(requests response)};

	unless (defined($requests) && ref($requests) eq 'ARRAY') {
		$response->(Netilla::CONDA::Response->new(
			success => 0,
			error => "CONDA: no requests specified",
		));
	}

	my @result;
	my $numreq = @$requests;

	for (my $i = 0; $i < $numreq && ref($requests->[$i]) eq 'HASH'; ++$i) {
		my $index = $i;

		$requests->[$i]{response} = sub {
			$result[$index] = shift;

			for (my $j = 0; $j < $numreq; ++$j) {
				if (!defined($result[$j])) {
					return;
				}
			}

			$response->(Netilla::CONDA::Response->new(
				success => 1,
				result => [@result]
			));

			# help the interpreter free up memory.
			undef @result;
			undef $response;
			undef $numreq;
		};

		$kernel->post($session, 'auth_client_request',
			%{$requests->[$i]}, type => 'request');
	}
}


sub poe_request {
	print_log(REQUEST, "poe_request called.\n");

	my ($self, $kernel, $session, $heap, %params) =
	@_[OBJECT, KERNEL, SESSION, HEAP, ARG0 .. $#_ ];

	my ($blocking, $token, $type, $args, $response, $funcname, $timeout) =
	    @params{qw(blocking token type args response function timeout)};

	my $username = $heap->{'nad_vars'}->{'QualifiedUsername'};
	Logger->log({ level => "info", "message" => "$username: $funcname()" });

	my ($function, @args, $function_id, $fileserver, $replicate);

	if ('ARRAY' eq ref($args)) {
		@args = @$args;
	} else {
		@args = ();
		$params{args} = [];
	}

	#
	# we do not check for the existence of
	# $Netilla::CONDA::Request::functions{$function}, because by this time
	# the check should have been berformed in poe_auth_client_request().
	#
	$function = $Netilla::CONDA::Request::functions{$funcname};
	$fileserver = $function->{fileserver};
	$replicate  = (Netilla::CONDA::Component::Replicator->run() ? $function->{replicate} : 0);
	$params{funcname} = $funcname;
	$params{locks} = $function->{locks};
	$params{timeout} = ($timeout || $function->{timeout});
	$params{function} = sub { $function->{function}->(@args) };
	$function_id = $params{function_id} = $self->{_worker_id}++;

	if ($blocking) {
		$params{response} = sub {
			my ($result) = @_;

			$kernel->post($session, 'got_blocking_response',
						result => $result,
						response => $response,
						function_id => $function_id,
						fileserver => $fileserver,
						replicate => $replicate,
						($replicate ? (repl_params => {
							funcname => $params{funcname},
							args => [@args],
							timeout => ($timeout || $function->{timeout}),
							blocking => 1,
						}) : ()),
						token => $token,
			);

			# undef all outside lexical variables so that
			# Perl releases memory.
			undef @args;
			undef $result;
			undef $response;
			undef $function_id;
			undef $function;
			undef $fileserver;
			undef $replicate;
			undef %params;
			undef $token;
			undef $timeout;
		};

		$kernel->yield('fork_damnit', %params);
	} else {
		my $req_id = $params{req_id} = $self->{_results}->get_new_id;
		my $token = $params{token};

		$params{response} = sub {
			my ($result, $e) = @_;

			$kernel->post($session, 'got_response',
			 			token => $token,
						req_id => $req_id,
						result => $result,
						function_id => $function_id,
						fileserver => $fileserver,
						replicate => $replicate,
						token => $token,
			);
		};

		$kernel->yield('fork_damnit', %params);

		return $req_id;
	}
}


sub poe_fork_damnit {
	print_log(REQUEST, "poe_fork_damnit called.\n");

	my ($self, $kernel, %params) = @_[OBJECT, KERNEL, ARG0 .. $#_ ];

	my ($response, $function, $function_id, $funcname, $locks) =
		@params{qw(response function function_id funcname locks)};

	my $alias = "worker" . $function_id;

	$self->{_queue}->add(
		id => $function_id,
		function => sub {
			Netilla::CONDA::Component::Worker->new(
				alias => $alias,
				debug => 0,
				prog => $function,
				%params
			);
		},
		name => $funcname,
		locks => $locks,
	);

	$kernel->yield('execute_next');

	return;
}


sub poe_execute_next {
	print_log(REQUEST, "poe_execute_next called.\n");

	$_[OBJECT]->{_queue}->execute();

	return;
}


sub poe_got_blocking_response {
	print_log(REQUEST, "poe_got_blocking_response called.\n");

	my ($self, $kernel, %params) = @_[OBJECT, KERNEL, ARG0 .. $#_ ];

	my ($function_id, $response, $result, $fileserver, $token, $replicate, $repl_params) =
		@params{qw(function_id response result fileserver token replicate repl_params)};

	$self->{_queue}->job_finished($function_id);

	if ($fileserver && $result->success) {
		my $server = Netilla::CONDA::Component::FileServer->new(
				alias => "server" . $function_id,
				token => $token,
				file => $result->result,
		);

		if (defined($server)) {
			$result->result($server->port);
		} else {
			$result->success(0);
			$result->error(
				"CONDA error: could not spawn file server"
			);
		}
	}

	if ($replicate && $result->success) {
		$$repl_params{token} = $token;

		$kernel->post('replicator', 'replicate', $repl_params);
	}

	$response->($result);

	$kernel->yield('execute_next');
}


sub poe_got_response {
	print_log(REQUEST, "poe_got_response called.\n");

	my ($self, $kernel, %params) = @_[OBJECT, KERNEL, ARG0 .. $#_ ];

	my ($token, $req_id, $result, $function_id, $fileserver) =
		@params{qw(token req_id result function_id fileserver)};

	$self->{_queue}->job_finished($function_id);

	if ($fileserver && $result->success) {
		my $server = Netilla::CONDA::Component::FileServer->new(
				alias => "server" . $function_id,
				token => $token,
				file => $result->result,
		);

		if (defined($server)) {
			$result->result($server->port);
		} else {
			$result->success(0);
			$result->error(
				"CONDA error: could not spawn file server"
			);
		}
	}

	$self->{_results}->add(
		result => $result,
		token => $token,
		id => $req_id,
	);

	$kernel->yield('execute_next');
}


sub poe_get_result {
	print_log(REQUEST, "poe_get_result called.\n");

	my ($self, %params) = @_[OBJECT, ARG0 .. $#_ ];

	my ($token, $args, $response) = @params{qw(token args response)};
	my %args = @$args;
	my $req_id = $args{req_id};

	my $result = $self->{_results}->get(id => $req_id, token => $token);

	unless (defined($result)) {
		$result = Netilla::CONDA::Response->new(
			success => 0,
			error => "Result with ID $req_id does not exist",
		);
	}

	&$response($result);
}


sub poe_shutdown {
	print_log(REQUEST|SYSTEM, __PACKAGE__, '(', $_[SESSION]->ID, '):',
		' shutting down...', "\n");

	$_[HEAP]->{shutdown} = 1;
}

sub poe_TERM {
	print_log(REQUEST, "got TERM signal...\n");
	$_[KERNEL]->yield('shutdown');
}
sub poe_TTIN {
	print_log(REQUEST, "got TTIN signal...\n");
	Netilla::CONDA::Component::Replicator->run(1);
}
sub poe_TTOUT {
	print_log(REQUEST, "got TTOU signal...\n");
	Netilla::CONDA::Component::Replicator->run(0);
}



sub poe_signal {
	my $signal = $_[ARG0];

	print_log(REQUEST, "got signal $signal...\n");

	if ($signal eq "TERM") { $_[KERNEL]->yield('shutdown') }
	elsif ($signal eq "USR1") { log_level_up() }
	elsif ($signal eq "USR2") { log_level_down() }
	elsif ($signal eq "TTIN") { Netilla::CONDA::Component::Replicator->run(1) }
	elsif ($signal eq "TTOU") { Netilla::CONDA::Component::Replicator->run(0) }
}


sub poe_soap_error {
        my ($syscall_name, $error_number, $error_string) = @_[ARG0 .. ARG2 ];

        warn(
                "poe_soap_error: syscall_name $syscall_name, ",
                "error_number $error_number, error_string $error_string"
        ) if $_[OBJECT]->debug;

        # "Address already in use" error
        if ($syscall_name eq 'bind' and $error_number == 98) {
                $_[OBJECT]->soap_server_port_in_use($_[HEAP]->{alias});
        }

        delete $_[HEAP]->{listener};
        $_[KERNEL]->post($_[SESSION], 'shutdown');
}


sub soap_server_port_in_use {
	my (undef, $alias) = @_;
	warn("$alias port is in use");
}


sub is_exception {
	UNIVERSAL::isa($_[0], 'Exception::Class::Base');
}
1;
