Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Lalwani, Jayesh
flatMap is supposed to return Seq, not Iterator. You are returning a class that 
implements Iterator. I have a hunch that's what's causing the confusion. 
flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do you intend it to 
be RDD[CrawlData]? You might want to call toSeq on FairFetcher.

On 6/8/21, 10:10 PM, "Tom Barber"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



For anyone interested here's the execution logs up until the point where it 
actually kicks off the workload in question: 
https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473

On 2021/06/09 01:52:39, Tom Barber  wrote:
> ExecutorID says driver, and looking at the IP addresses its running on 
its not any of the worker ip's.
>
> I forcibly told it to create 50, but they'd all end up running in the 
same place.
>
> Working on some other ideas, I set spark.task.cpus to 16 to match the 
nodes whilst still forcing it to 50 partitions
>
> val m = 50
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
rs.iterator, localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
> .persist()
>
> that sort of thing. But still the tasks are pinned to the driver executor 
and none of the workers, so I no longer saturate the master node, but I also 
have 3 workers just sat there doing nothing.
>
> On 2021/06/09 01:26:50, Sean Owen  wrote:
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> >
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  
wrote:
> >
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this 
a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design 
that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark 
cluster
> > > and its getting to this point in the code(
> > > 
https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > .groupByKey()
> > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > > .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over 
each RDD
> > > and returns the results. But when Spark executes it, it runs all the 
crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing 
the
> > > partition count (50 on a 16 core per node cluster) and also 
repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing 
of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl 
data is
> > > the bottleneck, but that happens after the code has been assigned to 
a node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Sean Owen
Really weird. flatMap definitely doesn't happen on the driver. My only
long-shot theory that I haven't thought through is, what is FairFetcher
doing with 'job'? it kind of looks like this is submitting a (driver) Job
directly or something into its scheduler which could be .. something but
maybe that's totally wrong and I'd much more expect that to fail if
executed this way.

On Tue, Jun 8, 2021 at 8:53 PM Tom Barber  wrote:

> ExecutorID says driver, and looking at the IP addresses its running on its
> not any of the worker ip's.
>
> I forcibly told it to create 50, but they'd all end up running in the same
> place.
>
> Working on some other ideas, I set spark.task.cpus to 16 to match the
> nodes whilst still forcing it to 50 partitions
>
> val m = 50
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job,
> rs.iterator, localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> .persist()
>
> that sort of thing. But still the tasks are pinned to the driver executor
> and none of the workers, so I no longer saturate the master node, but I
> also have 3 workers just sat there doing nothing.
>
> On 2021/06/09 01:26:50, Sean Owen  wrote:
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> >
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber 
> wrote:
> >
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark
> cluster
> > > and its getting to this point in the code(
> > >
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > .groupByKey()
> > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > > .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over each
> RDD
> > > and returns the results. But when Spark executes it, it runs all the
> crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing
> the
> > > partition count (50 on a 16 core per node cluster) and also
> repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl
> data is
> > > the bottleneck, but that happens after the code has been assigned to a
> node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
For anyone interested here's the execution logs up until the point where it 
actually kicks off the workload in question: 
https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473

On 2021/06/09 01:52:39, Tom Barber  wrote: 
> ExecutorID says driver, and looking at the IP addresses its running on its 
> not any of the worker ip's.
> 
> I forcibly told it to create 50, but they'd all end up running in the same 
> place. 
> 
> Working on some other ideas, I set spark.task.cpus to 16 to match the nodes 
> whilst still forcing it to 50 partitions
> 
> val m = 50
> 
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
> .persist()
> 
> that sort of thing. But still the tasks are pinned to the driver executor and 
> none of the workers, so I no longer saturate the master node, but I also have 
> 3 workers just sat there doing nothing.
> 
> On 2021/06/09 01:26:50, Sean Owen  wrote: 
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> > 
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:
> > 
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark cluster
> > > and its getting to this point in the code(
> > > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > .groupByKey()
> > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > > .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over each RDD
> > > and returns the results. But when Spark executes it, it runs all the 
> > > crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing the
> > > partition count (50 on a 16 core per node cluster) and also 
> > > repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl data 
> > > is
> > > the bottleneck, but that happens after the code has been assigned to a 
> > > node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> > 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
ExecutorID says driver, and looking at the IP addresses its running on its not 
any of the worker ip's.

I forcibly told it to create 50, but they'd all end up running in the same 
place. 

Working on some other ideas, I set spark.task.cpus to 16 to match the nodes 
whilst still forcing it to 50 partitions

val m = 50

val fetchedRdd = rdd.map(r => (r.getGroup, r))
.groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
rs.iterator, localFetchDelay,
  FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
.persist()

that sort of thing. But still the tasks are pinned to the driver executor and 
none of the workers, so I no longer saturate the master node, but I also have 3 
workers just sat there doing nothing.

On 2021/06/09 01:26:50, Sean Owen  wrote: 
> Are you sure it's on the driver? or just 1 executor?
> how many partitions does the groupByKey produce? that would limit your
> parallelism no matter what if it's a small number.
> 
> On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:
> 
> > Hi folks,
> >
> > Hopefully someone with more Spark experience than me can explain this a
> > bit.
> >
> > I dont' know if this is possible, impossible or just an old design that
> > could be better.
> >
> > I'm running Sparkler as a spark-submit job on a databricks spark cluster
> > and its getting to this point in the code(
> > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > )
> >
> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > .groupByKey()
> > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > localFetchDelay,
> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > StatusUpdateSolrTransformer) })
> > .persist()
> >
> > This basically takes the RDD and then runs a web based crawl over each RDD
> > and returns the results. But when Spark executes it, it runs all the crawls
> > on the driver node and doesn't distribute them.
> >
> > The key is pretty static in these tests, so I have also tried forcing the
> > partition count (50 on a 16 core per node cluster) and also repartitioning,
> > but every time all the jobs are scheduled to run on one node.
> >
> > What can I do better to distribute the tasks? Because the processing of
> > the data in the RDD isn't the bottleneck, the fetching of the crawl data is
> > the bottleneck, but that happens after the code has been assigned to a node.
> >
> > Thanks
> >
> > Tom
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Sean Owen
Are you sure it's on the driver? or just 1 executor?
how many partitions does the groupByKey produce? that would limit your
parallelism no matter what if it's a small number.

