Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-07 Thread ZHANG Wei
AFAICT, there might be data skews, some partitions got too much rows,
which caused out of memory limitation. Trying .groupBy().count()
or .aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is
worth a try.

-- 
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +
Gautham Acharya  wrote:

> Hi everyone,
> 
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
> 
> The GROUP BY function runs on a wide dataset. The first column of the dataset 
> contains string labels that are GROUPed on. The remaining columns are numeric 
> values that are aggregated in the Pandas UDF. The dataset is very wide, with 
> 50,000 columns and 3 million rows.
> 
> --
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_5|
> |   label_a  | 2.0| 5.6   |  7.123  |
> |   label_b  | 11.0  | 1.4   |  2.345  |
> |   label_a  | 3.1| 6.2   |  5.444  |
> 
> 
> 
> My job runs fine on smaller datasets, with the same number of columns but 
> fewer rows. However, when run on a dataset with 3 million rows, I see the 
> following exception:
> 
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 
> 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 377, in main
> process()
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 372, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 286, in dump_stream
> for series in iterator:
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 303, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> 
> Looking at this issue, it 
> looks like PyArrow has a 2GB limit for each shard that is sent to the 
> grouping function.
> 
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory 
> each.
> 
> I've attached the full error log here as well. What are some workarounds that 
> I can do to get this job running? Unfortunately, we are running up to a 
> production release and this is becoming a severe blocker.
> 
> Thanks,
> Gautham
> 
> 
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [E] Re: Pyspark Kafka Structured Stream not working.

2020-05-07 Thread Vijayant Kumar
Hi Jungtek,

Thanks for the response. It appears to be #1.
I will appreciate if you can share some sample command to submit the Spark 
application.?

From: Jungtaek Lim [mailto:kabhwan.opensou...@gmail.com]
Sent: Wednesday, May 06, 2020 8:24 PM
To: Vijayant Kumar 
Cc: user@spark.apache.org
Subject: [E] Re: Pyspark Kafka Structured Stream not working.


[EXTERNAL EMAIL] DO NOT CLICK links or attachments unless you recognize the 
sender and know the content is safe.
Hi,

1. You seem to use DStream (Spark Streaming), not Structured Streaming.
2. I'm not familiar with pyspark, but looks like the error message is very 
clear - Kafka doesn't allow such name for "client.id". The 
error message guides the naming rule, so you may need to be adopted with such 
convention. (e.g. no space)

Hope this helps,

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, May 6, 2020 at 5:36 PM Vijayant Kumar 
mailto:vijayant.ku...@mavenir.com.invalid>> 
wrote:
Hi All,

I am getting the below error while using Pyspark Structured Streaming from 
Kafka Producer.

20/05/06 11:51:16 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
Error starting receiver 0 - kafka.common.InvalidConfigException: 
client.id Python Kafka streamer is illegal, contains a 
character other than ASCII alphanumerics, '.', '_' and '-'

I am using the below code to get the messages:

broker='vm105:2181'
topic='Hello-Kafka'
print 'broker topic is ',broker,topic
kvs = KafkaUtils.createStream(ssc, \
  broker, \
  "Python Kafka streamer",{topic:1})

And my Submit command is like below :-
spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar 
test_kafka.py vm105:2181 Hello-Kafka

Can any one help me what am I missing. ?

Thanks,
Vijayant

This e-mail message may contain confidential or proprietary information of 
Mavenir Systems, Inc. or its affiliates and is intended solely for the use of 
the intended recipient(s). If you are not the intended recipient of this 
message, you are hereby notified that any review, use or distribution of this 
information is absolutely prohibited and we request that you delete all copies 
in your control and contact us by e-mailing to 
secur...@mavenir.com. This message contains the 
views of its author and may not necessarily reflect the views of Mavenir 
Systems, Inc. or its affiliates, who employ systems to monitor email messages, 
but make no representation that such messages are authorized, secure, 
uncompromised, or free from computer viruses, malware, or other defects. Thank 
You

This e-mail message may contain confidential or proprietary information of 
Mavenir Systems, Inc. or its affiliates and is intended solely for the use of 
the intended recipient(s). If you are not the intended recipient of this 
message, you are hereby notified that any review, use or distribution of this 
information is absolutely prohibited and we request that you delete all copies 
in your control and contact us by e-mailing to secur...@mavenir.com. This 
message contains the views of its author and may not necessarily reflect the 
views of Mavenir Systems, Inc. or its affiliates, who employ systems to monitor 
email messages, but make no representation that such messages are authorized, 
secure, uncompromised, or free from computer viruses, malware, or other 
defects. Thank You


