Spark Sparser library

2018-08-09 Thread umargeek
Hi Team,

Please let me know the spark Sparser library to use while submitting the
spark application to use below mentioned format,

val df = spark.read.format("*edu.stanford.sparser.json*")

When I used above format it throwed error class not found exception.

Thanks,
Umar




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



MultilayerPerceptronClassifier

2018-08-09 Thread Mina Aslani
Hi,
I have couple of questions regarding using MultilayerPerceptronClassifier
in pyspark.
- Do we have any other options for solver other than solver='l-bfgs'?
- Also, tried to tune using cross validation and find the best model based
on the defined parameters. (e.g. maxIter, layers, tol, seed). When I get
the best model and try to see the different parameters using getSeed()/
getTol()/getMaxIter(), etc, I get below error:

'MultilayerPerceptronClassificationModel' object has no attribute
'getSeed'/'getTol'/'getMaxIter'.


The only attribute for the best model that I can get is the layers.


Any idea?


Best regards,

Mina


[Structured Streaming] Two watermarks and StreamingQueryListener

2018-08-09 Thread subramgr
Hi, 

We have two *flatMapGroupWithState* in our job and we have two
*withWatermark*

We are getting the event max time, event time and watermarks from
*QueryProgressEvent*. 

Right now it just returns one *watermark* value. 

Are two watermarks maintained by Spark or just one. 
If one which one
If one watermark is maintained per *Dataframe* how do I get the values for
them ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How does mapPartitions function work in Spark streaming on DStreams?

2018-08-09 Thread zakhavan
Hello,

I am running a spark streaming program on a dataset which is a sequence of
numbers in a text file format. I read the text file and load it into a Kafka
topic and then run the Spark streaming program on the DStream and finally
write the result into an output text file. But I'm getting almost totally
different result compared to run the program without Spark streaming. 

I'm using maPartitions and it seems it shuffles the data and messes it up. 

Here is my code in Spark streaming and using Kafka:

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import pandas as pd

#---
def classic_sta_lta_py(a):
nsta = 2
nlta = 30
#


print("a=", a)

sta = np.cumsum(a ** 2)
print("sta1=", sta)
#   sta = np.cumsum(a * a, dtype=float)
#   print("{}. sta array is: ".format(sta))


# Convert to float
sta = np.require(sta, dtype=np.float)
print("sta2=", sta)

# Copy for LTA
lta = sta.copy()
print("lta=", lta)

# Compute the STA and the LTA
sta[nsta:] = sta[nsta:] - sta[:-nsta]
sta /= nsta
lta[nlta:] = lta[nlta:] - lta[:-nlta]
lta /= nlta

# Pad zeros
sta[:nlta - 1] = 0

# Avoid division by zero by setting zero values to tiny float
dtiny = np.finfo(0.0).tiny
idx = lta < dtiny
lta[idx] = dtiny

return sta / lta
#---