On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:

> Hi folks,
>
> Hopefully someone with more Spark experience than me can explain this a
> bit.
>
> I dont' know if this is possible, impossible or just an old design that
> could be better.
>
> I'm running Sparkler as a spark-submit job on a databricks spark cluster
> and its getting to this point in the code(
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> )
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey()
> .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> .persist()
>
> This basically takes the RDD and then runs a web based crawl over each RDD
> and returns the results. But when Spark executes it, it runs all the crawls
> on the driver node and doesn't distribute them.
>
> The key is pretty static in these tests, so I have also tried forcing the
> partition count (50 on a 16 core per node cluster) and also repartitioning,
> but every time all the jobs are scheduled to run on one node.
>
> What can I do better to distribute the tasks? Because the processing of
> the data in the RDD isn't the bottleneck, the fetching of the crawl data is
> the bottleneck, but that happens after the code has been assigned to a node.
>
> Thanks
>
> Tom
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
Hi folks, 

Hopefully someone with more Spark experience than me can explain this a bit.

I dont' know if this is possible, impossible or just an old design that could 
be better.

I'm running Sparkler as a spark-submit job on a databricks spark cluster and 
its getting to this point in the 
code(https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226)

val fetchedRdd = rdd.map(r => (r.getGroup, r))
.groupByKey()
.flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, 
localFetchDelay,
  FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
.persist()