java.lang.OutOfMemoryError Spark Worker

2020-05-07 Thread Hrishikesh Mishra
Hi

I am getting out of memory error in worker log in streaming jobs in every
couple of hours. After this worker dies. There is no shuffle, no
aggression, no. caching  in job, its just a transformation.
I'm not able to identify where is the problem, driver or executor. And why
worker getting dead after the OOM streaming job should die. Am I missing
something.

Driver Memory:  2g
Executor memory: 4g

Spark Version:  2.4
Kafka Direct Stream
Spark Standalone Cluster.


20/05/06 12:52:20 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users  with view permissions: Set(root); groups
with view permissions: Set(); users  with modify permissions: Set(root);
groups with modify permissions: Set()

20/05/06 12:53:03 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[ExecutorRunner for app-20200506124717-10226/0,5,main]

java.lang.OutOfMemoryError: Java heap space

at org.apache.xerces.util.XMLStringBuffer.append(Unknown Source)

at org.apache.xerces.impl.XMLEntityScanner.scanData(Unknown Source)

at org.apache.xerces.impl.XMLScanner.scanComment(Unknown Source)

at
org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanComment(Unknown
Source)

at
org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown
Source)

at
org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown
Source)

at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)

at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)

at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)

at org.apache.xerces.parsers.DOMParser.parse(Unknown Source)

at org.apache.xerces.jaxp.DocumentBuilderImpl.parse(Unknown Source)

at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:150)

at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2480)

at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2468)

at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2539)

at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)

at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)

at org.apache.hadoop.conf.Configuration.set(Configuration.java:1143)

at org.apache.hadoop.conf.Configuration.set(Configuration.java:1115)

at
org.apache.spark.deploy.SparkHadoopUtil$.org$apache$spark$deploy$SparkHadoopUtil$$appendS3AndSparkHadoopConfigurations(SparkHadoopUtil.scala:464)

at
org.apache.spark.deploy.SparkHadoopUtil$.newConfiguration(SparkHadoopUtil.scala:436)

at
org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:114)

at org.apache.spark.SecurityManager.(SecurityManager.scala:114)

at org.apache.spark.deploy.worker.ExecutorRunner.org
$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:149)

at
org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)

20/05/06 12:53:38 INFO DriverRunner: Worker shutting down, killing driver
driver-20200505181719-1187

20/05/06 12:53:38 INFO DriverRunner: Killing driver process!




Regards
Hrishi


[Spark SQL][Beginner] Spark throw Catalyst error while writing the dataframe in ORC format

2020-05-07 Thread Deepak Garg
Hi,

I am getting following error while running a spark job. Error occurred when
Spark is trying to write the dataframe in ORC format . I am pasting the
error trace.