def trigger_onset(charfct):
"""
Calculate trigger on and off times.

Given thres1 and thres2 calculate trigger on and off times from
characteristic function.

This method is written in pure Python and gets slow as soon as there
are more then 1e6 triggerings ("on" AND "off") in charfct --- normally
this does not happen.

:type charfct: NumPy :class:`~numpy.ndarray`
:param charfct: Characteristic function of e.g. STA/LTA trigger
:type thres1: float
:param thres1: Value above which trigger (of characteristic function)
   is activated (higher threshold)
:type thres2: float
:param thres2: Value below which trigger (of characteristic function)
is deactivated (lower threshold)
:type max_len: int
:param max_len: Maximum length of triggered event in samples. A new
event will be triggered as soon as the signal reaches
again above thres1.
:type max_len_delete: bool
:param max_len_delete: Do not write events longer than max_len into
   report file.
:rtype: List
:return: Nested List of trigger on and of times in samples
"""
# 1) find indices of samples greater than threshold
# 2) calculate trigger "of" times by the gap in trigger indices
#above the threshold i.e. the difference of two following indices
#in ind is greater than 1
# 3) in principle the same as for "of" just add one to the index to get
#start times, this operation is not supported on the compact
#syntax
# 4) as long as there is a on time greater than the actual of time find
#trigger on states which are greater than last of state an the
#corresponding of state which is greater than current on state
# 5) if the signal stays above thres2 longer than max_len an event
#is triggered and following a new event can be triggered as soon as
#the signal is above thres1
thres1 = 4
thres2 = 2
max_len = 9e99
max_len_delete = False
#charfct = []
# for x in iterator:
# print(x)
# charfct.append(x)
ind1 = np.where(charfct > thres1)[0]
if len(ind1) == 0:
return []
ind2 = np.where(charfct > thres2)[0]
#
on = deque([ind1[0]])
of = deque([-1])
# determine the indices where charfct falls below off-threshold
ind2_ = np.empty_like(ind2, dtype=bool)
ind2_[:-1] = np.diff(ind2) > 1
# last occurence is missed by the diff, add it manually
ind2_[-1] = True
of.extend(ind2[ind2_].tolist())
on.extend(ind1[np.where(np.diff(ind1) > 1)[0] + 1].tolist())
# include last pick if trigger is on or drop it
if max_len_delete:
# drop it
of.extend([1e99])
on.extend([on[-1]])
else:
# include it
of.extend([ind2[-1]])
#
pick = []
while on[-1] > of[0]:
while on[0] <= of[0]:
on.popleft()
while of[0] < on[0]:
of.popleft()
if of[0] - on[0] > max_len:
if max_len_delete:
on.popleft()
continue
of.appendleft(on[0] + max_len)
pick.append([on[0], of[0]])
return np.array(pick, dtype=np.int64)
# #---

def saveRec(rdd):
rdd.foreach(lambda rec:

Kryoserializer with pyspark

2018-08-09 Thread Hichame El Khalfi
Hello there !!!

Is there any benefit from tuning `spark.kryoserializer.buffer` and 
`spark.kryoserializer.buffer.max` if we just use pyspark wth no Java or Scala 
classes ?

Thanks for your help,

Hichame



Re: Implementing .zip file codec

2018-08-09 Thread mytramesh
Spark doesn't support zip file reading directly since this not distributable
file . 

Read using Java.uti.zipInputStream api and prepare rdd ..  ( 4GB Limit ) 

import java.util.zip.ZipInputStream
import scala.io.Source
import org.apache.spark.input.PortableDataStream

var zipPath = "s3:// ABC.zip"

val rdd= sc.binaryFiles(zipPath).flatMap((file: (String,
PortableDataStream)) => {
var zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
var iter: Iterator[String] = null

iter = Source.fromInputStream(zipStream, "ISO_8859_1").getLines

iter
})


if zip file more than 4 GB use 
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Koert Kuipers
thanks for that long reply jungtaek!

so when i set spark.sql.shuffle.partitions to 2048 i have 2048 data
partitions (or "partitions of state"). these are determined by a hashing
function. ok got it!

when i look at the application manager i also see 2048 "tasks" for the
relevant stage. so tasks here is not the same as parallelism, which is
defined by number of executors * number of cores. and i see these tasks are
being queued up to be processed. i have learned to watch the number of
tasks in shuffle carefully, since its the unit of work, and because when i
have memory issues (OOM) it usually means i need to split the data up more,
so increase the tasks.

so is it reasonable to assume for the shuffle that a task maps to a single
data partition being processed?

if so, then when i do a coalesce(100) after a shuffle i see only 100 tasks
for the stage of shuffle. what does this mean? does this mean a task no
longer maps to a single data partition being processed, and i still have
2048 data partitions? if so, does every task process multiple data
partitions sequentially? and does this not increase my chances of OOM
because the data partitions are processed sequentially within a task?


On Thu, Aug 9, 2018 at 3:10 AM, Jungtaek Lim  wrote:

> I could be wrong so welcome anyone to correct me if I'm missing here.
>
> You can expect Spark operators in narrow dependency as applying wrapped
> functions to an iterator (like "op3(op2(op1(iter)))"), and with such
> expectation there's no way to achieve adjusting partitions. Each partition
> is independent from others and there's no communication between tasks.
>
> So if you have 1000 partitions (in terms of parallelism, not data
> partitions) and willing to reduce down (or scale out) to some arbitrary
> number of partitions, it would require moving of data and requires shuffle.
>
> The meaning of "spark.sql.shuffle.partitions" is especially important for
> structured streaming because it defines data partitions of state. For
> structured streaming, there're couple of operations which leverage state
> which is stored to the file system. The state is partitioned by key
> columns, and "spark.sql.shuffle.partitions" data partitions are generated.
> Due to the nature of hash function, once you run the streaming query,
> "spark.sql.shuffle.partitions" keeps unchanged (Spark doesn't allow
> reconfigure for the config).
>
> So the value of configuration represents data partitions of state, as well
> as max parallelism of stateful operators. If we want to have less
> parallelism (not same as number of partitions), we should apply coalesce to
> the operator and the number of partitions are still kept unchanged whereas
> it incurs less parallelism and also less tasks.
>
> We just can't apply coalesce to individual operator in narrow dependency.
>
> -Jungtaek Lim (HeartSaVioR)
> 2018년 8월 9일 (목) 오후 3:07, Koert Kuipers 님이 작성:
>
>> well an interesting side effect of this is that i can now control the
>> number of partitions for every shuffle in a dataframe job, as opposed to
>> having a single setting for number of partitions across all shuffles.
>>
>> basically i can set spark.sql.shuffle.partitions to some huge number, and
>> then for every groupByKey (or any other shuffle operation) follow it up
>> with a coalesce to set the number of partitions. its like i have
>> numPartitions back from those good old RDD shuffle methods :)
>>
>>
>> On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers  wrote:
>>
>>> an new map task after a shuffle is also a narrow dependency, isnt it?
>>> its narrow because data doesn't need to move, e.g. every partition depends
>>> on single partition, preferably on same machine.
>>>
>>> modifying a previous shuffle to avoid a shuffle strikes me as odd, and
>>> can potentially make a mess of performance, especially when no shuffle is
>>> needed. just a new map task.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim  wrote:
>>>
 > shouldnt coalesce introduce a new map-phase with less tasks instead
 of changing the previous shuffle?

 The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
 results in narrow dependency, hence no shuffle.

 So it is pretty clear that you need to use "repartition". Not sure
 there's any available trick to achieve it without calling repartition.

 Thanks,
 Jungtaek Lim (HeartSaVioR)

 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e
 9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/
 Dataset.scala#L2918-L2937


 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:

