Re: Using Spark Accumulators with Structured Streaming

2020-06-04 Thread ZHANG Wei
t(key, newState); } } ``` -- Cheers, -z On Tue, 2 Jun 2020 10:28:36 +0800 ZHANG Wei wrote: > Yes, verified on the cluster with 5 executors. > > -- > Cheers, > -z > > On Fri, 29 May 2020 11:16:12 -0700 > Something Something wrote: > > > Did you try this

Re: Using Spark Accumulators with Structured Streaming

2020-06-01 Thread ZHANG Wei
Yes, verified on the cluster with 5 executors. -- Cheers, -z On Fri, 29 May 2020 11:16:12 -0700 Something Something wrote: > Did you try this on the Cluster? Note: This works just fine under 'Local' > mode. > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei wrote: > > &

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
gger.info("Query made progress - batchId: {} > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{} > durationMs:{}" , > queryProgress.progress().batchId(), > queryProgress.progress().numInputRows(), > queryProgress.progress().input

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
nd' are > >>> getting populated correctly! > >>> > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: > >>> > >>>> Hello, > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use > >>&g

Re: Unit testing Spark/Scala code with Mockito

2020-05-20 Thread ZHANG Wei
AFAICT, depends on testing goals, Unit Test, Integration Test or E2E Test. For Unit Test, mostly, it tests individual class or class methods. Mockito can help mock and verify dependent instances or methods. For Integration Test, some Spark testing helper methods can setup the environment, such

Re: CSV data source : Garbled Japanese text and handling multilines

2020-05-20 Thread ZHANG Wei
May I get the CSV file's encoding, which can be checked by `file` command? -- Cheers, -z On Tue, 19 May 2020 09:24:24 +0900 Ashika Umagiliya wrote: > In my Spark job (spark 2.4.1) , I am reading CSV files on S3.These files > contain Japanese characters.Also they can have ^M character (u000D)

Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread ZHANG Wei
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and

Re: [PySpark] Tagging descriptions

2020-05-13 Thread ZHANG Wei
o amazon as well as other vendors etc. > > Appreciate your response! > > > > On Tue, May 12, 2020 at 6:23 AM ZHANG Wei wrote: > > > May I get some requirement details? > > > > Such as: > > 1. The row count and one row data size > > 2. The av

Re: [PySpark] Tagging descriptions

2020-05-12 Thread ZHANG Wei
May I get some requirement details? Such as: 1. The row count and one row data size 2. The avg length of text to be parsed by RegEx 3. The sample format of text to be parsed 4. The sample of current RegEx -- Cheers, -z On Mon, 11 May 2020 18:40:49 -0400 Rishi Shah wrote: > Hi All, > > I

Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-07 Thread ZHANG Wei
AFAICT, there might be data skews, some partitions got too much rows, which caused out of memory limitation. Trying .groupBy().count() or .aggregateByKey().count() may help check each partition data size. If no data skew, to increase .groupBy() parameter `numPartitions` is worth a try. --

Re: [Structured Streaming] NullPointerException in long running query

2020-04-29 Thread ZHANG Wei
Is there any chance we also print the least recent failure in stage as the following most recent failure before Driver statcktrace? > >> Caused by: org.apache.spark.SparkException: Job aborted due to stage > >> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost > >> task

Re: Filtering on multiple columns in spark

2020-04-29 Thread ZHANG Wei
AFAICT, maybe Spark SQL built-in functions[1] can help as below: scala> df.show() ++---+ | age| name| ++---+ |null|Michael| | 30| Andy| | 19| Justin| ++---+ scala> df.filter("length(name) == 4 or substring(name, 1, 1) == 'J'").show() +---+--+ |age| name|

Re: Spark ORC store written timestamp as column

2020-04-24 Thread ZHANG Wei
>From what I think I understand, the OrcOutputWriter leverages orc-core to write. I'm wondering if ORC supports the row metadata or not. If not, maybe the org.apache.orc.Writer::addRowBatch() can be overrided to record the metadata after RowBatch written. -- Cheers, -z On Thu, 16 Apr 2020

Re: Save Spark dataframe as dynamic partitioned table in Hive

2020-04-24 Thread ZHANG Wei
AFAICT, we can use spark.sql(s"select $name ..."), name is a value in Scala context[1]. -- Cheers, -z [1] https://docs.scala-lang.org/overviews/core/string-interpolation.html On Fri, 17 Apr 2020 00:10:59 +0100 Mich Talebzadeh wrote: > Thanks Patrick, > > The partition broadcastId is static

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-23 Thread ZHANG Wei
-z [1] -- https://linux.die.net/man/2/accept From: Jungtaek Lim Sent: Wednesday, April 22, 2020 11:21 To: Ruijing Li Cc: Gabor Somogyi; Mich Talebzadeh; ZHANG Wei; user Subject: Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting No, that's not a thin

Re: What is the best way to take the top N entries from a hive table/data source?

2020-04-22 Thread ZHANG Wei
The performance issue might be caused by the parquet table partitions count, only 3. The reader used that partitions count to parallelize extraction. Refer to the log you provided: > spark.sql("select * from db.table limit 100").explain(false) > == Physical Plan == > CollectLimit 100 >

Re: What is the best way to take the top N entries from a hive table/data source?

2020-04-21 Thread ZHANG Wei
https://github.com/apache/spark/pull/7334 may explain the question as below: > This patch preserves this optimization by treating logical Limit operators > specially when they appear as the terminal operator in a query plan: if a > Limit is the final operator, then we will plan a special

Re: Cross Region Apache Spark Setup

2020-04-20 Thread ZHANG Wei
There might be 3 options: 1. Just as you expect, only ONE application, ONE rdd with regioned containers and executors automatically allocated and distributed, the ResourceProfile (https://issues.apache.org/jira/browse/SPARK-27495) may meet the requirement, treating Region as a type of

Re: Can I run Spark executors in a Hadoop cluster from a Kubernetes container

2020-04-20 Thread ZHANG Wei
Looks like you'd like to submit Spark job out of Spark cluster, Apache Livy [https://livy.incubator.apache.org/] worths a try, which provides a REST service for Spark in a Hadoop cluster. Cheers, -z From: mailford...@gmail.com Sent: Thursday, April 16,

Re: How does spark sql evaluate case statements?

2020-04-16 Thread ZHANG Wei
Are you looking for this: https://spark.apache.org/docs/2.4.0/api/sql/#when ? The code generated will look like this in a `do { ... } while (false)` loop: do { ${cond.code} if (!${cond.isNull} && ${cond.value}) { ${res.code} $resultState = (byte)(${res.isNull} ? $HAS_NULL :

Re: Is there any way to set the location of the history for the spark-shell per session?

2020-04-16 Thread ZHANG Wei
You are welcome! It's not in Spark sourcecode. It's in Scala source: https://github.com/scala/scala/blob/2.11.x/src/repl-jline/scala/tools/nsc/interpreter/jline/FileBackedHistory.scala#L26 Reference Code: // For a history file in the standard location, always try to restrict permission,

Re: How to pass a constant value to a partitioned hive table in spark

2020-04-16 Thread ZHANG Wei
> scala> spark.sql($sqltext) > :41: error: not found: value $sqltext > spark.sql($sqltext) ^ +-- should be Scala language Try this: scala> spark.sql(sqltext) -- Cheers, -z On Thu, 16 Apr 2020 08:49:40 +0100 Mich Talebzadeh wrote: > I have

Re: [Spark Core]: Does an executor only cache the partitions it requires for its computations or always the full RDD?

2020-04-16 Thread ZHANG Wei
As far as I know, if you are talking about RDD.cache(), the answer is the executor only caches the partition it requires. Cheers, -z From: zwithouta Sent: Tuesday, April 14, 2020 18:28 To: user@spark.apache.org Subject: [Spark Core]: Does an executor

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-16 Thread ZHANG Wei
The Thread dump result table of Spark UI can provide some clues to find out thread locks issue, such as: Thread ID | Thread Name | Thread State | Thread Locks 13| NonBlockingInputStreamThread | WAITING | Blocked by Thread Some(48)

Re: Is there any way to set the location of the history for the spark-shell per session?

2020-04-16 Thread ZHANG Wei
>From my understanding, you are talking about spark-shell command history, >aren't you? If yes, you can try adding `--conf 'spark.driver.extraJavaOptions=-Dscala.shell.histfile=` into spark-shell command arguments since Spark shell is leveraging Scala REPL JLine file backend history settings.

Re: Spark Streaming not working

2020-04-14 Thread ZHANG Wei
Here is the assertion error message format: s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") You might have to check the kafka service with the error log: > 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24) >

Re: Spark interrupts S3 request backoff

2020-04-14 Thread ZHANG Wei
I will make a guess, it's not interruptted, it's killed by the driver or the resource manager since the executor fallen into sleep for a long time. You may have to find the root cause in the driver and failed executor log contexts. -- Cheers, -z From:

Re: [External Sender] Re: Driver pods stuck in running state indefinitely

2020-04-12 Thread Zhang Wei
I would like to suggest to double check the resolving with logging into the failed node, and try the ping command: ping spark-1586333186571-driver-svc.fractal-segmentation.svc Just my 2 cents. -- Cheers, -z On Fri, 10 Apr 2020 13:03:46 -0400 "Prudhvi Chennuru (CONT)" wrote: > No, there