Re: [Spark Structured Streaming] Measure metrics from CsvSink for Rate source

2018-06-21 Thread Dhruv Kumar
Thanks a lot for your mail Jungtaek. I added the StreamingQueryListener into my 
code (updated code 
) and was able 
to see valid inputRowsPerSecond, processRowsPerSecond numbers. But it also 
shows zeros intermittently. Here is the sample output 
  Could you 
explain why is this the case? 
Unfortunately, the csv files still show zeros only except few non-zeros. Do you 
know why this may be happening? (I changed the metrics.properties to print 
every second instead of every 10 seconds). 
Here is the output of the metrics log file 
(run_latest.driver.spark.streaming.aggregates.inputRate-total.csv)
t,value
1529645042,0.0
1529645043,0.0
1529645044,0.0
1529645045,NaN
1529645046,88967.97153024911
1529645047,100200.4008016032
1529645048,122100.12210012211
1529645049,0.0
1529645050,0.0
1529645051,0.0
1529645052,0.0
1529645053,0.0
1529645054,0.0
1529645055,0.0
1529645056,0.0
1529645057,0.0
1529645058,0.0
1529645059,0.0
1529645060,0.0
1529645061,0.0
1529645062,0.0
1529645063,0.0
1529645064,0.0
1529645065,0.0
1529645066,0.0
1529645067,0.0
1529645068,0.0
1529645069,0.0
1529645070,0.0
1529645071,0.0
1529645072,93808.63039399624
1529645073,0.0
1529645074,0.0
1529645075,0.0
1529645076,0.0
1529645077,0.0
1529645078,0.0



--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Jun 21, 2018, at 23:07, Jungtaek Lim  wrote:
> 
> I'm referring to 2.4.0-SNAPSHOT (not sure which commit I'm referring) but it 
> properly returns the input rate.
> 
> $ tail -F 
> /tmp/spark-trial-metric/local-1529640063554.driver.spark.streaming.counts.inputRate-total.csv
> t,value
> 1529640073,0.0
> 1529640083,0.9411272613196695
> 1529640093,0.9430996541967934
> 1529640103,1.0606060606060606
> 1529640113,0.9997000899730081
> 
> Could you add streaming query listener and see the value of sources -> 
> numInputRows, inputRowsPerSecond, processedRowsPerSecond? They should provide 
> some valid numbers.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> 2018년 6월 22일 (금) 오전 11:49, Dhruv Kumar  >님이 작성:
> Hi
> 
> I was trying to measure the performance metrics for spark structured 
> streaming. But I am unable to see any data in the metrics log files. My input 
> source is the Rate source 
> 
>  which generates data at the specified number of rows per second. Here is the 
> link to my code 
>  and 
> metrics.properties 
>  file.
> 
> When I run the above mentioned code using spark-submit, I see that the 
> metrics logs (for example, 
> run_1.driver.spark.streaming.aggregates.inputRate-total.csv) are created 
> under the specified directory but most of the values are 0. 
> Below is a portion of the inputeRate-total.csv file:
> 1529634585,0.0
> 1529634595,0.0
> 1529634605,0.0
> 1529634615,0.0
> 1529634625,0.0
> 1529634635,0.0
> 1529634645,0.0
> 1529634655,0.0
> 1529634665,0.0
> 1529634675,0.0
> 1529634685,0.0
> 1529634695,0.0
> 1529634705,0.0
> 1529634715,0.0
> 
> Any reason as to why this must be happening? Happy to share more information 
> if that helps.
> 
> Thanks
> --
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me 



Re: [Spark Structured Streaming] Measure metrics from CsvSink for Rate source

2018-06-21 Thread Jungtaek Lim
I'm referring to 2.4.0-SNAPSHOT (not sure which commit I'm referring) but
it properly returns the input rate.

$ tail -F
/tmp/spark-trial-metric/local-1529640063554.driver.spark.streaming.counts.inputRate-total.csv
t,value
1529640073,0.0
1529640083,0.9411272613196695
1529640093,0.9430996541967934
1529640103,1.0606060606060606
1529640113,0.9997000899730081