> sorry i meant to say:
> wit a checkpoint i get a map phase with lots of tasks to read the
> data, then a reduce phase with 2048 reducers, and then finally a map phase
> with 100 tasks.
>
> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers 
> wrote:
>
>> the only thing that seems to stop this so far is a checkpoint.
>>

Re: Structured Streaming doesn't write checkpoint log when I use coalesce

2018-08-09 Thread Jungtaek Lim
Which version do you use? Above app works with Spark 2.3.1, 200 partitions
are stored for State.

val queryStatusFile = conf.queryStatusFile()
val rateRowPerSecond = conf.rateRowPerSecond()
val rateRampUpTimeSecond = conf.rateRampUpTimeSecond()

val ss = SparkSession
  .builder()
  .master("local[3]")
  .appName("state coalesce test")
  .getOrCreate()

ss.streams.addListener(new
QueryListenerWriteProgressToFile(queryStatusFile))

import ss.implicits._

val df = ss.readStream
  .format("rate")
  .option("rowsPerSecond", rateRowPerSecond)
  .option("rampUpTime", s"${rateRampUpTimeSecond}s")
  .load()

df.printSchema()

val outDf = df.withWatermark("timestamp", "10 seconds")
  .selectExpr(
"timestamp", "mod(value, 100) as mod", "value",
BenchmarkQueryHelper.createCaseExprStr(
  "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)", 50, 10) + " as word")
  .groupBy(
window($"timestamp", "1 minute", "10 seconds"),
$"mod", $"word")
  .agg(max("value").as("max_value"), min("value").as("min_value"),
avg("value").as("avg_value"))
  .coalesce(8)