This basically takes the RDD and then runs a web based crawl over each RDD and 
returns the results. But when Spark executes it, it runs all the crawls on the 
driver node and doesn't distribute them.

The key is pretty static in these tests, so I have also tried forcing the 
partition count (50 on a 16 core per node cluster) and also repartitioning, but 
every time all the jobs are scheduled to run on one node.

What can I do better to distribute the tasks? Because the processing of the 
data in the RDD isn't the bottleneck, the fetching of the crawl data is the 
bottleneck, but that happens after the code has been assigned to a node.

Thanks

Tom


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Problem in Restoring ML Pipeline with UDF

2021-06-08 Thread Sean Owen
It's a little bit of a guess, but the class name
$line103090609224.$read$FeatureModder looks like something generated by the
shell. I think it's your 'real' classname in this case. If you redefined
this later and loaded it you may not find it matches up. Can you declare
this in a package?

On Tue, Jun 8, 2021 at 10:50 AM Artemis User  wrote:

> We have a feature engineering transformer defined as a custom class with
> UDF as follows:
>
> class FeatureModder extends Transformer with DefaultParamsWritable with
> DefaultParamsReadable[FeatureModder] {
> val uid: String = "FeatureModder"+randomUUID
>
> final val inputCol: Param[String] = new Param[String](this,
> "inputCos", "input column")
> final def setInputCol(col:String) = set(inputCol, col)
>
> final val outputCol: Param[String] = new Param[String](this,
> "outputCol", "output column")
> final def setOutputCol(col:String) = set(outputCol, col)
>
> final val size: Param[String] = new Param[String](this, "size",
> "length of output vector")
> final def setSize = (n:Int) => set(size, n.toString)
>
> override def transform(data: Dataset[_]) = {
> val modUDF = udf({n: Int => n % $(size).toInt})
> data.withColumn($(outputCol),
> modUDF(col($(inputCol)).cast(IntegerType)))
> }
>
> def transformSchema(schema: org.apache.spark.sql.types.StructType):
> org.apache.spark.sql.types.StructType = {
> val actualType = schema($(inputCol)).dataType
> require(actualType.equals(IntegerType) ||
> actualType.equals(DoubleType), s"Input column must be of numeric type")
> DataTypes.createStructType(schema.fields :+
> DataTypes.createStructField($(outputCol), IntegerType, false))
> }
>
> override def copy(extra: ParamMap): Transformer = copy(extra)
> }
>
> This was included in an ML pipeline, fitted into a model and persisted to
> a disk file.  When we try to load the pipeline model in a separate notebook
> (we use Zeppelin), an exception is thrown complaining class not fund.
>
> java.lang.ClassNotFoundException: $line103090609224.$read$FeatureModder at
> scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at
> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at
> java.base/java.lang.Class.forName0(Native Method) at
> java.base/java.lang.Class.forName(Class.java:398) at
> org.apache.spark.util.Utils$.classForName(Utils.scala:207) at
> org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstanceReader(ReadWrite.scala:630)
> at
> org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$4(Pipeline.scala:276)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at
> scala.collection.TraversableLike.map(TraversableLike.scala:238) at
> scala.collection.TraversableLike.map$(TraversableLike.scala:231) at
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at
> org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$3(Pipeline.scala:274)
> at
> org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
> at scala.util.Try$.apply(Try.scala:213) at
> org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
> at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:268)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$7(Pipeline.scala:356)
> at org.apache.spark.ml.MLEvents.withLoadInstanceEvent(events.scala:160) at
> org.apache.spark.ml.MLEvents.withLoadInstanceEvent$(events.scala:155) at
> org.apache.spark.ml.util.Instrumentation.withLoadInstanceEvent(Instrumentation.scala:42)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$6(Pipeline.scala:355)
> at
> org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
> at scala.util.Try$.apply(Try.scala:213) at
> org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:355)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:349)
> at org.apache.spark.ml.util.MLReadable.load(ReadWrite.scala:355) at
> org.apache.spark.ml.util.MLReadable.load$(ReadWrite.scala:355) at
> org.apache.spark.ml.PipelineModel$.load(Pipeline.scala:337) ... 40 elided 
> Could
> someone help explaining why?  My guess was the class definition is not in
> the classpath.  The question is how to include the class definition or
> class metadata as part of the pipeline model serialization? or include the
> class definition in a notebook (we did include the class definition in the
> notebook that

Problem in Restoring ML Pipeline with UDF

2021-06-08 Thread Artemis User
We have a feature engineering transformer defined as a custom class with 
UDF as follows:


class FeatureModder extends Transformer with DefaultParamsWritable with 
DefaultParamsReadable[FeatureModder] {

    val uid: String = "FeatureModder"+randomUUID

    final val inputCol: Param[String] = new Param[String](this, 
"inputCos", "input column")

    final def setInputCol(col:String) = set(inputCol, col)

    final val outputCol: Param[String] = new Param[String](this, 
"outputCol", "output column")

    final def setOutputCol(col:String) = set(outputCol, col)

    final val size: Param[String] = new Param[String](this, "size", 
"length of output vector")

    final def setSize = (n:Int) => set(size, n.toString)

    override def transform(data: Dataset[_]) = {
    val modUDF = udf({n: Int => n % $(size).toInt})
    data.withColumn($(outputCol), 
modUDF(col($(inputCol)).cast(IntegerType)))

    }

    def transformSchema(schema: org.apache.spark.sql.types.StructType): 
org.apache.spark.sql.types.StructType = {

    val actualType = schema($(inputCol)).dataType
    require(actualType.equals(IntegerType) || 
actualType.equals(DoubleType), s"Input column must be of numeric type")
    DataTypes.createStructType(schema.fields :+ 
DataTypes.createStructField($(outputCol), IntegerType, false))

    }

    override def copy(extra: ParamMap): Transformer = copy(extra)
}

This was included in an ML pipeline, fitted into a model and persisted 
to a disk file.  When we try to load the pipeline model in a separate 
notebook (we use Zeppelin), an exception is thrown complaining class not 
fund.


java.lang.ClassNotFoundException: $line103090609224.$read$FeatureModder 
at 
scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72) 
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at 
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at 
java.base/java.lang.Class.forName0(Native Method) at 
java.base/java.lang.Class.forName(Class.java:398) at 
org.apache.spark.util.Utils$.classForName(Utils.scala:207) at 
org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstanceReader(ReadWrite.scala:630) 
at 
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$4(Pipeline.scala:276) 
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) 
at scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at 
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$3(Pipeline.scala:274) 
at 
org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191) 
at scala.util.Try$.apply(Try.scala:213) at 
org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191) 
at 
org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:268) 
at 
org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$7(Pipeline.scala:356) 
at org.apache.spark.ml.MLEvents.withLoadInstanceEvent(events.scala:160) 
at org.apache.spark.ml.MLEvents.withLoadInstanceEvent$(events.scala:155) 
at 
org.apache.spark.ml.util.Instrumentation.withLoadInstanceEvent(Instrumentation.scala:42) 
at 
org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$6(Pipeline.scala:355) 
at 
org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191) 
at scala.util.Try$.apply(Try.scala:213) at 
org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191) 
at 
org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:355) 
at 
org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:349) 
at org.apache.spark.ml.util.MLReadable.load(ReadWrite.scala:355) at 
org.apache.spark.ml.util.MLReadable.load$(ReadWrite.scala:355) at 
org.apache.spark.ml.PipelineModel$.load(Pipeline.scala:337) ... 40 
elided Could someone help explaining why?  My guess was the class 
definition is not in the classpath.  The question is how to include the 
class definition or class metadata as part of the pipeline model 
serialization? or include the class definition in a notebook (we did 
include the class definition in the notebook that loads the pipeline model)?


