Re: intermittent Kryo serialization failures in Spark

2019-09-17 Thread Vadim Semenov
Pre-register your classes:

```
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
kryo.register(Class.forName("[[B")) // byte[][]
kryo.register(classOf[java.lang.Class[_]])
  }
}
```

then run with

'spark.kryo.referenceTracking': 'false',
'spark.kryo.registrationRequired': 'false',
'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
'spark.kryo.unsafe': 'false',
'spark.kryoserializer.buffer.max': '256m',

On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
wrote:

> Hi folks,
>
> Posted this some time ago but the problem continues to bedevil us. I'm
> including a (slightly edited) stack trace that results from this error. If
> anyone can shed any light on what exactly is happening here and what we can
> do to avoid it, that would be much appreciated.
>
> org.apache.spark.SparkException: Failed to register classes with Kryo
>>  at 
>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>  at 
>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>  at 
>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>  at 
>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>  at 
>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>  at 
>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>  at 
>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>  at 
>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>  at 
>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>  at 
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>  at 
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>  at 
>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>  at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>  at 
>> 

Re: custom rdd - do I need a hadoop input format?

2019-09-17 Thread Arun Mahadevan
You can do it with custom RDD implementation.
You will mainly implement "getPartitions" - the logic to split your input
into partitions and "compute" to compute and return the values from the
executors.

On Tue, 17 Sep 2019 at 08:47, Marcelo Valle  wrote:

> Just to be more clear about my requirements, what I have is actually a
> custom format, with header, summary and multi line blocks. I want to create
> tasks per block and no per line.I already have a library that reads an
> InputStream and outputs an Iterator of Block, but now I need to integrate
> this with spark
>
> On Tue, 17 Sep 2019 at 16:28, Marcelo Valle 
> wrote:
>
>> Hi,
>>
>> I want to create a custom RDD which will read n lines in sequence from a
>> file, which I call a block, and each block should be converted to a spark
>> dataframe to be processed in parallel.
>>
>> Question - do I have to implement a custom hadoop input format to achieve
>> this? Or is it possible to do it only with RDD APIs?
>>
>> Thanks,
>> Marcelo.
>>
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>


Re: custom rdd - do I need a hadoop input format?

2019-09-17 Thread Marcelo Valle
Just to be more clear about my requirements, what I have is actually a
custom format, with header, summary and multi line blocks. I want to create
tasks per block and no per line.I already have a library that reads an
InputStream and outputs an Iterator of Block, but now I need to integrate
this with spark

On Tue, 17 Sep 2019 at 16:28, Marcelo Valle  wrote:

> Hi,
>
> I want to create a custom RDD which will read n lines in sequence from a
> file, which I call a block, and each block should be converted to a spark
> dataframe to be processed in parallel.
>
> Question - do I have to implement a custom hadoop input format to achieve
> this? Or is it possible to do it only with RDD APIs?
>
> Thanks,
> Marcelo.
>

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


custom rdd - do I need a hadoop input format?

2019-09-17 Thread Marcelo Valle
Hi,

I want to create a custom RDD which will read n lines in sequence from a
file, which I call a block, and each block should be converted to a spark
dataframe to be processed in parallel.

Question - do I have to implement a custom hadoop input format to achieve
this? Or is it possible to do it only with RDD APIs?

Thanks,
Marcelo.

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


Can I set the Alluxio WriteType in Spark applications?

2019-09-17 Thread Mark Zhao
Hi,

If Spark applications write data into alluxio, can WriteType be configured?

Thanks,
Mark


Re: intermittent Kryo serialization failures in Spark

2019-09-17 Thread Jerry Vinokurov
Hi folks,

Posted this some time ago but the problem continues to bedevil us. I'm
including a (slightly edited) stack trace that results from this error. If
anyone can shed any light on what exactly is happening here and what we can
do to avoid it, that would be much appreciated.

org.apache.spark.SparkException: Failed to register classes with Kryo
>   at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>   at 
> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>   at 
> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>   at 
> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>   at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>   at 
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>   at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>   at 
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>   at 
> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>   at 
> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>   at 
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>   at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> 

How to Integrate Spark mllib Streaming Training Models To Spark Structured Streaming

2019-09-17 Thread Praful Rana
Spark mllib library Streaming Training models work with DStream. So is
there any way to use them with spark structured streaming.


How to integrates MLeap to Spark Structured Streaming

2019-09-17 Thread Praful Rana
So I am trying to integrate MLeap to spark structured streaming, But facing
a problem. As the Spark structured Streaming with Kafka works with data
frames and for MLeap LeapFrame is required. So I tried to convert data
frame to leapframe using mleap spark support library function
(toSparkLeapFrame) but it gives an error as during conversion it converts
dataframe to rdd which performs some action which is not allowed in
structured Streaming. So if anyone knows how to use mleap in spark
structured Stream please help to solve this?