val query = outDf.writeStream
  .format("memory")
  .option("queryName", "stateCoalesceTest")
  .option("checkpointLocation", "/tmp/state-coalesce-test")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode(OutputMode.Update())
  .start()

query.awaitTermination()

-Jungtaek Lim (HeartSaVioR)


2018년 8월 9일 (목) 오후 8:38, WangXiaolong 님이 작성:

> Hi,
>
>Lately, I encountered a problem, when I was writing as structured
> streaming job to write things into opentsdb.
>   The write-stream part looks something like
>
>   outputDs
>   .coalesce(14)
>   .writeStream
>   .outputMode("append")
>   .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds"))
>   .option("checkpointLocation",s"$checkpointDir/$appName/tsdb")
>   .foreach {
> TsdbWriter(
>   tsdbUrl,
>   MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword,
> mongoDatabase, mongoCollection,mongoAuthenticationDatabase)
> )(createMetricBuilder(tsdbMetricPrefix))
>   }
>   .start()
>
> And when I check the checkpoint dir, I discover that the
> "/checkpoint/state" dir  is empty. I looked into the executor's log and
> found that the HDFSBackedStateStoreProvider didn't write anything on the
> checkpoint dir.
>
>Strange thing is, when I replace the "coalesce" function into
> "repartition" function, the problem solved. Is there a difference between
> these two functions when using structured streaming?
>
>   Looking forward to you help, thanks.
>
>
>
>
>


Structured Streaming doesn't write checkpoint log when I use coalesce

2018-08-09 Thread WangXiaolong
Hi,


   Lately, I encountered a problem, when I was writing as structured streaming 
job to write things into opentsdb.
  The write-stream part looks something like 


  outputDs
  .coalesce(14)
  .writeStream
  .outputMode("append")
  .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds"))
  .option("checkpointLocation",s"$checkpointDir/$appName/tsdb")
  .foreach {
TsdbWriter(
  tsdbUrl,
  MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword, 
mongoDatabase, mongoCollection,mongoAuthenticationDatabase)
)(createMetricBuilder(tsdbMetricPrefix))
  }
  .start()


And when I check the checkpoint dir, I discover that the 
"/checkpoint/state" dir  is empty. I looked into the executor's log and found 
that the HDFSBackedStateStoreProvider didn't write anything on the checkpoint 
dir.


   Strange thing is, when I replace the "coalesce" function into "repartition" 
function, the problem solved. Is there a difference between these two functions 
when using structured streaming?


  Looking forward to you help, thanks.



Understanding spark.executor.memoryOverhead

2018-08-09 Thread Akash Mishra
Hi *,

I would like to know what part of spark codebase
uses spark.executor.memoryOverhead? I have a job which has
*spark.executor.memory=18g
*but it requires spark.executor.memoryOverhead=4g for the same process
otherwise I get task error,

