Re: How to reissue a delegated token after max lifetime passes for a spark streaming application on a Kerberized cluster

2019-01-07 Thread Ali Nazemian
The submit command:

spark-submit \
 --master yarn \
 --deploy-mode cluster \
 --conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf
-Dlog4j.configuration=xxx -Djava.util.Arrays.useLegacyMergeSort=true" \
 --conf
"spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf
-Dlog4j.configuration=xxx -Djava.util.Arrays.useLegacyMergeSort=true" \
 --conf spark.ui.port=18086 \
 --conf spark.executor.memory=${executor_memory} \
 --conf spark.executor.instances=${num_executors} \
 --conf spark.executor.cores=${executor_cores} \
 --conf spark.driver.memory=4g \
 --conf spark.driver.maxResultSize=3g \
 --conf spark.kafka.broker.ingest=xxx \
 --conf spark.kafka.zookeeper.ingest=xxx \
 --conf spark.kafka.broker.egest=xxx \
 --conf spark.kafka.topic.input=xxx \
 --conf spark.kafka.topic.output=xxx \
 --conf spark.kafka.input.interval=10 \
 --conf spark.kafka.group=xxx \
 --conf spark.streaming.kafka.maxRetries=10 \
 --conf spark.kafka.security.protocol.ingress=SASL_PLAINTEXT \
 --conf spark.kafka.security.protocol.egress=SASL_PLAINTEXT \
 --conf spark.fetch.message.max.bytes=104857600 \
 --conf spark.hive.enable.stats=true \
 --conf spark.streaming.backpressure.enabled=true \
 --conf spark.streaming.kafka.maxRatePerPartition=1 \
 --conf spark.streaming.receiver.maxRate=10 \
 --conf spark.executor.heartbeatInterval=120s \
 --conf spark.network.timeout=600s \
 --conf spark.yarn.scheduler.heartbeat.interval-ms=1000 \
 --conf spark.sql.parquet.compression.codec=snappy \
 --conf spark.scheduler.minRegisteredResourcesRatio=1 \
 --conf spark.yarn.maxAppAttempts=10 \
 --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
 --conf spark.yarn.max.executor.failures=$((8 * ${num_executors}))
`# Increase max executor failures (Default: max(numExecutors * 2, 3))` \
 --conf spark.yarn.executor.failuresValidityInterval=1h \
 --conf spark.task.maxFailures=8 \
 --conf spark.yarn.submit.waitAppCompletion=false \
 --conf spark.yarn.principal=xxx \
 --conf spark.yarn.keytab=xxx \
 --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
 --queue default \
 ${APP_HOME}/xxx.jar

The stack trace:

WARN Client: Exception encountered while connecting to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.Secret
Manager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 155456 for spark)
can't be found in cache Exception in thread "main"
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
token (HDFS_DELEGATION_TOKEN token 1 55456 for spark) can't be found in
cache at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554) at
org.apache.hadoop.ipc.Client.call(Client.java:1498) at
org.apache.hadoop.ipc.Client.call(Client.java:1398) at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:818)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source) at
org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2165) at
org.apache.hadoop.hdfs.DistributedFileSystem$26.doCall(DistributedFileSystem.java:1442)
at
org.apache.hadoop.hdfs.DistributedFileSystem$26.doCall(DistributedFileSystem.java:1438)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1438)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$6.apply(ApplicationMaster.scala:160)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$6.apply(ApplicationMaster.scala:157)
at scala.Option.foreach(Option.scala:257) at
org.apache.spark.deploy.yarn.ApplicationMaster.(ApplicationMaster.scala:157)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:765)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67)
at
org.apache.spark.deploy.SparkHadoopUtil$$an

[Spark-ml]Error in training ML models: Missing an output location for shuffle xxx

2019-01-07 Thread Pola Yao
Hi Spark Comminuty,

I was using XGBoost-spark to train a machine learning model. The dataset
was not large (around 1G). And I used the following command to submit my
application:
'''

./bin/spark-submit --master yarn --deploy-mode client --num-executors 50
--executor-cores 2 --executor-memory 3g --driver-memory 8g --conf
spark.executor.memoryOverhead=2g --conf spark.network.timeout=2000s --class
XXX --jars /path/to/jars /path/to/application
'''

And got the following errors:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 58

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 58
at 
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867)
at 
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:863)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:863)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:677)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at 
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at 
ml.dmlc.xgboost4j.java.DataBatch$BatchIterator.hasNext(DataBatch.java:47)
at ml.dmlc.xgboost4j.java.XGBoostJNI.XGDMatrixCreateFromDataIter(Native 
Method)
at ml.dmlc.xgboost4j.java.DMatrix.(DMatrix.java:53)
at ml.dmlc.xgboost4j.scala.DMatrix.(DMatrix.scala:42)
at 
ml.dmlc.xgboost4j.scala.spark.Watches$.buildWatches(XGBoost.scala:436)
at 
ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$12.apply(XGBoost.scala:276)
at 
ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$12.apply(XGBoost.scala:275)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The error was occurred at foreachPartition at XGBoost.scala:287


