[GitHub] [systemds] sebwrede commented on a change in pull request #1141: [SYSTEMDS-2550] Batch scaling and weighing of imbalanced workers

2021-01-05 Thread GitBox


sebwrede commented on a change in pull request #1141:
URL: https://github.com/apache/systemds/pull/1141#discussion_r551816533



##
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
##
@@ -492,8 +523,8 @@ public FederatedResponse execute(ExecutionContext ec, 
Data... data) {
ParamservUtils.cleanupData(ec, 
Statement.PS_FEATURES);
ParamservUtils.cleanupData(ec, 
Statement.PS_LABELS);

ec.removeVariable(ec.getVariable(Statement.PS_FED_BATCHCOUNTER_VARID).toString());
-   if( LOG.isInfoEnabled() )
-   LOG.info("[+]" + " completed batch " + 
localBatchNum);
+   /*if( LOG.isInfoEnabled() )
+   LOG.info("[+]" + " completed batch " + 
localBatchNum);*/

Review comment:
   If this is no longer needed, it should be deleted. 

##
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
##
@@ -58,48 +61,68 @@
 public class FederatedPSControlThread extends PSWorker implements 
Callable {
private static final long serialVersionUID = 6846648059569648791L;
protected static final Log LOG = 
LogFactory.getLog(ParamServer.class.getName());
-   
-   Statement.PSRuntimeBalancing _runtimeBalancing;
+
FederatedData _featuresData;
FederatedData _labelsData;
final long _localStartBatchNumVarID;
final long _modelVarID;
-   int _numBatchesPerGlobalEpoch;
+
+   // runtime balancing
+   Statement.PSRuntimeBalancing _runtimeBalancing;
+   int _numBatchesPerEpoch;
int _possibleBatchesPerLocalEpoch;
+   boolean _weighing;
+   double _weighingFactor = 1;
boolean _cycleStartAt0 = false;
 
-   public FederatedPSControlThread(int workerID, String updFunc, 
Statement.PSFrequency freq, Statement.PSRuntimeBalancing runtimeBalancing, int 
epochs, long batchSize, int numBatchesPerGlobalEpoch, ExecutionContext ec, 
ParamServer ps) {
+   public FederatedPSControlThread(int workerID, String updFunc, 
Statement.PSFrequency freq,
+   
Statement.PSRuntimeBalancing runtimeBalancing, boolean weighing, int epochs, 
long batchSize,
+   int 
numBatchesPerGlobalEpoch, ExecutionContext ec, ParamServer ps) {
super(workerID, updFunc, freq, epochs, batchSize, ec, ps);
 
-   _numBatchesPerGlobalEpoch = numBatchesPerGlobalEpoch;
+   _numBatchesPerEpoch = numBatchesPerGlobalEpoch;
_runtimeBalancing = runtimeBalancing;
+   _weighing = weighing;
// generate the IDs for model and batch counter. These get 
overwritten on the federated worker each time
_localStartBatchNumVarID = FederationUtils.getNextFedDataID();
_modelVarID = FederationUtils.getNextFedDataID();
}
 
/**
 * Sets up the federated worker and control thread
+*
+* @param weighingFactor Gradients from this worker will be multiplied 
by this factor if weighing is enabled
 */
-   public void setup() {
+   public void setup(double weighingFactor) {
// prepare features and labels
_featuresData = (FederatedData) 
_features.getFedMapping().getMap().values().toArray()[0];
_labelsData = (FederatedData) 
_labels.getFedMapping().getMap().values().toArray()[0];
 
-   // calculate number of batches and get data size
+   // weighing factor is always set, but only used when weighing 
is specified
+   _weighingFactor = weighingFactor;
+
+   // different runtime balancing calculations
long dataSize = _features.getNumRows();
-   _possibleBatchesPerLocalEpoch = (int) Math.ceil((double) 
dataSize / _batchSize);
-   if(!(_runtimeBalancing == Statement.PSRuntimeBalancing.RUN_MIN 
-   || _runtimeBalancing == 
Statement.PSRuntimeBalancing.CYCLE_AVG 
-   || _runtimeBalancing == 
Statement.PSRuntimeBalancing.CYCLE_MAX)) {
-   _numBatchesPerGlobalEpoch = 
_possibleBatchesPerLocalEpoch;
+
+   // calculate scaled batch size if balancing via batch size.
+   // In some cases there will be some cycling
+   if(_runtimeBalancing == 
Statement.PSRuntimeBalancing.SCALE_BATCH) {
+   _batchSize = (int) Math.ceil((float) dataSize / 
_numBatchesPerEpoch);
}
 
-   if(_runtimeBalancing == 
Statement.PSRuntimeBalancing.SCALE_BATCH 
-   || _runtimeBalancing == 

[GitHub] [systemds] tobiasrieger commented on a change in pull request #1141: [SYSTEMDS-2550] Batch scaling and weighing of imbalanced workers

2021-01-05 Thread GitBox


tobiasrieger commented on a change in pull request #1141:
URL: https://github.com/apache/systemds/pull/1141#discussion_r551866215



##
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
##
@@ -58,48 +61,68 @@
 public class FederatedPSControlThread extends PSWorker implements 
Callable {
private static final long serialVersionUID = 6846648059569648791L;
protected static final Log LOG = 
LogFactory.getLog(ParamServer.class.getName());
-   
-   Statement.PSRuntimeBalancing _runtimeBalancing;
+
FederatedData _featuresData;
FederatedData _labelsData;
final long _localStartBatchNumVarID;
final long _modelVarID;
-   int _numBatchesPerGlobalEpoch;
+
+   // runtime balancing
+   Statement.PSRuntimeBalancing _runtimeBalancing;
+   int _numBatchesPerEpoch;
int _possibleBatchesPerLocalEpoch;
+   boolean _weighing;
+   double _weighingFactor = 1;
boolean _cycleStartAt0 = false;
 
-   public FederatedPSControlThread(int workerID, String updFunc, 
Statement.PSFrequency freq, Statement.PSRuntimeBalancing runtimeBalancing, int 
epochs, long batchSize, int numBatchesPerGlobalEpoch, ExecutionContext ec, 
ParamServer ps) {
+   public FederatedPSControlThread(int workerID, String updFunc, 
Statement.PSFrequency freq,
+   
Statement.PSRuntimeBalancing runtimeBalancing, boolean weighing, int epochs, 
long batchSize,
+   int 
numBatchesPerGlobalEpoch, ExecutionContext ec, ParamServer ps) {
super(workerID, updFunc, freq, epochs, batchSize, ec, ps);
 
-   _numBatchesPerGlobalEpoch = numBatchesPerGlobalEpoch;
+   _numBatchesPerEpoch = numBatchesPerGlobalEpoch;
_runtimeBalancing = runtimeBalancing;
+   _weighing = weighing;
// generate the IDs for model and batch counter. These get 
overwritten on the federated worker each time
_localStartBatchNumVarID = FederationUtils.getNextFedDataID();
_modelVarID = FederationUtils.getNextFedDataID();
}
 
/**
 * Sets up the federated worker and control thread
+*
+* @param weighingFactor Gradients from this worker will be multiplied 
by this factor if weighing is enabled
 */
-   public void setup() {
+   public void setup(double weighingFactor) {
// prepare features and labels
_featuresData = (FederatedData) 
_features.getFedMapping().getMap().values().toArray()[0];
_labelsData = (FederatedData) 
_labels.getFedMapping().getMap().values().toArray()[0];
 
-   // calculate number of batches and get data size
+   // weighing factor is always set, but only used when weighing 
is specified
+   _weighingFactor = weighingFactor;
+
+   // different runtime balancing calculations
long dataSize = _features.getNumRows();
-   _possibleBatchesPerLocalEpoch = (int) Math.ceil((double) 
dataSize / _batchSize);
-   if(!(_runtimeBalancing == Statement.PSRuntimeBalancing.RUN_MIN 
-   || _runtimeBalancing == 
Statement.PSRuntimeBalancing.CYCLE_AVG 
-   || _runtimeBalancing == 
Statement.PSRuntimeBalancing.CYCLE_MAX)) {
-   _numBatchesPerGlobalEpoch = 
_possibleBatchesPerLocalEpoch;
+
+   // calculate scaled batch size if balancing via batch size.
+   // In some cases there will be some cycling
+   if(_runtimeBalancing == 
Statement.PSRuntimeBalancing.SCALE_BATCH) {
+   _batchSize = (int) Math.ceil((float) dataSize / 
_numBatchesPerEpoch);
}
 
-   if(_runtimeBalancing == 
Statement.PSRuntimeBalancing.SCALE_BATCH 
-   || _runtimeBalancing == 
Statement.PSRuntimeBalancing.SCALE_BATCH_AND_WEIGH) {
-   throw new NotImplementedException();
+   // Calculate possible batches with batch size
+   _possibleBatchesPerLocalEpoch = (int) Math.ceil((double) 
dataSize / _batchSize);
+
+   // If no runtime balancing is specified, just run possible 
number of batches
+   // WARNING: Will get stuck on miss match
+   if(_runtimeBalancing == Statement.PSRuntimeBalancing.NONE) {
+   _numBatchesPerEpoch = _possibleBatchesPerLocalEpoch;

Review comment:
   Thanks for taking a look! A missmatch occurs when you specify a batch 
size and two workers have a different number of examples. For Example 
batchsize: 1 Worker 1: 20 and Worker 2: 30 examples. Without data partitioning 
or runtime balancing Worker 2 will wait for Worker 1 to complete batch 21 which 
it never does 

[GitHub] [systemds] sebwrede commented on a change in pull request #1141: [SYSTEMDS-2550] Batch scaling and weighing of imbalanced workers

2021-01-05 Thread GitBox


sebwrede commented on a change in pull request #1141:
URL: https://github.com/apache/systemds/pull/1141#discussion_r551899677



##
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
##
@@ -58,48 +61,68 @@
 public class FederatedPSControlThread extends PSWorker implements 
Callable {
private static final long serialVersionUID = 6846648059569648791L;
protected static final Log LOG = 
LogFactory.getLog(ParamServer.class.getName());
-   
-   Statement.PSRuntimeBalancing _runtimeBalancing;
+
FederatedData _featuresData;
FederatedData _labelsData;
final long _localStartBatchNumVarID;
final long _modelVarID;
-   int _numBatchesPerGlobalEpoch;
+
+   // runtime balancing
+   Statement.PSRuntimeBalancing _runtimeBalancing;
+   int _numBatchesPerEpoch;
int _possibleBatchesPerLocalEpoch;
+   boolean _weighing;
+   double _weighingFactor = 1;
boolean _cycleStartAt0 = false;
 
-   public FederatedPSControlThread(int workerID, String updFunc, 
Statement.PSFrequency freq, Statement.PSRuntimeBalancing runtimeBalancing, int 
epochs, long batchSize, int numBatchesPerGlobalEpoch, ExecutionContext ec, 
ParamServer ps) {
+   public FederatedPSControlThread(int workerID, String updFunc, 
Statement.PSFrequency freq,
+   
Statement.PSRuntimeBalancing runtimeBalancing, boolean weighing, int epochs, 
long batchSize,
+   int 
numBatchesPerGlobalEpoch, ExecutionContext ec, ParamServer ps) {
super(workerID, updFunc, freq, epochs, batchSize, ec, ps);
 
-   _numBatchesPerGlobalEpoch = numBatchesPerGlobalEpoch;
+   _numBatchesPerEpoch = numBatchesPerGlobalEpoch;
_runtimeBalancing = runtimeBalancing;
+   _weighing = weighing;
// generate the IDs for model and batch counter. These get 
overwritten on the federated worker each time
_localStartBatchNumVarID = FederationUtils.getNextFedDataID();
_modelVarID = FederationUtils.getNextFedDataID();
}
 
/**
 * Sets up the federated worker and control thread
+*
+* @param weighingFactor Gradients from this worker will be multiplied 
by this factor if weighing is enabled
 */
-   public void setup() {
+   public void setup(double weighingFactor) {
// prepare features and labels
_featuresData = (FederatedData) 
_features.getFedMapping().getMap().values().toArray()[0];
_labelsData = (FederatedData) 
_labels.getFedMapping().getMap().values().toArray()[0];
 
-   // calculate number of batches and get data size
+   // weighing factor is always set, but only used when weighing 
is specified
+   _weighingFactor = weighingFactor;
+
+   // different runtime balancing calculations
long dataSize = _features.getNumRows();
-   _possibleBatchesPerLocalEpoch = (int) Math.ceil((double) 
dataSize / _batchSize);
-   if(!(_runtimeBalancing == Statement.PSRuntimeBalancing.RUN_MIN 
-   || _runtimeBalancing == 
Statement.PSRuntimeBalancing.CYCLE_AVG 
-   || _runtimeBalancing == 
Statement.PSRuntimeBalancing.CYCLE_MAX)) {
-   _numBatchesPerGlobalEpoch = 
_possibleBatchesPerLocalEpoch;
+
+   // calculate scaled batch size if balancing via batch size.
+   // In some cases there will be some cycling
+   if(_runtimeBalancing == 
Statement.PSRuntimeBalancing.SCALE_BATCH) {
+   _batchSize = (int) Math.ceil((float) dataSize / 
_numBatchesPerEpoch);
}
 
-   if(_runtimeBalancing == 
Statement.PSRuntimeBalancing.SCALE_BATCH 
-   || _runtimeBalancing == 
Statement.PSRuntimeBalancing.SCALE_BATCH_AND_WEIGH) {
-   throw new NotImplementedException();
+   // Calculate possible batches with batch size
+   _possibleBatchesPerLocalEpoch = (int) Math.ceil((double) 
dataSize / _batchSize);
+
+   // If no runtime balancing is specified, just run possible 
number of batches
+   // WARNING: Will get stuck on miss match
+   if(_runtimeBalancing == Statement.PSRuntimeBalancing.NONE) {
+   _numBatchesPerEpoch = _possibleBatchesPerLocalEpoch;

Review comment:
   OK, that sounds like a good idea. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact 

[GitHub] [systemds] sebwrede commented on pull request #1141: [SYSTEMDS-2550] Batch scaling and weighing of imbalanced workers

2021-01-05 Thread GitBox


sebwrede commented on pull request #1141:
URL: https://github.com/apache/systemds/pull/1141#issuecomment-754607168


   LGTM!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [systemds] tobiasrieger commented on pull request #1141: [SYSTEMDS-2550] Batch scaling and weighing of imbalanced workers

2021-01-05 Thread GitBox


tobiasrieger commented on pull request #1141:
URL: https://github.com/apache/systemds/pull/1141#issuecomment-754582572


   @sebwrede I have addressed all other proposed changes in the new commit. 
Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [systemds] ywcb00 opened a new pull request #1143: [SYSTEMDS-2747] Federated Quaternary Operation WSLoss and WSigmoid

2021-01-05 Thread GitBox


ywcb00 opened a new pull request #1143:
URL: https://github.com/apache/systemds/pull/1143


   Hi,
   This is a PR for adding the federated quaternary operations WSLoss and 
WSigmoid.
   The PR contains the instructions and tests.
   
   Feel free to ask and comment :)
   Thanks for review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [systemds] dkerschbaumer opened a new pull request #1144: [SYSTEMDS-????] Disguised missing value imputation

2021-01-05 Thread GitBox


dkerschbaumer opened a new pull request #1144:
URL: https://github.com/apache/systemds/pull/1144


   This PR contains the dmv implemtation and corresponding tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org