Hi,

I am trying to figure out how to dynamically update algorithm parameter
list. After the train is finished only model is updated. The reason why I
need this data to be updated is that I am creating data mapping based on
the training data. Is there a way to update this data after the train is
done?

Here is the code that I am using. The variable that and should be updated
after the train is marked *bold red.*

import io.prediction.controller.{EmptyParams, EngineParams}
import io.prediction.data.storage.EngineInstance
import io.prediction.workflow.CreateWorkflow.WorkflowConfig
import io.prediction.workflow._
import org.apache.spark.ml.linalg.SparseVector
import org.joda.time.DateTime
import org.json4s.JsonAST._

import scala.collection.mutable

object TrainApp extends App {

  val envs = Map("FOO" -> "BAR")

  val sparkEnv = Map("spark.master" -> "local")

  val sparkConf = Map("spark.executor.extraClassPath" -> ".")

  val engineFactoryName = "LogisticRegressionEngine"

  val workflowConfig = WorkflowConfig(
    engineId = EngineConfig.engineId,
    engineVersion = EngineConfig.engineVersion,
    engineVariant = EngineConfig.engineVariantId,
    engineFactory = engineFactoryName
  )

  val workflowParams = WorkflowParams(
    verbose = workflowConfig.verbosity,
    skipSanityCheck = workflowConfig.skipSanityCheck,
    stopAfterRead = workflowConfig.stopAfterRead,
    stopAfterPrepare = workflowConfig.stopAfterPrepare,
    sparkEnv = WorkflowParams().sparkEnv ++ sparkEnv
  )

  WorkflowUtils.modifyLogging(workflowConfig.verbose)

  val dataSourceParams = DataSourceParams(sys.env.get("APP_NAME").get)
  val preparatorParams = EmptyParams()

 * val algorithmParamsList = Seq("Logistic" -> LogisticParams(columns =
Array[String](),*
*                                                              dataMapping
= Map[String, Map[String, SparseVector]]()))*
  val servingParams = EmptyParams()

  val engineInstance = EngineInstance(
    id = "",
    status = "INIT",
    startTime = DateTime.now,
    endTime = DateTime.now,
    engineId = workflowConfig.engineId,
    engineVersion = workflowConfig.engineVersion,
    engineVariant = workflowConfig.engineVariant,
    engineFactory = workflowConfig.engineFactory,
    batch = workflowConfig.batch,
    env = envs,
    sparkConf = sparkConf,
    dataSourceParams =
JsonExtractor.paramToJson(workflowConfig.jsonExtractor,
workflowConfig.engineParamsKey -> dataSourceParams),
    preparatorParams =
JsonExtractor.paramToJson(workflowConfig.jsonExtractor,
workflowConfig.engineParamsKey -> preparatorParams),
    algorithmsParams =
JsonExtractor.paramsToJson(workflowConfig.jsonExtractor,
algorithmParamsList),
    servingParams = JsonExtractor.paramToJson(workflowConfig.jsonExtractor,
workflowConfig.engineParamsKey -> servingParams)
  )

  val (engineLanguage, engineFactory) =
WorkflowUtils.getEngine(engineInstance.engineFactory,
getClass.getClassLoader)

  val engine = engineFactory()

  val engineParams = EngineParams(
    dataSourceParams = dataSourceParams,
    preparatorParams = preparatorParams,
    algorithmParamsList = algorithmParamsList,
    servingParams = servingParams
  )

  val engineInstanceId = CreateServer.engineInstances.insert(engineInstance)

  CoreWorkflow.runTrain(
    env = envs,
    params = workflowParams,
    engine = engine,
    engineParams = engineParams,
    engineInstance = engineInstance.copy(id = engineInstanceId)
  )

  CreateServer.actorSystem.shutdown()
}


Thank you,
Tihomir

Reply via email to