[SparkSQL] Full Join Return Null Value For Funtion-Based Column

2021-01-17 Thread 刘 欢
Hi All:
Here I got two tables:

Table A
name
num
tom
2
jerry
3
jerry
4
null
null






Table B
name
score
tom
12
jerry
10
jerry
8
null
null






When i use spark.sql() to get result from A and B with sql :


select
  a.name as aName,
  a.date,
  b.name as bName
from
(
select
  name,
  date_format(now(),'-MM-dd') AS date
from
  A
group by
  name
) a
FULL JOIN
(
select
  name
from
  B
group by
  name
) b
ON a.name = b.name

I got results contain ALL NULL VALUE ROW like:

aName
date
bName
null
null
null
…
…
…

Can anyone explains why all null value row appears?






Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Shiao-An Yuan
Hi,
I am using Spark 2.4.4 standalone mode.

On Mon, Jan 18, 2021 at 4:26 AM Sean Owen  wrote:

> Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?
>
> On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan 
> wrote:
>
>> Hi folks,
>>
>> I finally found the root cause of this issue.
>> It can be easily reproduced by the following code.
>> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
>> environment.
>>
>> ```
>> import org.apache.spark.TaskContext
>> import scala.sys.process._
>> import org.apache.spark.sql.functions._
>> import com.google.common.hash.Hashing
>> val murmur3 = Hashing.murmur3_32()
>>
>> // create a Dataset with the cardinality of the second element equals
>> 5.
>> val ds = spark.range(0, 10, 1, 130).map(i =>
>> (murmur3.hashLong(i).asInt(), i/2))
>>
>> ds.groupByKey(_._2)
>>   .agg(first($"_1").as[Long])
>>   .repartition(200)
>>   .map { x =>
>> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
>> == 100 && TaskContext.get.stageAttemptNumber == 0) {
>>   throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
>> }
>> x
>>   }
>>   .map(_._2).distinct().count()   // the correct result is 5, but we
>> always got fewer number
>> ```
>>
>> The problem here is SPARK-23207 use sorting to make
>> RoundRobinPartitioning always generate the same distribution,
>> but the UDAF `first` may return non-deterministic results and caused the
>> sorting result non-deterministic.
>> Therefore, the first stage and the retry stage might have different
>> distribution and cause duplications and loss.
>>
>> Thanks,
>> Shiao-An Yuan
>>
>> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan 
>> wrote:
>>
>>> Hi folks,
>>>
>>> We recently identified a data correctness issue in our pipeline.
>>>
>>> The data processing flow is as follows:
>>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>>> 2. read unprocessed new data
>>> 3. union them and do a `reduceByKey` operation
>>> 4. output a new version of the snapshot
>>> 5. repeat step 1~4
>>>
>>> The simplified version of code:
>>> ```
>>> // schema
>>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>>
>>> // function for reduce
>>> def merge(left: Log, right: Log): Log = {
>>>   Log(pkey = left.pkey
>>>   a= if (left.a!=null) left.a else right.a,
>>>   b= if (left.a!=null) left.b else right.b,
>>>   ...
>>>   )
>>> }
>>>
>>> // a very large parquet file (>10G, 200 partitions)
>>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> // multiple small parquet files
>>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>>   .groupByKey(new String(pkey))  // generate key
>>>   .reduceGroups(_.merge(_))//
>>> spark.sql.shuffle.partitions=200
>>>   .map(_._2) // drop key
>>>
>>> newSnapshot
>>>   .repartition(60)  // (1)
>>>   .write.parquet(newPath)
>>> ```
>>>
>>> The issue we have is that some data were duplicated or lost, and the
>>> amount of
>>> duplicated and loss data are similar.
>>>
>>> We also noticed that this situation only happens if some instances got
>>> preempted. Spark will retry the stage, so some of the partitioned files
>>> are
>>> generated at the 1st time, and other files are generated at the
>>> 2nd(retry) time.
>>> Moreover, those duplicated logs will be duplicated exactly twice and
>>> located in
>>> both batches (one in the first batch; and one in the second batch).
>>>
>>> The input/output files are parquet on GCS. The Spark version is 2.4.4
>>> with
>>> standalone deployment. Workers running on GCP preemptible instances and
>>> they
>>> being preempted very frequently.
>>>
>>> The pipeline is running in a single long-running process with
>>> multi-threads,
>>> each snapshot represent an "hour" of data, and we do the
>>> "read-reduce-write" operations
>>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>>> snapshot(hour) never process parallelly and the output path always
>>> generated with a timestamp, so those jobs shouldn't affect each other.
>>>
>>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>>> the issue
>>> was gone, but I believe there is still a correctness bug that hasn't
>>> been reported yet.
>>>
>>> We have tried to reproduce this bug on a smaller scale but haven't
>>> succeeded yet. I
>>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>>
>>> Can anyone give me some advice about the following tasks?
>>> Thanks in advance.
>>>
>>> Shiao-An Yuan
>>>
>>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Mich Talebzadeh
Hi Shiao-An,

