Albertus Kelvin created AIRFLOW-6214: ----------------------------------------
Summary: Spark driver status tracking for standalone, YARN, Mesos and K8s with cluster deploy mode Key: AIRFLOW-6214 URL: https://issues.apache.org/jira/browse/AIRFLOW-6214 Project: Apache Airflow Issue Type: Improvement Components: hooks, operators Affects Versions: 1.10.6 Reporter: Albertus Kelvin Based on the following code snippet: {code:python} def _resolve_should_track_driver_status(self): return ('spark://' in self._connection['master'] and self._connection['deploy_mode'] == 'cluster') {code} It seems that the above code will always return *False* because the master address for standalone cluster doesn't contain *spark://* as shown from the below code snippet. {code:python} conn = self.get_connection(self._conn_id) if conn.port: conn_data['master'] = "{}:{}".format(conn.host, conn.port) else: conn_data['master'] = conn.host {code} Additionally, I think this driver status tracker should also be enabled for mesos and kubernetes with cluster mode since the *--status* argument supports all of these cluster managers. Refer to [this|https://github.com/apache/spark/blob/be867e8a9ee8fc5e4831521770f51793e9265550/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala#L543]. For YARN cluster mode, I think we can use built-in commands from yarn itself, such as *yarn application -status <ApplicationID>*. Therefore, the below code snippet should be updated similarly to accommodate such a need. {code:python} def _build_track_driver_status_command(self): # The driver id so we can poll for its status if not self._driver_id: raise AirflowException( "Invalid status: attempted to poll driver " + "status but no driver id is known. Giving up.") if self._connection['master'].startswith("spark://") or self._connection['master'].startswith("mesos://") or self._connection['master'].startswith("k8s://"): # standalone, mesos, kubernetes connection_cmd = self._get_spark_binary_path() connection_cmd += ["--master", self._connection['master']] connection_cmd += ["--status", self._driver_id] else: # yarn connection_cmd = ["yarn application -status"] connection_cmd += [self._driver_id] self.log.debug("Poll driver status cmd: %s", connection_cmd) return connection_cmd {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)