Could you add streaming query listener and see the value of sources ->
numInputRows, inputRowsPerSecond, processedRowsPerSecond? They should
provide some valid numbers.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 6월 22일 (금) 오전 11:49, Dhruv Kumar 님이 작성:

> Hi
>
> I was trying to measure the performance metrics for spark structured
> streaming. But I am unable to see any data in the metrics log files. My
> input source is the Rate source
> 
>  which
> generates data at the specified number of rows per second. Here is the link
> to my code
>  and
> metrics.properties
>  file.
>
> When I run the above mentioned code using *spark-submit, *I see that the
> metrics logs (for example,
> run_1.driver.spark.streaming.aggregates.inputRate-total.csv) are created
> under the specified directory but most of the values are 0.
> Below is a portion of the inputeRate-total.csv file:
> 1529634585,0.0
> 1529634595,0.0
> 1529634605,0.0
> 1529634615,0.0
> 1529634625,0.0
> 1529634635,0.0
> 1529634645,0.0
> 1529634655,0.0
> 1529634665,0.0
> 1529634675,0.0
> 1529634685,0.0
> 1529634695,0.0
> 1529634705,0.0
> 1529634715,0.0
>
> Any reason as to why this must be happening? Happy to share more
> information if that helps.
>
> Thanks
> --
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
>


[Spark Structured Streaming] Measure metrics from CsvSink for Rate source

2018-06-21 Thread Dhruv Kumar
Hi

I was trying to measure the performance metrics for spark structured streaming. 
But I am unable to see any data in the metrics log files. My input source is 
the Rate source 

 which generates data at the specified number of rows per second. Here is the 
link to my code 
 and 
metrics.properties 
 file.

When I run the above mentioned code using spark-submit, I see that the metrics 
logs (for example, run_1.driver.spark.streaming.aggregates.inputRate-total.csv) 
are created under the specified directory but most of the values are 0. 
Below is a portion of the inputeRate-total.csv file:
1529634585,0.0
1529634595,0.0
1529634605,0.0
1529634615,0.0
1529634625,0.0
1529634635,0.0
1529634645,0.0
1529634655,0.0
1529634665,0.0
1529634675,0.0
1529634685,0.0
1529634695,0.0
1529634705,0.0
1529634715,0.0

Any reason as to why this must be happening? Happy to share more information if 
that helps.

Thanks
--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



Re: RepartitionByKey Behavior

2018-06-21 Thread Jungtaek Lim
It is not possible because the cardinality of the partitioning key is
non-deterministic, while partition count should be fixed. There's a chance
that cardinality > partition count and then the system can't ensure the
requirement.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 6월 22일 (금) 오전 8:55, Chawla,Sumit 님이 작성:

> Based on code read it looks like Spark does modulo of key for partition.
> Keys of c and b end up pointing to same value.  Whats the best partitioning
> scheme to deal with this?
>
> Regards
>
> Sumit Chawla
>
>
> On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
> wrote:
>
>> Hi
>>
>>  I have been trying to this simple operation.  I want to land all values
>> with one key in same partition, and not have any different key in the same
>> partition.  Is this possible?   I am getting b and c always getting mixed
>> up in the same partition.
>>
>>
>> rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
>> ('b', 3),('c', 8)])
>> from pyspark.rdd import portable_hash
>>
>> n = 4
>>
>> def partitioner(n):
>> """Partition by the first item in the key tuple"""
>> def partitioner_(x):
>> val = x[0]
>> key = portable_hash(x[0])
>> print ("Val %s Assigned Key %s" % (val, key))
>> return key
>> return partitioner_
>>
>> def validate(part):
>> last_key = None
>> for p in part:
>> k = p[0]
>> if not last_key:
>> last_key = k
>> if k != last_key:
>> print("Mixed keys in partition %s %s" % (k,last_key) )
>>
>> partioned = (rdd
>>   .keyBy(lambda kv: (kv[0], kv[1]))
>>   .repartitionAndSortWithinPartitions(
>>   numPartitions=n, partitionFunc=partitioner(n),
>> ascending=False)).map(lambda x: x[1])
>>
>> print(partioned.getNumPartitions())
>> partioned.foreachPartition(validate)
>>
>>
>> Val a Assigned Key -7583489610679606711
>> Val a Assigned Key -7583489610679606711
>> Val d Assigned Key 2755936516345535118
>> Val b Assigned Key -1175849324817995036
>> Val c Assigned Key 1421958803217889556
>> Val d Assigned Key 2755936516345535118
>> Val b Assigned Key -1175849324817995036
>> Mixed keys in partition b c
>> Mixed keys in partition b c
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>


