This is an automated email from the ASF dual-hosted git repository. qkou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git
The following commit(s) were added to refs/heads/master by this push: new 5976973 [R] RNN bucketing with multiple devices. (#7315) 5976973 is described below commit 59769736a402b834b5d53bcdaae895ac4a069e12 Author: Qiang Kou (KK) <q...@qkou.info> AuthorDate: Wed Aug 2 21:51:34 2017 +0000 [R] RNN bucketing with multiple devices. (#7315) --- example/rnn/bucket_R/aclImdb_lstm_classification.R | 55 +++----- example/rnn/bucket_R/mx.io.bucket.iter.R | 6 +- example/rnn/bucket_R/rnn.R | 44 ++++--- example/rnn/bucket_R/rnn.train.R | 140 +++++++++++++++++---- 4 files changed, 157 insertions(+), 88 deletions(-) diff --git a/example/rnn/bucket_R/aclImdb_lstm_classification.R b/example/rnn/bucket_R/aclImdb_lstm_classification.R index aaa6d38..bb5eaac 100644 --- a/example/rnn/bucket_R/aclImdb_lstm_classification.R +++ b/example/rnn/bucket_R/aclImdb_lstm_classification.R @@ -11,51 +11,30 @@ vocab <- length(corpus_bucketed_test$dic) ### Create iterators batch.size <- 64 -train.data <- mx.io.bucket.iter(buckets = corpus_bucketed_train$buckets, - batch.size = batch.size, - data.mask.element = 0, - shuffle = TRUE) +num.round <- 16 -eval.data <- mx.io.bucket.iter(buckets = corpus_bucketed_test$buckets, - batch.size = batch.size, - data.mask.element = 0, - shuffle = FALSE) +train.data <- mx.io.bucket.iter(buckets = corpus_bucketed_train$buckets, batch.size = batch.size, + data.mask.element = 0, shuffle = TRUE) + +eval.data <- mx.io.bucket.iter(buckets = corpus_bucketed_test$buckets, batch.size = batch.size, + data.mask.element = 0, shuffle = FALSE) mx.set.seed(0) +optimizer <- mx.opt.create("adadelta", rho = 0.92, epsilon = 1e-06, wd = 2e-04, clip_gradient = NULL, + rescale.grad = 1/batch.size) + +model_sentiment_lstm <- mx.rnn.buckets(train.data = train.data, begin.round = 1, + num.round = num.round, ctx = mx.cpu(), metric = mx.metric.accuracy, optimizer = optimizer, + num.rnn.layer = 2, num.embed = 16, num.hidden = 24, num.label = 2, input.size = vocab, + initializer = mx.init.Xavier(rnd_type = "gaussian", factor_type = "in", magnitude = 2), + dropout = 0.25, config = "seq-to-one", batch.end.callback = mx.callback.log.train.metric(period = 50), + verbose = TRUE) -end.round <- 16 - -optimizer <- mx.opt.create("adadelta", - rho = 0.92, - epsilon = 1e-06, - wd = 2e-04, - clip_gradient = NULL, - rescale.grad = 1/batch.size) - -model_sentiment_lstm <- mx.rnn.buckets(train.data = train.data, - begin.round = 1, - end.round = end.round, - ctx = mx.cpu(), - metric = mx.metric.accuracy, - optimizer = optimizer, - num.rnn.layer = 2, - num.embed = 16, - num.hidden = 24, - num.label = 2, - input.size = vocab, - initializer = mx.init.Xavier(rnd_type = "gaussian", - factor_type = "in", - magnitude = 2), - dropout = 0.25, - config = "seq-to-one", - batch.end.callback = mx.callback.log.train.metric(period = 50), - verbose = TRUE) - -mx.model.save(model_sentiment_lstm, prefix = "model_sentiment_lstm", iteration = end.round) +mx.model.save(model_sentiment_lstm, prefix = "model_sentiment_lstm", iteration = num.round) source("rnn.infer.R") -model <- mx.model.load("model_sentiment_lstm", iteration = end.round) +model <- mx.model.load("model_sentiment_lstm", iteration = num.round) pred <- mx.rnn.infer.buckets(infer_iter = eval.data, model, "seq-to-one", ctx = mx.cpu()) diff --git a/example/rnn/bucket_R/mx.io.bucket.iter.R b/example/rnn/bucket_R/mx.io.bucket.iter.R index 887247a..61f8795 100644 --- a/example/rnn/bucket_R/mx.io.bucket.iter.R +++ b/example/rnn/bucket_R/mx.io.bucket.iter.R @@ -64,16 +64,14 @@ BucketIter <- setRefClass("BucketIter", fields = c("buckets", "bucket.names", "b # to appropriate sequence length) idx <- (.self$bucketID - 1) * (.self$batch.size) + (1:batch.size) data <- .self$buckets[[names(.self$bucketID)]]$data[, idx, drop = F] - data_mask <- as.integer(names(.self$bucketID)) - apply(data == .self$data.mask.element, - 2, sum) data_mask_array <- (!data == 0) if (length(dim(.self$buckets[[names(.self$bucketID)]]$label)) == 0) { label <- .self$buckets[[names(.self$bucketID)]]$label[idx] } else { label <- .self$buckets[[names(.self$bucketID)]]$label[, idx, drop = F] } - return(list(data = mx.nd.array(data), label = mx.nd.array(label), data.mask = mx.nd.array(data_mask), - data.mask.array = mx.nd.array(data_mask_array))) + return(list(data = mx.nd.array(data), data.mask.array = mx.nd.array(data_mask_array), + label = mx.nd.array(label))) }, finalize = function() { })) diff --git a/example/rnn/bucket_R/rnn.R b/example/rnn/bucket_R/rnn.R index f55272f..ea02b95 100644 --- a/example/rnn/bucket_R/rnn.R +++ b/example/rnn/bucket_R/rnn.R @@ -33,7 +33,6 @@ rnn.unroll <- function(num.rnn.layer, seq.len, input.size, num.embed, num.hidden # embeding layer label <- mx.symbol.Variable("label") data <- mx.symbol.Variable("data") - data_mask <- mx.symbol.Variable("data.mask") data_mask_array <- mx.symbol.Variable("data.mask.array") data_mask_array <- mx.symbol.stop_gradient(data_mask_array, name = "data.mask.array") @@ -112,8 +111,8 @@ rnn.unroll <- function(num.rnn.layer, seq.len, input.size, num.embed, num.hidden mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidden, num.embed, num.label, input.size, ctx = NULL, num.round = 1, initializer = mx.init.uniform(0.01), dropout = 0, config = "one-to-one", optimizer = "sgd", batch.end.callback = NULL, - epoch.end.callback = NULL, begin.round = 1, end.round = 1, metric = mx.metric.rmse, - cell.type = "lstm", verbose = FALSE) { + epoch.end.callback = NULL, begin.round = 1, metric = mx.metric.rmse, cell.type = "lstm", + kvstore = "local", verbose = FALSE) { if (!train.data$iter.next()) { train.data$reset() @@ -131,8 +130,11 @@ mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidd if (is.null(ctx)) ctx <- mx.ctx.default() - if (!is.mx.context(ctx)) - stop("ctx must be mx.context") + if (is.mx.context(ctx)) { + ctx <- list(ctx) + } + if (!is.list(ctx)) + stop("ctx must be mx.context or list of mx.context") if (is.character(optimizer)) { if (is.numeric(input.shape)) { ndim <- length(input.shape) @@ -155,17 +157,28 @@ mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidd symbol <- sym_list[[names(train.data$bucketID)]] arg.names <- symbol$arguments - input.shape <- lapply(train.data$value(), dim) - input.shape <- input.shape[names(input.shape) %in% arg.names] + input.names <- c("data", "data.mask.array") + input.shape <- sapply(input.names, function(n) { + dim(train.data$value()[[n]]) + }, simplify = FALSE) + output.names <- "label" + output.shape <- sapply(output.names, function(n) { + dim(train.data$value()[[n]]) + }, simplify = FALSE) + + params <- mx.model.init.params(symbol, input.shape, output.shape, initializer, + mx.cpu()) - params <- mx.model.init.params(symbol, input.shape, NULL, initializer, mx.cpu()) + kvstore <- mxnet:::mx.model.create.kvstore(kvstore, params$arg.params, length(ctx), + verbose = verbose) ### Execute training - rnn.model.R model <- mx.model.train.rnn.buckets(sym_list = sym_list, input.shape = input.shape, - arg.params = params$arg.params, aux.params = params$aux.params, optimizer = optimizer, - train.data = train.data, eval.data = eval.data, verbose = verbose, begin.round = begin.round, - end.round = end.round, metric = metric, ctx = ctx, batch.end.callback = batch.end.callback, - epoch.end.callback = epoch.end.callback) + output.shape = output.shape, arg.params = params$arg.params, aux.params = params$aux.params, + optimizer = optimizer, train.data = train.data, eval.data = eval.data, verbose = verbose, + begin.round = begin.round, end.round = num.round, metric = metric, ctx = ctx, + batch.end.callback = batch.end.callback, epoch.end.callback = epoch.end.callback, + kvstore = kvstore) return(model) } @@ -193,10 +206,3 @@ mx.model.check.arguments <- function(symbol) { } return(c(data, label)) } - -# filter out null, keep the names -mx.util.filter.null <- function(lst) { - lst[!sapply(lst, is.null)] -} - - diff --git a/example/rnn/bucket_R/rnn.train.R b/example/rnn/bucket_R/rnn.train.R index 962430c..b833b2b 100644 --- a/example/rnn/bucket_R/rnn.train.R +++ b/example/rnn/bucket_R/rnn.train.R @@ -4,30 +4,57 @@ source("rnn.R") # Internal function to do multiple device training on RNN mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, input.shape, - begin.round, end.round, optimizer, train.data, eval.data, metric, epoch.end.callback, - batch.end.callback, verbose = TRUE) { + output.shape, begin.round, end.round, optimizer, train.data, eval.data, metric, + epoch.end.callback, batch.end.callback, kvstore, verbose = TRUE) { symbol <- sym_list[[names(train.data$bucketID)]] input.names <- names(input.shape) + output.names <- names(output.shape) arg.names <- names(arg.params) + ndevice <- length(ctx) + if (verbose) + message(paste0("Start training with ", ndevice, " devices")) + input_slice <- mxnet:::mx.model.slice.shape(input.shape, ndevice) + output_slice <- mxnet:::mx.model.slice.shape(output.shape, ndevice) + + # Grad request grad_req <- rep("write", length(symbol$arguments)) + # grad_null_idx <- match(c(input.names, output.names), symbol$arguments) grad_null_idx <- match(input.names, symbol$arguments) grad_req[grad_null_idx] <- "null" # Arg array order - update_names <- c(input.names, arg.names) + update_names <- c(input.names, output.names, arg.names) arg_update_idx <- match(symbol$arguments, update_names) - s <- sapply(input.shape, function(shape) { - mx.nd.zeros(shape = shape, ctx = mx.cpu()) + train.execs <- lapply(1:ndevice, function(i) { + s <- sapply(append(input_slice[[i]]$shape, output_slice[[i]]$shape), function(shape) { + mx.nd.zeros(shape = shape, ctx = mx.cpu()) + }) + mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, arg.params)[arg_update_idx], + aux.arrays = aux.params, ctx = mx.cpu(), grad.req = grad_req) }) - train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, arg.params)[arg_update_idx], - aux.arrays = aux.params, ctx = ctx, grad.req = grad_req) + # KVStore related stuffs + params.index <- as.integer(mxnet:::mx.util.filter.null(lapply(1:length(train.execs[[1]]$ref.grad.arrays), + function(k) { + if (!is.null(train.execs[[1]]$ref.grad.arrays[[k]])) k else NULL + }))) + update.on.kvstore <- FALSE + if (!is.null(kvstore) && kvstore$update.on.kvstore) { + update.on.kvstore <- TRUE + kvstore$set.optimizer(optimizer) + } else { + updaters <- lapply(1:ndevice, function(i) { + mx.opt.get.updater(optimizer, train.execs[[i]]$ref.arg.arrays) + }) + } - updaters <- mx.opt.get.updater(optimizer, train.exec$ref.arg.arrays) + if (!is.null(kvstore)) { + kvstore$init(params.index, train.execs[[1]]$ref.arg.arrays[params.index]) + } for (iteration in begin.round:end.round) { nbatch <- 0 @@ -36,26 +63,67 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in } train.data$reset() while (train.data$iter.next()) { - dlist <- train.data$value()[input.names] + dlist <- train.data$value() #[input.names] symbol <- sym_list[[names(train.data$bucketID)]] + slices <- lapply(1:ndevice, function(i) { + s <- input_slice[[i]] + ret <- sapply(names(dlist), function(n) { + mxnet:::mx.nd.slice(dlist[[n]], s$begin, s$end) + }) + return(ret) + }) - train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(dlist, - train.exec$arg.arrays[arg.names])[arg_update_idx], aux.arrays = train.exec$aux.arrays, - ctx = ctx, grad.req = grad_req) + train.execs <- lapply(1:ndevice, function(i) { + s <- slices[[i]] + mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, train.execs[[i]]$arg.arrays[arg.names])[arg_update_idx], + aux.arrays = train.execs[[i]]$aux.arrays, ctx = ctx[[i]], grad.req = grad_req) + }) - mx.exec.forward(train.exec, is.train = TRUE) + for (texec in train.execs) { + mx.exec.forward(texec, is.train = TRUE) + } - # copy outputs to CPU - out.preds <- mx.nd.copyto(train.exec$ref.outputs[[1]], mx.cpu()) + out.preds <- lapply(train.execs, function(texec) { + mx.nd.copyto(texec$ref.outputs[[1]], mx.cpu()) + }) - mx.exec.backward(train.exec) + for (texec in train.execs) { + mx.exec.backward(texec) + } - arg.blocks <- updaters(train.exec$ref.arg.arrays, train.exec$ref.grad.arrays) - mx.exec.update.arg.arrays(train.exec, arg.blocks, skip.null = TRUE) + if (!is.null(kvstore)) { + # push the gradient + kvstore$push(params.index, lapply(train.execs, function(texec) { + texec$ref.grad.arrays[params.index] + }), -params.index) + } + if (update.on.kvstore) { + # pull back weight + kvstore$pull(params.index, lapply(train.execs, function(texec) { + texec$ref.arg.arrays[params.index] + }), -params.index) + } else { + # pull back gradient sums + if (!is.null(kvstore)) { + kvstore$pull(params.index, lapply(train.execs, function(texec) { + texec$ref.grad.arrays[params.index] + }), -params.index) + } + arg.blocks <- lapply(1:ndevice, function(i) { + updaters[[i]](train.execs[[i]]$ref.arg.arrays, train.execs[[i]]$ref.grad.arrays) + }) + for (i in 1:ndevice) { + mx.exec.update.arg.arrays(train.execs[[i]], arg.blocks[[i]], skip.null = TRUE) + } + } # Update the evaluation metrics if (!is.null(metric)) { - train.metric <- metric$update(dlist$label, out.preds, train.metric) + # train.metric <- metric$update(dlist$label, out.preds, train.metric) + for (i in 1:ndevice) { + train.metric <- metric$update(slices[[i]][[length(slices[[i]])]], + out.preds[[i]], train.metric) + } } nbatch <- nbatch + 1 @@ -78,19 +146,37 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in eval.data$reset() while (eval.data$iter.next()) { # Get input data slice - dlist <- eval.data$value()[input.names] + dlist <- eval.data$value() #[input.names] symbol <- sym_list[[names(eval.data$bucketID)]] - train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(dlist, - train.exec$arg.arrays[arg.names])[arg_update_idx], aux.arrays = train.exec$aux.arrays, - ctx = ctx, grad.req = grad_req) + slices <- lapply(1:ndevice, function(i) { + s <- input_slice[[i]] + ret <- sapply(names(dlist), function(n) { + mxnet:::mx.nd.slice(dlist[[n]], s$begin, s$end) + }) + return(ret) + }) - mx.exec.forward(train.exec, is.train = FALSE) + + train.execs <- lapply(1:ndevice, function(i) { + s <- slices[[i]] + mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, train.execs[[i]]$arg.arrays[arg.names])[arg_update_idx], + aux.arrays = train.execs[[i]]$aux.arrays, ctx = ctx[[i]], grad.req = grad_req) + }) + + for (texec in train.execs) { + mx.exec.forward(texec, is.train = FALSE) + } # copy outputs to CPU - out.preds <- mx.nd.copyto(train.exec$ref.outputs[[1]], mx.cpu()) + out.preds <- lapply(train.execs, function(texec) { + mx.nd.copyto(texec$ref.outputs[[1]], mx.cpu()) + }) if (!is.null(metric)) { - eval.metric <- metric$update(dlist$label, out.preds, eval.metric) + for (i in 1:ndevice) { + eval.metric <- metric$update(slices[[i]][[length(slices[[i]])]], + out.preds[[i]], eval.metric) + } } } @@ -105,7 +191,7 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in eval.metric <- NULL } # get the model out - model <- mxnet:::mx.model.extract.model(symbol, list(train.exec)) + model <- mxnet:::mx.model.extract.model(symbol, train.execs) epoch_continue <- TRUE if (!is.null(epoch.end.callback)) { -- To stop receiving notification emails like this one, please contact ['"comm...@mxnet.apache.org" <comm...@mxnet.apache.org>'].