Did anybody know what caused the error? Was it a memory issue?

Thanks!


Re: Re: Can an UDF return a custom class other than case class?

2019-01-07 Thread Muthu Jayakumar
Perhaps use of generic StructType may work in your situation of being
language agnostic? case-classes are backed by implicits to provide type
conversions into columnar.
My 2 cents.

Thanks,
Mutu


On Mon, Jan 7, 2019 at 4:13 AM yeikel valdes  wrote:

>
>
>  Forwarded Message 
> From : em...@yeikel.com
> To : kfehl...@gmail.com
> Date : Mon, 07 Jan 2019 04:11:22 -0800
> Subject : Re: Can an UDF return a custom class other than case class?
>
>
> In this case I am just curious because I'd like to know if it is possible.
>
> At the same time I will be interacting with external Java class files if
> that's allowed.
>
> Also, what are the equivalents for other languages like Java? I am not
> aware of anything similar to the case class in Java.
>
> I am currently using Scala but I might use PySpark or the Java apis in the
> future.
>
> Thank you
>
>  On Sun, 06 Jan 2019 22:06:28 -0800 * kfehl...@gmail.com
>  * wrote 
>
> Is there a reason why case classes won't work for your use case?
>
> On Sun, Jan 6, 2019 at 10:43 PM  wrote:
>
> Hi ,
>
>
>
> Is it possible to return a custom class from an UDF other than a case
> class?
>
>
>
> If so , how can we avoid this exception ? :
> java.lang.UnsupportedOperationException: Schema for type {custom type} is
> not supported
>
>
>
> Full Example :
>
>
>
> import spark.implicits._
>
> import org.apache.spark.sql.functions.udf
>
>
>
> class Person (val name : String)
>
>
>
> val toPerson = (s1 : String) => new Person(s1)
>
>
>
> val dataset = Seq("John Smith").toDF("name")
>
>
>
> val personUDF = udf(toPerson)
>
>
>
> java.lang.UnsupportedOperationException: Schema for type Person is not
> supported
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:780)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:715)
>
>   at
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:711)
>
>   at org.apache.spark.sql.functions$.udf(functions.scala:3340)
>
>
>
> dataset.withColumn("person", personUDF($"name"))
>
>
>
>
>
> Thank you.
>
>
>
>


Re:Parquet file number of columns

2019-01-07 Thread yeikel valdes
Not according to Parquet dev group

https://groups.google.com/forum/m/#!topic/parquet-dev/jj7TWPIUlYI

 On Mon, 07 Jan 2019 05:11:51 -0800 gourav.sengu...@gmail.com wrote 

Hi,

Is there any limit to the number of columns that we can have in Parquet file 
format? 


Thanks and Regards,
Gourav Sengupta

Parquet file number of columns

2019-01-07 Thread Gourav Sengupta
Hi,

Is there any limit to the number of columns that we can have in Parquet
file format?


Thanks and Regards,
Gourav Sengupta


RE: RE: Re: Spark Kinesis Connector SSL issue

2019-01-07 Thread Shashikant Bangera
Hi,

The issue is that the KCL inside the Spark Streaming connector does not provide 
a way to pass KCL configuration in, which means we can’t supply configuration 
to disable SSL cert checks. In a typical (non-Spark Streaming) KCL app, we can 
instantiate the KCL via:

   Worker worker = new Worker.Builder()
.config(kclConfig) , //we can specify to disable SSL certs here.
.kinesisClient(kinesisClient)
.recordProcessorFactory(processorFactory)
.build();

However with the Kinesis Spark Streaming consumer, we do not have the ability 
to set this. Our only interface with Kinesis is:

KinesisUtils.createStream(context, appName, streamName, serviceEndpoint, 
regionName,
InitialPositionInStream.TRIM_HORIZON, checkpoint, 
StorageLevel.MEMORY_AND_DISK_2())

and so we can’t pass config anywhere for the KCL to read.

Regards,

Ben


Shashikant Bangera | DevOps Engineer
Payment Services DevOps Engineering
Email: shashikantbang...@discover.com
Group email: eppdev...@discover.com
Tel: +44 (0)
Mob: +44 (0) 7440783885


From: Ben Watson
Sent: 07 January 2019 12:32
To: yeikel valdes ; Shashikant Bangera 