Re: Re: how can I dynamic parse json in kafka when using Structured Streaming

2019-09-17 Thread lk_spark
I want to parse the Struct of data dynamically , then write data to delta lake 
, I think it can automatically merge scheme. 

2019-09-17 

lk_spark 



发件人:Tathagata Das 
发送时间:2019-09-17 16:13
主题:Re: how can I dynamic parse json in kafka when using Structured Streaming
收件人:"lk_spark"
抄送:"user.spark"

You can use from_json built-in SQL function to parse json. 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-



On Mon, Sep 16, 2019 at 7:39 PM lk_spark  wrote:

hi,all :
I'm using Structured Streaming to read kafka , the data type is json String 
, I want to parse  it and conver to a datafrme , my code can't pass compile , I 
don't know how to fix it: 

val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String]

val words = lines.map(line => {
  var json: JValue = null
  try {
json = parse(line)
  } catch {
case ex: Exception => { println(ex.getMessage + " " + line) }
  }
  //var result: scala.collection.mutable.Map[String,String] = 
scala.collection.mutable.Map()
  val jsonObj = json.values.asInstanceOf[Map[String, _]]
  val valuse = jsonObj.values.toArray
  val schema = StructType(List())
  for ((k, v) <- jsonObj){
//result += (k -> jsonObj.get(k).toString())

if(v.isInstanceOf[String]){
  schema.add(k,StringType)
}else if (v.isInstanceOf[Int]){
  schema.add(k,IntegerType)
}/*else if (v.isInstanceOf[Array[String]]){
  schema.add(k,ArrayType(StringType))
}else if (v.isInstanceOf[Map[String,String]]){
  schema.add(k,MapType(StringType,StringType))
}*/
  }
  val row =  new GenericRowWithSchema(valuse,schema)
  row
})

Error:(45, 26) Unable to find encoder for type 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit 
Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is 
needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 
instances in a Dataset. Primitive types (Int, String, etc) and Product types 
(case classes) are supported by importing spark.implicits._  Support for 
serializing other types will be added in future releases.
val words = lines.map(line => {

Error:(45, 26) not enough arguments for method map: (implicit evidence$6: 
org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
Unspecified value parameter evidence$6.
val words = lines.map(line => {



2019-09-17


lk_spark 

Re: how can I dynamic parse json in kafka when using Structured Streaming

2019-09-17 Thread Tathagata Das
You can use *from_json* built-in SQL function to parse json.
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-

On Mon, Sep 16, 2019 at 7:39 PM lk_spark  wrote:

> hi,all :
> I'm using Structured Streaming to read kafka , the data type is json
> String , I want to parse  it and conver to a datafrme , my code can't pass
> compile , I don't know how to fix it:
>
>
> val lines = messages.selectExpr("CAST(value AS STRING) as value").as[
> String]
>
> val words = lines.map(line => {
>   var json: JValue = null
>   try {
> json = parse(line)
>   } catch {
> case ex: Exception => { println(ex.getMessage + " " + line) }
>   }
>   //var result: scala.collection.mutable.Map[String,String] =
> scala.collection.mutable.Map()
>   val jsonObj = json.values.asInstanceOf[Map[String, _]]
>   val valuse = jsonObj.values.toArray
>   val schema = StructType(List())
>   for ((k, v) <- jsonObj){
> //result += (k -> jsonObj.get(k).toString())
>
> if(v.isInstanceOf[String]){
>   schema.add(k,StringType)
> }else if (v.isInstanceOf[Int]){
>   schema.add(k,IntegerType)
> }/*else if (v.isInstanceOf[Array[String]]){
>   schema.add(k,ArrayType(StringType))
> }else if (v.isInstanceOf[Map[String,String]]){
>   schema.add(k,MapType(StringType,StringType))
> }*/
>   }
>   val row =  new GenericRowWithSchema(valuse,schema)
>   row
> })
>
>
> Error:(45, 26) Unable to find encoder for type
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit
> Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is
> needed to store
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in
> a Dataset. Primitive types (Int, String, etc) and Product types (case
> classes) are supported by importing spark.implicits._  Support for
> serializing other types will be added in future releases.
> val words = lines.map(line => {
>
> Error:(45, 26) not enough arguments for method map: (implicit evidence$6:
> org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
> Unspecified value parameter evidence$6.
> val words = lines.map(line => {
>
>
>
> 2019-09-17
> --
> lk_spark
>