With regard to your set-up below and I quote:

"The input/output files are parquet on GCS. The Spark version is 2.4.4
with standalone deployment. Workers running on GCP preemptible instances
and they being preempted very frequently."

Am I correct that you have foregone deploying Dataproc clusters on GCP in
favour of selecting some VM boxes, installing your own Spark cluster
running Spark in standalone mode (assuming to save costs $$$). What is the
rationale behind this choice?  Accordingly
 *Compute
Engine might stop (preempt) these instances if it requires access to those
resources for other tasks. Preemptible instances are excess Compute Engine
capacity, so their availability varies with usage*. So what causes some VM
instances to be preempted? I have not touched standalone mode for couple of
years myself. So your ETL process reads the raw snapshots, does some joins
and creates new hourly processed snapshots. There seems to be not an
intermediate stage to verify the sanity of data (data lineage).
Personally I would deploy a database to do this ETL. That would give you an
option to look at your data easier and store everything in a staging area
before final push to the analytics layer.

HTH

LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 29 Dec 2020 at 14:01, Shiao-An Yuan  wrote:

> Hi folks,
>
> We recently identified a data correctness issue in our pipeline.
>
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
>
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>   a= if (left.a!=null) left.a else right.a,
>   b= if (left.a!=null) left.b else right.b,
>   ...
>   )
> }
>
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))  // generate key
>   .reduceGroups(_.merge(_))//
> spark.sql.shuffle.partitions=200
>   .map(_._2) // drop key
>
> newSnapshot
>   .repartition(60)  // (1)
>   .write.parquet(newPath)
> ```
>
> The issue we have is that some data were duplicated or lost, and the
> amount of
> duplicated and loss data are similar.
>
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry)
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and
> located in
> both batches (one in the first batch; and one in the second batch).
>
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and
> they
> being preempted very frequently.
>
> The pipeline is running in a single long-running process with
> multi-threads,
> each snapshot represent an "hour" of data, and we do the
> "read-reduce-write" operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
>
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
> the issue
> was gone, but I believe there is still a correctness bug that hasn't been
> reported yet.
>
> We have tried to reproduce this bug on a smaller scale but haven't
> succeeded yet. I
> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>
> Can anyone give me some advice about the following tasks?
> Thanks in advance.
>
> Shiao-An Yuan
>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Gourav Sengupta
Hi,

I may be wrong, but this looks like a massively complicated solution for
what could have been a simple SQL.

It always seems okay to be to first reduce the complexity and then solve
it, rather than solve a problem which should not even exist in the first
instance.

Regards,
Gourav

On Sun, Jan 17, 2021 at 12:22 PM Shiao-An Yuan 
wrote:

> Hi folks,
>
> I finally found the root cause of this issue.
> It can be easily reproduced by the following code.
> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
> environment.
>
> ```
> import org.apache.spark.TaskContext
> import scala.sys.process._
> import org.apache.spark.sql.functions._
> import com.google.common.hash.Hashing
> val murmur3 = Hashing.murmur3_32()
>
> // create a Dataset with the cardinality of the second element equals
> 5.
> val ds = spark.range(0, 10, 1, 130).map(i =>
> (murmur3.hashLong(i).asInt(), i/2))
>
> ds.groupByKey(_._2)
>   .agg(first($"_1").as[Long])
>   .repartition(200)
>   .map { x =>
> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
> == 100 && TaskContext.get.stageAttemptNumber == 0) {
>   throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
> }
> x
>   }
>   .map(_._2).distinct().count()   // the correct result is 5, but we
> always got fewer number
> ```
>
> The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
> always generate the same distribution,
> but the UDAF `first` may return non-deterministic results and caused the
> sorting result non-deterministic.
> Therefore, the first stage and the retry stage might have different
> distribution and cause duplications and loss.
>
> Thanks,
> Shiao-An Yuan
>
> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan 
> wrote:
>
>> Hi folks,
>>
>> We recently identified a data correctness issue in our pipeline.
>>
>> The data processing flow is as follows:
>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>> 2. read unprocessed new data
>> 3. union them and do a `reduceByKey` operation
>> 4. output a new version of the snapshot
>> 5. repeat step 1~4
>>
>> The simplified version of code:
>> ```
>> // schema
>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>
>> // function for reduce
>> def merge(left: Log, right: Log): Log = {
>>   Log(pkey = left.pkey
>>   a= if (left.a!=null) left.a else right.a,
>>   b= if (left.a!=null) left.b else right.b,
>>   ...
>>   )
>> }
>>
>> // a very large parquet file (>10G, 200 partitions)
>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>
>> // multiple small parquet files
>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>
>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>   .groupByKey(new String(pkey))  // generate key
>>   .reduceGroups(_.merge(_))//
>> spark.sql.shuffle.partitions=200
>>   .map(_._2) // drop key
>>
>> newSnapshot
>>   .repartition(60)  // (1)
>>   .write.parquet(newPath)
>> ```
>>
>> The issue we have is that some data were duplicated or lost, and the
>> amount of
>> duplicated and loss data are similar.
>>
>> We also noticed that this situation only happens if some instances got
>> preempted. Spark will retry the stage, so some of the partitioned files
>> are
>> generated at the 1st time, and other files are generated at the
>> 2nd(retry) time.
>> Moreover, those duplicated logs will be duplicated exactly twice and
>> located in
>> both batches (one in the first batch; and one in the second batch).
>>
>> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
>> standalone deployment. Workers running on GCP preemptible instances and
>> they
>> being preempted very frequently.
>>
>> The pipeline is running in a single long-running process with
>> multi-threads,
>> each snapshot represent an "hour" of data, and we do the
>> "read-reduce-write" operations
>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>> snapshot(hour) never process parallelly and the output path always
>> generated with a timestamp, so those jobs shouldn't affect each other.
>>
>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>> the issue
>> was gone, but I believe there is still a correctness bug that hasn't been
>> reported yet.
>>
>> We have tried to reproduce this bug on a smaller scale but haven't
>> succeeded yet. I
>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>
>> Can anyone give me some advice about the following tasks?
>> Thanks in advance.
>>
>> Shiao-An Yuan
>>
>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Sean Owen
Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?

On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan 
wrote:

> Hi folks,
>
> I finally found the root cause of this issue.
> It can be easily reproduced by the following code.
> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
> environment.
>
> ```
> import org.apache.spark.TaskContext
> import scala.sys.process._
> import org.apache.spark.sql.functions._
> import com.google.common.hash.Hashing
> val murmur3 = Hashing.murmur3_32()
>
> // create a Dataset with the cardinality of the second element equals
> 5.
> val ds = spark.range(0, 10, 1, 130).map(i =>
> (murmur3.hashLong(i).asInt(), i/2))
>
> ds.groupByKey(_._2)
>   .agg(first($"_1").as[Long])
>   .repartition(200)
>   .map { x =>
> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
> == 100 && TaskContext.get.stageAttemptNumber == 0) {
>   throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
> }
> x
>   }
>   .map(_._2).distinct().count()   // the correct result is 5, but we
> always got fewer number
> ```
>
> The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
> always generate the same distribution,
> but the UDAF `first` may return non-deterministic results and caused the
> sorting result non-deterministic.
> Therefore, the first stage and the retry stage might have different
> distribution and cause duplications and loss.
>
> Thanks,
> Shiao-An Yuan
>
> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan 
> wrote:
>
>> Hi folks,
>>
>> We recently identified a data correctness issue in our pipeline.
>>
>> The data processing flow is as follows:
>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>> 2. read unprocessed new data
>> 3. union them and do a `reduceByKey` operation
>> 4. output a new version of the snapshot
>> 5. repeat step 1~4
>>
>> The simplified version of code:
>> ```
>> // schema
>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>
>> // function for reduce
>> def merge(left: Log, right: Log): Log = {
>>   Log(pkey = left.pkey
>>   a= if (left.a!=null) left.a else right.a,
>>   b= if (left.a!=null) left.b else right.b,
>>   ...
>>   )
>> }
>>
>> // a very large parquet file (>10G, 200 partitions)
>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>
>> // multiple small parquet files
>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>
>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>   .groupByKey(new String(pkey))  // generate key
>>   .reduceGroups(_.merge(_))//
>> spark.sql.shuffle.partitions=200
>>   .map(_._2) // drop key
>>
>> newSnapshot
>>   .repartition(60)  // (1)
>>   .write.parquet(newPath)
>> ```
>>
>> The issue we have is that some data were duplicated or lost, and the
>> amount of
>> duplicated and loss data are similar.
>>
>> We also noticed that this situation only happens if some instances got
>> preempted. Spark will retry the stage, so some of the partitioned files
>> are
>> generated at the 1st time, and other files are generated at the
>> 2nd(retry) time.
>> Moreover, those duplicated logs will be duplicated exactly twice and
>> located in
>> both batches (one in the first batch; and one in the second batch).
>>
>> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
>> standalone deployment. Workers running on GCP preemptible instances and
>> they
>> being preempted very frequently.
>>
>> The pipeline is running in a single long-running process with
>> multi-threads,
>> each snapshot represent an "hour" of data, and we do the
>> "read-reduce-write" operations
>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>> snapshot(hour) never process parallelly and the output path always
>> generated with a timestamp, so those jobs shouldn't affect each other.
>>
>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>> the issue
>> was gone, but I believe there is still a correctness bug that hasn't been
>> reported yet.
>>
>> We have tried to reproduce this bug on a smaller scale but haven't
>> succeeded yet. I
>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>
>> Can anyone give me some advice about the following tasks?
>> Thanks in advance.
>>
>> Shiao-An Yuan
>>
>


Re: Running pyspark job from virtual environment

2021-01-17 Thread Mich Talebzadeh
Well. When you or application log in to Linux host (whether a physical tin
box or a virtual node), they execute a script called .bashrc at home
directory.

If it is a scheduled job then it will also execute the same as well.

In my Google Data proc cluster of three (one master and two workers), in
the master node I automatically activate the virtual environment as below

cd /usr/src/Python-3.7.9/environments; source virtualenv/bin/activate

Then I execute spark-submit script as follows:

spark-submit \
 --master yarn \
 --deploy-mode client \
 --jars /home/hduser/jars/spark-bigquery-latest.jar \
   analyze_house_prices_GCP.py

Note that as I understand using virtual environment is only necessary in
the master node, I don't touch worker nodes.

My suggestion is that you first test running a script in your master node
interactively once you have activated your virtual environment and see how
it goes.

HTH



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 17 Jan 2021 at 17:22, rajat kumar 
wrote:

> Hi Mich,
>
> Thanks for response. I am running it through CLI (on the cluster).
>
> Since this will be scheduled job. I do not want to activate the
> environment manually. It should automatically take the path of virtual
> environment to run the job.
>
> For that I saw 3 properties which I mentioned. I think setting  some of
> them to point to environment binary will help to run the job from venv.
>
> PYTHONPATH
> PYSPARK_DRIVER_PYTHON
> PYSPARK_PYTHON
>
> Also, It has to be set in env.sh or bashrc file? What is the difference
> between spark-env.sh and bashrc
>
> Thanks
> Rajat
>
>
>
> On Sun, Jan 17, 2021 at 10:32 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Rajat,
>>
>> Are you running this through an IDE like PyCharm or on CLI?
>>
>> If you already have a Python Virtual environment, then just activate it
>>
>> The only env variable you need to set is export PYTHONPATH that you can
>> do it in your startup shell script .bashrc etc.
>>
>> Once you are in virtual environment, then you run:
>>
>> $SPARK_HOME/bin/spark-submit >
>> Alternatively you can chmod +x > to the file
>>
>> #! /usr/bin/env python3
>>
>> and then you can run it as.
>>
>> ./
>>
>> HTH
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 17 Jan 2021 at 13:41, rajat kumar 
>> wrote:
>>
>>> Hello,
>>>
>>> Can anyone confirm here please?
>>>
>>> Regards
>>> Rajat
>>>
>>> On Sat, Jan 16, 2021 at 11:46 PM rajat kumar 
>>> wrote:
>>>
 Hey Users,

 I want to run spark job from virtual environment using Python.

 Please note I am creating virtual env (using python3 -m venv env)

 I see that there are 3 variables for PYTHON which we have to set:
 PYTHONPATH
 PYSPARK_DRIVER_PYTHON
 PYSPARK_PYTHON

 I have 2 doubts:
 1. If i want to use Virtual env, do I need to point python path of
 virtual environment to all these variables?
 2. Should I set these variables in spark-env.sh or should I set them
 using export statements.

 Regards
 Rajat





Re: Running pyspark job from virtual environment

2021-01-17 Thread rajat kumar
Hi Mich,

Thanks for response. I am running it through CLI (on the cluster).

Since this will be scheduled job. I do not want to activate the environment
manually. It should automatically take the path of virtual environment to
run the job.

For that I saw 3 properties which I mentioned. I think setting  some of
them to point to environment binary will help to run the job from venv.

PYTHONPATH
PYSPARK_DRIVER_PYTHON
PYSPARK_PYTHON

Also, It has to be set in env.sh or bashrc file? What is the difference
between spark-env.sh and bashrc

Thanks
Rajat



On Sun, Jan 17, 2021 at 10:32 PM Mich Talebzadeh 
wrote:

> Hi Rajat,
>
> Are you running this through an IDE like PyCharm or on CLI?
>
> If you already have a Python Virtual environment, then just activate it
>
> The only env variable you need to set is export PYTHONPATH that you can do
> it in your startup shell script .bashrc etc.
>
> Once you are in virtual environment, then you run:
>
> $SPARK_HOME/bin/spark-submit 
> Alternatively you can chmod +x  to the file
>
> #! /usr/bin/env python3
>
> and then you can run it as.
>
> ./
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 17 Jan 2021 at 13:41, rajat kumar 
> wrote:
>
>> Hello,
>>
>> Can anyone confirm here please?
>>
>> Regards
>> Rajat
>>
>> On Sat, Jan 16, 2021 at 11:46 PM rajat kumar 
>> wrote:
>>
>>> Hey Users,
>>>
>>> I want to run spark job from virtual environment using Python.
>>>
>>> Please note I am creating virtual env (using python3 -m venv env)
>>>
>>> I see that there are 3 variables for PYTHON which we have to set:
>>> PYTHONPATH
>>> PYSPARK_DRIVER_PYTHON
>>> PYSPARK_PYTHON
>>>
>>> I have 2 doubts:
>>> 1. If i want to use Virtual env, do I need to point python path of
>>> virtual environment to all these variables?
>>> 2. Should I set these variables in spark-env.sh or should I set them
>>> using export statements.
>>>
>>> Regards
>>> Rajat
>>>
>>>
>>>


Re: Running pyspark job from virtual environment

2021-01-17 Thread Mich Talebzadeh
Hi Rajat,

Are you running this through an IDE like PyCharm or on CLI?

If you already have a Python Virtual environment, then just activate it

The only env variable you need to set is export PYTHONPATH that you can do
it in your startup shell script .bashrc etc.

Once you are in virtual environment, then you run:

$SPARK_HOME/bin/spark-submit 

HTH



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 17 Jan 2021 at 13:41, rajat kumar 
wrote:

> Hello,
>
> Can anyone confirm here please?
>
> Regards
> Rajat
>
> On Sat, Jan 16, 2021 at 11:46 PM rajat kumar 
> wrote:
>
>> Hey Users,
>>
>> I want to run spark job from virtual environment using Python.
>>
>> Please note I am creating virtual env (using python3 -m venv env)
>>
>> I see that there are 3 variables for PYTHON which we have to set:
>> PYTHONPATH
>> PYSPARK_DRIVER_PYTHON
>> PYSPARK_PYTHON
>>
>> I have 2 doubts:
>> 1. If i want to use Virtual env, do I need to point python path of
>> virtual environment to all these variables?
>> 2. Should I set these variables in spark-env.sh or should I set them
>> using export statements.
>>
>> Regards
>> Rajat
>>
>>
>>


Re: Dynamic Spark metrics creation

2021-01-17 Thread Ivan Petrov
Would custom accumulator work for you? It should be do-able for
Map[String,Long] too
https://stackoverflow.com/questions/42293798/how-to-create-custom-set-accumulator-i-e-setstring


‪вс, 17 янв. 2021 г. в 15:16, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yur...@gmail.com>:‬

> Hey Jacek, I’ll clarify myself a bit:
> As bottom line I need following metrics being reported by structured
> streaming:
> Country-USA:7
> Country-Poland: 23
> Country-Brazil: 56
>
> The country names are included in incoming events and unknown at very
> beginning/application startup.
>
> Thus registering accumulator and binding it to metric source at driver
> side on application startup is impossible (unless you register with all
> possible country names - which is waste of Spark memory, polluting metrics
> namespace with 99% of metrics having zero value, and wasting the network
> bandwidth ).
>
>
> Отправлено с iPhone
>
> 17 янв. 2021 г., в 15:56, Jacek Laskowski  написал(а):
>
> 
> Hey Yurii,
>
> > which is unavailable from executors.
>
> Register it on the driver and use accumulators on executors to update the
> values (on the driver)?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> ‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
> yur...@gmail.com> wrote:‬
>
>> Hi all,
>> I have a spark application with Arbitrary Stateful Aggregation
>> implemented with FlatMapGroupsWithStateFunction.
>>
>> I want to make some statistics about incoming events inside
>> FlatMapGroupsWithStateFunction.
>> The statistics are made from some event property which on the one hand
>> has dynamic values but on the other hand - small finite set (thought
>> unknown) of values (e.g. country name).
>>
>> So I thought to register dynamic metrics inside
>> FlatMapGroupsWithStateFunction but as far as I understand, this requires
>> accessing MetricsSystem via SparkEnv.get() which is unavailable from
>> executors.
>>
>> Any thoughts/suggestions?
>>
>> With best regards,
>> Yurii
>>
>>


Re: Dynamic Spark metrics creation

2021-01-17 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Hey Jacek, I’ll clarify myself a bit:
As bottom line I need following metrics being reported by structured streaming:
Country-USA:7
Country-Poland: 23
Country-Brazil: 56

The country names are included in incoming events and unknown at very 
beginning/application startup.

Thus registering accumulator and binding it to metric source at driver side on 
application startup is impossible (unless you register with all possible 
country names - which is waste of Spark memory, polluting metrics namespace 
with 99% of metrics having zero value, and wasting the network bandwidth ).


Отправлено с iPhone

> 17 янв. 2021 г., в 15:56, Jacek Laskowski  написал(а):
> 
> 
> Hey Yurii,
> 
> > which is unavailable from executors.
> 
> Register it on the driver and use accumulators on executors to update the 
> values (on the driver)?
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books
> Follow me on https://twitter.com/jaceklaskowski
> 
> 
> 
> ‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ 
>  wrote:‬
>> Hi all, 
>> I have a spark application with Arbitrary Stateful Aggregation implemented 
>> with FlatMapGroupsWithStateFunction.
>> 
>> I want to make some statistics about incoming events inside 
>> FlatMapGroupsWithStateFunction.
>> The statistics are made from some event property which on the one hand has 
>> dynamic values but on the other hand - small finite set (thought unknown) of 
>> values (e.g. country name).
>> 
>> So I thought to register dynamic metrics inside  
>> FlatMapGroupsWithStateFunction but as far as I understand, this requires 
>> accessing MetricsSystem via SparkEnv.get() which is unavailable from 
>> executors.
>> 
>> Any thoughts/suggestions? 
>> 
>> With best regards,
>> Yurii
>> 


Re: Spark Event Log Forwarding and Offset Tracking

2021-01-17 Thread Jacek Laskowski
Hi,

> Forwarding Spark Event Logs to identify critical events like job start,
executor failures, job failures etc to ElasticSearch via log4j. However I
could not find any way to foward event log via log4j configurations. Is
there any other recommended approach to track these application events?

I'd use SparkListener API (
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/scheduler/SparkListener.html
)

> 2 - For Spark streaming jobs, is there any way to identify that data from
Kafka is not consumed for whatever reason, or the offsets are not
progressing as expected and also forward that to ElasticSearch via log4j
for monitoring

Think SparkListener API would help here too.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Wed, Jan 13, 2021 at 5:15 PM raymond.tan 
wrote:

> Hello here, I am new to spark and am trying to add some monitoring for
> spark applications specifically to handle the below situations - 1 -
> Forwarding Spark Event Logs to identify critical events like job start,
> executor failures, job failures etc to ElasticSearch via log4j. However I
> could not find any way to foward event log via log4j configurations. Is
> there any other recommended approach to track these application events? 2 -
> For Spark streaming jobs, is there any way to identify that data from Kafka
> is not consumed for whatever reason, or the offsets are not progressing as
> expected and also forward that to ElasticSearch via log4j for monitoring
> Thanks, Raymond
> --
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: understanding spark shuffle file re-use better

2021-01-17 Thread Jacek Laskowski
Hi,

An interesting question that I must admit I'm not sure how to answer myself
actually :)

Off the top of my head, I'd **guess** unless you cache the first query
these two queries would share nothing. With caching, there's a phase in
query execution when a canonicalized version of a query is used to look up
any cached queries.

Again, I'm not really sure and if I'd have to answer it (e.g. as part of an
interview) I'd say nothing would be shared / re-used.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Wed, Jan 13, 2021 at 5:39 PM Koert Kuipers  wrote:

> is shuffle file re-use based on identity or equality of the dataframe?
>
> for example if run the exact same code twice to load data and do
> transforms (joins, aggregations, etc.) but without re-using any actual
> dataframes, will i still see skipped stages thanks to shuffle file re-use?
>
> thanks!
> koert
>


Re: Dynamic Spark metrics creation

2021-01-17 Thread Jacek Laskowski
Hey Yurii,

> which is unavailable from executors.

Register it on the driver and use accumulators on executors to update the
values (on the driver)?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
yur...@gmail.com> wrote:‬

> Hi all,
> I have a spark application with Arbitrary Stateful Aggregation implemented
> with FlatMapGroupsWithStateFunction.
>
> I want to make some statistics about incoming events inside
> FlatMapGroupsWithStateFunction.
> The statistics are made from some event property which on the one hand has
> dynamic values but on the other hand - small finite set (thought unknown)
> of values (e.g. country name).
>
> So I thought to register dynamic metrics inside
> FlatMapGroupsWithStateFunction but as far as I understand, this requires
> accessing MetricsSystem via SparkEnv.get() which is unavailable from
> executors.
>
> Any thoughts/suggestions?
>
> With best regards,
> Yurii
>
>


Re: Running pyspark job from virtual environment

2021-01-17 Thread rajat kumar
Hello,

Can anyone confirm here please?

Regards
Rajat

On Sat, Jan 16, 2021 at 11:46 PM rajat kumar 
wrote:

> Hey Users,
>
> I want to run spark job from virtual environment using Python.
>
> Please note I am creating virtual env (using python3 -m venv env)
>
> I see that there are 3 variables for PYTHON which we have to set:
> PYTHONPATH
> PYSPARK_DRIVER_PYTHON
> PYSPARK_PYTHON
>
> I have 2 doubts:
> 1. If i want to use Virtual env, do I need to point python path of virtual
> environment to all these variables?
> 2. Should I set these variables in spark-env.sh or should I set them using
> export statements.
>
> Regards
> Rajat
>
>
>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Shiao-An Yuan
Hi folks,

I finally found the root cause of this issue.
It can be easily reproduced by the following code.
We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
environment.

```
import org.apache.spark.TaskContext
import scala.sys.process._
import org.apache.spark.sql.functions._
import com.google.common.hash.Hashing
val murmur3 = Hashing.murmur3_32()

