Hi! I am trying to query Event Server with PEventStore api in predict method to fetch events per entity to create my features. PEventStore needs sc, and for this, I have:
- Extended PAlgorithm - Extended LocalFileSystemPersistentModel and LocalFileSystemPersistentModelLoader - Put a dummy emptyRDD into my model - Tried to access sc with model.dummyRDD.context to receive this error: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. Just like this user got it here <https://groups.google.com/forum/#!topic/predictionio-user/h4kIltGIIYE> in predictionio-user group. Any suggestions? Here's a more of my predict method: def predict(model: SomeModel, query: Query): PredictedResult = { def predict(model: SomeModel, query: Query): PredictedResult = { val appName = sys.env.getOrElse[String]("APP_NAME", ap.appName) var previousEvents = try { PEventStore.find( appName = appName, entityType = Some(ap.entityType), entityId = Some(query.entityId.getOrElse("")) )(model.dummyRDD.context).map(event => { Try(new CustomEvent( Some(event.event), Some(event.entityType), Some(event.entityId), Some(event.eventTime), Some(event.creationTime), Some(new Properties( *...* )) )) }).filter(_.isSuccess).map(_.get) } catch { case e: Exception => // fatal because of error, an empty query logger.error(s"Error when reading events: ${e}") throw e } ... }
