Re: intermittent Kryo serialization failures in Spark
Pre-register your classes: ``` import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(Class.forName("[[B")) // byte[][] kryo.register(classOf[java.lang.Class[_]]) } } ``` then run with 'spark.kryo.referenceTracking': 'false', 'spark.kryo.registrationRequired': 'false', 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', 'spark.kryo.unsafe': 'false', 'spark.kryoserializer.buffer.max': '256m', On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov wrote: > Hi folks, > > Posted this some time ago but the problem continues to bedevil us. I'm > including a (slightly edited) stack trace that results from this error. If > anyone can shed any light on what exactly is happening here and what we can > do to avoid it, that would be much appreciated. > > org.apache.spark.SparkException: Failed to register classes with Kryo >> at >> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) >> at >> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) >> at >> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) >> at >> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) >> at >> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) >> at >> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) >> at >> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) >> at >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> at >> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) >> at >> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) >> at >> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >> at >> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) >> at >> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >> at >> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) >> at >> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) >> at >> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) >> at >> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) >> at >> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) >> at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121) >> at >>
Re: custom rdd - do I need a hadoop input format?
You can do it with custom RDD implementation. You will mainly implement "getPartitions" - the logic to split your input into partitions and "compute" to compute and return the values from the executors. On Tue, 17 Sep 2019 at 08:47, Marcelo Valle wrote: > Just to be more clear about my requirements, what I have is actually a > custom format, with header, summary and multi line blocks. I want to create > tasks per block and no per line.I already have a library that reads an > InputStream and outputs an Iterator of Block, but now I need to integrate > this with spark > > On Tue, 17 Sep 2019 at 16:28, Marcelo Valle > wrote: > >> Hi, >> >> I want to create a custom RDD which will read n lines in sequence from a >> file, which I call a block, and each block should be converted to a spark >> dataframe to be processed in parallel. >> >> Question - do I have to implement a custom hadoop input format to achieve >> this? Or is it possible to do it only with RDD APIs? >> >> Thanks, >> Marcelo. >> > > This email is confidential [and may be protected by legal privilege]. If > you are not the intended recipient, please do not copy or disclose its > content but contact the sender immediately upon receipt. > > KTech Services Ltd is registered in England as company number 10704940. > > Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, > United Kingdom >
Re: custom rdd - do I need a hadoop input format?
Just to be more clear about my requirements, what I have is actually a custom format, with header, summary and multi line blocks. I want to create tasks per block and no per line.I already have a library that reads an InputStream and outputs an Iterator of Block, but now I need to integrate this with spark On Tue, 17 Sep 2019 at 16:28, Marcelo Valle wrote: > Hi, > > I want to create a custom RDD which will read n lines in sequence from a > file, which I call a block, and each block should be converted to a spark > dataframe to be processed in parallel. > > Question - do I have to implement a custom hadoop input format to achieve > this? Or is it possible to do it only with RDD APIs? > > Thanks, > Marcelo. > This email is confidential [and may be protected by legal privilege]. If you are not the intended recipient, please do not copy or disclose its content but contact the sender immediately upon receipt. KTech Services Ltd is registered in England as company number 10704940. Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United Kingdom
custom rdd - do I need a hadoop input format?
Hi, I want to create a custom RDD which will read n lines in sequence from a file, which I call a block, and each block should be converted to a spark dataframe to be processed in parallel. Question - do I have to implement a custom hadoop input format to achieve this? Or is it possible to do it only with RDD APIs? Thanks, Marcelo. This email is confidential [and may be protected by legal privilege]. If you are not the intended recipient, please do not copy or disclose its content but contact the sender immediately upon receipt. KTech Services Ltd is registered in England as company number 10704940. Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United Kingdom
Can I set the Alluxio WriteType in Spark applications?
Hi, If Spark applications write data into alluxio, can WriteType be configured? Thanks, Mark
Re: intermittent Kryo serialization failures in Spark
Hi folks, Posted this some time ago but the problem continues to bedevil us. I'm including a (slightly edited) stack trace that results from this error. If anyone can shed any light on what exactly is happening here and what we can do to avoid it, that would be much appreciated. org.apache.spark.SparkException: Failed to register classes with Kryo > at > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) > at > org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) > at > org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) > at > org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) > at > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) > at > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) > at > org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) > at > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) > at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) > at > org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) > at > org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) > at > org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) > at > org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) > at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at >
How to Integrate Spark mllib Streaming Training Models To Spark Structured Streaming
Spark mllib library Streaming Training models work with DStream. So is there any way to use them with spark structured streaming.
How to integrates MLeap to Spark Structured Streaming
So I am trying to integrate MLeap to spark structured streaming, But facing a problem. As the Spark structured Streaming with Kafka works with data frames and for MLeap LeapFrame is required. So I tried to convert data frame to leapframe using mleap spark support library function (toSparkLeapFrame) but it gives an error as during conversion it converts dataframe to rdd which performs some action which is not allowed in structured Streaming. So if anyone knows how to use mleap in spark structured Stream please help to solve this?
Re: Re: how can I dynamic parse json in kafka when using Structured Streaming
I want to parse the Struct of data dynamically , then write data to delta lake , I think it can automatically merge scheme. 2019-09-17 lk_spark 发件人:Tathagata Das 发送时间:2019-09-17 16:13 主题:Re: how can I dynamic parse json in kafka when using Structured Streaming 收件人:"lk_spark" 抄送:"user.spark" You can use from_json built-in SQL function to parse json. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column- On Mon, Sep 16, 2019 at 7:39 PM lk_spark wrote: hi,all : I'm using Structured Streaming to read kafka , the data type is json String , I want to parse it and conver to a datafrme , my code can't pass compile , I don't know how to fix it: val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String] val words = lines.map(line => { var json: JValue = null try { json = parse(line) } catch { case ex: Exception => { println(ex.getMessage + " " + line) } } //var result: scala.collection.mutable.Map[String,String] = scala.collection.mutable.Map() val jsonObj = json.values.asInstanceOf[Map[String, _]] val valuse = jsonObj.values.toArray val schema = StructType(List()) for ((k, v) <- jsonObj){ //result += (k -> jsonObj.get(k).toString()) if(v.isInstanceOf[String]){ schema.add(k,StringType) }else if (v.isInstanceOf[Int]){ schema.add(k,IntegerType) }/*else if (v.isInstanceOf[Array[String]]){ schema.add(k,ArrayType(StringType)) }else if (v.isInstanceOf[Map[String,String]]){ schema.add(k,MapType(StringType,StringType)) }*/ } val row = new GenericRowWithSchema(valuse,schema) row }) Error:(45, 26) Unable to find encoder for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. val words = lines.map(line => { Error:(45, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema]. Unspecified value parameter evidence$6. val words = lines.map(line => { 2019-09-17 lk_spark
Re: how can I dynamic parse json in kafka when using Structured Streaming
You can use *from_json* built-in SQL function to parse json. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column- On Mon, Sep 16, 2019 at 7:39 PM lk_spark wrote: > hi,all : > I'm using Structured Streaming to read kafka , the data type is json > String , I want to parse it and conver to a datafrme , my code can't pass > compile , I don't know how to fix it: > > > val lines = messages.selectExpr("CAST(value AS STRING) as value").as[ > String] > > val words = lines.map(line => { > var json: JValue = null > try { > json = parse(line) > } catch { > case ex: Exception => { println(ex.getMessage + " " + line) } > } > //var result: scala.collection.mutable.Map[String,String] = > scala.collection.mutable.Map() > val jsonObj = json.values.asInstanceOf[Map[String, _]] > val valuse = jsonObj.values.toArray > val schema = StructType(List()) > for ((k, v) <- jsonObj){ > //result += (k -> jsonObj.get(k).toString()) > > if(v.isInstanceOf[String]){ > schema.add(k,StringType) > }else if (v.isInstanceOf[Int]){ > schema.add(k,IntegerType) > }/*else if (v.isInstanceOf[Array[String]]){ > schema.add(k,ArrayType(StringType)) > }else if (v.isInstanceOf[Map[String,String]]){ > schema.add(k,MapType(StringType,StringType)) > }*/ > } > val row = new GenericRowWithSchema(valuse,schema) > row > }) > > > Error:(45, 26) Unable to find encoder for type > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit > Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is > needed to store > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in > a Dataset. Primitive types (Int, String, etc) and Product types (case > classes) are supported by importing spark.implicits._ Support for > serializing other types will be added in future releases. > val words = lines.map(line => { > > Error:(45, 26) not enough arguments for method map: (implicit evidence$6: > org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema]. > Unspecified value parameter evidence$6. > val words = lines.map(line => { > > > > 2019-09-17 > -- > lk_spark >