Re: Use Arrow instead of Pickle without pandas_udf
Here is a link to the JIRA for adding StructType support for scalar pandas_udf https://issues.apache.org/jira/browse/SPARK-24579 On Wed, Jul 25, 2018 at 3:36 PM, Hichame El Khalfi wrote: > Hey Holden, > Thanks for your reply, > > We currently using a python function that produces a Row(TS=LongType(), > bin=BinaryType()). > We use this function like this dataframe.rdd.map(my_function) > .toDF().write.parquet() > > To reuse it in pandas_udf, we changes the return type to > StructType(StructField(Long), StructField(BinaryType). > > 1)But we face an issue that StructType is not supported by pandas_udf. > > So I was wondering to still continue to reuse dataftame.rdd.map but get an > improvement in serialization by using ArrowFormat instead of Pickle. > > *From:* hol...@pigscanfly.ca > *Sent:* July 25, 2018 4:41 PM > *To:* hich...@elkhalfi.com > *Cc:* user@spark.apache.org > *Subject:* Re: Use Arrow instead of Pickle without pandas_udf > > Not currently. What's the problem with pandas_udf for your use case? > > On Wed, Jul 25, 2018 at 1:27 PM, Hichame El Khalfi > wrote: > >> Hi There, >> >> >> Is there a way to use Arrow format instead of Pickle but without using >> pandas_udf ? >> >> >> Thank for your help, >> >> >> Hichame >> > > > > -- > Twitter: https://twitter.com/holdenkarau >
Executor lost for unknown reasons error Spark 2.3 on kubernetes
Hello, I’m getting below error in spark driver pod logs and executor pods are getting killed midway through while the job is running and even driver pod Terminated with below intermittent error ,this happens if I run multiple jobs in parallel. Not able to see executor logs as executor pods are killed org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor lost for unknown reasons. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 42 mor
Executor lost for unknown reasons error Spark 2.3 on kubernetes
Hello, I’m getting below error in spark driver pod logs and executor pods are getting killed midway through while the job is running and even driver pod Terminated with below intermittent error ,this happens if I run multiple jobs in parallel. Not able to see executor logs as executor pods are killed org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor lost for unknown reasons. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 42 more Thanks, Purna The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How to Create one DB connection per executor and close it after the job is done?
object MyDatabseSingleton { @transient lazy val dbConn = DB.connect(…) `transient` marks the variable to be excluded from serialization and `lazy` would open connection only when it's needed and also makes sure that the val is thread-safe http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/ http://code-o-matic.blogspot.com/2009/05/double-checked-locking-idiom-sweet-in.html On Mon, Jul 30, 2018 at 1:32 PM kant kodali wrote: > > Hi Patrick, > > This object must be serializable right? I wonder if I will access to this > object in my driver(since it is getting created on the executor side) so I > can close when I am done with my batch? > > Thanks! > > On Mon, Jul 30, 2018 at 7:37 AM, Patrick McGloin > wrote: >> >> You could use an object in Scala, of which only one instance will be created >> on each JVM / Executor. E.g. >> >> object MyDatabseSingleton { >> var dbConn = ??? >> } >> >> On Sat, 28 Jul 2018, 08:34 kant kodali, wrote: >>> >>> Hi All, >>> >>> I understand creating a connection forEachPartition but I am wondering can >>> I create one DB connection per executor and close it after the job is done? >>> any sample code would help. you can imagine I am running a simple batch >>> processing application. >>> >>> Thanks! > > -- Sent from my iPhone - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to Create one DB connection per executor and close it after the job is done?
Hi Patrick, This object must be serializable right? I wonder if I will access to this object in my driver(since it is getting created on the executor side) so I can close when I am done with my batch? Thanks! On Mon, Jul 30, 2018 at 7:37 AM, Patrick McGloin wrote: > You could use an object in Scala, of which only one instance will be > created on each JVM / Executor. E.g. > > object MyDatabseSingleton { > var dbConn = ??? > } > > On Sat, 28 Jul 2018, 08:34 kant kodali, wrote: > >> Hi All, >> >> I understand creating a connection forEachPartition but I am wondering >> can I create one DB connection per executor and close it after the job is >> done? any sample code would help. you can imagine I am running a simple >> batch processing application. >> >> Thanks! >> >
sorting on dataframe causes out of memory (java heap space)
While working with larger datasets I run into out of memory issues. Basically a hadoop sequence file is read, its contents are sorted and a hadoop map file is written back. Code works fine for workloads greater than 20gb. Than I changed one column in my dataset to store a large object and size of row object increased from 20kb to about 4mb. Now the same code runs into java heap space issues and application is shut down with an out of memory exception. Seems dataframe sort operations cannot handle large objects. I took an heap dump and saw an large array-of-array. I would expect such object when using collect() operation when single task results are collected into large array. I know, groupBy and collect() operation will cause such problems on large datasets, but I expected a single sort should not run into such issues. I switched from sort() to sortWithinPartitions() and the applicationdid not crash. Of course, the result is not the same. But shouldn't a simple sort() not work at all? I created a simple test programm, which blows up a tiny Int-RDD to Row-RDD with such large objects and found out, that spilling to disk seems not to work out of the box. Defaults and any StorageLevel of MEMORY* runs in this issues, only DISK_ONLY() works but is very slow. I posted an question with example code to stackoverflow: https://stackoverflow.com/questions/51546921/apache-spark-dataframe-causes-out-of-memory My question to the community is, how to sort growing number of data without increasing heap-size? I found out following facts: * larger datasets require to set maxResult size to greater values or 0 for no limit * row object size seems to impact memory usage * GC1 garbage collector may run in fragmentation issues for large objects, so I used parallelGc instead. In my case this has no impact, after processing n tasks heap runs full * reducing driver- and executor memory takes no effect, heap always fills in same way * persist with DISK* Storage level is no warranty that spark spills data to disk * using kryo serializer has in my case less effect, some more tasks are finsihed before oom occues * sortWithinPartitions works but after that only contents of partitions are sorted I assume sortWithinPartitions with a merge-shuffle-join should be okay to sort the final result. But why does spark combine whole resultset on driver? That is not very scalable?! So I dropped large column and let spark sort the other columns and finally I do left-join to combine large data back. Code runs without oom but left-join looses sort-order. Any ideas? May latest tests are not finished yet, but sorting on RDD instead of dataframe seems to work better. Coding is more complex and I expected catalyst optimizer in dataframes does not choose optimal settings. How can growing datasets be sorted without increasing memory? Is my code worse or is it just a spark bug? My setup is: - Windows 10, Java 1.8u144 (u171) with -Xms5g -Xmx5g and optional -XX:+UseParallelOldGC -Spark 2.3.1 in local mode (running as single node cluster on my workstation) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Kafka backlog - spark structured streaming
Heres a proposal to a add - https://github.com/apache/spark/pull/21819 Its always good to set "maxOffsetsPerTrigger" unless you want spark to process till the end of the stream in each micro batch. Even without "maxOffsetsPerTrigger" the lag can be non-zero by the time the micro batch completes. On 30 July 2018 at 08:50, Burak Yavuz wrote: > If you don't set rate limiting through `maxOffsetsPerTrigger`, Structured > Streaming will always process until the end of the stream. So number of > records waiting to be processed should be 0 at the start of each trigger. > > On Mon, Jul 30, 2018 at 8:03 AM, Kailash Kalahasti < > kailash.kalaha...@gmail.com> wrote: > >> Is there any way to find out backlog on kafka topic while using spark >> structured streaming ? I checked few consumer apis but that requires to >> enable groupid for streaming, but seems it is not allowed. >> >> Basically i want to know number of records waiting to be processed. >> >> Any suggestions ? >> > >
Re: Kafka backlog - spark structured streaming
If you don't set rate limiting through `maxOffsetsPerTrigger`, Structured Streaming will always process until the end of the stream. So number of records waiting to be processed should be 0 at the start of each trigger. On Mon, Jul 30, 2018 at 8:03 AM, Kailash Kalahasti < kailash.kalaha...@gmail.com> wrote: > Is there any way to find out backlog on kafka topic while using spark > structured streaming ? I checked few consumer apis but that requires to > enable groupid for streaming, but seems it is not allowed. > > Basically i want to know number of records waiting to be processed. > > Any suggestions ? >
Kafka backlog - spark structured streaming
Is there any way to find out backlog on kafka topic while using spark structured streaming ? I checked few consumer apis but that requires to enable groupid for streaming, but seems it is not allowed. Basically i want to know number of records waiting to be processed. Any suggestions ?
Re: How to Create one DB connection per executor and close it after the job is done?
You could use an object in Scala, of which only one instance will be created on each JVM / Executor. E.g. object MyDatabseSingleton { var dbConn = ??? } On Sat, 28 Jul 2018, 08:34 kant kodali, wrote: > Hi All, > > I understand creating a connection forEachPartition but I am wondering can > I create one DB connection per executor and close it after the job is done? > any sample code would help. you can imagine I am running a simple batch > processing application. > > Thanks! >
Re: How to reduceByKeyAndWindow in Structured Streaming?
Thanks guys, it really helps. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Using Spark Streaming for analyzing changing data
We have a use case where there's a stream of events while every event has an ID and its current state with a timestamp: … 111,ready,1532949947 111,offline,1532949955 111,ongoing,1532949955 111,offline,1532949973 333,offline,1532949981 333,ongoing,1532949987 … We want to ask questions about the current state of the *whole dataset*, from the beginning of time, such as: "how many items are now in ongoing state" (but bear in mind that there are more complicated questions, and all of them are asking about the _current_ state of the dataset, from the beginning of time) I haven't found any simple, performant way of doing it. The ways I've found are: 1. Using mapGroupsWithState, where I groupByKey on the ID, and update the state always for the latest event by timestamp 2. Using groupByKey on the ID, and leaving only the matched event whose timestamp is the latest Both methods are not good because the first one involves state which means checkpointing, memory, etc., and the second involves shuffling and sorting. We will have a lot of such queries in order to populate a real-time dashboard. I wonder, as a general question, what is the correct way to process this type of data in Spark Streaming? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to add a new source to exsting struct streaming application, like a kafka source
How to add a new source to exsting struct streaming application, like a kafka source
How to read csv in dataframe
I am trying to read csv in spark dataframe . My Os = Ubuntu 18.04, spark-version 2.3.1, python -version 2.7.15 My code : from pyspark import SparkConf from pyspark import SparkContext from pyspark import SQLContext from pyspark.sql import SparkSession conf = SparkConf() sc = SparkContext(conf = conf) spark = SparkSession.builder.config(conf=conf).appName('FinancialRecon').getOrCreate() sqlContext =SQLContext(sc) df=spark.read.csv('/home/iconnect4/finrecon/test2.csv') df.show() Error : Traceback (most recent call last): File "/home/iconnect4/finrecon/scratch_3.py", line 43, in df=sqlContext.read.csv('/home/iconnect4/finrecon/test2.csv') File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 441, in csv File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 79, in deco pyspark.sql.utils.IllegalArgumentException: 'Illegal pattern component: XXX' Please help -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org