Re: RepartitionByKey Behavior

2018-06-21 Thread Chawla,Sumit
Based on code read it looks like Spark does modulo of key for partition.
Keys of c and b end up pointing to same value.  Whats the best partitioning
scheme to deal with this?

Regards
Sumit Chawla


On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
wrote:

> Hi
>
>  I have been trying to this simple operation.  I want to land all values
> with one key in same partition, and not have any different key in the same
> partition.  Is this possible?   I am getting b and c always getting mixed
> up in the same partition.
>
>
> rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
> ('b', 3),('c', 8)])
> from pyspark.rdd import portable_hash
>
> n = 4
>
> def partitioner(n):
> """Partition by the first item in the key tuple"""
> def partitioner_(x):
> val = x[0]
> key = portable_hash(x[0])
> print ("Val %s Assigned Key %s" % (val, key))
> return key
> return partitioner_
>
> def validate(part):
> last_key = None
> for p in part:
> k = p[0]
> if not last_key:
> last_key = k
> if k != last_key:
> print("Mixed keys in partition %s %s" % (k,last_key) )
>
> partioned = (rdd
>   .keyBy(lambda kv: (kv[0], kv[1]))
>   .repartitionAndSortWithinPartitions(
>   numPartitions=n, partitionFunc=partitioner(n),
> ascending=False)).map(lambda x: x[1])
>
> print(partioned.getNumPartitions())
> partioned.foreachPartition(validate)
>
>
> Val a Assigned Key -7583489610679606711
> Val a Assigned Key -7583489610679606711
> Val d Assigned Key 2755936516345535118
> Val b Assigned Key -1175849324817995036
> Val c Assigned Key 1421958803217889556
> Val d Assigned Key 2755936516345535118
> Val b Assigned Key -1175849324817995036
> Mixed keys in partition b c
> Mixed keys in partition b c
>
>
> Regards
> Sumit Chawla
>
>


RepartitionByKey Behavior

2018-06-21 Thread Chawla,Sumit
Hi

 I have been trying to this simple operation.  I want to land all values
with one key in same partition, and not have any different key in the same
partition.  Is this possible?   I am getting b and c always getting mixed
up in the same partition.


rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
('b', 3),('c', 8)])
from pyspark.rdd import portable_hash

n = 4

def partitioner(n):
"""Partition by the first item in the key tuple"""
def partitioner_(x):
val = x[0]
key = portable_hash(x[0])
print ("Val %s Assigned Key %s" % (val, key))
return key
return partitioner_

def validate(part):
last_key = None
for p in part:
k = p[0]
if not last_key:
last_key = k
if k != last_key:
print("Mixed keys in partition %s %s" % (k,last_key) )

partioned = (rdd
  .keyBy(lambda kv: (kv[0], kv[1]))
  .repartitionAndSortWithinPartitions(
  numPartitions=n, partitionFunc=partitioner(n),
ascending=False)).map(lambda x: x[1])

print(partioned.getNumPartitions())
partioned.foreachPartition(validate)


Val a Assigned Key -7583489610679606711
Val a Assigned Key -7583489610679606711
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Val c Assigned Key 1421958803217889556
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Mixed keys in partition b c
Mixed keys in partition b c


Regards
Sumit Chawla


Re: Spark 2.3.1 not working on Java 10

2018-06-21 Thread vaquar khan
Sure let me check Jira

Regards,
Vaquar khan

On Thu, Jun 21, 2018, 4:42 PM Takeshi Yamamuro 
wrote:

