Hello there!
We got a situation when we run srun in a batch where we need SPANK
environment to get some plugins to work not only on first node (where
slurmd got that batch job) but on every node that is allocated for the
job. There is no possible a way to get that SPANK environment for now
since srun started by slurmstepd on the node never can get any of that
environment. To resolve the issue I wrote a simple RPC implementation
of two calls: REQUEST_SPANK_ENVIRONMENT and RESPONCE_SPANK_ENVIRONMENT,
with appropriate handling of sending, receiving and freeing messages of
that RPC type. To use the RPC plugin have (as usual) to call function
slurm_send_recv_controller_msg() with appropriate data. Implementation
patch is in attachment. Hope to see it in some next version of SLURM.
Andriy.diff -udpr slurm-2.2.1/src/common/slurm_protocol_defs.c slurm-2.2.1.new/src/common/slurm_protocol_defs.c
--- slurm-2.2.1/src/common/slurm_protocol_defs.c 2010-12-09 22:02:23.000000000 +0200
+++ slurm-2.2.1.new/src/common/slurm_protocol_defs.c 2011-03-14 16:36:52.000000000 +0200
@@ -834,6 +834,21 @@ inline void slurm_free_suspend_msg(suspe
xfree(msg);
}
+void slurm_free_spank_env_request_msg(spank_env_request_msg_t *msg)
+{
+ xfree(msg);
+}
+
+void slurm_free_spank_env_responce_msg(spank_env_responce_msg_t *msg)
+{
+ uint32_t i;
+
+ for (i = 0; i < msg->spank_job_env_size; i++)
+ xfree(msg->spank_job_env[i]);
+ xfree(msg->spank_job_env);
+ xfree(msg);
+}
+
/* Given a job's reason for waiting, return a descriptive string */
extern char *job_reason_string(enum job_state_reason inx)
{
@@ -2285,6 +2300,13 @@ extern int slurm_free_msg_data(slurm_msg
break;
case REQUEST_UPDATE_JOB_STEP:
slurm_free_update_step_msg(data);
+ break;
+ case REQUEST_SPANK_ENVIRONMENT:
+ slurm_free_spank_env_request_msg(data);
+ break;
+ case RESPONCE_SPANK_ENVIRONMENT:
+ slurm_free_spank_env_responce_msg(data);
+ break;
default:
error("invalid type trying to be freed %u", type);
break;
diff -udpr slurm-2.2.1/src/common/slurm_protocol_defs.h slurm-2.2.1.new/src/common/slurm_protocol_defs.h
--- slurm-2.2.1/src/common/slurm_protocol_defs.h 2010-11-09 18:23:40.000000000 +0200
+++ slurm-2.2.1.new/src/common/slurm_protocol_defs.h 2011-03-14 16:20:57.000000000 +0200
@@ -210,6 +210,8 @@ typedef enum {
REQUEST_TOPO_INFO,
RESPONSE_TOPO_INFO,
REQUEST_TRIGGER_PULL,
+ REQUEST_SPANK_ENVIRONMENT,
+ RESPONCE_SPANK_ENVIRONMENT,
REQUEST_UPDATE_JOB = 3001,
REQUEST_UPDATE_NODE,
@@ -908,6 +910,15 @@ typedef struct {
uint16_t rpc_version;
} accounting_update_msg_t;
+typedef struct {
+ uint32_t job_id; /* ID of job of request */
+} spank_env_request_msg_t;
+
+typedef struct {
+ uint32_t spank_job_env_size;
+ char **spank_job_env; /* spank environment */
+} spank_env_responce_msg_t;
+
typedef struct slurm_ctl_conf slurm_ctl_conf_info_msg_t;
/*****************************************************************************\
* SLURM MESSAGE INITIALIZATION
@@ -1057,6 +1068,8 @@ inline void slurm_free_block_info_reques
inline void slurm_free_job_notify_msg(job_notify_msg_t * msg);
inline void slurm_free_accounting_update_msg(accounting_update_msg_t *msg);
+void slurm_free_spank_env_request_msg(spank_env_request_msg_t *msg);
+void slurm_free_spank_env_responce_msg(spank_env_responce_msg_t *msg);
extern int slurm_free_msg_data(slurm_msg_type_t type, void *data);
extern uint32_t slurm_get_return_code(slurm_msg_type_t type, void *data);
diff -udpr slurm-2.2.1/src/common/slurm_protocol_pack.c slurm-2.2.1.new/src/common/slurm_protocol_pack.c
--- slurm-2.2.1/src/common/slurm_protocol_pack.c 2010-11-09 18:23:40.000000000 +0200
+++ slurm-2.2.1.new/src/common/slurm_protocol_pack.c 2011-03-14 16:34:04.000000000 +0200
@@ -565,6 +565,16 @@ static void _pack_update_job_step_msg(st
static int _unpack_update_job_step_msg(step_update_request_msg_t ** msg_ptr,
Buf buffer, uint16_t protocol_version);
+static void _pack_spank_env_request_msg(spank_env_request_msg_t * msg,
+ Buf buffer, uint16_t protocol_version);
+static int _unpack_spank_env_request_msg(spank_env_request_msg_t ** msg_ptr,
+ Buf buffer, uint16_t protocol_version);
+
+static void _pack_spank_env_responce_msg(spank_env_responce_msg_t * msg,
+ Buf buffer, uint16_t protocol_version);
+static int _unpack_spank_env_responce_msg(spank_env_responce_msg_t ** msg_ptr,
+ Buf buffer, uint16_t protocol_version);
+
/* pack_header
* packs a slurm protocol header that proceeds every slurm message
* IN header - the header structure to pack
@@ -1096,6 +1106,16 @@ pack_msg(slurm_msg_t const *msg, Buf buf
(job_sbcast_cred_msg_t *)msg->data, buffer,
msg->protocol_version);
break;
+ case REQUEST_SPANK_ENVIRONMENT:
+ _pack_spank_env_request_msg(
+ (spank_env_request_msg_t *)msg->data, buffer,
+ msg->protocol_version);
+ break;
+ case RESPONCE_SPANK_ENVIRONMENT:
+ _pack_spank_env_responce_msg(
+ (spank_env_responce_msg_t *)msg->data, buffer,
+ msg->protocol_version);
+ break;
default:
debug("No pack method for msg type %u", msg->msg_type);
return EINVAL;
@@ -1607,6 +1627,16 @@ unpack_msg(slurm_msg_t * msg, Buf buffer
(job_sbcast_cred_msg_t **)&msg->data, buffer,
msg->protocol_version);
break;
+ case REQUEST_SPANK_ENVIRONMENT:
+ rc = _unpack_spank_env_request_msg(
+ (spank_env_request_msg_t **)&msg->data, buffer,
+ msg->protocol_version);
+ break;
+ case RESPONCE_SPANK_ENVIRONMENT:
+ rc = _unpack_spank_env_responce_msg(
+ (spank_env_responce_msg_t **)&msg->data, buffer,
+ msg->protocol_version);
+ break;
default:
debug("No unpack method for msg type %u", msg->msg_type);
return EINVAL;
@@ -8230,6 +8260,60 @@ unpack_error:
return SLURM_ERROR;
}
+static void _pack_spank_env_request_msg(spank_env_request_msg_t * msg,
+ Buf buffer, uint16_t protocol_version)
+{
+ xassert(msg != NULL);
+
+ pack32(msg->job_id, buffer);
+}
+
+static int _unpack_spank_env_request_msg(spank_env_request_msg_t ** msg_ptr,
+ Buf buffer, uint16_t protocol_version)
+{
+ spank_env_request_msg_t *msg;
+
+ xassert(msg_ptr != NULL);
+ msg = xmalloc(sizeof(spank_env_request_msg_t));
+ *msg_ptr = msg;
+
+ safe_unpack32(&msg->job_id, buffer);
+ return SLURM_SUCCESS;
+
+unpack_error:
+ slurm_free_spank_env_request_msg(msg);
+ *msg_ptr = NULL;
+ return SLURM_ERROR;
+}
+
+static void _pack_spank_env_responce_msg(spank_env_responce_msg_t * msg,
+ Buf buffer, uint16_t protocol_version)
+{
+ xassert(msg != NULL);
+
+ packstr_array(msg->spank_job_env, msg->spank_job_env_size, buffer);
+}
+
+static int _unpack_spank_env_responce_msg(spank_env_responce_msg_t ** msg_ptr,
+ Buf buffer, uint16_t protocol_version)
+{
+ spank_env_responce_msg_t *msg;
+
+ xassert(msg_ptr != NULL);
+ msg = xmalloc(sizeof(spank_env_responce_msg_t));
+ *msg_ptr = msg;
+
+ safe_unpackstr_array(&msg->spank_job_env, &msg->spank_job_env_size,
+ buffer);
+ return SLURM_SUCCESS;
+
+unpack_error:
+ slurm_free_spank_env_responce_msg(msg);
+ *msg_ptr = NULL;
+ return SLURM_ERROR;
+}
+
+
/* template
void pack_ ( * msg , Buf buffer )
{
diff -udpr slurm-2.2.1/src/slurmctld/proc_req.c slurm-2.2.1.new/src/slurmctld/proc_req.c
--- slurm-2.2.1/src/slurmctld/proc_req.c 2011-01-11 21:36:36.000000000 +0200
+++ slurm-2.2.1.new/src/slurmctld/proc_req.c 2011-03-14 16:43:35.000000000 +0200
@@ -149,6 +149,7 @@ inline static void _slurm_rpc_update_jo
inline static void _slurm_rpc_update_node(slurm_msg_t * msg);
inline static void _slurm_rpc_update_partition(slurm_msg_t * msg);
inline static void _slurm_rpc_update_block(slurm_msg_t * msg);
+inline static void _slurm_rpc_dump_spank(slurm_msg_t * msg);
inline static void _update_cred_key(void);
@@ -398,6 +399,10 @@ void slurmctld_req (slurm_msg_t * msg)
_slurm_rpc_get_topo(msg);
/* No body to free */
break;
+ case REQUEST_SPANK_ENVIRONMENT:
+ _slurm_rpc_dump_spank(msg);
+ slurm_free_spank_env_request_msg(msg->data);
+ break;
default:
error("invalid RPC msg_type=%d", msg->msg_type);
slurm_send_rc_msg(msg, EINVAL);
@@ -3829,3 +3834,54 @@ inline static void _slurm_rpc_accountin
END_TIMER2("_slurm_rpc_accounting_first_reg");
}
+
+inline static void _slurm_rpc_dump_spank(slurm_msg_t * msg)
+{
+ int rc = SLURM_SUCCESS;
+ spank_env_request_msg_t *spank_req_msg = (spank_env_request_msg_t *)msg->data;
+ spank_env_responce_msg_t *spank_resp_msg;
+ /* Locks: read job */
+ slurmctld_lock_t job_read_lock = {
+ NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
+ uid_t uid = g_slurm_auth_get_uid(msg->auth_cred, NULL);
+ slurm_msg_t response_msg;
+ DEF_TIMERS;
+
+ START_TIMER;
+ debug("Processing RPC: REQUEST_SPANK_ENVIRONMENT from uid=%d", uid);
+ if (!validate_slurm_user(uid)) {
+ rc = ESLURM_USER_ID_MISSING;
+ error("Security violation, REQUEST_SPANK_ENVIRONMENT RPC from uid=%d",
+ uid);
+ }
+
+ spank_resp_msg = xmalloc(sizeof(spank_env_responce_msg_t));
+ if (rc == SLURM_SUCCESS) {
+ /* do RPC call */
+ struct job_record *job_ptr;
+ uint32_t i;
+
+ lock_slurmctld(job_read_lock);
+ job_ptr = find_job_record(spank_req_msg->job_id);
+ if (job_ptr) {
+ spank_resp_msg->spank_job_env_size = job_ptr->spank_job_env_size;
+ spank_resp_msg->spank_job_env = xmalloc(
+ spank_resp_msg->spank_job_env_size * sizeof(char *));
+ for (i = 0; i < spank_resp_msg->spank_job_env_size; i++)
+ spank_resp_msg->spank_job_env[i] = xstrdup(
+ job_ptr->spank_job_env[i]);
+ } else
+ rc = ESLURM_INVALID_JOB_ID;
+ unlock_slurmctld(job_read_lock);
+ }
+ END_TIMER2("_slurm_rpc_dump_spank");
+
+ slurm_msg_t_init(&response_msg);
+ response_msg.flags = msg->flags;
+ response_msg.protocol_version = msg->protocol_version;
+ response_msg.address = msg->address;
+ response_msg.msg_type = RESPONCE_SPANK_ENVIRONMENT;
+ response_msg.data = spank_resp_msg;
+ slurm_send_node_msg(msg->conn_fd, &response_msg);
+ slurm_free_spank_env_responce_msg(spank_resp_msg);
+}