On Thu, Mar 26, 2009 at 03:22:59PM +0100, Jaroslav Hajek wrote:
> On Thu, Mar 26, 2009 at 3:00 PM, Jaroslav Hajek <[email protected]> wrote:
> > 2009/3/26 Olaf Till <[email protected]>:
> >> On Thu, Mar 26, 2009 at 09:21:38AM +0100, Olaf Till wrote:
> >>>
> >>> ...
> >>>
> >>> Looking at the parcellfun stuff, I noticed that there are still more
> >>> functions for sending and receiving Octave variables in octave-forge:
> >>> fsave and fload. Some time ago there was a discussion in the
> >>> octave-maintainers list on save'ing and load'ing over streams. After
> >>> getting some hints, I posted psend and prcv, which can install/read
> >>> Octave variables to/from memory as well as directly returning/taking
> >>> their contents, and which read/write a binary header also (to care for
> >>> endian'ness, as I understood), and which distinguish eof at start of
> >>> reading from eof later. The discussion dried out, probably the
> >>> solution was not general enough and not obviously the right one for
> >>> Octave. For the now contributed code, the functionality of fsave and
> >>> fload would probably be sufficient, if only endian'ness was cared for
> >>> by writing/reading a header (varibles are sent between different
> >>> machines now). If this would be done, I could rewrite my code, and
> >>> then there is probably no reason to keep psend and prcv in the
> >>> package.
> >>
> >> Jaroslav,
> >>
> >> would you accept the attached small patches for fsave.cc and fload.cc,
> >> to make them robust for usage between different machines?
> >>
> >
> > I actually didn't intend fsave and fload to work across machines
> > (parcellfun uses them just for pipes). But I'm not against it. I want,
> > however, to avoid the overhead of sending the binary header with each
> > variable.
> >
> > I see several options:
> > 1. use explicit arch specs (like fwrite, fread)
> > 2. use explicit option to control whether headers are used (but then
> > you can as well use different functions)
> > 3. cache the stream pointer in both fload/fsave and only write the
> > header on first save / attempt to read it on first load. This is
> > possible because octave_stream has reference counting, ensuring a
> > cached object remains valid.
> > The price to pay is one dangling closed file (but no memory leaks),
> > which is easily acceptable.
> >
> 
> Umm, I just realized this is not really that easy if you
> simultaneously use more streams. So the question is still open. I
> don't see any reasonably simple way to achieve it, other than
> maintaining a map of open stream numbers which already received the
> header.
> Specifying architecture explicitly is another option - or just leaving
> everything in the present state.

Well, I didn't want to make things more difficult. But I thought there
may be more people wanting to save/load to/from streams, and something
general for all would be good. The names "fsave" and "fload" sound
very general. But maybe this is not so easy and we should use
different functions for different purposes.

select.cc: current code is attached.

_exit.cc: also attached, but maybe you expect too much, its only a
trivial one-liner.

functions for advisory locking: I think I postpone rewriting these and
for the present include them in the package. Still think, apart from
using Octave streams, they should be more general to be a solution for
all.


The question is still open wether I should make scheduling remote
function execution a seperate package or combine it maybe with
"parallel". Actually I think that "parallel" would also be a suitable
place for parcellfun, since users will first look there for such
functionality.

Olaf
// Copyright (C) 2008 Olaf Till <[email protected]>

// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA


#include <octave/oct.h>
#include <octave/oct-map.h>
#include <octave/parse.h>
#include <octave/sysdep.h>

#include <unistd.h>

DEFUN_DLD (_exit, args, , "") {

  _exit (args(0).int_value ());

}
// Copyright (C) 2007, 2009 Olaf Till <[email protected]>

// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

#include <octave/oct.h>
#include <octave/oct-stream.h>

#ifdef POSIX
#include <sys/select.h>
#else
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#endif
#include <errno.h>
#include <map>

DEFUN_DLD (select, args, nargout, 
  "-*- texinfo -*-\n\
@deftypefn {Loadable Function} {...@var{n}, @var{ridx}, @var{widx}, @var{eidx}] =} select (@var{read_fids}, @var{write_fids}, @var{except_fids}, @var{timeout}[, @var{nfds}])\n\
Calls Unix @code{select}, see the respective manual.\n\
The following interface was chosen:\n\
@var{read_fids}, @var{write_fids}, @var{except_fids}: vectors of stream-ids.\n\
@var{timeout}: seconds, negative for infinite.\n\
@var{nfds}: optional, default is Unix' FD_SETSIZE (platform specific).\n\
An error is returned if nfds or a filedescriptor belonging to a stream-id\n\
plus one exceeds FD_SETSIZE.\n\
Return values are:\n\
@var{n}: number of ready streams.\n\
@var{ridx}, @var{widx}, @var{eidx}: index vectors of ready streams in\n\
@var{read_fids}, @var{write_fids}, and @var{except_fids}, respectively.\n\
@end deftypefn")
{
	/* This routine assumes that Octaves stream-id and the
	   corresponding filedescriptor of the system are the same
	   number. This should be the case in Octave >= 3.0. */
	octave_value_list retval;
	int nargin = args.length ();
	int i, fd, fid, nfds, n, act, argc;
	double argtout, *fvec;
	timeval tout;
	timeval *timeout = &tout;
	ColumnVector read_fids, write_fids, except_fids;

	if (nargin == 4)
		nfds = FD_SETSIZE;
	else if (nargin == 5) {
		if (! args(4).is_real_scalar ()) {
			error ("select: 'nfds' must be a real scalar.\n");
			return retval;
		}
		nfds = args(4).int_value ();
		if (nfds <= 0) {
			error ("select: 'nfds' should be greater than zero.\n");
			return retval;
		}
		if (nfds > FD_SETSIZE) {
			error ("select: 'nfds' exceeds systems maximum given by FD_SETSIZE.\n");
			return retval;
		}
	}
	else {
		error ("select: four or five arguments required.\n");
		return retval;
	}
	if (! args(3).is_real_scalar ()) {
		error ("select: 'timeout' must be a real scalar.\n");
		return retval;
	}
	if ((argtout = args(3).double_value ()) < 0)
		timeout = NULL;
	else {
		double ipart, fpart;
		fpart = modf (argtout, &ipart);
		tout.tv_sec = lrint (ipart);
		tout.tv_usec = lrint (fpart * 1000);
	}
	for (argc = 0; argc < 3; argc++) {
		if (! args(argc).is_empty ()) {
			if (! args(argc).is_real_type ()) {
				error ("select: first three arguments must be real vectors.\n");
				return retval;
			}
			if (args(argc).rows () != 1 &&
			    args(argc).columns () != 1) {
				error ("select: first three arguments must be real vectors.\n");
				return retval;
			}
			switch (argc) {
			case 0:	read_fids = ColumnVector
					(args(argc).vector_value ()); break;
			case 1: write_fids = ColumnVector
					(args(argc).vector_value ()); break;
			case 2: except_fids = ColumnVector
					(args(argc).vector_value ()); break;
			}
		}
	}

	fd_set rfds, wfds, efds;
	FD_ZERO (&rfds); FD_ZERO (&wfds); FD_ZERO (&efds);
	for (i = 0; i < read_fids.length (); i++) {
		fid = lrint (read_fids(i));
		fd = octave_stream_list::lookup (fid, "select") . 
			file_number ();
		if (error_state || fd < 0) {
			error ("select: invalid file-id");
			return retval;
		}
		if (fid >= FD_SETSIZE) {
			error ("select: filedescriptor >= FD_SETSIZE");
			return retval;
		}
		FD_SET (fid, &rfds);
	}
	for (i = 0; i < write_fids.length (); i++) {
		fid = lrint (write_fids(i));
		fd = octave_stream_list::lookup (fid, "select") . 
			file_number ();
		if (error_state || fd < 0) {
			error ("select: invalid file-id");
			return retval;
		}
		if (fid >= FD_SETSIZE) {
			error ("select: filedescriptor >= FD_SETSIZE");
			return retval;
		}
		FD_SET (fid, &wfds);
	}
	for (i = 0; i < except_fids.length (); i++) {
		fid = lrint (except_fids(i));
		fd = octave_stream_list::lookup (fid, "select") . 
			file_number ();
		if (error_state || fd < 0) {
			error ("select: invalid file-id");
			return retval;
		}
		if (fid >= FD_SETSIZE) {
			error ("select: filedescriptor >= FD_SETSIZE");
			return retval;
		}
		FD_SET (fid, &efds);
	}

	if ((n = select (nfds, &rfds, &wfds, &efds, timeout)) == -1) {
		std::string err;
		switch (errno) {
		case EBADF:
			err = "EBADF";
			break;
		case EINTR:
			err = "EINTR";
			break;
		case EINVAL:
			err = "EINVAL";
			break;
		default:
			err = "unknown error";
		}
		error ("select: unix select returned error: %s\n",
		       err.c_str ());
		return retval;
	}
	if (nargout > 3) {
		for (i = 0, act = 0; i < except_fids.length (); i++)
			if (FD_ISSET (lrint (except_fids(i)), &efds)) act++;
		RowVector eidx = RowVector (act);
		for (i = 0, fvec = eidx.fortran_vec ();
		     i < except_fids.length (); i++)
			if (FD_ISSET (lrint (except_fids(i)), &efds)) {
				*fvec = double (i + 1);
				fvec++;
			}
		retval(3) = eidx;
	}
	if (nargout > 2) {
		for (i = 0, act = 0; i < write_fids.length (); i++)
			if (FD_ISSET (lrint (write_fids(i)), &wfds)) act++;
		RowVector widx = RowVector (act);
		for (i = 0, fvec = widx.fortran_vec ();
		     i < write_fids.length (); i++)
			if (FD_ISSET (lrint (write_fids(i)), &wfds)) {
				*fvec = double (i + 1);
				fvec++;
			}
		retval(2) = widx;
	}
	if (nargout > 1) {
		for (i = 0, act = 0; i < read_fids.length (); i++)
			if (FD_ISSET (lrint (read_fids(i)), &rfds)) act++;
		RowVector ridx = RowVector (act);
		for (i = 0, fvec = ridx.fortran_vec ();
		     i < read_fids.length (); i++)
			if (FD_ISSET (lrint (read_fids(i)), &rfds)) {
				*fvec = double (i + 1);
				fvec++;
			}
		retval(1) = ridx;
	}

	retval(0) = n;

	return retval;
}
------------------------------------------------------------------------------
_______________________________________________
Octave-dev mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/octave-dev

Reply via email to