melin commented on issue #524:
URL: 
https://github.com/apache/spark-kubernetes-operator/issues/524#issuecomment-4427209326

   目前产品中的实现,希望对有需要的人提供帮助
   
   <img width="1191" height="187" alt="Image" 
src="https://github.com/user-attachments/assets/c72d71d2-ed89-4511-b50b-f4e304c6d861";
 />
   
   ```
   import static 
io.github.melin.jobserver.spark.submission.k8s.Constants.JOBSERVER_NAME;
   import static 
io.github.melin.jobserver.spark.submission.k8s.Constants.JOBSERVER_NAME_LABEL;
   import static 
io.github.melin.jobserver.spark.submission.k8s.Constants.SPARK_APP_ID_LABEL;
   import static 
io.github.melin.jobserver.spark.submission.k8s.Constants.SPARK_POD_DRIVER_ROLE;
   import static 
io.github.melin.jobserver.spark.submission.k8s.Constants.SPARK_POD_EXECUTOR_ROLE;
   import static 
io.github.melin.jobserver.spark.submission.k8s.Constants.SPARK_ROLE_LABEL;
   import static 
io.github.melin.jobserver.spark.support.leader.LeaderTypeEnum.COLLECT_FLINK_POD_LOG;
   
   import com.gitee.melin.bee.util.ThreadUtils;
   import io.fabric8.kubernetes.api.model.Pod;
   import io.fabric8.kubernetes.client.KubernetesClient;
   import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
   import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
   import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
   import io.github.melin.jobserver.spark.common.entity.CloudRegionEntity;
   import io.github.melin.jobserver.spark.common.entity.JobServerEntity;
   import io.github.melin.jobserver.spark.common.model.ClusterId;
   import io.github.melin.jobserver.spark.common.model.EngineId;
   import io.github.melin.jobserver.spark.common.util.DateUtils;
   import io.github.melin.jobserver.spark.common.util.FsUtils;
   import io.github.melin.jobserver.spark.service.CloudRegionService;
   import io.github.melin.jobserver.spark.service.JobServerService;
   import io.github.melin.jobserver.spark.support.cluster.ClusterEngineManager;
   import io.github.melin.jobserver.spark.support.cluster.ClusterManager;
   import io.github.melin.jobserver.spark.support.leader.RedisLeaderElection;
   import java.io.InputStream;
   import java.util.ArrayList;
   import java.util.List;
   import java.util.concurrent.ScheduledExecutorService;
   import java.util.concurrent.TimeUnit;
   import org.apache.commons.lang3.StringUtils;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   import org.springframework.beans.factory.InitializingBean;
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.stereotype.Service;
   
   @Service
   public class SparkPodLogController implements InitializingBean {
       private static final Logger LOG = 
LoggerFactory.getLogger(SparkPodLogController.class);
   
       @Autowired
       private ClusterManager clusterManager;
   
       @Autowired
       private ClusterEngineManager clusterEngineManager;
   
       @Autowired
       private CloudRegionService cloudRegionService;
   
       @Autowired
       private JobServerService jobServerService;
   
       @Autowired
       private RedisLeaderElection redisLeaderElection;
   
       private List<ClusterId> clusterIds = new ArrayList<>();
   
       private final ScheduledExecutorService executorService =
               
ThreadUtils.newDaemonSingleThreadScheduledExecutor("check-informer-register");
   
       @Override
       public void afterPropertiesSet() throws Exception {
           redisLeaderElection.buildLeader(COLLECT_FLINK_POD_LOG);
   
           executorService.scheduleAtFixedRate(
                   () -> {
                       clusterManager.getK8sClientCache().forEach((clusterId, 
kubernetesClient) -> {
                           try {
                               if 
(redisLeaderElection.checkLeader(COLLECT_FLINK_POD_LOG)) {
                                   if (!clusterIds.contains(clusterId)) {
                                       registInformer(kubernetesClient, 
clusterId);
                                   }
                               } else {
                                   
kubernetesClient.informers().stopAllRegisteredInformers();
                                   clusterIds.remove(clusterId);
                               }
                           } catch (Exception e) {
                               LOG.error("❌ 注册 informer 失败: {}, error: {}", 
clusterId, e.getMessage());
                           }
                       });
                   },
                   10,
                   60,
                   TimeUnit.SECONDS);
       }
   
       public synchronized void registInformer(KubernetesClient 
kubernetesClient, ClusterId clusterId) {
           if (redisLeaderElection.checkLeader(COLLECT_FLINK_POD_LOG)) {
               SharedInformerFactory factory = kubernetesClient.informers();
   
               // 2. 创建监听器,过滤带有 spark-role=driver 标签的 Pod
               SharedIndexInformer<Pod> informer = 
factory.sharedIndexInformerFor(Pod.class, 30 * 1000L);
   
               CloudRegionEntity region = 
cloudRegionService.queryRegion(clusterId.getRegionCode());
               String schema = region.getStorageType().getSchema();
               String prefix = schema + region.getLogBucket();
               informer.addEventHandler(new 
SparkResourceEventHandler(clusterId, kubernetesClient, prefix));
   
               LOG.info("🚀 {} 控制器已启动,正在监控 Spark Pods...", clusterId);
               factory.startAllRegisteredInformers();
           } else {
               kubernetesClient.informers().stopAllRegisteredInformers();
           }
       }
   
       private class SparkResourceEventHandler implements 
ResourceEventHandler<Pod> {
   
           private ClusterId clusterId;
   
           private KubernetesClient kubernetesClient;
   
           private String prefix;
   
           public SparkResourceEventHandler(ClusterId clusterId, 
KubernetesClient kubernetesClient, String prefix) {
               this.clusterId = clusterId;
               this.kubernetesClient = kubernetesClient;
               this.prefix = prefix;
           }
   
           @Override
           public void onAdd(Pod pod) {
               /* 暂不处理 */
           }
   
           @Override
           public void onUpdate(Pod oldPod, Pod newPod) {
               // 检查是否为 Spark Driver
               String role = 
newPod.getMetadata().getLabels().get(SPARK_ROLE_LABEL);
               if (!(SPARK_POD_DRIVER_ROLE.equals(role) || 
SPARK_POD_EXECUTOR_ROLE.equals(role))) {
                   return;
               }
   
               String jobserverName = 
newPod.getMetadata().getLabels().get(JOBSERVER_NAME_LABEL);
               if (!JOBSERVER_NAME.equals(jobserverName)) {
                   return;
               }
   
               String phase = newPod.getStatus().getPhase();
               String oldPhase = oldPod.getStatus().getPhase();
   
               // 只有当状态从非终态变为终态 (Succeeded/Failed) 时触发
               if (!phase.equals(oldPhase) && ("Succeeded".equals(phase) || 
"Failed".equals(phase))) {
                   handleFinishedPod(clusterId, kubernetesClient, newPod, role);
               }
           }
   
           @Override
           public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
               String podName = pod.getMetadata().getName();
               // 检查是否为 Spark Driver
               String role = 
pod.getMetadata().getLabels().get(SPARK_ROLE_LABEL);
               if (!(SPARK_POD_DRIVER_ROLE.equals(role) || 
SPARK_POD_EXECUTOR_ROLE.equals(role))) {
                   return;
               }
   
               String jobserverName = 
pod.getMetadata().getLabels().get(JOBSERVER_NAME_LABEL);
               if (!JOBSERVER_NAME.equals(jobserverName)) {
                   return;
               }
   
               LOG.info("Deleted Pod: {}, status: {}", podName, 
pod.getStatus().getPhase());
           }
   
           private void handleFinishedPod(ClusterId clusterId, KubernetesClient 
kubernetesClient, Pod pod, String role) {
               String podName = pod.getMetadata().getName();
               String namespace = pod.getMetadata().getNamespace();
               String engineCode = 
pod.getMetadata().getLabels().get("engineCode");
               EngineId engineId = EngineId.of(clusterId, engineCode);
               String applicationId = 
pod.getMetadata().getLabels().get(SPARK_APP_ID_LABEL);
               String podPhase = pod.getStatus().getPhase();
   
               JobServerEntity jobServer = 
jobServerService.queryJobServerByAppId(applicationId);
               if (jobServer == null) {
                   LOG.warn("engine: {}, pod name: {}, jobserver {} not 
exists", engineId, podName, applicationId);
                   return;
               }
   
               try {
                   // 3. 获取日志 (获取最后 1000 行防止数据过大)
                   String logs = kubernetesClient
                           .pods()
                           .inNamespace(namespace)
                           .withName(podName)
                           .tailingLines(1000)
                           .getLog();
   
                   String createdDate = 
DateUtils.formatDate(jobServer.getGmtCreated());
                   String path = prefix + "/spark_logs/" + createdDate + "/" + 
applicationId;
   
                   if (SPARK_POD_DRIVER_ROLE.equals(role)) {
                       clusterEngineManager.runSecured(engineId, (fileSystem) 
-> {
                           String logPath = path + "/driver.log";
                           InputStream logInputStream = kubernetesClient
                                   .pods()
                                   .inNamespace(namespace)
                                   .withName(podName)
                                   .getLogInputStream();
                           FsUtils.uploadFile(fileSystem, logInputStream, 
logPath);
                           LOG.info("Pod: {}, status: {}, log path: {}", 
podName, podPhase, logPath);
                           return null;
                       });
                   } else if (SPARK_POD_EXECUTOR_ROLE.equals(role)) {
                       String executorId = StringUtils.substringAfter(podName, 
"exec-");
                       clusterEngineManager.runSecured(engineId, (fileSystem) 
-> {
                           String logPath = path + "/executor/executor-" + 
executorId + ".log";
                           InputStream logInputStream = kubernetesClient
                                   .pods()
                                   .inNamespace(namespace)
                                   .withName(podName)
                                   .getLogInputStream();
                           FsUtils.uploadFile(fileSystem, logInputStream, 
logPath);
                           LOG.info("Pod: {}, status: {}, log path: {}", 
podName, podPhase, logPath);
                           return null;
                       });
                   }
               } catch (Exception e) {
                   LOG.error("❌ Pod: {} 处理日志失败: {}", podName, e.getMessage());
               }
           }
       }
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to