Thanks a lot in advance for your help!

ND


Re: class KafkaCluster related errors

2021-06-08 Thread Mich Talebzadeh
Hi Kiran,

I don't seem to have a reference to handling offsets in my old code.

However, in Spark structured streaming (SSS) I handle it using a reference
to checkpointLocation as below: (this is in Python)

   checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"

  result = resultMF.withColumn("uuid",uuidUdf()) \
 .selectExpr("CAST(uuid AS STRING) AS key",
"to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
 .writeStream \
 .outputMode('complete') \
 .format("kafka") \
 .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
 .option("topic", "avgtemperature") \
* .option('checkpointLocation', checkpoint_path) \*
 .queryName("avgtemperature") \
 .start()

Now within that  checkpoint_path directory you have five sub-directories
containing all you need  including offsets

/ssd/hduser/avgtemperature/chkpt> ls
commits  metadata  offsets  sources  state

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 8 Jun 2021 at 01:21, Kiran Biswal  wrote:

> Hello Mich
>
> Thanks a lot. Using code similar to yours I was able to compile.
>
> One outstanding question is in my older code the *getConsumerOffsets *older
> method was handling offsets(latestLeaderOffsets, earliestLeaderOffsets
> etc, was calling kafkaCluster).
>
>  Will there be data loss if I don't handle offsets? In your example
> handling offsets was not required? If I were to handle offsets any examples
> you could share?
>
> Thanks a lot again and appreciate the great help.
> Regards
> Kiran
>
> On Mon, Jun 7, 2021 at 2:58 AM Mich Talebzadeh 
> wrote:
>
>> Hi Kiran,
>>
>> As you be aware  createDirectStream is depreciated and you ought to use
>> Spark Structured streaming, especially that you are moving to version 3.0.1.
>>
>> If you still want to use dstream then that page seems to be correct
>>
>> Looking at my old code I have
>>
>> import org.apache.spark.streaming._
>> import org.apache.spark.streaming.kafka._
>> import org.apache.spark.streaming.kafka.KafkaUtils
>>
>> val kafkaParams = Map[String, String](
>>   "bootstrap.servers" ->
>> bootstrapServers,
>>   "schema.registry.url" ->
>> schemaRegistryURL,
>>"zookeeper.connect" ->
>> zookeeperConnect,
>>"group.id" -> sparkAppName,
>>"zookeeper.connection.timeout.ms"
>> -> zookeeperConnectionTimeoutMs,
>>"rebalance.backoff.ms" ->
>> rebalanceBackoffMS,
>>"zookeeper.session.timeout.ms" ->
>> zookeeperSessionTimeOutMs,
>>"auto.commit.interval.ms" ->
>> autoCommitIntervalMS
>>  )
>> //val topicsSet = topics.split(",").toSet
>> val topicsValue = Set(topics)
>> val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsValue)
>> dstream.cache()
>>
>> HTH,
>>
>>
>> Mich
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Jun 2021 at 10:34, Kiran Biswal  wrote:
>>
>>> Hi Mich, Thanks a lot for your response. I am basically trying to get
>>> some older code(streaming job to read from kafka) in 2.0.1 spark to work in
>>> 3.0,1. The specific area where I am having problem (KafkaCluster) has most
>>> likely to do with get/ set commit offsets in kafka
>>>
>>>
>>>
>>> // Create message Dstream for each (topic, schema class)
>>>
>>>
>>> val msgStreams = config.getTopicSchemaClassMap.map {
>>>
>>>
>>>   case (kafkaTopic, schemaClass) => {
>>>
>>>
>>> val consumerOffsets = *getConsumerOffsets*(kafkaTopic)
>>>
>>>
>>> val msgDStream = (KafkaUtils.createDirectStream[Array[Byte],
>>> Array[Byte], DefaultDecoder, DefaultDecoder,
>>>
>>>   Tuple2[Array[Byte],Array[Byte]]]
>>>
>>>
>>>   (ssc, kafkaParams, consumerO