ExecutorLostFailure (executor 28 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 23.7 GB of 22
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

There is only 1 task running on this executor and JVM Heap usage is around
14 GB. I am not able to understand that what exactly is using 5.7 GB of
memory other than Java?

Is it netty for block read or something else?



-- 

Regards,
Akash Mishra.


"It's not our abilities that make us, but our decisions."--Albus Dumbledore


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Jungtaek Lim
I could be wrong so welcome anyone to correct me if I'm missing here.

You can expect Spark operators in narrow dependency as applying wrapped
functions to an iterator (like "op3(op2(op1(iter)))"), and with such
expectation there's no way to achieve adjusting partitions. Each partition
is independent from others and there's no communication between tasks.

So if you have 1000 partitions (in terms of parallelism, not data
partitions) and willing to reduce down (or scale out) to some arbitrary
number of partitions, it would require moving of data and requires shuffle.

The meaning of "spark.sql.shuffle.partitions" is especially important for
structured streaming because it defines data partitions of state. For
structured streaming, there're couple of operations which leverage state
which is stored to the file system. The state is partitioned by key
columns, and "spark.sql.shuffle.partitions" data partitions are generated.
Due to the nature of hash function, once you run the streaming query,
"spark.sql.shuffle.partitions" keeps unchanged (Spark doesn't allow
reconfigure for the config).

So the value of configuration represents data partitions of state, as well
as max parallelism of stateful operators. If we want to have less
parallelism (not same as number of partitions), we should apply coalesce to
the operator and the number of partitions are still kept unchanged whereas
it incurs less parallelism and also less tasks.

We just can't apply coalesce to individual operator in narrow dependency.

-Jungtaek Lim (HeartSaVioR)
2018년 8월 9일 (목) 오후 3:07, Koert Kuipers 님이 작성:

> well an interesting side effect of this is that i can now control the
> number of partitions for every shuffle in a dataframe job, as opposed to
> having a single setting for number of partitions across all shuffles.
>
> basically i can set spark.sql.shuffle.partitions to some huge number, and
> then for every groupByKey (or any other shuffle operation) follow it up
> with a coalesce to set the number of partitions. its like i have
> numPartitions back from those good old RDD shuffle methods :)
>
>
> On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers  wrote:
>
>> an new map task after a shuffle is also a narrow dependency, isnt it? its
>> narrow because data doesn't need to move, e.g. every partition depends on
>> single partition, preferably on same machine.
>>
>> modifying a previous shuffle to avoid a shuffle strikes me as odd, and
>> can potentially make a mess of performance, especially when no shuffle is
>> needed. just a new map task.
>>
>>
>> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim  wrote:
>>
>>> > shouldnt coalesce introduce a new map-phase with less tasks instead of
>>> changing the previous shuffle?
>>>
>>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>>> results in narrow dependency, hence no shuffle.
>>>
>>> So it is pretty clear that you need to use "repartition". Not sure
>>> there's any available trick to achieve it without calling repartition.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 1.
>>> https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937
>>>
>>>
>>> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:
>>>
 sorry i meant to say:
 wit a checkpoint i get a map phase with lots of tasks to read the data,
 then a reduce phase with 2048 reducers, and then finally a map phase with
 100 tasks.

 On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers 
 wrote:

> the only thing that seems to stop this so far is a checkpoint.
>
> wit a checkpoint i get a map phase with lots of tasks to read the
> data, then a reduce phase with 2048 reducers, and then finally a map phase
> with 4 tasks.
>
> now i need to figure out how to do this without having to checkpoint.
> i wish i could insert something like a dummy operation that logical steps
> cannot jump over.
>
> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers 
> wrote:
>
>> ok thanks.
>>
>> mh. that seems odd. shouldnt coalesce introduce a new map-phase
>> with less tasks instead of changing the previous shuffle?
>>
>> using repartition seems too expensive just to keep the number of
>> files down. so i guess i am back to looking for another solution.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
>> wrote:
>>
>>> `coalesce` sets the number of partitions for the last stage, so you
>>> have to use `repartition` instead which is going to introduce an
>>> extra
>>> shuffle stage
>>>
>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers 
>>> wrote:
>>> >
>>> > one small correction: lots of files leads to pressure on the spark
>>> driver program when reading this data in spark.
>>> >
>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>>> wrote:
>>> >>
>>> >> hi,
>>> 

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Koert Kuipers
well an interesting side effect of this is that i can now control the
number of partitions for every shuffle in a dataframe job, as opposed to
having a single setting for number of partitions across all shuffles.

basically i can set spark.sql.shuffle.partitions to some huge number, and
then for every groupByKey (or any other shuffle operation) follow it up
with a coalesce to set the number of partitions. its like i have
numPartitions back from those good old RDD shuffle methods :)


On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers  wrote:

> an new map task after a shuffle is also a narrow dependency, isnt it? its
> narrow because data doesn't need to move, e.g. every partition depends on
> single partition, preferably on same machine.
>
> modifying a previous shuffle to avoid a shuffle strikes me as odd, and can
> potentially make a mess of performance, especially when no shuffle is
> needed. just a new map task.
>
>
> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim  wrote:
>
>> > shouldnt coalesce introduce a new map-phase with less tasks instead of
>> changing the previous shuffle?
>>
>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>> results in narrow dependency, hence no shuffle.
>>
>> So it is pretty clear that you need to use "repartition". Not sure
>> there's any available trick to achieve it without calling repartition.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a03081
>> 65f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/
>> spark/sql/Dataset.scala#L2918-L2937
>>
>>
>> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:
>>
>>> sorry i meant to say:
>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>> then a reduce phase with 2048 reducers, and then finally a map phase with
>>> 100 tasks.
>>>
>>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers  wrote:
>>>
 the only thing that seems to stop this so far is a checkpoint.

 wit a checkpoint i get a map phase with lots of tasks to read the data,
 then a reduce phase with 2048 reducers, and then finally a map phase with 4
 tasks.

 now i need to figure out how to do this without having to checkpoint. i
 wish i could insert something like a dummy operation that logical steps
 cannot jump over.

 On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers 
 wrote:

