Hello,
I am learning Flink and using the docker image along with the AMIDST library for this. Below is a sample task from AMIDST which provides INFO output up until I reach updateModel(). I pasted the short method as well and wonder what prevents the Logger from

        //Set-up Flink session
        env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");

        //generate a random dataset
DataFlink<DataInstance> dataFlink = new DataSetGenerator().generate(env, 1234, 1000, 5, 0);

//Creates a DAG with the NaiveBayes structure for the random dataset DAG dag = DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(), "DiscreteVar4");
        LOG.info(dag.toString());

        //Create the Learner object
ParameterLearningAlgorithm learningAlgorithmFlink = new ParallelMaximumLikelihood();

        //Learning parameters
        learningAlgorithmFlink.setBatchSize(10);
        learningAlgorithmFlink.setDAG(dag);

        //Initialize the learning process
        learningAlgorithmFlink.initLearning();

        //Learn from the flink data
        LOG.info("BEFORE UPDATEMODEL");
        learningAlgorithmFlink.updateModel(dataFlink);
        LOG.info("AFTER UPDATEMODEL");

        //Print the learnt Bayes Net
BayesianNetwork bn = learningAlgorithmFlink.getLearntBayesianNetwork();
        LOG.info(bn.toString());


Below is the updateModel method.

    public double updateModel(DataFlink<DataInstance> dataUpdate) {
        try {
            Configuration config = new Configuration();
            config.setString(BN_NAME, this.dag.getName());
config.setBytes(EFBN_NAME, Serialization.serializeObject(efBayesianNetwork));

            DataSet<DataInstance> dataset = dataUpdate.getDataSet();
            this.sumSS = dataset.map(new SufficientSatisticsMAP())
                    .withParameters(config)
                    .reduce(new SufficientSatisticsReduce())
                    .collect().get(0);

            //Add the prior
sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

JobExecutionResult result = dataset.getExecutionEnvironment().getLastJobExecutionResult();

numInstances = result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
            numInstances++;//Initial counts

        }catch(Exception ex){
            throw new UndeclaredThrowableException(ex);
        }

        return this.getLogMarginalProbability();
    }


Not sure why LOG.info past that method are not output to the console.
TIA
JP

Reply via email to