// create a Dataset with the cardinality of the second element equals 5.
val ds = spark.range(0, 10, 1, 130).map(i =>
(murmur3.hashLong(i).asInt(), i/2))

ds.groupByKey(_._2)
  .agg(first($"_1").as[Long])
  .repartition(200)
  .map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
== 100 && TaskContext.get.stageAttemptNumber == 0) {
  throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
}
x
  }
  .map(_._2).distinct().count()   // the correct result is 5, but we
always got fewer number
```

The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
always generate the same distribution,
but the UDAF `first` may return non-deterministic results and caused the
sorting result non-deterministic.
Therefore, the first stage and the retry stage might have different
distribution and cause duplications and loss.

Thanks,
Shiao-An Yuan

On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan 
wrote:

> Hi folks,
>
> We recently identified a data correctness issue in our pipeline.
>
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
>
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>   a= if (left.a!=null) left.a else right.a,
>   b= if (left.a!=null) left.b else right.b,
>   ...
>   )
> }
>
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))  // generate key
>   .reduceGroups(_.merge(_))//
> spark.sql.shuffle.partitions=200
>   .map(_._2) // drop key
>
> newSnapshot
>   .repartition(60)  // (1)
>   .write.parquet(newPath)
> ```
>
> The issue we have is that some data were duplicated or lost, and the
> amount of
> duplicated and loss data are similar.
>
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry)
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and
> located in
> both batches (one in the first batch; and one in the second batch).
>
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and
> they
> being preempted very frequently.
>
> The pipeline is running in a single long-running process with
> multi-threads,
> each snapshot represent an "hour" of data, and we do the
> "read-reduce-write" operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
>
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
> the issue
> was gone, but I believe there is still a correctness bug that hasn't been
> reported yet.
>
> We have tried to reproduce this bug on a smaller scale but haven't
> succeeded yet. I
> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>
> Can anyone give me some advice about the following tasks?
> Thanks in advance.
>
> Shiao-An Yuan
>