Hi,
I am trying to figure out how to dynamically update algorithm parameter
list. After the train is finished only model is updated. The reason why I
need this data to be updated is that I am creating data mapping based on
the training data. Is there a way to update this data after the train is
done?
Here is the code that I am using. The variable that and should be updated
after the train is marked *bold red.*
import io.prediction.controller.{EmptyParams, EngineParams}
import io.prediction.data.storage.EngineInstance
import io.prediction.workflow.CreateWorkflow.WorkflowConfig
import io.prediction.workflow._
import org.apache.spark.ml.linalg.SparseVector
import org.joda.time.DateTime
import org.json4s.JsonAST._
import scala.collection.mutable
object TrainApp extends App {
val envs = Map("FOO" -> "BAR")
val sparkEnv = Map("spark.master" -> "local")
val sparkConf = Map("spark.executor.extraClassPath" -> ".")
val engineFactoryName = "LogisticRegressionEngine"
val workflowConfig = WorkflowConfig(
engineId = EngineConfig.engineId,
engineVersion = EngineConfig.engineVersion,
engineVariant = EngineConfig.engineVariantId,
engineFactory = engineFactoryName
)
val workflowParams = WorkflowParams(
verbose = workflowConfig.verbosity,
skipSanityCheck = workflowConfig.skipSanityCheck,
stopAfterRead = workflowConfig.stopAfterRead,
stopAfterPrepare = workflowConfig.stopAfterPrepare,
sparkEnv = WorkflowParams().sparkEnv ++ sparkEnv
)
WorkflowUtils.modifyLogging(workflowConfig.verbose)
val dataSourceParams = DataSourceParams(sys.env.get("APP_NAME").get)
val preparatorParams = EmptyParams()
* val algorithmParamsList = Seq("Logistic" -> LogisticParams(columns =
Array[String](),*
* dataMapping
= Map[String, Map[String, SparseVector]]()))*
val servingParams = EmptyParams()
val engineInstance = EngineInstance(
id = "",
status = "INIT",
startTime = DateTime.now,
endTime = DateTime.now,
engineId = workflowConfig.engineId,
engineVersion = workflowConfig.engineVersion,
engineVariant = workflowConfig.engineVariant,
engineFactory = workflowConfig.engineFactory,
batch = workflowConfig.batch,
env = envs,
sparkConf = sparkConf,
dataSourceParams =
JsonExtractor.paramToJson(workflowConfig.jsonExtractor,
workflowConfig.engineParamsKey -> dataSourceParams),
preparatorParams =
JsonExtractor.paramToJson(workflowConfig.jsonExtractor,
workflowConfig.engineParamsKey -> preparatorParams),
algorithmsParams =
JsonExtractor.paramsToJson(workflowConfig.jsonExtractor,
algorithmParamsList),
servingParams = JsonExtractor.paramToJson(workflowConfig.jsonExtractor,
workflowConfig.engineParamsKey -> servingParams)
)
val (engineLanguage, engineFactory) =
WorkflowUtils.getEngine(engineInstance.engineFactory,
getClass.getClassLoader)
val engine = engineFactory()
val engineParams = EngineParams(
dataSourceParams = dataSourceParams,
preparatorParams = preparatorParams,
algorithmParamsList = algorithmParamsList,
servingParams = servingParams
)
val engineInstanceId = CreateServer.engineInstances.insert(engineInstance)
CoreWorkflow.runTrain(
env = envs,
params = workflowParams,
engine = engine,
engineParams = engineParams,
engineInstance = engineInstance.copy(id = engineInstanceId)
)
CreateServer.actorSystem.shutdown()
}
Thank you,
Tihomir