> ok thanks.
>
> mh. that seems odd. shouldnt coalesce introduce a new map-phase
> with less tasks instead of changing the previous shuffle?
>
> using repartition seems too expensive just to keep the number of files
> down. so i guess i am back to looking for another solution.
>
>
>
> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
> wrote:
>
>> `coalesce` sets the number of partitions for the last stage, so you
>> have to use `repartition` instead which is going to introduce an extra
>> shuffle stage
>>
>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers 
>> wrote:
>> >
>> > one small correction: lots of files leads to pressure on the spark
>> driver program when reading this data in spark.
>> >
>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>> wrote:
>> >>
>> >> hi,
>> >>
>> >> i am reading data from files into a dataframe, then doing a
>> groupBy for a given column with a count, and finally i coalesce to a
>> smaller number of partitions before writing out to disk. so roughly:
>> >>
>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>> lesce(100).write.format(...).save(...)
>> >>
>> >> i have this setting: spark.sql.shuffle.partitions=2048
>> >>
>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>> instead is a shuffle with only 100 partitions. it's like the coalesce has
>> taken over the partitioning of the groupBy.
>> >>
>> >> any idea why?
>> >>
>> >> i am doing coalesce because it is not helpful to write out 2048
>> files, lots of files leads to pressure down the line on executors reading
>> this data (i am writing to just one partition of a larger dataset), and
>> since i have less than 100 executors i expect it to be efficient. so 
>> sounds
>> like a good idea, no?
>> >>
>> >> but i do need 2048 partitions in my shuffle due to the operation i
>> am doing in the groupBy (in my real problem i am not just doing a 
>> count...).
>> >>
>> >> thanks!
>> >> koert
>> >>
>> >
>>
>>
>> --
>> Sent from my iPhone
>>
>
>

>>>
>


Error in java_gateway.py

2018-08-09 Thread ClockSlave
Following the code snippets on  this thread
  , I got a working version of
XGBoost on pyspark. But one issue I am still facing is the following
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/dummy_package/xgboost/xgboost.py",
line 92, in __init__self._java_obj =
self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator",
self.uid)  File
"/Users/ultrauser/Downloads/spark/python/pyspark/ml/wrapper.py", line 61, in
_new_java_objjava_obj = getattr(java_obj, name)  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py",
line 1598, in __getattr__raise Py4JError("{0} does not exist in the
JVM".format(new_fqn))py4j.protocol.Py4JError:
ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator does not exist in the
JVMException ignored in: Traceback (most recent call last):  File
"/Users/ultrauser/Downloads/spark/python/pyspark/ml/wrapper.py", line 105,
in __del__   
SparkContext._active_spark_context._gateway.detach(self._java_obj)  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py",
line 2000, in detachjava_object._detach()AttributeError: 'NoneType'
object has no attribute '_detach'
>From what I read on StackOverflow and elsewhere, this looks like an issue of
jar locations. I have two jar files that are needed for this code to work
   
xgboost4j-0.72.jar   
xgboost4j-spark-0.72
But I am not sure how to proceed. This is what I have tried so far
place the xgboost jar files in 
/Library/Java/Extensions
set the environment variables 
import osos.environ['PYSPARK_SUBMIT_ARGS'] = '--jars
/Users/ultrauser/Downloads/xgboost4j-0.72.jar,
/Users/ultrauser/Downloads/xgboost4j-spark-0.72.jar pyspark-shell'
Place the jar files in $SPARK_HOME/jars
But the error still persists. Is there something I am missing here?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/