Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread Nicolas Paris
IMO your json cannot be read in parallell at all  then spark only offers you
to play again with memory.

I d'say at one step it has to feet in both one executor and in the driver.
I d'try something like 20GB for both driver and executors and by using
dynamic amount of executor in order to then repartition that fat json.




2018-06-05 22:40 GMT+02:00 raksja :

> Yes I would say thats the first thing that i tried. thing is even though i
> provide more num executor and more memory to each, this process gets OOM in
> only one task which is stuck and unfinished.
>
> I dont think its splitting the load to other tasks.
>
> I had 11 blocks on that file i stored in hdfs and i got 11 partitions in my
> dataframe, when i did show(1), it spinned up 11 tasks, 10 passed quickly 1
> stuck and oom.
>
> Also i repartitioned to 1000 and that didnt help either.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Apply Core Java Transformation UDF on DataFrame

2018-06-05 Thread Chetan Khatri
Anyone can throw light on this. would be helpful.

On Tue, Jun 5, 2018 at 1:41 AM, Chetan Khatri 
wrote:

> All,
>
> I would like to Apply Java Transformation UDF on DataFrame created from
> Table, Flat Files and retrun new Data Frame Object. Any suggestions, with
> respect to Spark Internals.
>
> Thanks.
>


Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread raksja
Yes I would say thats the first thing that i tried. thing is even though i
provide more num executor and more memory to each, this process gets OOM in
only one task which is stuck and unfinished. 

I dont think its splitting the load to other tasks. 

I had 11 blocks on that file i stored in hdfs and i got 11 partitions in my
dataframe, when i did show(1), it spinned up 11 tasks, 10 passed quickly 1
stuck and oom. 

Also i repartitioned to 1000 and that didnt help either. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread Nicolas Paris
have you played with driver/executor memory configuration ?

Increasing them should avoid OOM

2018-06-05 22:30 GMT+02:00 raksja :

> Agreed, gzip or non splittable, the question that i have and examples i
> have
> posted above all are referring to non compressed file. A single json file
> with Array of objects in a continuous line.
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread raksja
Agreed, gzip or non splittable, the question that i have and examples i have
posted above all are referring to non compressed file. A single json file
with Array of objects in a continuous line. 






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread raksja
Yes its in right format as we are able to process that in python. 

Also I agree that JSONL would work when split that 

[{},{},...] 

array of objects into something like this

{}
{}
{}

But since i get the data from another system like this i cannot control, my
question is whether its possible to read json file which has an huge array
of objects?
And i can see it was not supported until < spark 2.2.0 where i have included
a bug that got resolved/fixed in 2.2.0
https://issues.apache.org/jira/browse/SPARK-20980




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread Holden Karau
If it’s one 33mb file which decompressed to 1.5g then there is also a
chance you need to split the inputs since gzip is a non-splittable
compression format.

On Tue, Jun 5, 2018 at 11:55 AM Anastasios Zouzias 
wrote:

