why the perf improvement is small after enabling arrow?
Anything else could be missing? Thanks.
On Sun, Oct 4, 2020 at 10:36 AM Lian Jiang wrote:
> Please ignore this question.
> https://kontext.tech/column/spark/370/improve-pyspark-performance-using-pandas-udf-with-apache-arrow
> shows p
missed enabling spark.sql.execution.arrow.enabled. Thanks.
Regards.
On Sun, Oct 4, 2020 at 10:22 AM Lian Jiang wrote:
> Hi,
>
> I am using pyspark Grouped Map pandas UDF (
> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html).
> Functionality wise it works
Hi,
I am using pyspark Grouped Map pandas UDF (
https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html).
Functionality wise it works great. However, serDe causes a lot of perf
hits. To optimize this UDF, can I do either below:
1. use a java UDF to completely replace the python Gr
Hi,
My Spark job failed when reading parquet files from S3 due to 503 slow
down. According to
https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html,
I can use backoff to mitigate this issue. However, spark seems to interrupt
the backoff sleeping (see "sleep interrupted"). Is
you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your
> result the same?
>
> From: Lian Jiang
> Date: Sunday, April 5, 2020 at 3:28 AM
> To: user
> Subject: pandas_udf is very slow
>
> Hi,
>
> I am using pandas udf in pyspark 2.4.3 on EMR 5.21.
Hi,
I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored
over non pandas udf per
https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.
My data has about 250M records and the pandas udf code is like:
def pd_udf_func(
I figured out. Thanks.
On Mon, Oct 7, 2019 at 9:55 AM Lian Jiang wrote:
> Hi,
>
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> import pyspark
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
>
> df = spark.createDataFra
Hi,
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, True, 1.0, 'aa'), (1, False, 2.0, 'aa'), (2, True, 3.0, 'aa'), (2,
True, 5.0, 'aa'), (2, True, 10.0,
file and using
>> oracle external table to do the insert.
>>
>> hope this could help.
>>
>> Dalin
>>
>>> On Thu, Apr 18, 2019 at 11:43 AM Jörn Franke wrote:
>>> What is the size of the data? How much time does it need on HDFS and how
>>
Hi,
My spark job writes into oracle db using:
df.coalesce(10).write.format("jdbc").option("url", url)
.option("driver", driver).option("user", user)
.option("batchsize", 2000)
.option("password", password).option("dbtable",
tableName).mode("append").save()
It is much slow than writting int
Hi,
Occasionally, spark generates some parquet files having only 4 bytes. The
content is "PAR1". ETL spark jobs cannot handle such corrupted files and
ignore the whole partition containing such poison pill files, causing big
data loss.
Spark also generates 0 bytes parquet files but they can be ha
it be possible that you share your solution (in case the project is
> open-sourced already) with us and then we can have a look at it?
>
> Many thanks in advance.
>
> Best regards,
> [1]. https://github.com/databricks/spark-csv
>
> On Tue, Mar 26, 2019 at 1:09 AM Lian Jian
coalesce instead?
>
> Kathleen
>
> On Fri, Mar 22, 2019 at 2:43 PM Lian Jiang wrote:
>
>> Hi,
>>
>> Writing a csv to HDFS takes about 1 hour:
>>
>>
>> df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite
Hi,
Writing a csv to HDFS takes about 1 hour:
df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv)
The generated csv file is only about 150kb. The job uses 3 containers (13
cores, 23g mem).
Other people have similar issues but I don't see
Hi,
In my spark batch job,
step 1: the driver assigns a partition of json file path list to each
executor.
step 2: each executor gets these assigned json files from S3 and save into
hdfs.
step 3: the driver read these json files into a data frame and save into
parquet.
To improve performance by
state management by setting the following
>>> configuration in the SparkSession before starting the streaming query.
>>>
>>> spark.conf.set(
>>> "spark.sql.streaming.stateStore.providerClass",
>>> "com.databricks.sql.streaming.state.RocksD
Hi,
I have a very simple SSS pipeline which does:
val query = df
.dropDuplicates(Array("Id", "receivedAt"))
.withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
.writeStream
.format("parquet")
.partitionBy("availabilityDomain", timePartitionCol)
.trigger(Trigger.Processin
gt;
> For me this worked to ignore reading empty/partially uploaded gzip files
> in s3 bucket.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang wrote:
>
>> Hi,
>>
>> I have a structured streaming job which liste
Hi,
I have a structured streaming job which listens to a hdfs folder containing
jsonl.gz files. The job crashed due to error:
java.io.IOException: incorrect header check
at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
Method)
at
org.apache.hadoop.io.compre
Hi,
We have a spark structured streaming monitoring a folder and converting
jsonl files into parquet. However, if there are some pre-existing jsonl
files before the first time (no check point yet) running of the spark
streaming job, these files will not be processed by the spark job when it
runs.
Hi,
We have a structured streaming job that converting json into parquets. We
want to validate the json records. If a json record is not valid, we want
to log a message and refuse to write it into the parquet. Also the json has
nesting jsons and we want to flatten the nesting jsons into other parq
I am using spark in Jupyter as below:
import findspark
findspark.init()
from pyspark import SQLContext, SparkContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.parquet("oci://mybucket@mytenant/myfile.parquet")
The error is:
Py4JJavaError: An error occurred while calling o198.parquet.
: org.apache
Any clue? Thanks.
On Wed, Oct 31, 2018 at 8:29 PM Lian Jiang wrote:
> We have jsonl files each of which is compressed as gz file. Is it possible
> to make SSS to handle such files? Appreciate any help!
>
We have jsonl files each of which is compressed as gz file. Is it possible
to make SSS to handle such files? Appreciate any help!
I solved this issue by using spark 2.3.1 jars copied from the HDP3.0
cluster. Thanks.
On Thu, Aug 30, 2018 at 10:18 AM Lian Jiang wrote:
> Per https://spark.apache.org/docs/latest/building-spark.html, spark 2.3.1
> is built with hadoop 2.6.X by default. This is why I see my fat jar
>
2.3.1 built
with hadoop 2.7. Where can I get spark 2.3.1 built with hadoop 3? Does
spark 2.3.1 support hadoop 3?
Appreciate your help.
On Thu, Aug 30, 2018 at 8:59 AM Lian Jiang wrote:
> Hi,
>
> I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark
> streaming jobs run
Hi,
I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark
streaming jobs running fine in HDP2.6.4 (HADOOP2.7.3, spark 2.2.0) fails in
HDP3:
java.lang.IllegalAccessError: class
org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
org.apache.hadoop.hdfs.web.TokenA
Hi,
I am considering tools to load hbase data using spark. One choice is
https://github.com/Huawei-Spark/Spark-SQL-on-HBase. However, this seems to
be out-of-date (e.g. "This version of 1.0.0 requires Spark 1.4.0."). Which
tool should I use for this purpose? Thanks for any hint.
Thanks for any help!
On Mon, Apr 23, 2018 at 11:46 AM, Lian Jiang wrote:
> Hi,
>
> I am using structured spark streaming which reads jsonl files and writes
> into parquet files. I am wondering what's the process if jsonl files schema
> change.
>
> Suppose jsonl fil
Hi,
I am using structured spark streaming which reads jsonl files and writes
into parquet files. I am wondering what's the process if jsonl files schema
change.
Suppose jsonl files are generated in \jsonl folder and the old schema is {
"field1": String}. My proposal is:
1. write the jsonl files
Hi,
My spark jobs need to talk to hbase and I am not sure which spark hbase
connector is recommended:
https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/
https://phoenix.apache.org/phoenix_spark.html
Or there is any other better solutions. Appreciate any guidance.
I have a parquet which has an id field which is the hash of the composite
key fields. Is it possible to maintain the uniqueness of the id field when
appending new data which may duplicate with existing records in the
parquet? Thanks!
to drop
> data older than x days outside streaming job.
>
> Sunil Parmar
>
> On Wed, Mar 14, 2018 at 11:36 AM, Lian Jiang
> wrote:
>
>> I have a spark structured streaming job which dump data into a parquet
>> file. To avoid the parquet file grows infinitely, I want
I have a spark structured streaming job which dump data into a parquet
file. To avoid the parquet file grows infinitely, I want to discard 3 month
old data. Does spark streaming supports this? Or I need to stop the
streaming job, trim the parquet file and restart the streaming job? Thanks
for any h
ster$.main(ApplicationMaster.scala:766)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Any idea?
On Tue, Mar 6, 2018 at 4:17 PM, Lian Jiang wrote:
> I am using HDP 2.6.4 and have followed https://docs.hortonworks.com/
> HDPDocuments/HDP2/HDP-2.6.1/bk
I am using HDP 2.6.4 and have followed
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_spark-component-guide/content/ch_oozie-spark-action.html
to make oozie use spark2.
After this, I found there are still a bunch of issues:
1. oozie and spark tries to add the same jars multiple time
Thanks Vijay. After changing the programming model (create a context class
for the workers), it finally worked for me. Cheers.
On Fri, Feb 23, 2018 at 5:42 PM, vijay.bvp wrote:
> when HTTP connection is opened you are opening a connection between
> specific
> machine (with IP and NIC card) to an
Hi Vijay,
Should HTTPConnection() (or any other object created per partition) be
serializable so that your code work? If so, the usage seems to be limited.
Sometimes, the error caused by a non-serializable object can be very
misleading (e.g. "Return statements aren't allowed in Spark closures")
i
ok for Android <https://aka.ms/ghei36>
>
> ------
> *From:* Lian Jiang
> *Sent:* Wednesday, February 21, 2018 4:16:08 PM
> *To:* user
> *Subject:* Return statements aren't allowed in Spark closures
>
> I can run below code in spark-shel
I can run below code in spark-shell using yarn client mode.
val csv = spark.read.option("header", "true").csv("my.csv")
def queryYahoo(row: Row) : Int = { return 10; }
csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => {
queryYahoo(r) })}
However, the same code failed when run using s
Thanks Vijay! This is very clear.
On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp wrote:
> I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
> with some token passed, in the code provided so far if you have 2000
> symbols, it will make 2000 new connections!! and 2000 API ca
example.
>
> https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-
> apache-spark/
>
> In a nutshell, the non-serialisable code is available to all executors, so
> there is no need for Spark to serialise from the driver to the executors.
>
> Best regards,
e it on a
> distributed file system. Schedule to download newest information every day/
> hour etc. you can store it using a query optimized format such as ORC or
> Parquet. Then you can run queries over it.
>
> On 17. Feb 2018, at 01:10, Lian Jiang wrote:
>
> Hi,
>
&g
> wrote:
>
>> I'm not sure what you mean by it could be hard to serialize complex
>> operations?
>>
>> Regardless I think the question is do you want to parallelize this on
>> multiple machines or just one?
>>
>> On Feb 17, 2018 4:20 PM, "Lian
wrote:
>>
>>> Do you only want to use Scala? Because otherwise, I think with pyspark
>>> and pandas read table you should be able to accomplish what you want to
>>> accomplish.
>>>
>>> Thank you,
>>>
>>> Irving Duran
>>>
&
Hi,
I have a user case:
I want to download S&P500 stock data from Yahoo API in parallel using
Spark. I have got all stock symbols as a Dataset. Then I used below code to
call Yahoo API for each symbol:
case class Symbol(symbol: String, sector: String)
case class Tick(symbol: String, sector: S
You might not have
> access to it that's causing problems.
>
>
>
> Thanks
> Prashanth Thipparthi
>
>
>
>
>
> On 11 Feb 2018 10:45 pm, "Lian Jiang" wrote:
>
> I started spark-shell with below command:
>
> spark-shell --master yarn --conf s
I started spark-shell with below command:
spark-shell --master yarn --conf spark.sql.warehouse.dir="/user/spark"
In spark-shell, below statement can create a managed table using
/user/spark HDFS folder:
spark.sql("CREATE TABLE t5 (i int) USING PARQUET")
However, below statements still use spark
names had to be in small letters, but that was in MYSQL.
>>
>>
>> Regards,
>> Gourav
>>
>> On Sun, Feb 11, 2018 at 2:26 AM, Lian Jiang
>> wrote:
>>
>>> Hi,
>>>
>>> I am following https://spark.apache.org/docs/latest/sql-
>>&g
Hi,
I am following
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
to query oracle database 12.1 from spark shell 2.11.8.
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS =
(PROTOCOL = TCP)(HOST = 129.106
50 matches
Mail list logo