Any help in resolving this would be appreciated.

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Exchange hashpartitioning(t_time#5, stg_name#842, access_ne#843,
switch#844, 200)
+- *(13) HashAggregate(keys=[t_time#5, stg_name#842, access_ne#843,
switch#844], functions=[partial_count(1)], output=[t_time#5, stg_name#842,
access_ne#843, switch#844, count#985L])
   +- Union

at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
... 28 more

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
at
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:280)
at

Re: [Spark SQL][Beginner] Spark throw Catalyst error while writing the dataframe in ORC format

2020-05-07 Thread Jeff Evans
You appear to be hitting the broadcast timeout.  See:
https://stackoverflow.com/a/41126034/375670

On Thu, May 7, 2020 at 8:56 AM Deepak Garg  wrote:

> Hi,
>
> I am getting following error while running a spark job. Error
> occurred when Spark is trying to write the dataframe in ORC format . I am
> pasting the error trace.
>
> Any help in resolving this would be appreciated.
>
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> execute, tree:
>
> Exchange hashpartitioning(t_time#5, stg_name#842, access_ne#843,
> switch#844, 200)
> +- *(13) HashAggregate(keys=[t_time#5, stg_name#842, access_ne#843,
> switch#844], functions=[partial_count(1)], output=[t_time#5, stg_name#842,
> access_ne#843, switch#844, count#985L])
>+- Union
>
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
> at
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
> at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
> at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
> ... 28 more
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
> at
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> 

Re: java.lang.OutOfMemoryError Spark Worker

2020-05-07 Thread Jeff Evans
You might want to double check your Hadoop config files.  From the stack
trace it looks like this is happening when simply trying to load
configuration (XML files).  Make sure they're well formed.

On Thu, May 7, 2020 at 6:12 AM Hrishikesh Mishra 
wrote:

> Hi
>
> I am getting out of memory error in worker log in streaming jobs in every
> couple of hours. After this worker dies. There is no shuffle, no
> aggression, no. caching  in job, its just a transformation.
> I'm not able to identify where is the problem, driver or executor. And why
> worker getting dead after the OOM streaming job should die. Am I missing
> something.
>
> Driver Memory:  2g
> Executor memory: 4g
>
> Spark Version:  2.4
> Kafka Direct Stream
> Spark Standalone Cluster.
>
>
> 20/05/06 12:52:20 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users  with view permissions: Set(root); groups
> with view permissions: Set(); users  with modify permissions: Set(root);
> groups with modify permissions: Set()
>
> 20/05/06 12:53:03 ERROR SparkUncaughtExceptionHandler: Uncaught exception
> in thread Thread[ExecutorRunner for app-20200506124717-10226/0,5,main]
>
> java.lang.OutOfMemoryError: Java heap space
>
> at org.apache.xerces.util.XMLStringBuffer.append(Unknown Source)
>
> at org.apache.xerces.impl.XMLEntityScanner.scanData(Unknown Source)
>
> at org.apache.xerces.impl.XMLScanner.scanComment(Unknown Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanComment(Unknown
> Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown
> Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown
> Source)
>
> at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
>
> at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
>
> at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
>
> at org.apache.xerces.parsers.DOMParser.parse(Unknown Source)
>
> at org.apache.xerces.jaxp.DocumentBuilderImpl.parse(Unknown Source)
>
> at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:150)
>
> at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2480)
>
> at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2468)
>
> at
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2539)
>
> at
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
>
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
>
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1143)
>
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1115)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil$.org$apache$spark$deploy$SparkHadoopUtil$$appendS3AndSparkHadoopConfigurations(SparkHadoopUtil.scala:464)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil$.newConfiguration(SparkHadoopUtil.scala:436)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:114)
>
> at org.apache.spark.SecurityManager.(SecurityManager.scala:114)
>
> at org.apache.spark.deploy.worker.ExecutorRunner.org
> $apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:149)
>
> at
> org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)
>
> 20/05/06 12:53:38 INFO DriverRunner: Worker shutting down, killing driver
> driver-20200505181719-1187
>
> 20/05/06 12:53:38 INFO DriverRunner: Killing driver process!
>
>
>
>
> Regards
> Hrishi
>


RE: PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-07 Thread Gautham Acharya
Thanks for the quick reply, Zhang.

I don't think that we have too much data skew, and if we do, there isn't much 
of a way around it - we need to groupby this specific column in order to run 
aggregates. 

I'm running this with PySpark, it doesn't look like the groupBy() function 
takes a numPartitions column. What other options can I explore?

--gautham

-Original Message-
From: ZHANG Wei  
Sent: Thursday, May 7, 2020 1:34 AM
To: Gautham Acharya 
Cc: user@spark.apache.org
Subject: Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.


AFAICT, there might be data skews, some partitions got too much rows, which 
caused out of memory limitation. Trying .groupBy().count() or 
.aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is worth a 
try.

--
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +
Gautham Acharya  wrote:

> Hi everyone,
>
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
>
> The GROUP BY function runs on a wide dataset. The first column of the dataset 
> contains string labels that are GROUPed on. The remaining columns are numeric 
> values that are aggregated in the Pandas UDF. The dataset is very wide, with 
> 50,000 columns and 3 million rows.
>
> --
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_5|
> |   label_a  | 2.0| 5.6   |  7.123  |
> |   label_b  | 11.0  | 1.4   |  2.345  |
> |   label_a  | 3.1| 6.2   |  5.444  |
>
>
>
> My job runs fine on smaller datasets, with the same number of columns but 
> fewer rows. However, when run on a dataset with 3 million rows, I see the 
> following exception:
>
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 
> (TID 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 377, in main
> process()
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 372, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 286, in dump_stream
> for series in iterator:
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 303, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
>
> Looking at this 
> issue,
>  it looks like PyArrow has a 2GB limit for each shard that is sent to the 
> grouping function.
>
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory 
> each.
>
> I've attached the full error log here as well. What are some workarounds that 
> I can do to get this job running? Unfortunately, we are running up to a 
> production release and this is becoming a severe blocker.
>
> Thanks,
> Gautham
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: java.lang.OutOfMemoryError Spark Worker

2020-05-07 Thread Hrishikesh Mishra
It's only happening for Hadoop config. The exceptions trace are different
for each time it gets died. And Jobs run for couple hours then worker dies.

Another Reason:

*20/05/02 02:26:34 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[ExecutorRunner for app-20200501213234-9846/3,5,main]*

*java.lang.OutOfMemoryError: Java heap space*

* at org.apache.xerces.xni.XMLString.toString(Unknown Source)*

at org.apache.xerces.parsers.AbstractDOMParser.characters(Unknown Source)

at org.apache.xerces.xinclude.XIncludeHandler.characters(Unknown Source)

at
org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanContent(Unknown
Source)

at
org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown
Source)

