Hello,
I am learning Flink and using the docker image along with the AMIDST
library for this.
Below is a sample task from AMIDST which provides INFO output up until I
reach updateModel(). I pasted the short method as well and wonder what
prevents the Logger from
//Set-up Flink session
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
//generate a random dataset
DataFlink<DataInstance> dataFlink = new
DataSetGenerator().generate(env, 1234, 1000, 5, 0);
//Creates a DAG with the NaiveBayes structure for the random
dataset
DAG dag =
DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
"DiscreteVar4");
LOG.info(dag.toString());
//Create the Learner object
ParameterLearningAlgorithm learningAlgorithmFlink = new
ParallelMaximumLikelihood();
//Learning parameters
learningAlgorithmFlink.setBatchSize(10);
learningAlgorithmFlink.setDAG(dag);
//Initialize the learning process
learningAlgorithmFlink.initLearning();
//Learn from the flink data
LOG.info("BEFORE UPDATEMODEL");
learningAlgorithmFlink.updateModel(dataFlink);
LOG.info("AFTER UPDATEMODEL");
//Print the learnt Bayes Net
BayesianNetwork bn =
learningAlgorithmFlink.getLearntBayesianNetwork();
LOG.info(bn.toString());
Below is the updateModel method.
public double updateModel(DataFlink<DataInstance> dataUpdate) {
try {
Configuration config = new Configuration();
config.setString(BN_NAME, this.dag.getName());
config.setBytes(EFBN_NAME,
Serialization.serializeObject(efBayesianNetwork));
DataSet<DataInstance> dataset = dataUpdate.getDataSet();
this.sumSS = dataset.map(new SufficientSatisticsMAP())
.withParameters(config)
.reduce(new SufficientSatisticsReduce())
.collect().get(0);
//Add the prior
sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
JobExecutionResult result =
dataset.getExecutionEnvironment().getLastJobExecutionResult();
numInstances =
result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
numInstances++;//Initial counts
}catch(Exception ex){
throw new UndeclaredThrowableException(ex);
}
return this.getLogMarginalProbability();
}
Not sure why LOG.info past that method are not output to the console.
TIA
JP