Cc: user@spark.apache.org
Subject: RE: [EXTERNAL] RE: Re: Spark Kinesis Connector SSL issue

Hi,

The issue is that the KCL inside the Spark Streaming connector does not provide 
a way to pass KCL configuration in, which means we can’t supply configuration 
to disable SSL cert checks. In a typical (non-Spark Streaming) KCL app, we can 
instantiate the KCL via:

   Worker worker = new Worker.Builder()
.config(kclConfig) , //we can specify to disable SSL certs here.
.kinesisClient(kinesisClient)
.recordProcessorFactory(processorFactory)
.build();

However with the Kinesis Spark Streaming consumer, we do not have the ability 
to set this. Our only interface with Kinesis is:

KinesisUtils.createStream(context, appName, streamName, serviceEndpoint, 
regionName,
InitialPositionInStream.TRIM_HORIZON, checkpoint, 
StorageLevel.MEMORY_AND_DISK_2())

and so we can’t pass config anywhere for the KCL to read.

Regards,

Ben

From: yeikel valdes [mailto:em...@yeikel.com]
Sent: 07 January 2019 12:21
To: Shashikant Bangera 
Cc: user@spark.apache.org; Ben Watson 
Subject: [EXTERNAL] RE: Re: Spark Kinesis Connector SSL issue

CAUTION EXTERNAL EMAIL
DO NOT open attachments or click on links from unknown senders or unexpected 
emails.


Any chance you can share a minimum example to replicate the issue?


 On Mon, 07 Jan 2019 04:17:44 -0800 shashikantbang...@discover.com wrote 

Hi Valdes,

Thank you for your response, to answer to your question. yes I can

@ben : correct me if I am wrong.

Cheers,
Shashi

Shashikant Bangera | DevOps Engineer
Payment Services DevOps Engineering
Email: shashikantbang...@discover.com
Group email: eppdev...@discover.com
Tel: +44 (0)
Mob: +44 (0) 7440783885


From: yeikel valdes [mailto:em...@yeikel.com]
Sent: 07 January 2019 12:15
To: Shashikant Bangera 
mailto:shashikantbang...@discover.com>>
Cc: user@spark.apache.org
Subject: [EXTERNAL] Re: Spark Kinesis Connector SSL issue

CAUTION EXTERNAL EMAIL
DO NOT open attachments or click on links from unknown senders or unexpected 
emails.


Can you call this service with regular code(No Spark)?


 On Mon, 07 Jan 2019 02:42:48 -0800 
shashikantbang...@discover.com wrote 
Hi team,

please help , we are kind of blocked here.

Cheers,
Shashi



--
Sent from: 
http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: Re:Writing RDDs to HDFS is empty

2019-01-07 Thread yeikel valdes
Ideally...we would like to copy paste and try in our end. A screenshot is not 
enough.

If you have private information just remove and create a minimum example we can 
use to replicate the issue.
I'd say similar to this :

https://stackoverflow.com/help/mcve

 On Mon, 07 Jan 2019 04:15:16 -0800 fyyleej...@163.com wrote 

Sorry,the code is too long,it is simple to say 
look at the photo 

 

i define a arrayBuffer ,there are "1 2", '' 2 3" ," 4 5" in it ,I want to 
save in hdfs ,so i make it to RDD, 
sc. pallelize(arraybuffeer) 
but when in idea,i use println(_),the value is right,but in distributed 
there is nothing 



-- 
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 

- 
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 



RE: Re: Spark Kinesis Connector SSL issue

2019-01-07 Thread yeikel valdes
Any chance you can share a minimum example to replicate the issue?

 On Mon, 07 Jan 2019 04:17:44 -0800 shashikantbang...@discover.com wrote 


Hi Valdes,

 

Thank you for your response, to answer to your question. yes I can

 

@ben : correct me if I am wrong.

 

Cheers,

Shashi

 

Shashikant Bangera | DevOps Engineer

Payment Services DevOps Engineering

Email: shashikantbang...@discover.com

Group email: eppdev...@discover.com

Tel: +44 (0)

Mob: +44 (0) 7440783885

 

 

From: yeikel valdes [mailto:em...@yeikel.com] 
Sent: 07 January 2019 12:15
To: Shashikant Bangera 
Cc: user@spark.apache.org
Subject: [EXTERNAL] Re: Spark Kinesis Connector SSL issue

 

CAUTION EXTERNAL EMAIL 
DO NOT open attachments or click on links from unknown senders or unexpected 
emails.

 

Can you call this service with regular code(No Spark)?

 


 On Mon, 07 Jan 2019 02:42:48 -0800 shashikantbang...@discover.com wrote 


Hi team, 

please help , we are kind of blocked here. 