> Are you sure that your JSON file has the right format?
>
> spark.read.json(...) expects a file where *each line is a json object*.
>
> My wild guess is that
>
> val hdf=spark.read.json("/user/tmp/hugedatafile")
> hdf.show(2) or hdf.take(1) gives OOM
>
> tries to fetch all the data into the driver. Can you reformat your input
> file and try again?
>
> Best,
> Anastasios
>
>
>
> On Tue, Jun 5, 2018 at 8:39 PM, raksja  wrote:
>
>> I have a json file which is a continuous array of objects of similar type
>> [{},{}...] for about 1.5GB uncompressed and 33MB gzip compressed.
>>
>> This is uploaded hugedatafile to hdfs and this is not a JSONL file, its a
>> whole regular json file.
>>
>>
>> [{"id":"1","entityMetadata":{"lastChange":"2018-05-11
>> 01:09:18.0","createdDateTime":"2018-05-11
>> 01:09:18.0","modifiedDateTime":"2018-05-11
>>
>> 01:09:18.0"},"type":"11"},{"id":"2","entityMetadata":{"lastChange":"2018-05-11
>> 01:09:18.0","createdDateTime":"2018-05-11
>> 01:09:18.0","modifiedDateTime":"2018-05-11
>>
>> 01:09:18.0"},"type":"11"},{"id":"3","entityMetadata":{"lastChange":"2018-05-11
>> 01:09:18.0","createdDateTime":"2018-05-11
>> 01:09:18.0","modifiedDateTime":"2018-05-11
>> 01:09:18.0"},"type":"11"}..]
>>
>>
>> I get OOM on executors whenever i try to load this into spark.
>>
>> Try 1
>> val hdf=spark.read.json("/user/tmp/hugedatafile")
>> hdf.show(2) or hdf.take(1) gives OOM
>>
>> Try 2
>> Took a small sampledatafile and got schema to avoid schema infering
>> val sampleSchema=spark.read.json("/user/tmp/sampledatafile").schema
>> val hdf=spark.read.schema(sampleSchema).json("/user/tmp/hugedatafile")
>> hdf.show(2) or hdf.take(1) stuck for 1.5 hrs and gives OOM
>>
>> Try 3
>> Repartition it after before performing action
>> gives OOM
>>
>> Try 4
>> Read about the https://issues.apache.org/jira/browse/SPARK-20980
>> completely
>> val hdf = spark.read.option("multiLine",
>> true)..schema(sampleSchema).json("/user/tmp/hugedatafile")
>> hdf.show(1) or hdf.take(1) gives OOM
>>
>>
>> Can any one help me here?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> -- Anastasios Zouzias
> 
>
-- 
Twitter: https://twitter.com/holdenkarau


Spark maxTaskFailures is not recognized with Cassandra

2018-06-05 Thread ravidspark
Hi All,

I configured the number of task failures using spark.task.maxFailures as 10
in my spark application which ingests data into Cassandra reading from
Kafka. I observed that when Cassandra service is down, it is not retrying
for the property I set i.e. 10. Instead it is retrying with the default
maxFailures which is 4. Is there something I need to do, to make Spark retry
to connect to Cassandra more than 4 times?

Thanks in Advance,
Ravi



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread Anastasios Zouzias
Are you sure that your JSON file has the right format?

spark.read.json(...) expects a file where *each line is a json object*.

My wild guess is that

val hdf=spark.read.json("/user/tmp/hugedatafile")
hdf.show(2) or hdf.take(1) gives OOM

tries to fetch all the data into the driver. Can you reformat your input
file and try again?

Best,
Anastasios



On Tue, Jun 5, 2018 at 8:39 PM, raksja  wrote:

> I have a json file which is a continuous array of objects of similar type
> [{},{}...] for about 1.5GB uncompressed and 33MB gzip compressed.
>
> This is uploaded hugedatafile to hdfs and this is not a JSONL file, its a
> whole regular json file.
>
>
> [{"id":"1","entityMetadata":{"lastChange":"2018-05-11
> 01:09:18.0","createdDateTime":"2018-05-11
> 01:09:18.0","modifiedDateTime":"2018-05-11
> 01:09:18.0"},"type":"11"},{"id":"2","entityMetadata":{"
> lastChange":"2018-05-11
> 01:09:18.0","createdDateTime":"2018-05-11
> 01:09:18.0","modifiedDateTime":"2018-05-11
> 01:09:18.0"},"type":"11"},{"id":"3","entityMetadata":{"
> lastChange":"2018-05-11
> 01:09:18.0","createdDateTime":"2018-05-11
> 01:09:18.0","modifiedDateTime":"2018-05-11
> 01:09:18.0"},"type":"11"}..]
>
>
> I get OOM on executors whenever i try to load this into spark.
>
> Try 1
> val hdf=spark.read.json("/user/tmp/hugedatafile")
> hdf.show(2) or hdf.take(1) gives OOM
>
> Try 2
> Took a small sampledatafile and got schema to avoid schema infering
> val sampleSchema=spark.read.json("/user/tmp/sampledatafile").schema
> val hdf=spark.read.schema(sampleSchema).json("/user/tmp/hugedatafile")
> hdf.show(2) or hdf.take(1) stuck for 1.5 hrs and gives OOM
>
> Try 3
> Repartition it after before performing action
> gives OOM
>
> Try 4
> Read about the https://issues.apache.org/jira/browse/SPARK-20980
> completely
> val hdf = spark.read.option("multiLine",
> true)..schema(sampleSchema).json("/user/tmp/hugedatafile")
> hdf.show(1) or hdf.take(1) gives OOM
>
>
> Can any one help me here?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias



Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread raksja
I have a json file which is a continuous array of objects of similar type
[{},{}...] for about 1.5GB uncompressed and 33MB gzip compressed. 

This is uploaded hugedatafile to hdfs and this is not a JSONL file, its a
whole regular json file. 


[{"id":"1","entityMetadata":{"lastChange":"2018-05-11
01:09:18.0","createdDateTime":"2018-05-11
01:09:18.0","modifiedDateTime":"2018-05-11
01:09:18.0"},"type":"11"},{"id":"2","entityMetadata":{"lastChange":"2018-05-11
01:09:18.0","createdDateTime":"2018-05-11
01:09:18.0","modifiedDateTime":"2018-05-11
01:09:18.0"},"type":"11"},{"id":"3","entityMetadata":{"lastChange":"2018-05-11
01:09:18.0","createdDateTime":"2018-05-11
01:09:18.0","modifiedDateTime":"2018-05-11
01:09:18.0"},"type":"11"}..]


I get OOM on executors whenever i try to load this into spark.

Try 1
val hdf=spark.read.json("/user/tmp/hugedatafile")
hdf.show(2) or hdf.take(1) gives OOM

Try 2
Took a small sampledatafile and got schema to avoid schema infering
val sampleSchema=spark.read.json("/user/tmp/sampledatafile").schema
val hdf=spark.read.schema(sampleSchema).json("/user/tmp/hugedatafile")
hdf.show(2) or hdf.take(1) stuck for 1.5 hrs and gives OOM

Try 3 
Repartition it after before performing action
gives OOM

Try 4 
Read about the https://issues.apache.org/jira/browse/SPARK-20980 completely
val hdf = spark.read.option("multiLine",
true)..schema(sampleSchema).json("/user/tmp/hugedatafile")
hdf.show(1) or hdf.take(1) gives OOM


Can any one help me here?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing custom Structured Streaming receiver

2018-06-05 Thread alz2
I'm implementing a simple Structured Streaming Source with the V2 API in
Java. I've taken the Offset logic (regarding startOffset, endOffset,
lastCommittedOffset, etc) from the socket source and also your receivers. 

However, upon start up for some reason Spark says that the initial offset or
-1, is immediately available. Because -1 is available and not committed, my
streaming query gets triggered with an empty data buffer. After the query
runs, -1 is added to the StreamExecution's commitedOffsets. The issue gets
worse from here: as new data is pushed into the internal data buffer, my
currentOffset gets immediately committed (it appears in the
StreamExecution's commitedOffsets). So as my currentOffset changes because
new data is pushed into the data buffer, it appears both in availableOffsets
and committedOffsets, causing no new batches to run. 

The interesting thing is my commit function never gets run -- printing out
stuff from inside the function doesn't change behavior and even providing an
empty commit function doesn't change behavior.

Any ideas where or why my Offsets are getting committed?

Any help would be appreciated!

Here are my relevant code snippets
// instance var declarations
private Offset startOffset = null;
private Offset endOffset = null;
private volatile static currentOffset = new SocketOffset(-1);
private SocketOffset lastOffsetCommitted = new SocketOffset(-1);

public Offset getEndOffset() { // getStartOffset is the same except with
this.startOffset
  if (this.endOffset == null) throw
  return this.endOffset
}

public void setOffsetRange(Optional start, Optional end) {
  this.startOffset = start.orElse(new SocketOffset(-1));
  this.endOffset = end.orElse(currentOffset);
}





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using checkpoint much, much faster than cache. Why?

2018-06-05 Thread Phillip Henry
Hi, folks.

I am using Spark 2.2.0 and a combination of Spark ML's LinearSVC and
OneVsRest to classify some documents that are originally read from HDFS
using sc.wholeTextFiles.

When I use it on small documents, I get the results in a few minutes. When
I use it on the same number of large documents, it takes hours.

NOTE! I munge every documents to a fixed length vector which is the same
size irrespective of the size of the document.

Using jstat, I see all my executor threads in serialization code even
though all the data easily fits into the memory of the cluster ("Fraction
cached: 100%" everywhere).

I have called cache() on all my DataFrames with no effect. However, calling
checkpoint() on the DF fed to Spark's ML code solved the problem.

So, although the problem is fixed, I'd like to know why cache() did not
work when checkpoint() did.

Can anybody explain?

Thanks,

Phill


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
"dependent" I mean this batch's job relies on the previous batch's result.
So this batch should wait for the finish of previous batch, if you set "
spark.streaming.concurrentJobs" larger than 1, then the current batch could
start without waiting for the previous batch (if it is delayed), which will
lead to unexpected results.


thomas lavocat  于2018年6月5日周二
下午7:48写道:

>
> On 05/06/2018 13:44, Saisai Shao wrote:
>
> You need to read the code, this is an undocumented configuration.
>
> I'm on it right now, but, Spark is a big piece of software.
>
> Basically this will break the ordering of Streaming jobs, AFAIK it may get
> unexpected results if you streaming jobs are not independent.
>
> What do you mean exactly by not independent ?
> Are several source joined together dependent ?
>
> Thanks,
> Thomas
>
>
> thomas lavocat  于2018年6月5日周二
> 下午7:17写道:
>
>> Hello,
>>
>> Thank's for your answer.
>>
>> On 05/06/2018 11:24, Saisai Shao wrote:
>>
>> spark.streaming.concurrentJobs is a driver side internal configuration,
>> this means that how many streaming jobs can be submitted concurrently in
>> one batch. Usually this should not be configured by user, unless you're
>> familiar with Spark Streaming internals, and know the implication of this
>> configuration.
>>
>>
>> How can I find some documentation about those implications ?
>>
>> I've experimented some configuration of this parameters and found out
>> that my overall throughput is increased in correlation with this property.
>> But I'm experiencing scalability issues. With more than 16 receivers
>> spread over 8 executors, my executors no longer receive work from the
>> driver and fall idle.
>> Is there an explanation ?
>>
>> Thanks,
>> Thomas
>>
>>
>


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread thomas lavocat


On 05/06/2018 13:44, Saisai Shao wrote:

You need to read the code, this is an undocumented configuration.

I'm on it right now, but, Spark is a big piece of software.
Basically this will break the ordering of Streaming jobs, AFAIK it may 
get unexpected results if you streaming jobs are not independent.

What do you mean exactly by not independent ?
Are several source joined together dependent ?

Thanks,
Thomas


thomas lavocat > 于2018年6月5日周二 
下午7:17写道:


Hello,

Thank's for your answer.


On 05/06/2018 11:24, Saisai Shao wrote:

spark.streaming.concurrentJobs is a driver side internal
configuration, this means that how many streaming jobs can be
submitted concurrently in one batch. Usually this should not be
configured by user, unless you're familiar with Spark Streaming
internals, and know the implication of this configuration.


How can I find some documentation about those implications ?

I've experimented some configuration of this parameters and found
out that my overall throughput is increased in correlation with
this property.
But I'm experiencing scalability issues. With more than 16
receivers spread over 8 executors, my executors no longer receive
work from the driver and fall idle.
Is there an explanation ?

Thanks,
Thomas





Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
You need to read the code, this is an undocumented configuration.

Basically this will break the ordering of Streaming jobs, AFAIK it may get
unexpected results if you streaming jobs are not independent.

thomas lavocat  于2018年6月5日周二
下午7:17写道:

> Hello,
>
> Thank's for your answer.
>
> On 05/06/2018 11:24, Saisai Shao wrote:
>
> spark.streaming.concurrentJobs is a driver side internal configuration,
> this means that how many streaming jobs can be submitted concurrently in
> one batch. Usually this should not be configured by user, unless you're
> familiar with Spark Streaming internals, and know the implication of this
> configuration.
>
>
> How can I find some documentation about those implications ?
>
> I've experimented some configuration of this parameters and found out that
> my overall throughput is increased in correlation with this property.
> But I'm experiencing scalability issues. With more than 16 receivers
> spread over 8 executors, my executors no longer receive work from the
> driver and fall idle.
> Is there an explanation ?
>
> Thanks,
> Thomas
>
>


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread thomas lavocat

Hello,

Thank's for your answer.


On 05/06/2018 11:24, Saisai Shao wrote:
spark.streaming.concurrentJobs is a driver side internal 
configuration, this means that how many streaming jobs can be 
submitted concurrently in one batch. Usually this should not be 
configured by user, unless you're familiar with Spark Streaming 
internals, and know the implication of this configuration.


How can I find some documentation about those implications ?

I've experimented some configuration of this parameters and found out 
that my overall throughput is increased in correlation with this property.
But I'm experiencing scalability issues. With more than 16 receivers 
spread over 8 executors, my executors no longer receive work from the 
driver and fall idle.

Is there an explanation ?

Thanks,
Thomas



Strange codegen error for SortMergeJoin in Spark 2.2.1

2018-06-05 Thread Rico Bergmann
Hi!

I get a strange error when executing a complex SQL-query involving 4
tables that are left-outer-joined:

Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 37, Column 18: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 37, 
Column 18: No applicable constructor/method found for actual parameters "int"; 
candidates are: 
"org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(org.apache.spark.memory.TaskMemoryManager,
 org.apache.spark.storage.BlockManager, 
org.apache.spark.serializer.SerializerManager, org.apache.spark.TaskContext, 
int, long, int, int)", 
"org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(int, int)"

...

/* 037 */ smj_matches = new 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647);

The same query works with Spark 2.2.0.

I checked the Spark source code and saw that in
ExternalAppendOnlyUnsafeRowArray a second int was introduced into the
constructor in 2.2.1

But looking at the codegeneration part of SortMergeJoinExec:

// A list to hold all matched rows from right side. val matches = 
ctx.freshName("matches")
val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName

val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold

ctx.addMutableState(clsName, matches, s"$matches= new 
$clsName($inMemoryThreshold, $spillThreshold);")

it should get 2 parameters, not just one.


May be anyone has an idea?


Best,

Rico.



Reg:- Py4JError in Windows 10 with Spark

2018-06-05 Thread @Nandan@
Hi ,
I am getting error :-

---
Py4JError Traceback (most recent call last)
 in ()
3 TOTAL = 100
4 dots = sc.parallelize([2.0 * np.random.random(2) - 1.0 for i in range(
TOTAL)]).cache()
> 5 print("Number of random points:", dots.count())
6
7 stats = dots.stats()
C:\opt\spark\python\pyspark\rdd.py in count(self)
1039 3
1040 """
-> 1041 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1042
1043 def stats(self):
C:\opt\spark\python\pyspark\rdd.py in sum(self)
1030 6.0
1031 """
-> 1032 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
1033
1034 def count(self):
C:\opt\spark\python\pyspark\rdd.py in fold(self, zeroValue, op)
904 # zeroValue provided to each partition is unique from the one provided
905 # to the final reduce call
--> 906 vals = self.mapPartitions(func).collect()
907 return reduce(op, vals, zeroValue)
908
C:\opt\spark\python\pyspark\rdd.py in collect(self)
807 """
808 with SCCallSiteSync(self.context) as css:
--> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
810 return list(_load_from_socket(port, self._jrdd_deserializer))
811
C:\opt\spark\python\pyspark\rdd.py in _jrdd(self)
2453
2454 wrapped_func = _wrap_function(self.ctx, self.func,
self._prev_jrdd_deserializer,
-> 2455 self._jrdd_deserializer, profiler)
2456 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
wrapped_func,
2457 self.preservesPartitioning)
C:\opt\spark\python\pyspark\rdd.py in _wrap_function(sc, func,
deserializer, serializer, profiler)
2388 pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command)
2389 return sc._jvm.PythonFunction(bytearray(pickled_command), env,
includes, sc.pythonExec,
-> 2390 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
2391
2392
C:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py in
__call__(self,
*args)
1426 answer = self._gateway_client.send_command(command)
1427 return_value = get_return_value(
-> 1428 answer, self._gateway_client, None, self._fqn)
1429
1430 for temp_arg in temp_args:
C:\opt\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
C:\ProgramData\Anaconda3\lib\site-packages\py4j\protocol.py in
get_return_value(answer, gateway_client, target_id, name)
322 raise Py4JError(
323 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
--> 324 format(target_id, ".", name, value))
325 else:
326 raise Py4JError(
Py4JError: An error occurred while calling
None.org.apache.spark.api.python.PythonFunction. Trace:
py4j.Py4JException: Constructor
org.apache.spark.api.python.PythonFunction([class [B, class
java.util.HashMap, class java.util.ArrayList, class java.lang.String, class
java.lang.String, class java.util.ArrayList, class
org.apache.spark.api.python.PythonAccumulatorV2]) does not exist
at
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
at
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
at py4j.Gateway.invoke(Gateway.java:235)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)


I installed Spark 2.0.2 on windows 10 and code is as below:-

> sc = SparkContext.getOrCreate()
> sc
> import numpy as np
> TOTAL = 100
> dots = sc.parallelize([2.0 * np.random.random(2) - 1.0 for i in
> range(TOTAL)]).cache()
> print("Number of random points:", dots.count())
> stats = dots.stats()
> print('Mean:', stats.mean())
> print('stdev:', stats.stdev())


Getting error , when I am running Numphy code.
Please tell me what's wrong in this.


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
spark.streaming.concurrentJobs is a driver side internal configuration,
this means that how many streaming jobs can be submitted concurrently in
one batch. Usually this should not be configured by user, unless you're
familiar with Spark Streaming internals, and know the implication of this
configuration.



thomas lavocat  于2018年6月5日周二
下午4:20写道:

> Hi everyone,
>
> I'm wondering if the property  spark.streaming.concurrentJobs should
> reflects the total number of possible concurrent task on the cluster, or
> the a local number of concurrent tasks on one compute node.
>
> Thanks for your help.
>
> Thomas
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


is there a way to parse and modify raw spark sql query?

2018-06-05 Thread kant kodali
Hi All,

is there a way to parse and modify raw spark sql query?

For example, given the following query

spark.sql("select hello from view")

I want to modify the query or logical plan such that I can get the result
equivalent to the below query.

spark.sql("select foo, hello from view")

Any sample code will be great!

Thanks


Re: Help explaining explain() after DataFrame join reordering

2018-06-05 Thread Matteo Cossu
Hello,

as explained here
,
the join order can be changed by the optimizer. The difference introduced
in Spark 2.2 is that the reordering is based on statistics instead of
heuristics, that can appear "random" and for some cases decrease the
performances.
If you want to control more the join order you can define your own Rule, an
example here.


Best,

Matteo


On 1 June 2018 at 18:31, Mohamed Nadjib MAMI 
wrote:

> Dear Sparkers,
>
> I'm loading into DataFrames data from 5 sources (using official
> connectors): Parquet, MongoDB, Cassandra, MySQL and CSV. I'm then joining
> those DataFrames in two different orders.
> - mongo * cassandra * jdbc * parquet * csv (random order).
> - parquet * csv * cassandra * jdbc * mongodb (optimized order).
>
> The first follows a random order, whereas the second I'm deciding based on
> some optimization techniques (can provide details for the interested
> readers or if needed here).
>
> After the evaluation on increasing sizes of data, the optimization
> techniques I developed didn't improve the performance very noticeably. I
> inspected the Logical/Physical plan of the final joined DataFrame (using
> `explain(true)`). The 1st order was respected, whereas the 2nd order, it
> turned out, wasn't respected, and MongoDB was queried first.
>
> However, that what it seemed to me, I'm not quite confident reading the
> Plans (returned using explain(true)). Could someone help explaining the
> `explain(true)` output? (pasted in this gist
> ). Is
> there a way we could enforce the given order?
>
> I'm using Spark 2.1, so I think it doesn't include the new cost-based
> optimizations (introduced in Spark 2.2).
>
> *Regards, Grüße, **Cordialement,** Recuerdos, Saluti, προσρήσεις, 问候,
> تحياتي.*
> *Mohamed Nadjib Mami*
> *Research Associate @ Fraunhofer IAIS - PhD Student @ Bonn University*
> *About me! *
> *LinkedIn *
>


[Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread thomas lavocat

Hi everyone,

I'm wondering if the property  spark.streaming.concurrentJobs should 
reflects the total number of possible concurrent task on the cluster, or 
the a local number of concurrent tasks on one compute node.


Thanks for your help.

Thomas


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark partitionBy with partitioned column in json output

2018-06-05 Thread Elior Malul
Had the same issue my self. I was surprised at first as well, but I found
it useful - the amount of data saved for each partition has decreased.
When I load the data from each partition, I add the partitioned columns
with lit function before I merge the frames from the
different partitions.

On Tue, Jun 5, 2018 at 5:44 AM, Jay  wrote:

> The partitionBy clause is used to create hive folders so that you can
> point a hive partitioned table on the data .
>
> What are you using the partitionBy for ? What is the use case ?
>
>
> On Mon 4 Jun, 2018, 4:59 PM purna pradeep, 
> wrote:
>
>> im reading below json in spark
>>
>> {"bucket": "B01", "actionType": "A1", "preaction": "NULL",
>> "postaction": "NULL"}
>> {"bucket": "B02", "actionType": "A2", "preaction": "NULL",
>> "postaction": "NULL"}
>> {"bucket": "B03", "actionType": "A3", "preaction": "NULL",
>> "postaction": "NULL"}
>>
>> val df=spark.read.json("actions.json").toDF()
>>
>> Now im writing the same to a json output as below
>>
>> df.write. format("json"). mode("append"). 
>> partitionBy("bucket","actionType").
>> save("output.json")
>>
>>
>> and the output.json is as below
>>
>> {"preaction":"NULL","postaction":"NULL"}
>>
>> bucket,actionType columns are missing in the json output, i need
>> partitionby columns as well in the output
>>
>>