[jira] [Comment Edited] (AIRFLOW-5385) SparkSubmit status spend lot of time

2019-12-26 Thread t oo (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986097#comment-16986097
 ] 

t oo edited comment on AIRFLOW-5385 at 12/26/19 4:56 PM:
-

[~sergio.soto] one issue is that if the response is like

 {
 "action" : "SubmissionStatusResponse",
 "serverSparkVersion" : "2.3.4",
 "submissionId" : "driver-20151202134243-0014",
 "success" : false
}

ie if the DNS name of spark master has flipped then it will keep polling 
forever AIRFLOW-6229


was (Author: toopt4):
[~sergio.soto] one issue is that if the response is like

 {
 "action" : "SubmissionStatusResponse",
 "serverSparkVersion" : "2.3.4",
 "submissionId" : "driver-20151202134243-0014",
 "success" : false
}

ie if the DNS name of spark master has flipped then it will keep polling forever

> SparkSubmit status spend lot of time
> 
>
> Key: AIRFLOW-5385
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5385
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Affects Versions: 1.10.2
>Reporter: Sergio Soto
>Priority: Blocker
>
> Hello,
> we have an issue with SparkSubmitOperator.  Airflow DAGs shows that some 
> streaming applications breaks out. I analyzed this behaviour. The 
> SparkSubmitHook is the responsable of check the driver status.
> We discovered some timeouts and tried to reproduce checking command. This is 
> an execution with `time`:
> {code:java}
> time /opt/java/jdk1.8.0_181/jre/bin/java -cp 
> /opt/shared/spark/client/conf/:/opt/shared/spark/client/jars/* -Xmx1g 
> org.apache.spark.deploy.SparkSubmit --master 
> spark://spark-master.corp.com:6066 --status driver-20190901180337-2749 
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/09/02 17:05:53 INFO RestSubmissionClient: Submitting a request for the 
> status of submission driver-20190901180337-2749 in 
> spark://lgmadbdtpspk01v.corp.logitravelgroup.com:6066.
> 19/09/02 17:05:59 INFO RestSubmissionClient: Server responded with 
> SubmissionStatusResponse:
> {
>   "action" : "SubmissionStatusResponse",
>   "driverState" : "RUNNING",
>   "serverSparkVersion" : "2.2.1",
>   "submissionId" : "driver-20190901180337-2749",
>   "success" : true,
>   "workerHostPort" : "172.25.10.194:45441",
>   "workerId" : "worker-20190821201014-172.25.10.194-45441"
> }
> real 0m11.598s 
> user 0m2.092s 
> sys 0m0.222s{code}
> We analyzed the Scala code and Spark API. This spark-submit status command 
> ends with a http get request to an url. Using curl, this is the time spent by 
> spark master to return status:
> {code:java}
>  time curl 
> "http://spark-master.corp.com:6066/v1/submissions/status/driver-20190901180337-2749;
> {
>   "action" : "SubmissionStatusResponse",
>   "driverState" : "RUNNING",
>   "serverSparkVersion" : "2.2.1",
>   "submissionId" : "driver-20190901180337-2749",
>   "success" : true,
>   "workerHostPort" : "172.25.10.194:45441",
>   "workerId" : "worker-20190821201014-172.25.10.194-45441"
> }
> real  0m0.011s
> user  0m0.000s
> sys   0m0.006s
> {code}
> Task spends 11.59 seconds with spark submit versus 0.011seconds with curl
> How can be this behaviour explained?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (AIRFLOW-5385) SparkSubmit status spend lot of time

2019-12-26 Thread t oo (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003406#comment-17003406
 ] 

t oo edited comment on AIRFLOW-5385 at 12/26/19 4:55 PM:
-

[~sergio.soto] [~Diego García] do u have a PR?

BEFORE
connection_cmd = self._get_spark_binary_path()

# The url ot the spark master
connection_cmd += ["--master", self._connection['master']]

# The driver id so we can poll for its status
if self._driver_id:
connection_cmd += ["--status", self._driver_id]
else:
raise AirflowException(
"Invalid status: attempted to poll driver " +
"status but no driver id is known. Giving up.")

AFTER
#connection_cmd = self._get_spark_binary_path()
#SPARK-27491 - spark 2.3.x status does not work

# The url ot the spark master
#connection_cmd += ["--master", self._connection['master']]

#https://jira.apache.org/jira/browse/AIRFLOW-5385
curl_max_wait_time = 30
spark_host = self._connection['master'].replace("spark://", "http://;)
connection_cmd = ["/usr/bin/curl", "--max-time", 
str(curl_max_wait_time), 
"{host}/v1/submissions/status/{submission_id}".format(host=spark_host, 
submission_id=self._driver_id)]
self.log.info(connection_cmd)

# The driver id so we can poll for its status
if self._driver_id:
pass
#connection_cmd += ["--status", self._driver_id]
else:
raise AirflowException(
"Invalid status: attempted to poll driver " +
"status but no driver id is known. Giving up.")


another thing I notice is the polling every second is too frequent so:

contrib/hooks/spark_submit_hook.py Poll spark server at a custom interval 
instead of every second

 

*BEFORE*
 # Sleep for 1 second as we do not want to spam the cluster
time.sleep(1)

 

*AFTER*

import airflow
from airflow import configuration as conf

Sleep for n second as we do not want to spam the cluster
_poll_interval = conf.getint('sparksubmit', 'poll_interval')
time.sleep(_poll_interval)


was (Author: toopt4):
[~sergio.soto] [~Diego García] do u have a PR?

another thing I notice is the polling every second is too frequent so:

contrib/hooks/spark_submit_hook.py Poll spark server at a custom interval 
instead of every second

 

*BEFORE*
 # Sleep for 1 second as we do not want to spam the cluster
time.sleep(1)

 

*AFTER*

import airflow
from airflow import configuration as conf

Sleep for n second as we do not want to spam the cluster
_poll_interval = conf.getint('sparksubmit', 'poll_interval')
time.sleep(_poll_interval)

> SparkSubmit status spend lot of time
> 
>
> Key: AIRFLOW-5385
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5385
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Affects Versions: 1.10.2
>Reporter: Sergio Soto
>Priority: Blocker
>
> Hello,
> we have an issue with SparkSubmitOperator.  Airflow DAGs shows that some 
> streaming applications breaks out. I analyzed this behaviour. The 
> SparkSubmitHook is the responsable of check the driver status.
> We discovered some timeouts and tried to reproduce checking command. This is 
> an execution with `time`:
> {code:java}
> time /opt/java/jdk1.8.0_181/jre/bin/java -cp 
> /opt/shared/spark/client/conf/:/opt/shared/spark/client/jars/* -Xmx1g 
> org.apache.spark.deploy.SparkSubmit --master 
> spark://spark-master.corp.com:6066 --status driver-20190901180337-2749 
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/09/02 17:05:53 INFO RestSubmissionClient: Submitting a request for the 
> status of submission driver-20190901180337-2749 in 
> spark://lgmadbdtpspk01v.corp.logitravelgroup.com:6066.
> 19/09/02 17:05:59 INFO RestSubmissionClient: Server responded with 
> SubmissionStatusResponse:
> {
>   "action" : "SubmissionStatusResponse",
>   "driverState" : "RUNNING",
>   "serverSparkVersion" : "2.2.1",
>   "submissionId" : "driver-20190901180337-2749",
>   "success" : true,
>   "workerHostPort" : "172.25.10.194:45441",
>   "workerId" : "worker-20190821201014-172.25.10.194-45441"
> }
> real 0m11.598s 
> user 0m2.092s 
> sys 0m0.222s{code}
> We analyzed the Scala code and Spark API. This spark-submit status command 
> ends with a http get request to an url. Using curl, this is the time spent by 
> spark master to return status:
> {code:java}
>  time curl 
> "http://spark-master.corp.com:6066/v1/submissions/status/driver-20190901180337-2749;
> {
>   "action" : "SubmissionStatusResponse",
>   "driverState" : "RUNNING",
>   "serverSparkVersion" : "2.2.1",
>   "submissionId" : "driver-20190901180337-2749",
>   "success" : true,
>   

[jira] [Comment Edited] (AIRFLOW-5385) SparkSubmit status spend lot of time

2019-09-17 Thread t oo (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931099#comment-16931099
 ] 

t oo edited comment on AIRFLOW-5385 at 9/17/19 10:21 PM:
-

i created spark-27491, i noticed airflow is always at 99%cpu and i have 40 
spark-submits running in parallel! thank u!


was (Author: toopt4):
i created spark-27491, i noticed airflow is always at 99%cpu and i have 40 
spark-submits running in parallel! thank u!

 

but i am facing this error with your patch:

[2019-09-17 19:06:01,135] \{__init__.py:1603} INFO - Marking task as 
UP_FOR_RETRY
[2019-09-17 19:06:01,195] \{base_task_runner.py:101} INFO - Job x Traceback 
(most recent call last):
[2019-09-17 19:06:01,195] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/bin/airflow", line 32, in 
[2019-09-17 19:06:01,195] \{base_task_runner.py:101} INFO - Job x 
args.func(args)
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/cli.py", 
line 74, in wrapper
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x return 
f(*args, **kwargs)
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", 
line 523, in run
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x _run(args, 
dag, ti)
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", 
line 442, in _run
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x 
pool=args.pool,
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", 
line 73, in wrapper
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x return 
func(*args, **kwargs)
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/models/__init__.py",
 line 1441, in _run_raw_task
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x result = 
task_copy.execute(context=context)
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/contrib/operators/spark_submit_operator.py",
 line 176, in execute
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x 
self._hook.submit(self._application)
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/contrib/hooks/spark_submit_hook.py",
 line 384, in submit
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x 
self._start_driver_status_tracking()
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/contrib/hooks/spark_submit_hook.py",
 line 514, in _start_driver_status_tracking
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x 
universal_newlines=True)
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x File 
"/usr/lib64/python2.7/subprocess.py", line 394, in __init__
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x errread, 
errwrite)
[2019-09-17 19:06:01,199] \{base_task_runner.py:101} INFO - Job x File 
"/usr/lib64/python2.7/subprocess.py", line 1047, in _execute_child
[2019-09-17 19:06:01,199] \{base_task_runner.py:101} INFO - Job x raise 
child_exception
[2019-09-17 19:06:01,199] \{base_task_runner.py:101} INFO - Job x OSError: 
[Errno 2] No such file or directory
[2019-09-17 19:06:02,898] \{logging_mixin.py:95} INFO - [2019-09-17 
19:06:02,897] \{jobs.py:2566} INFO - Task exited with return code 1

> SparkSubmit status spend lot of time
> 
>
> Key: AIRFLOW-5385
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5385
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Affects Versions: 1.10.2
>Reporter: Sergio Soto
>Priority: Blocker
>
> Hello,
> we have an issue with SparkSubmitOperator.  Airflow DAGs shows that some 
> streaming applications breaks out. I analyzed this behaviour. The 
> SparkSubmitHook is the responsable of check the driver status.
> We discovered some timeouts and tried to reproduce checking command. This is 
> an execution with `time`:
> {code:java}
> time /opt/java/jdk1.8.0_181/jre/bin/java -cp 
> /opt/shared/spark/client/conf/:/opt/shared/spark/client/jars/* -Xmx1g 
> org.apache.spark.deploy.SparkSubmit --master 
> spark://spark-master.corp.com:6066 --status driver-20190901180337-2749 
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/09/02 17:05:53 

[jira] [Comment Edited] (AIRFLOW-5385) SparkSubmit status spend lot of time

2019-09-17 Thread t oo (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931099#comment-16931099
 ] 

t oo edited comment on AIRFLOW-5385 at 9/17/19 7:23 PM:


i created spark-27491, i noticed airflow is always at 99%cpu and i have 40 
spark-submits running in parallel! thank u!

 

but i am facing this error with your patch:

[2019-09-17 19:06:01,135] \{__init__.py:1603} INFO - Marking task as 
UP_FOR_RETRY
[2019-09-17 19:06:01,195] \{base_task_runner.py:101} INFO - Job x Traceback 
(most recent call last):
[2019-09-17 19:06:01,195] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/bin/airflow", line 32, in 
[2019-09-17 19:06:01,195] \{base_task_runner.py:101} INFO - Job x 
args.func(args)
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/cli.py", 
line 74, in wrapper
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x return 
f(*args, **kwargs)
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", 
line 523, in run
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x _run(args, 
dag, ti)
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", 
line 442, in _run
[2019-09-17 19:06:01,196] \{base_task_runner.py:101} INFO - Job x 
pool=args.pool,
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", 
line 73, in wrapper
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x return 
func(*args, **kwargs)
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/models/__init__.py",
 line 1441, in _run_raw_task
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x result = 
task_copy.execute(context=context)
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/contrib/operators/spark_submit_operator.py",
 line 176, in execute
[2019-09-17 19:06:01,197] \{base_task_runner.py:101} INFO - Job x 
self._hook.submit(self._application)
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/contrib/hooks/spark_submit_hook.py",
 line 384, in submit
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x 
self._start_driver_status_tracking()
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/contrib/hooks/spark_submit_hook.py",
 line 514, in _start_driver_status_tracking
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x 
universal_newlines=True)
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x File 
"/usr/lib64/python2.7/subprocess.py", line 394, in __init__
[2019-09-17 19:06:01,198] \{base_task_runner.py:101} INFO - Job x errread, 
errwrite)
[2019-09-17 19:06:01,199] \{base_task_runner.py:101} INFO - Job x File 
"/usr/lib64/python2.7/subprocess.py", line 1047, in _execute_child
[2019-09-17 19:06:01,199] \{base_task_runner.py:101} INFO - Job x raise 
child_exception
[2019-09-17 19:06:01,199] \{base_task_runner.py:101} INFO - Job x OSError: 
[Errno 2] No such file or directory
[2019-09-17 19:06:02,898] \{logging_mixin.py:95} INFO - [2019-09-17 
19:06:02,897] \{jobs.py:2566} INFO - Task exited with return code 1


was (Author: toopt4):
i created spark-27491, i noticed airflow is always at 99%cpu and i have 40 
spark-submits running in parallel! thank u!

> SparkSubmit status spend lot of time
> 
>
> Key: AIRFLOW-5385
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5385
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Affects Versions: 1.10.2
>Reporter: Sergio Soto
>Priority: Blocker
>
> Hello,
> we have an issue with SparkSubmitOperator.  Airflow DAGs shows that some 
> streaming applications breaks out. I analyzed this behaviour. The 
> SparkSubmitHook is the responsable of check the driver status.
> We discovered some timeouts and tried to reproduce checking command. This is 
> an execution with `time`:
> {code:java}
> time /opt/java/jdk1.8.0_181/jre/bin/java -cp 
> /opt/shared/spark/client/conf/:/opt/shared/spark/client/jars/* -Xmx1g 
> org.apache.spark.deploy.SparkSubmit --master 
> spark://spark-master.corp.com:6066 --status driver-20190901180337-2749 
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/09/02 17:05:53 INFO