Cheers, 
Shashi 



-- 
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 

- 
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

 

RE: Re: Spark Kinesis Connector SSL issue

2019-01-07 Thread Shashikant Bangera
Hi Valdes,

Thank you for your response, to answer to your question. yes I can

@ben : correct me if I am wrong.

Cheers,
Shashi

Shashikant Bangera | DevOps Engineer
Payment Services DevOps Engineering
Email: shashikantbang...@discover.com
Group email: eppdev...@discover.com
Tel: +44 (0)
Mob: +44 (0) 7440783885


From: yeikel valdes [mailto:em...@yeikel.com]
Sent: 07 January 2019 12:15
To: Shashikant Bangera 
Cc: user@spark.apache.org
Subject: [EXTERNAL] Re: Spark Kinesis Connector SSL issue

CAUTION EXTERNAL EMAIL
DO NOT open attachments or click on links from unknown senders or unexpected 
emails.


Can you call this service with regular code(No Spark)?


 On Mon, 07 Jan 2019 02:42:48 -0800 shashikantbang...@discover.com wrote 

Hi team,

please help , we are kind of blocked here.

Cheers,
Shashi



--
Sent from: 
http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org



Re: Re:Writing RDDs to HDFS is empty

2019-01-07 Thread Jian Lee
Sorry,the code is too long,it is simple to say 
look at the photo

 

i define a arrayBuffer ,there are "1 2",  '' 2 3" ," 4 5" in it ,I want to
save in hdfs ,so i make it to RDD,
sc. pallelize(arraybuffeer)
but when in idea,i use println(_),the value is right,but in distributed
there is nothing 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Kinesis Connector SSL issue

2019-01-07 Thread yeikel valdes
Can you call this service with regular code(No Spark)?

 On Mon, 07 Jan 2019 02:42:48 -0800 shashikantbang...@discover.com wrote 


Hi team, 

please help , we are kind of blocked here. 

Cheers, 
Shashi 



-- 
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 

- 
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 



Fwd:Re: Can an UDF return a custom class other than case class?

2019-01-07 Thread yeikel valdes


 Forwarded Message 
>From : em...@yeikel.com
To : kfehl...@gmail.com
Date : Mon, 07 Jan 2019 04:11:22 -0800
Subject : Re: Can an UDF return a custom class other than case class?


In this case I am just curious because I'd like to know if it is possible. 

At the same time I will be interacting with external Java class files if that's 
allowed.

Also, what are the equivalents for other languages like Java? I am not aware of 
anything similar to the case class in Java.

I am currently using Scala but I might use PySpark or the Java apis in the 
future.

Thank you

 On Sun, 06 Jan 2019 22:06:28 -0800 kfehl...@gmail.com wrote 

Is there a reason why case classes won't work for your use case?

On Sun, Jan 6, 2019 at 10:43 PM  wrote:
Hi ,

 

Is it possible to return a custom class from an UDF other than a case class?

 

If so , how can we avoid this exception ? : 
java.lang.UnsupportedOperationException: Schema for type {custom type} is not 
supported

 

Full Example :

 

import spark.implicits._

import org.apache.spark.sql.functions.udf

 

class Person (val name : String)

 

val toPerson = (s1 : String) => new Person(s1)

 

val dataset = Seq("John Smith").toDF("name")

 

val personUDF = udf(toPerson)

 

java.lang.UnsupportedOperationException: Schema for type Person is not supported

  at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:780)

  at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:715)

  at 
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)

  at 
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)

  at 
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)

  at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)

  at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:711)

  at org.apache.spark.sql.functions$.udf(functions.scala:3340)

 

dataset.withColumn("person", personUDF($"name"))

 

 

Thank you.




Re:Writing RDDs to HDFS is empty

2019-01-07 Thread yeikel valdes
Please share a minimum amount of code to try reproduce the issue...

 On Mon, 07 Jan 2019 00:46:42 -0800 fyyleej...@163.com wrote 

Hi all, 
In my experiment program,I used spark Graphx, 
when running on the Idea in windows,the result is right, 
but when runing on the linux distributed cluster,the result in hdfs is 
empty, 
why?how to solve? 

 

Thanks! 
Jian Li 



-- 
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 

- 
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 



Re: Spark Kinesis Connector SSL issue

2019-01-07 Thread shzshi
Hi team, 

please help , we are kind of blocked here. 

Cheers,
Shashi



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Writing RDDs to HDFS is empty

2019-01-07 Thread Jian Lee
Hi all,
In  my experiment program,I used spark Graphx,
when running on the Idea in windows,the result is right,
but when runing  on the linux distributed cluster,the result in hdfs is
empty,
why?how to solve?

 

Thanks!
Jian Li



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org