at
org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown
Source)

at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)

at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)

at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)

at org.apache.xerces.parsers.DOMParser.parse(Unknown Source)

at org.apache.xerces.jaxp.DocumentBuilderImpl.parse(Unknown Source)

at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:150)

at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2480)

at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2468)

at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2539)

at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)

at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)

at org.apache.hadoop.conf.Configuration.set(Configuration.java:1143)

at org.apache.hadoop.conf.Configuration.set(Configuration.java:1115)

at
org.apache.spark.deploy.SparkHadoopUtil$.org$apache$spark$deploy$SparkHadoopUtil$$appendS3AndSparkHadoopConfigurations(SparkHadoopUtil.scala:464)

at
org.apache.spark.deploy.SparkHadoopUtil$.newConfiguration(SparkHadoopUtil.scala:436)

at
org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:114)

at org.apache.spark.SecurityManager.(SecurityManager.scala:114)

at org.apache.spark.deploy.worker.ExecutorRunner.org
$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:149)

at
org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)

*20/05/02 02:26:37 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[dispatcher-event-loop-3,5,main]*

*java.lang.OutOfMemoryError: Java heap space*

* at java.lang.Class.newInstance(Class.java:411)*

at
sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:403)

at
sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:394)

at java.security.AccessController.doPrivileged(Native Method)

at
sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:393)

at
sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)

at
sun.reflect.ReflectionFactory.generateConstructor(ReflectionFactory.java:398)

at
sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:360)

at
java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1520)

at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:79)

at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:507)

at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:482)

at java.security.AccessController.doPrivileged(Native Method)

at java.io.ObjectStreamClass.(ObjectStreamClass.java:482)

at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:379)

at java.io.ObjectStreamClass.(ObjectStreamClass.java:478)

at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:379)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)

at
org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)

at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:193)

at
org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:528)

at org.apache.spark.deploy.worker.Worker.org

Re: How to populate all possible combination values in columns using Spark SQL

2020-05-07 Thread Aakash Basu
Hi,

I've updated the SO question with masked data, added year column and other
requirement. Please take a look.

Hope this helps in solving the problem.

Thanks and regards,
AB

On Thu 7 May, 2020, 10:59 AM Sonal Goyal,  wrote:

> As mentioned in the comments on SO, can you provide a (masked) sample of
> the data? It will be easier to see what you are trying to do if you add the
> year column
>
> Thanks,
> Sonal
> Nube Technologies 
>
> 
>
>
>
>
> On Thu, May 7, 2020 at 10:26 AM Aakash Basu 
> wrote:
>
>> Hi,
>>
>> I've described the problem in Stack Overflow with a lot of detailing, can
>> you kindly check and help if possible?
>>
>> https://stackoverflow.com/q/61643910/5536733
>>
>> I'd be absolutely fine if someone solves it using Spark SQL APIs rather
>> than plain spark SQL query.
>>
>> Thanks,
>> Aakash.
>>
>


No. of active states?

2020-05-07 Thread Something Something
Is there a way to get the total no. of active states in memory at any given
point in a Stateful Spark Structured Streaming job? We are thinking of
using this metric for 'Auto Scaling' our Spark cluster.


Dynamically changing maxOffsetsPerTrigger

2020-05-07 Thread Something Something
Is there a way to dynamically modify value of 'maxOffsetsPerTrigger' while
a Stateful Structured Streaming job is running?

We are thinking of auto-scaling our Spark cluster but if we don't modify
the value of 'maxOffsetsPerTrigger' dynamically would adding more VMs to
the cluster help? I don't think it would, would it?

In other words, if I add 2 new VMs to the cluster but value of
'maxOffsetsPerTrigger' is still the same would performance improve? I would
think not. We would have to explicitly stop the job, add VMs & then restart
the job after changing the value of 'maxOffsetsPerTrigger' - which defeats
the purpose of Auto-scaling.

Please tell me if my understanding is not correct. Thanks.


Re: No. of active states?

2020-05-07 Thread Jungtaek Lim
Have you looked through and see metrics for state operators?

It has been providing "total rows" of state, and starting from Spark 2.4 it
also provides additional metrics specific to HDFSBackedStateStoreProvider,
including estimated memory usage in overall.

https://github.com/apache/spark/blob/24fac1e0c70a783b4d240607639ff20d7dd24191/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L168-L179


