Spark streaming filling the disk with logs

2019-02-13 Thread Deepak Sharma
Hi All
I am running a spark streaming job with below configuration :

--conf "spark.executor.extraJavaOptions=-Droot.logger=WARN,console"

But it’s still filling the disk with info logs.
If the logging level is set to WARN at cluster level , then only the WARN
logs are getting written but then it affects all the jobs .

Is there any way to get rid of INFO level of logging at spark streaming job
level ?

Thanks
Deepak

-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: "where" clause able to access fields not in its schema

2019-02-13 Thread Yeikel
It seems that we are using the function incorrectly. 


val a = Seq((1,10),(2,20)).toDF("foo","bar")

val b = a.select($"foo")

val c = b.where(b("bar") === 20)

c.show


  Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot
resolve column name "bar" among   (foo);






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Pyspark elementwise matrix multiplication

2019-02-13 Thread Yeikel
Elementwise product is described here : 

https://spark.apache.org/docs/latest/mllib-feature-extraction.html#elementwiseproduct

I don't know if it will work with your input thought. 





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkR + binary type + how to get value

2019-02-13 Thread Felix Cheung
Please share your code



From: Thijs Haarhuis 
Sent: Wednesday, February 13, 2019 6:09 AM
To: user@spark.apache.org
Subject: SparkR + binary type + how to get value

Hi all,

Does anybody have any experience in accessing the data from a column which has 
a binary type in a Spark Data Frame in R?
I have a Spark Data Frame which has a column which is of a binary type. I want 
to access this data and process it.
In my case I collect the spark data frame to a R data frame and access the 
first row.
When I print this row to the console it does print all the hex values correctly.

However when I access the column it prints it is a list of 1 …when I print the 
type of the child element..it again prints it is a list.
I expected this value to be of a raw type.

Anybody has some experience with this?

Thanks
Thijs



Re: Got fatal error when running spark 2.4.0 on k8s

2019-02-13 Thread dawn breaks
It seems that fabric8 kubernetes client can't parse the caCertFile in the
default location /var/run/secrets/kubernetes.io/serviceaccount/ca.crt,  and
anybody give me some advices?

On Wed, 13 Feb 2019 at 16:21, dawn breaks <2005dawnbre...@gmail.com> wrote:

> we submit spark job to k8s by the following command, and the driver pod
> got an error and exit. Anybody can help us to solve it?
>
>  ./bin/spark-submit \
> --master k8s://https://172.21.91.48:6443 \
> --deploy-mode cluster \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --name spark-pi \
> --class org.apache.spark.examples.SparkPi \
> --conf spark.executor.instances=1 \
> --conf spark.kubernetes.container.image=xxxRepo/spark:v2.4.0 \
> local:///opt/spark/examples/jars/spark-examples*.jar \
> 5
>
>
> The error detail info as following:
>
> 2019-02-13 07:13:06 ERROR SparkContext:91 - Error initializing
> SparkContext.
> org.apache.spark.SparkException: External scheduler cannot be instantiated
> at
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2794)
> at org.apache.spark.SparkContext.(SparkContext.scala:493)
> at
> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
> at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
> at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
> at scala.Option.getOrElse(Option.scala:121)
> at
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
> at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
> at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> 
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException: An
> error has occurred.
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:53)
> at
> io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClient(HttpClientUtils.java:167)
> at
> org.apache.spark.deploy.k8s.SparkKubernetesClientFactory$.createKubernetesClient(SparkKubernetesClientFactory.scala:84)
> at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:64)
> at
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2788)
> ... 20 more
> Caused by: java.security.cert.CertificateException: Could not parse
> certificate: java.io.IOException: Empty input
> at
> sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:110)
> at
> java.security.cert.CertificateFactory.generateCertificate(CertificateFactory.java:339)
> at
> io.fabric8.kubernetes.client.internal.CertUtils.createTrustStore(CertUtils.java:93)
> at
> io.fabric8.kubernetes.client.internal.CertUtils.createTrustStore(CertUtils.java:71)
> at
> io.fabric8.kubernetes.client.internal.SSLUtils.trustManagers(SSLUtils.java:114)
> at
> io.fabric8.kubernetes.client.internal.SSLUtils.trustManagers(SSLUtils.java:93)
> at
> io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClient(HttpClientUtils.java:63)
> ... 23 more
> Caused by: java.io.IOException: Empty input
> at
> sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:106)
> ... 29 more
>


