Hi Marcin!
Thank you for your answer.
I do only need SparkContext, but have no idea on:
1- How to retrieve it from PersitentModelLoader?
2- How do I access sc in predict method using the configuration below?
class SomeModel() extends LocalFileSystemPersistentModel[SomeAlgorithmParams] {
override def save(id: String, params: SomeAlgorithmParams, sc:
SparkContext): Boolean = {
false
}
}
object SomeModel extends
LocalFileSystemPersistentModelLoader[SomeAlgorithmParams, FraudModel]
{
override def apply(id: String, params: SomeAlgorithmParams, sc:
Option[SparkContext]): SomeModel = {
new SomeModel() // HERE I TRAIN AND RETURN THE TRAINED MODEL
}
}
Thank you very much, I really appreciate it!
Hasan
On Thu, Sep 22, 2016 at 7:05 PM, Marcin Ziemiński <[email protected]> wrote:
> Hi Hasan,
>
> I think that you problem comes from using deserialized RDD, which already
> lost its connection with SparkContext.
> Similar case could be found here: http://stackoverflow.com/
> questions/29567247/serializing-rdd
>
> If you only really need SparkContext you could probably use the one
> provided to PersitentModelLoader, which would be implemented by your model.
> Alternatively you could also implement PersistentModel to return false
> from save method. In this case your algorithm would be retrained on deploy,
> what would also provide you with the instance of SparkContext.
>
> Regards,
> Marcin
>
>
> czw., 22.09.2016 o 13:34 użytkownik Hasan Can Saral <
> [email protected]> napisał:
>
>> Hi!
>>
>> I am trying to query Event Server with PEventStore api in predict method
>> to fetch events per entity to create my features. PEventStore needs sc, and
>> for this, I have:
>>
>> - Extended PAlgorithm
>> - Extended LocalFileSystemPersistentModel and LocalFileSystemP
>> ersistentModelLoader
>> - Put a dummy emptyRDD into my model
>> - Tried to access sc with model.dummyRDD.context to receive this error:
>>
>> org.apache.spark.SparkException: RDD transformations and actions can
>> only be invoked by the driver, not inside of other transformations; for
>> example, rdd1.map(x => rdd2.values.count() * x) is invalid because the
>> values transformation and count action cannot be performed inside of the
>> rdd1.map transformation. For more information, see SPARK-5063.
>>
>> Just like this user got it here
>> <https://groups.google.com/forum/#!topic/predictionio-user/h4kIltGIIYE> in
>> predictionio-user group. Any suggestions?
>>
>> Here's a more of my predict method:
>>
>> def predict(model: SomeModel, query: Query): PredictedResult = {
>>
>> def predict(model: SomeModel, query: Query): PredictedResult = {
>>
>>
>> val appName = sys.env.getOrElse[String]("APP_NAME", ap.appName)
>>
>> var previousEvents = try {
>> PEventStore.find(
>> appName = appName,
>> entityType = Some(ap.entityType),
>> entityId = Some(query.entityId.getOrElse(""))
>> )(model.dummyRDD.context).map(event => {
>>
>> Try(new CustomEvent(
>> Some(event.event),
>> Some(event.entityType),
>> Some(event.entityId),
>> Some(event.eventTime),
>> Some(event.creationTime),
>> Some(new Properties(
>> *...*
>> ))
>> ))
>> }).filter(_.isSuccess).map(_.get)
>> } catch {
>> case e: Exception => // fatal because of error, an empty query
>> logger.error(s"Error when reading events: ${e}")
>> throw e
>> }
>>
>> ...
>>
>> }
>>
>>
--
Hasan Can Saral
[email protected]