> In this ticket SPARK-24201, the ambiguous statement in the doc had been
> pointed out.
> can you make pr for that?
>
> On Fri, Jun 22, 2018 at 6:17 AM, vaquar khan 
> wrote:
>
>> https://spark.apache.org/docs/2.3.0/
>>
>> Avoid confusion we need to updated doc with supported java version "*Java8
>> + " *word confusing for users
>>
>> Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API,
>> Spark 2.3.0 uses Scala 2.11. You will need to use a compatible Scala
>> version (2.11.x).
>>
>>
>> Regards,
>> Vaquar khan
>>
>> On Thu, Jun 21, 2018 at 11:56 AM, chriswakare <
>> chris.newski...@intellibridge.co> wrote:
>>
>>> Hi Rahul,
>>> This will work only in Java 8.
>>> Installation does not work with both version 9 and 10
>>>
>>> Thanks,
>>> Christopher
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Regards,
>> Vaquar Khan
>> +1 -224-436-0783
>> Greater Chicago
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark 2.3.1 not working on Java 10

2018-06-21 Thread Takeshi Yamamuro
In this ticket SPARK-24201, the ambiguous statement in the doc had been
pointed out.
can you make pr for that?

On Fri, Jun 22, 2018 at 6:17 AM, vaquar khan  wrote:

> https://spark.apache.org/docs/2.3.0/
>
> Avoid confusion we need to updated doc with supported java version "*Java8
> + " *word confusing for users
>
> Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API,
> Spark 2.3.0 uses Scala 2.11. You will need to use a compatible Scala
> version (2.11.x).
>
>
> Regards,
> Vaquar khan
>
> On Thu, Jun 21, 2018 at 11:56 AM, chriswakare  intellibridge.co> wrote:
>
>> Hi Rahul,
>> This will work only in Java 8.
>> Installation does not work with both version 9 and 10
>>
>> Thanks,
>> Christopher
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Regards,
> Vaquar Khan
> +1 -224-436-0783
> Greater Chicago
>



-- 
---
Takeshi Yamamuro


Re: Spark 2.3.1 not working on Java 10

2018-06-21 Thread vaquar khan
https://spark.apache.org/docs/2.3.0/

Avoid confusion we need to updated doc with supported java version "*Java8
+ " *word confusing for users

Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API,
Spark 2.3.0 uses Scala 2.11. You will need to use a compatible Scala
version (2.11.x).


Regards,
Vaquar khan

On Thu, Jun 21, 2018 at 11:56 AM, chriswakare <
chris.newski...@intellibridge.co> wrote:

> Hi Rahul,
> This will work only in Java 8.
> Installation does not work with both version 9 and 10
>
> Thanks,
> Christopher
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago


Re: Spark 2.3.0 and Custom Sink

2018-06-21 Thread Lalwani, Jayesh
Actually, you can do partition level ingest using ForEachWriter. You just have 
to add each row to a list in the write method, and write to the data store in 
the close method

I know it’s awkward. I don’t know why  Spark doesn’t provide a 
ForEachPartitionWriter

From: Yogesh Mahajan 
Date: Thursday, June 21, 2018 at 3:45 PM
To: subramgr 
Cc: user 
Subject: Re: Spark 2.3.0 and Custom Sink

Since ForeachWriter works at a record level so you cannot do bulk ingest into 
KairosDB, which supports bulk inserts. This will be slow.
Instead, you can have your own Sink implementation which is a batch (DataFrame) 
level.

Thanks,
http://www.snappydata.io/blog

On Thu, Jun 21, 2018 at 10:54 AM, subramgr 
mailto:subramanian.gir...@gmail.com>> wrote:
Hi Spark Mailing list,

We are looking for pushing the output of the structured streaming query
output to KairosDB. (time series database)

What would be the recommended way of doing this? Do we implement the *Sink*
trait or do we use the *ForEachWriter*

At each trigger point if I do a *dataset.collect()* the size of the data is
not huge it should be in lower 10MBs

Any suggestions?

Thanks
Girish



--
Sent from: 
http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Spark 2.3.0 and Custom Sink

2018-06-21 Thread Yogesh Mahajan
Since ForeachWriter works at a record level so you cannot do bulk ingest
into KairosDB, which supports bulk inserts. This will be slow.
Instead, you can have your own Sink implementation which is a batch
(DataFrame) level.

Thanks,
http://www.snappydata.io/blog 