On Fri, May 8, 2020 at 11:30 AM Something Something <
mailinglist...@gmail.com> wrote:

> No. We are already capturing these metrics (e.g. numInputRows,
> inputRowsPerSecond).
>
> I am talking about "No. of States" in the memory at any given time.
>
> On Thu, May 7, 2020 at 4:31 PM Jungtaek Lim 
> wrote:
>
>> If you're referring total "entries" in all states in SS job, it's being
>> provided via StreamingQueryListener.
>>
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>>
>> Hope this helps.
>>
>> On Fri, May 8, 2020 at 3:26 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Is there a way to get the total no. of active states in memory at any
>>> given point in a Stateful Spark Structured Streaming job? We are thinking
>>> of using this metric for 'Auto Scaling' our Spark cluster.
>>>
>>


Re: [E] Re: Pyspark Kafka Structured Stream not working.

2020-05-07 Thread Jungtaek Lim
It's not either 1 or 2. Both two items are applied. I haven't played with
DStream + pyspark but given the error message is clear you'll probably want
to change the client.id "Python Kafka streamer" to accommodate the naming
convention guided in error message.

On Thu, May 7, 2020 at 3:55 PM Vijayant Kumar 
wrote:

> Hi Jungtek,
>
>
>
> Thanks for the response. It appears to be #1.
>
> I will appreciate if you can share some sample command to submit the Spark
> application.?
>
>
>
> *From:* Jungtaek Lim [mailto:kabhwan.opensou...@gmail.com]
> *Sent:* Wednesday, May 06, 2020 8:24 PM
> *To:* Vijayant Kumar 
> *Cc:* user@spark.apache.org
> *Subject:* [E] Re: Pyspark Kafka Structured Stream not working.
>
>
>
> *[EXTERNAL EMAIL]* DO NOT CLICK links or attachments unless you recognize
> the sender and know the content is safe.
>
> Hi,
>
>
>
> 1. You seem to use DStream (Spark Streaming), not Structured Streaming.
>
> 2. I'm not familiar with pyspark, but looks like the error message is very
> clear - Kafka doesn't allow such name for "client.id". The error message
> guides the naming rule, so you may need to be adopted with such convention.
> (e.g. no space)
>
>
>
> Hope this helps,
>
>
>
> Thanks,
>
> Jungtaek Lim (HeartSaVioR)
>
>
>
> On Wed, May 6, 2020 at 5:36 PM Vijayant Kumar <
> vijayant.ku...@mavenir.com.invalid> wrote:
>
> Hi All,
>
>
>
> I am getting the below error while using Pyspark Structured Streaming from
> Kafka Producer.
>
>
>
> 20/05/06 11:51:16 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - kafka.common.InvalidConfigException:
> client.id Python Kafka streamer is illegal, contains a character other
> than ASCII alphanumerics, '.', '_' and '-'
>
>
>
> I am using the below code to get the messages:
>
>
>
> broker='vm105:2181'
>
> topic='Hello-Kafka'
>
> print 'broker topic is ',broker,topic
>
> kvs = KafkaUtils.createStream(ssc, \
>
>   broker, \
>
>   "Python Kafka streamer",{topic:1})
>
>
>
> And my Submit command is like below :-
>
> *spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar
> test_kafka.py vm105:2181 Hello-Kafka*
>
>
>
> Can any one help me what am I missing. ?
>
>
>
> Thanks,
>
> Vijayant
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>


Re: No. of active states?

2020-05-07 Thread Something Something
No. We are already capturing these metrics (e.g. numInputRows,
inputRowsPerSecond).

I am talking about "No. of States" in the memory at any given time.

On Thu, May 7, 2020 at 4:31 PM Jungtaek Lim 
wrote:

> If you're referring total "entries" in all states in SS job, it's being
> provided via StreamingQueryListener.
>
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>
> Hope this helps.
>
> On Fri, May 8, 2020 at 3:26 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Is there a way to get the total no. of active states in memory at any
>> given point in a Stateful Spark Structured Streaming job? We are thinking
>> of using this metric for 'Auto Scaling' our Spark cluster.
>>
>


Re: No. of active states?

2020-05-07 Thread Jungtaek Lim
If you're referring total "entries" in all states in SS job, it's being
provided via StreamingQueryListener.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

Hope this helps.

On Fri, May 8, 2020 at 3:26 AM Something Something 
wrote:

> Is there a way to get the total no. of active states in memory at any
> given point in a Stateful Spark Structured Streaming job? We are thinking
> of using this metric for 'Auto Scaling' our Spark cluster.
>