Re: "where" clause able to access fields not in its schema

2019-02-13 Thread Vadim Semenov
Yeah, the filter gets infront of the select after analyzing

scala> b.where($"bar" === 20).explain(true)
== Parsed Logical Plan ==
'Filter ('bar = 20)
+- AnalysisBarrier
  +- Project [foo#6]
 +- Project [_1#3 AS foo#6, _2#4 AS bar#7]
+- SerializeFromObject [assertnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._1 AS _1#3, assertnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._2 AS _2#4]
   +- ExternalRDD [obj#2]

== Analyzed Logical Plan ==
foo: int
Project [foo#6]
+- Filter (bar#7 = 20)
   +- Project [foo#6, bar#7]
  +- Project [_1#3 AS foo#6, _2#4 AS bar#7]
 +- SerializeFromObject [assertnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._1 AS _1#3, assertnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._2 AS _2#4]
+- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [_1#3 AS foo#6]
+- Filter (_2#4 = 20)
   +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1
AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
  +- ExternalRDD [obj#2]

== Physical Plan ==
*(1) Project [_1#3 AS foo#6]
+- *(1) Filter (_2#4 = 20)
   +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2,
true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
  +- Scan ExternalRDDScan[obj#2]

On Wed, Feb 13, 2019 at 8:04 PM Yeikel  wrote:

> This is indeed strange. To add to the question , I can see that if I use a
> filter I get an exception (as expected) , so I am not sure what's the
> difference between the  where clause and filter :
>
>
> b.filter(s=> {
> val bar : String = s.getAs("bar")
>
> bar.equals("20")
> }).show
>
> * java.lang.IllegalArgumentException: Field "bar" does not exist.*
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Sent from my iPhone


Re: Spark with Kubernetes connecting to pod ID, not address

2019-02-13 Thread Pat Ferrel
Hmm, I’m not asking about using k8s to control Spark as a Job manager or 
scheduler like Yarn. We use the built-in standalone Spark Job Manager and 
sparl://spark-api:7077 as the master not k8s.

The problem is using k8s to manage a cluster consisting of our app, some 
databases, and Spark (one master, one driver, several executors). The problem 
is that some kind of callback from Spark is trying to use the pod ID in the 
callback and is failing to connect because of that. We have tried deployMode 
“client” and “cluster” but get the same error

The full trace is below but the important bit is:

    Failed to connect to harness-64d97d6d6-6n7nh:46337

This came from the deployMode = “client: and the port is the driver port, which 
should be on the launching pod. For some reason it is using a pod ID instead of 
a real address. Doesn’t the driver run in the launching app’s process? The 
launching app is on the pod ID harness-64d97d6d6-6n7nh but it has the k8s DNS 
address of harness-api. I can see the correct address fro the launching pod 
with "kubectl get services"


The error is:

Spark Executor Command: "/usr/lib/jvm/java-1.8-openjdk/bin/java" "-cp" 
"/spark/conf/:/spark/jars/*:/etc/hadoop/" "-Xmx1024M" 
"-Dspark.driver.port=46337" 
"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" 
"spark://CoarseGrainedScheduler@harness-64d97d6d6-6n7nh:46337" "--executor-id" 
"138" "--hostname" "10.31.31.174" "--cores" "8" "--app-id" 
"app-20190213210105-" "--worker-url" "spark://Worker@10.31.31.174:37609"


Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:63)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:63)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
... 4 more
Caused by: java.io.IOException: Failed to connect to 
harness-64d97d6d6-6n7nh:46337
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: harness-64d97d6d6-6n7nh
at java.net.InetAddress.getAllByName0(InetAddress.java:1281)
at java.net.InetAddress.getAllByName(InetAddress.java:1193)
at java.net.InetAddress.getAllByName(InetAddress.java:1127)
at java.net.InetAddress.getByName(InetAddress.java:1077)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
at java.security.AccessController.doPrivileged(Native Method)
at 
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)
at 
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)
at 

Re: "where" clause able to access fields not in its schema

2019-02-13 Thread Yeikel
This is indeed strange. To add to the question , I can see that if I use a
filter I get an exception (as expected) , so I am not sure what's the
difference between the  where clause and filter : 


b.filter(s=> {
val bar : String = s.getAs("bar")

bar.equals("20")
}).show

* java.lang.IllegalArgumentException: Field "bar" does not exist.*





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



"where" clause able to access fields not in its schema

2019-02-13 Thread Alex Nastetsky
I don't know if this is a bug or a feature, but it's a bit counter-intuitive 
when reading code.

The "b" dataframe does not have field "bar" in its schema, but is still able to 
filter on that field.

scala> val a = sc.parallelize(Seq((1,10),(2,20))).toDF("foo","bar")
a: org.apache.spark.sql.DataFrame = [foo: int, bar: int]

scala> a.show
+---+---+
|foo|bar|
+---+---+
|  1| 10|
|  2| 20|
+---+---+

scala> val b = a.select($"foo")
b: org.apache.spark.sql.DataFrame = [foo: int]

scala> b.schema
res3: org.apache.spark.sql.types.StructType = 
StructType(StructField(foo,IntegerType,false))

scala> b.select($"bar").show
org.apache.spark.sql.AnalysisException: cannot resolve '`bar`' given input 
columns: [foo];;
[...snip...]

scala> b.where($"bar" === 20).show
+---+
|foo|
+---+
|  2|
+---+



Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Chris Teoh
Thanks Peter.

I'm not sure if that is possible yet. The closest I can think of to
achieving what you want is to try something like:-
df.registerTempTable("mytable")
sql("create table mymanagedtable as select * from mytable")

I haven't used CTAS in Spark SQL before but have heard it works. This would
infer the schema for you and from what I have heard CTAS creates managed
tables.

Let me know if this works for you.

Kind Regards
Chris

On Thu, 14 Feb 2019 at 03:08 Horváth Péter Gergely <
horvath.peter.gerg...@gmail.com> wrote:

> Hi Chris,
>
> Thank you for the input, I know I can always write the table DDL manually.
>
> But here I would like to rely on Spark generating the schema. What I don't
> understand is the change in the behaviour of Spark: having the storage path
> specified does not necessarily mean it should be an external table.
>
> Is there any way to control/override this?
>
> Thanks,
> Peter
>
>
> On Wed, Feb 13, 2019, 13:09 Chris Teoh 
>> Hey there,
>>
>> Could you not just create a managed table using the DDL in Spark SQL and
>> then written the data frame to the underlying folder or use Spark SQL to do
>> an insert?
>>
>> Alternatively try create table as select. Iirc hive creates managed
>> tables this way.
>>
>> I've not confirmed this works but I think that might be worth trying.
>>
>> I hope that helps.
>>
>> Kind regards
>> Chris
>>
>> On Wed., 13 Feb. 2019, 10:44 pm Horváth Péter Gergely, <
>> horvath.peter.gerg...@gmail.com> wrote:
>>
>>> Dear All,
>>>
>>> I am facing a strange issue with Spark 2.3, where I would like to create
>>> a MANAGED table out of the content of a DataFrame with the storage path
>>> overridden.
>>>
>>> Apparently, when one tries to create a Hive table via
>>> DataFrameWriter.saveAsTable, supplying a "path" option causes Spark to
>>> automatically create an external table.
>>>
>>> This demonstrates the behaviour:
>>>
>>> scala> val numbersDF = sc.parallelize((1 to 100).toList).toDF("numbers")
>>> numbersDF: org.apache.spark.sql.DataFrame = [numbers: int]
>>>
>>> scala> numbersDF.write.format("orc").saveAsTable("numbers_table1")
>>>
>>> scala> spark.sql("describe formatted
>>> numbers_table1").filter(_.get(0).toString == "Type").show
>>> ++-+---+
>>> |col_name|data_type|comment|
>>> ++-+---+
>>> |Type|  MANAGED|   |
>>> ++-+---+
>>>
>>>
>>> scala> numbersDF.write.format("orc").option("path",
>>> "/user/foobar/numbers_table_data").saveAsTable("numbers_table2")
>>>
>>> scala> spark.sql("describe formatted
>>> numbers_table2").filter(_.get(0).toString == "Type").show
>>> ++-+---+
>>> |col_name|data_type|comment|
>>> ++-+---+
>>> |Type| EXTERNAL|   |
>>> ++-+---+
>>>
>>>
>>>
>>> I am wondering if there is any way to force creation of a managed table
>>> with a custom path (which as far as I know, should be possible via standard
>>> Hive commands).
>>>
>>> I often seem to have the problem that I cannot find the appropriate
>>> documentation for the option configuration of Spark APIs. Could someone
>>> please point me to the right direction and tell me where these things are
>>> documented?
>>>
>>> Thanks,
>>> Peter
>>>
>>>


Stage or Tasks level logs missing

2019-02-13 Thread Nirav Patel
Currently there seems to be 3 places to check task level logs:
1) Using spark UI
2) `yarn application log`
3) log aggregation  on hdfs (if enabled)

All above only give you log at executor(container) level. However one
executor can have multiple threads and each might be running part of
different stages(stg1, stg2)) and within that different tasks(tid1,
tid2...) . It's hard to track particular task activities  in Executor logs.

It'd be nice if:
1) mark all log entries with stageId followed by taskId
2) have a separate log file for each task (taskId)
3) have a separated log file for of stage level logs

If I missed something let me know

Thanks,
Nirav


Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Horváth Péter Gergely
Hi Chris,

Thank you for the input, I know I can always write the table DDL manually.

But here I would like to rely on Spark generating the schema. What I don't
understand is the change in the behaviour of Spark: having the storage path
specified does not necessarily mean it should be an external table.

Is there any way to control/override this?

Thanks,
Peter


On Wed, Feb 13, 2019, 13:09 Chris Teoh  Hey there,
>
> Could you not just create a managed table using the DDL in Spark SQL and
> then written the data frame to the underlying folder or use Spark SQL to do
> an insert?
>
> Alternatively try create table as select. Iirc hive creates managed tables
> this way.
>
> I've not confirmed this works but I think that might be worth trying.
>
> I hope that helps.
>
> Kind regards
> Chris
>
> On Wed., 13 Feb. 2019, 10:44 pm Horváth Péter Gergely, <
> horvath.peter.gerg...@gmail.com> wrote:
>
>> Dear All,
>>
>> I am facing a strange issue with Spark 2.3, where I would like to create
>> a MANAGED table out of the content of a DataFrame with the storage path
>> overridden.
>>
>> Apparently, when one tries to create a Hive table via
>> DataFrameWriter.saveAsTable, supplying a "path" option causes Spark to
>> automatically create an external table.
>>
>> This demonstrates the behaviour:
>>
>> scala> val numbersDF = sc.parallelize((1 to 100).toList).toDF("numbers")
>> numbersDF: org.apache.spark.sql.DataFrame = [numbers: int]
>>
>> scala> numbersDF.write.format("orc").saveAsTable("numbers_table1")
>>
>> scala> spark.sql("describe formatted
>> numbers_table1").filter(_.get(0).toString == "Type").show
>> ++-+---+
>> |col_name|data_type|comment|
>> ++-+---+
>> |Type|  MANAGED|   |
>> ++-+---+
>>
>>
>> scala> numbersDF.write.format("orc").option("path",
>> "/user/foobar/numbers_table_data").saveAsTable("numbers_table2")
>>
>> scala> spark.sql("describe formatted
>> numbers_table2").filter(_.get(0).toString == "Type").show
>> ++-+---+
>> |col_name|data_type|comment|
>> ++-+---+
>> |Type| EXTERNAL|   |
>> ++-+---+
>>
>>
>>
>> I am wondering if there is any way to force creation of a managed table
>> with a custom path (which as far as I know, should be possible via standard
>> Hive commands).
>>
>> I often seem to have the problem that I cannot find the appropriate
>> documentation for the option configuration of Spark APIs. Could someone
>> please point me to the right direction and tell me where these things are
>> documented?
>>
>> Thanks,
>> Peter
>>
>>


Design recommendation

2019-02-13 Thread Kumar sp
Hello I  need a design recommendation.

I need to calcualte a couple of calculations with min shuffling and better
perf. I have an nested structure with say a class have n number of students
and structure will be similiar to this

{ classId: String,
StudendId:String,
Score:Int,
AreaCode:String}

now i have to validate say class will have students who should be all part
of same area code and another one student who is taking more than one class.
I can create groupby classId and count(AreaCode) get classID, count..
similiarly groupby StudentID and count(Class_Id)  get aggregated structure
and join these two with say studentId but this is taking multiple
shuffles and data is huge so cant really use broadcast join .

Can you please suggest some better approach.

Regards,
sk


[no subject]

2019-02-13 Thread Kumar sp



SparkR + binary type + how to get value

2019-02-13 Thread Thijs Haarhuis
Hi all,

Does anybody have any experience in accessing the data from a column which has 
a binary type in a Spark Data Frame in R?
I have a Spark Data Frame which has a column which is of a binary type. I want 
to access this data and process it.
In my case I collect the spark data frame to a R data frame and access the 
first row.
When I print this row to the console it does print all the hex values correctly.

However when I access the column it prints it is a list of 1 ...when I print 
the type of the child element..it again prints it is a list.
I expected this value to be of a raw type.

Anybody has some experience with this?

Thanks
Thijs



RE: Got fatal error when running spark 2.4.0 on k8s

2019-02-13 Thread Sinha, Breeta (Nokia - IN/Bangalore)
Hi Dawn,

Probably, you are providing the incorrect image(must be a java image) or the 
incorrect master ip or the service account. Please verify the pod’s permissions 
for the service account(‘spark’ in your case).

I have tried executing the same program as below:

./spark-submit --master k8s://https:// --deploy-mode 
cluster--name spark-pi --class org.apache.spark.examples.SparkPi
 --conf spark.executor.instances=1 --conf 
spark.kubernetes.container.image= --conf 
spark.kubernetes.namespace= 
local:///opt/spark/examples/jars/spark-examples*.jar 5

And, I was able to see “Pi is roughly 3.139774279548559” in the pod’s output 
log.

Hope this will help! 

Regards,
Breeta


From: dawn breaks <2005dawnbre...@gmail.com>
Sent: Wednesday, February 13, 2019 1:52 PM
To: user@spark.apache.org
Subject: Got fatal error when running spark 2.4.0 on k8s

we submit spark job to k8s by the following command, and the driver pod got an 
error and exit. Anybody can help us to solve it?

 ./bin/spark-submit \
--master k8s://https://172.21.91.48:6443 \
--deploy-mode cluster \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.container.image=xxxRepo/spark:v2.4.0 \
local:///opt/spark/examples/jars/spark-examples*.jar \
5


The error detail info as following:

2019-02-13 07:13:06 ERROR SparkContext:91 - Error initializing SparkContext.
org.apache.spark.SparkException: External scheduler cannot be instantiated
at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2794)
at org.apache.spark.SparkContext.(SparkContext.scala:493)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: An error has 
occurred.
at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)
at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:53)
at 
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClient(HttpClientUtils.java:167)
at 
org.apache.spark.deploy.k8s.SparkKubernetesClientFactory$.createKubernetesClient(SparkKubernetesClientFactory.scala:84)
at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:64)
at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2788)
... 20 more
Caused by: java.security.cert.CertificateException: Could not parse 
certificate: java.io.IOException: Empty input
at 
sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:110)
at 
java.security.cert.CertificateFactory.generateCertificate(CertificateFactory.java:339)
at 
io.fabric8.kubernetes.client.internal.CertUtils.createTrustStore(CertUtils.java:93)
at 
io.fabric8.kubernetes.client.internal.CertUtils.createTrustStore(CertUtils.java:71)
at 
io.fabric8.kubernetes.client.internal.SSLUtils.trustManagers(SSLUtils.java:114)
at 
io.fabric8.kubernetes.client.internal.SSLUtils.trustManagers(SSLUtils.java:93)
at 

Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Chris Teoh
Hey there,

Could you not just create a managed table using the DDL in Spark SQL and
then written the data frame to the underlying folder or use Spark SQL to do
an insert?

Alternatively try create table as select. Iirc hive creates managed tables
this way.

I've not confirmed this works but I think that might be worth trying.

I hope that helps.

Kind regards
Chris

On Wed., 13 Feb. 2019, 10:44 pm Horváth Péter Gergely, <
horvath.peter.gerg...@gmail.com> wrote:

> Dear All,
>
> I am facing a strange issue with Spark 2.3, where I would like to create a
> MANAGED table out of the content of a DataFrame with the storage path
> overridden.
>
> Apparently, when one tries to create a Hive table via
> DataFrameWriter.saveAsTable, supplying a "path" option causes Spark to
> automatically create an external table.
>
> This demonstrates the behaviour:
>
> scala> val numbersDF = sc.parallelize((1 to 100).toList).toDF("numbers")
> numbersDF: org.apache.spark.sql.DataFrame = [numbers: int]
>
> scala> numbersDF.write.format("orc").saveAsTable("numbers_table1")
>
> scala> spark.sql("describe formatted
> numbers_table1").filter(_.get(0).toString == "Type").show
> ++-+---+
> |col_name|data_type|comment|
> ++-+---+
> |Type|  MANAGED|   |
> ++-+---+
>
>
> scala> numbersDF.write.format("orc").option("path",
> "/user/foobar/numbers_table_data").saveAsTable("numbers_table2")
>
> scala> spark.sql("describe formatted
> numbers_table2").filter(_.get(0).toString == "Type").show
> ++-+---+
> |col_name|data_type|comment|
> ++-+---+
> |Type| EXTERNAL|   |
> ++-+---+
>
>
>
> I am wondering if there is any way to force creation of a managed table
> with a custom path (which as far as I know, should be possible via standard
> Hive commands).
>
> I often seem to have the problem that I cannot find the appropriate
> documentation for the option configuration of Spark APIs. Could someone
> please point me to the right direction and tell me where these things are
> documented?
>
> Thanks,
> Peter
>
>


Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Horváth Péter Gergely
Dear All,

I am facing a strange issue with Spark 2.3, where I would like to create a
MANAGED table out of the content of a DataFrame with the storage path
overridden.

Apparently, when one tries to create a Hive table via
DataFrameWriter.saveAsTable, supplying a "path" option causes Spark to
automatically create an external table.

This demonstrates the behaviour:

scala> val numbersDF = sc.parallelize((1 to 100).toList).toDF("numbers")
numbersDF: org.apache.spark.sql.DataFrame = [numbers: int]

scala> numbersDF.write.format("orc").saveAsTable("numbers_table1")

scala> spark.sql("describe formatted
numbers_table1").filter(_.get(0).toString == "Type").show
++-+---+
|col_name|data_type|comment|
++-+---+
|Type|  MANAGED|   |
++-+---+


scala> numbersDF.write.format("orc").option("path",
"/user/foobar/numbers_table_data").saveAsTable("numbers_table2")

scala> spark.sql("describe formatted
numbers_table2").filter(_.get(0).toString == "Type").show
++-+---+
|col_name|data_type|comment|
++-+---+
|Type| EXTERNAL|   |
++-+---+



I am wondering if there is any way to force creation of a managed table
with a custom path (which as far as I know, should be possible via standard
Hive commands).

I often seem to have the problem that I cannot find the appropriate
documentation for the option configuration of Spark APIs. Could someone
please point me to the right direction and tell me where these things are
documented?

Thanks,
Peter


Subscribe

2019-02-13 Thread Rafael Mendes



Re: Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to access group: spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2

2019-02-13 Thread Jungtaek Lim
Adding to Gabor's answer, in Spark 3.0 end users can even provide full of
group id (Please refer SPARK-26350 [1]), but you may feel more convenient
to use prefix of group id Gabor guided (Please refer SPARK-26121 [2]) to
provide permission to broader ranges of groups.

1. https://issues.apache.org/jira/browse/SPARK-26350
2. https://issues.apache.org/jira/browse/SPARK-26121

Thanks,
Jungtaek Lim (HeartSaVioR)

2019년 2월 13일 (수) 오후 6:36, Gabor Somogyi 님이 작성:

> Hi Thomas,
>
> The issue occurs when the user does not have the READ permission on the
> consumer groups.
>
> In DStreams group ID is configured in application, for example:
> https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-app/blob/161bf02eb3677aac604d63499041f72231d0e371/src/main/scala/com/cloudera/spark/examples/DirectKafkaWordCount.scala#L59
>
> In Strucuted Streaming the group ID is generated by Spark internally.
>
> Either one has to give access to "spark-kafka-source-*" group or in Spark
> 3.0 this prefix can be configured with "groupidprefix" parameter.
>
> BR,
> G
>
>
> On Wed, Feb 13, 2019 at 3:58 AM Allu Thomas
>  wrote:
>
>> Hi There,
>>
>> My use case is to read a simple json message from Kafka queue using Spark
>> Structured Streaming. But I’m getting the following error message when I
>> run  my Kafka consumer. I don’t get this error when using Spark direct
>> stream. The issue is happening only with structured streaming. Any help
>> would be greatly appreciated.
>>
>>
>> Exception in thread "main"
>> org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to
>> access group:
>> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
>> === Streaming Query ===
>> Identifier: [id = 6ab10eab-4f71-435c-8705-820e66cee47e, runId =
>> 48430367-9e14-450b-b8e0-27199b536403]
>> Current Committed Offsets: {}
>> Current Available Offsets: {}
>>
>>
>> Current State: ACTIVE
>> Thread State: RUNNABLE
>>
>>
>> Logical Plan:
>> KafkaSource[Subscribe[cla-claim-raw]]
>> at org.apache.spark.sql.execution.streaming.StreamExecution.org
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
>> Caused by: org.apache.kafka.common.errors.GroupAuthorizationException:
>> Not authorized to access group:
>> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
>>
>> Thanks,
>> Thomas Thomas
>>
>


Re: Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to access group: spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2

2019-02-13 Thread Gabor Somogyi
Hi Thomas,

The issue occurs when the user does not have the READ permission on the
consumer groups.

In DStreams group ID is configured in application, for example:
https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-app/blob/161bf02eb3677aac604d63499041f72231d0e371/src/main/scala/com/cloudera/spark/examples/DirectKafkaWordCount.scala#L59

In Strucuted Streaming the group ID is generated by Spark internally.

Either one has to give access to "spark-kafka-source-*" group or in Spark
3.0 this prefix can be configured with "groupidprefix" parameter.

BR,
G


On Wed, Feb 13, 2019 at 3:58 AM Allu Thomas
 wrote:

> Hi There,
>
> My use case is to read a simple json message from Kafka queue using Spark
> Structured Streaming. But I’m getting the following error message when I
> run  my Kafka consumer. I don’t get this error when using Spark direct
> stream. The issue is happening only with structured streaming. Any help
> would be greatly appreciated.
>
>
> Exception in thread "main"
> org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to
> access group:
> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
> === Streaming Query ===
> Identifier: [id = 6ab10eab-4f71-435c-8705-820e66cee47e, runId =
> 48430367-9e14-450b-b8e0-27199b536403]
> Current Committed Offsets: {}
> Current Available Offsets: {}
>
>
> Current State: ACTIVE
> Thread State: RUNNABLE
>
>
> Logical Plan:
> KafkaSource[Subscribe[cla-claim-raw]]
> at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not
> authorized to access group:
> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
>
> Thanks,
> Thomas Thomas
>


Got fatal error when running spark 2.4.0 on k8s

2019-02-13 Thread dawn breaks
we submit spark job to k8s by the following command, and the driver pod got
an error and exit. Anybody can help us to solve it?

 ./bin/spark-submit \
--master k8s://https://172.21.91.48:6443 \
--deploy-mode cluster \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.container.image=xxxRepo/spark:v2.4.0 \
local:///opt/spark/examples/jars/spark-examples*.jar \
5


The error detail info as following:

2019-02-13 07:13:06 ERROR SparkContext:91 - Error initializing SparkContext.
org.apache.spark.SparkException: External scheduler cannot be instantiated
at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2794)
at org.apache.spark.SparkContext.(SparkContext.scala:493)
at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
at scala.Option.getOrElse(Option.scala:121)
at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org

$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: An error
has occurred.
at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)
at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:53)
at
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClient(HttpClientUtils.java:167)
at
org.apache.spark.deploy.k8s.SparkKubernetesClientFactory$.createKubernetesClient(SparkKubernetesClientFactory.scala:84)
at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:64)
at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2788)
... 20 more
Caused by: java.security.cert.CertificateException: Could not parse
certificate: java.io.IOException: Empty input
at
sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:110)
at
java.security.cert.CertificateFactory.generateCertificate(CertificateFactory.java:339)
at
io.fabric8.kubernetes.client.internal.CertUtils.createTrustStore(CertUtils.java:93)
at
io.fabric8.kubernetes.client.internal.CertUtils.createTrustStore(CertUtils.java:71)
at
io.fabric8.kubernetes.client.internal.SSLUtils.trustManagers(SSLUtils.java:114)
at
io.fabric8.kubernetes.client.internal.SSLUtils.trustManagers(SSLUtils.java:93)
at
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClient(HttpClientUtils.java:63)
... 23 more
Caused by: java.io.IOException: Empty input
at
sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:106)
... 29 more