On Thu, Jun 21, 2018 at 10:54 AM, subramgr 
wrote:

> Hi Spark Mailing list,
>
> We are looking for pushing the output of the structured streaming query
> output to KairosDB. (time series database)
>
> What would be the recommended way of doing this? Do we implement the *Sink*
> trait or do we use the *ForEachWriter*
>
> At each trigger point if I do a *dataset.collect()* the size of the data is
> not huge it should be in lower 10MBs
>
> Any suggestions?
>
> Thanks
> Girish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark 2.3.0 and Custom Sink

2018-06-21 Thread subramgr
Hi Spark Mailing list,
 
We are looking for pushing the output of the structured streaming query
output to KairosDB. (time series database)
 
What would be the recommended way of doing this? Do we implement the *Sink*
trait or do we use the *ForEachWriter*
 
At each trigger point if I do a *dataset.collect()* the size of the data is
not huge it should be in lower 10MBs
 
Any suggestions?
 
Thanks
Girish



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.3.1 not working on Java 10

2018-06-21 Thread chriswakare
Hi Rahul,
This will work only in Java 8. 
Installation does not work with both version 9 and 10

Thanks,
Christopher



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.3.1 not working on Java 10

2018-06-21 Thread Rahul Agrawal
Thanks Felix. Do you know if you support Java 9?

Thanks,
Rahul

On Thu, Jun 21, 2018 at 8:11 PM, Felix Cheung 
wrote:

> I’m not sure we have completed support for Java 10
>
> --
> *From:* Rahul Agrawal 
> *Sent:* Thursday, June 21, 2018 7:22:42 AM
> *To:* user@spark.apache.org
> *Subject:* Spark 2.3.1 not working on Java 10
>
> Dear Team,
>
> I have installed Java 10, Scala 2.12.6 and spark 2.3.1 in my desktop
> having Ubuntu 16.04. I am getting error opening spark-shell.
>
> Failed to initialize compiler: object java.lang.Object in compiler mirror
> not found.
>
> Please let me know if there is any way to run spark in Java 10.
> Thanks,
> Rahul
>


Re: Spark 2.3.1 not working on Java 10

2018-06-21 Thread Felix Cheung
I'm not sure we have completed support for Java 10


From: Rahul Agrawal 
Sent: Thursday, June 21, 2018 7:22:42 AM
To: user@spark.apache.org
Subject: Spark 2.3.1 not working on Java 10

Dear Team,


I have installed Java 10, Scala 2.12.6 and spark 2.3.1 in my desktop having 
Ubuntu 16.04. I am getting error opening spark-shell.

Failed to initialize compiler: object java.lang.Object in compiler mirror not 
found.

Please let me know if there is any way to run spark in Java 10.

Thanks,
Rahul


Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread kant kodali
>From my experience so far, update mode in structured streaming is the most
useful one out of the three available modes. But when it comes it RDBMS
there isn't really an upsert so If I go with ForEachWriter I wouldn't quite
know when to do an Insert or an update unless I really tie it with my
application logic (other words it wont look generic or seamless). so if
spark can provide me additional info on a certain row is an update of the
existing one or if it is a new row altogether then it becomes easy to
figure out if I need to insert or update and I don't really need to tie it
with my application logic.




On Thu, Jun 21, 2018 at 6:49 AM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Open source Spark Structured Streaming doesn’t have a JDBC sink.  You can
> implement your own ForEachWriter, or you can use my sink from here
>
> https://github.com/GaalDornick/spark/blob/master/
> sql/core/src/main/scala/org/apache/spark/sql/execution/
> streaming/JdbcSink.scala
>
> https://github.com/GaalDornick/spark/blob/master/
> sql/core/src/main/scala/org/apache/spark/sql/execution/
> streaming/JDBCSinkLog.scala
>
>
>
>
>
>
>
> *From: *kant kodali 
> *Date: *Wednesday, June 20, 2018 at 9:10 PM
> *To: *"user @spark" 
> *Subject: *Does Spark Structured Streaming have a JDBC sink or Do I need
> to use ForEachWriter?
>
>
>
> Hi All,
>
>
>
> Does Spark Structured Streaming have a JDBC sink or Do I need to use
> ForEachWriter? I see the following code in this link
> 
>  and
> I can see that database name can be passed in the connection string,
> however, I wonder how to pass a table name?
>
>
>
> inputDF.groupBy($"action", window($"time", "1 hour")).count()
>
>.writeStream.format("jdbc")
>
>.save("jdbc:mysql//…")
>
>
>
> Thanks,
>
> Kant
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Spark 2.3.1 not working on Java 10

2018-06-21 Thread Rahul Agrawal
Dear Team,

I have installed Java 10, Scala 2.12.6 and spark 2.3.1 in my desktop having
Ubuntu 16.04. I am getting error opening spark-shell.

Failed to initialize compiler: object java.lang.Object in compiler mirror
not found.

Please let me know if there is any way to run spark in Java 10.
Thanks,
Rahul


Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread Lalwani, Jayesh
Open source Spark Structured Streaming doesn’t have a JDBC sink.  You can 
implement your own ForEachWriter, or you can use my sink from here
https://github.com/GaalDornick/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/JdbcSink.scala
https://github.com/GaalDornick/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/JDBCSinkLog.scala



From: kant kodali 
Date: Wednesday, June 20, 2018 at 9:10 PM
To: "user @spark" 
Subject: Does Spark Structured Streaming have a JDBC sink or Do I need to use 
ForEachWriter?

Hi All,

Does Spark Structured Streaming have a JDBC sink or Do I need to use 
ForEachWriter? I see the following code in this 
link
 and I can see that database name can be passed in the connection string, 
however, I wonder how to pass a table name?


inputDF.groupBy($"action", window($"time", "1 hour")).count()

   .writeStream.format("jdbc")

   .save("jdbc:mysql//…")

Thanks,
Kant


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


createorreplacetempview cause memory leak

2018-06-21 Thread onmstester onmstester
I'm loading some json files in a loop, deserialize them in a list of objects 
and create a temp table from the list, run a select on table (repeat this for 
every file): for(jsonFile : allJsonFiles){ sqlcontext.sql("select * from 
mainTable").filter(").createOrReplaceTempView("table1"); 
sqlcontext.createDataFram(serializedObjectList, 
MyObject.class).createOrReplaceTempView("table2"); sqlqcontext.sql("select * 
from table1 join table2 ").collectAsList();  } after processing 30 json 
files my application crashes with OOM, if i add this two line at the end of 
for-loop: sql.dropTempTable("table1"); sql.dropTempTable("table2"); then My app 
continue to process all 500 json files with no problem, I've used 
createOrReplaceTempView many times in my problem,  should i drop temp tables 
everywhere to free memory? Sent using Zoho Mail

restarting ranger kms causes spark thrift server to stop

2018-06-21 Thread quentinlam
Hi,

When one of the Ranger KMS is stopped, the spark thrift server remains fine
but when the Ranger KMS is started back up, the spark thrift server stops
and throws the following errors:

java.lang.ClassCastException:
org.apache.hadoop.security.authentication.client.AuthenticationException
cannot be cast to java.security.GeneralSecurityException
at
org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider.decryptEncryptedKey(LoadBalancingKMSClientProvider.java:189)
at
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.decryptEncryptedKey(KeyProviderCryptoExtension.java:388)
at
org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1381)
at
org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1451)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:305)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at
org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:109)
at
org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at
org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:252)
at
org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:251)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

The cluster is in a kerberoised environment, ranger version 0.7.0, spark
2.1.1.
Any ideas to what is causing this problem, thanks.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread Tathagata Das
Actually, we do not support jdbc sink yet. The blog post was just an
example :) I agree it is misleading in hindsight.

On Wed, Jun 20, 2018 at 6:09 PM, kant kodali  wrote:

> Hi All,
>
> Does Spark Structured Streaming have a JDBC sink or Do I need to use
> ForEachWriter? I see the following code in this link
> 
>  and
> I can see that database name can be passed in the connection string,
> however, I wonder how to pass a table name?
>
> inputDF.groupBy($"action", window($"time", "1 hour")).count()
>.writeStream.format("jdbc")
>.save("jdbc:mysql//…")
>
>
> Thanks,
> Kant
>