This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch release-2.19.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.19.0 by this push: new 7e2a165 [BEAM-8939] Cherry-pick: A bash script that cancels stale dataflow jobs new 5286261 Merge pull request #10695 from boyuanzz/cherry-pick 7e2a165 is described below commit 7e2a16533cda9151861a26f716a59c6994491e0c Author: Kamil Wasilewski <kamil.wasilew...@polidea.com> AuthorDate: Tue Jan 14 09:33:45 2020 +0100 [BEAM-8939] Cherry-pick: A bash script that cancels stale dataflow jobs (cherry picked from commit 4d3295c4279a853758e4218b437a92edba63bd89) --- .../jenkins/job_CancelStaleDataflowJobs.groovy | 1 - .test-infra/tools/build.gradle | 25 +---- .test-infra/tools/stale_dataflow_jobs_cleaner.go | 117 --------------------- .test-infra/tools/stale_dataflow_jobs_cleaner.sh | 23 ++++ .../tools/stale_dataflow_jobs_cleaner_test.go | 74 ------------- 5 files changed, 25 insertions(+), 215 deletions(-) diff --git a/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy index a03a1d0..e32d14a 100644 --- a/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy +++ b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy @@ -37,7 +37,6 @@ job("beam_CancelStaleDataflowJobs") { steps { gradle { rootBuildScriptDir(commonJobProperties.checkoutDir) - tasks(':beam-test-tools:check') tasks(':beam-test-tools:cancelStaleDataflowJobs') commonJobProperties.setGradleSwitches(delegate) } diff --git a/.test-infra/tools/build.gradle b/.test-infra/tools/build.gradle index aabeca0..53445b6 100644 --- a/.test-infra/tools/build.gradle +++ b/.test-infra/tools/build.gradle @@ -16,27 +16,6 @@ * limitations under the License. */ -plugins { - id 'org.apache.beam.module' -} - -applyGoNature() - -repositories { mavenCentral() } - -clean { - delete '.gogradle' -} - -golang { - packagePath = 'github.com/apache/beam/.test-infra/tools' -} - -check.dependsOn goTest - -task cancelStaleDataflowJobs(type: com.github.blindpirate.gogradle.Go) { - dependsOn goVendor - go('get golang.org/x/oauth2/google') - go('get google.golang.org/api/dataflow/v1b3') - go('run stale_dataflow_jobs_cleaner.go') +task cancelStaleDataflowJobs(type: Exec) { + commandLine './stale_dataflow_jobs_cleaner.sh' } diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner.go b/.test-infra/tools/stale_dataflow_jobs_cleaner.go deleted file mode 100644 index 6361e27..0000000 --- a/.test-infra/tools/stale_dataflow_jobs_cleaner.go +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "context" - "log" - "strings" - "time" - - "golang.org/x/oauth2/google" - df "google.golang.org/api/dataflow/v1b3" -) - -const ( - longRunningPrefix = "long-running-" -) - -// client contains methods for listing and cancelling jobs, extracted to allow easier testing. -type client interface { - CurrentTime() time.Time - ListJobs(projectId string) ([]*df.Job, error) - CancelJob(job *df.Job) error -} - -// dataflowClient implements the client interface for Google Cloud Dataflow. -type dataflowClient struct { - s *df.ProjectsJobsService -} - -// newDataflowClient creates a new Dataflow ProjectsJobsService. -func newDataflowClient() (*dataflowClient, error) { - ctx := context.Background() - cl, err := google.DefaultClient(ctx, df.CloudPlatformScope) - if err != nil { - return nil, err - } - service, err := df.New(cl) - if err != nil { - return nil, err - } - return &dataflowClient{s: df.NewProjectsJobsService(service)}, nil -} - -// CurrentTime gets the time Now. -func (c dataflowClient) CurrentTime() time.Time { - return time.Now() -} - -// ListJobs lists the active Dataflow jobs for a project. -func (c dataflowClient) ListJobs(projectId string) ([]*df.Job, error) { - resp, err := c.s.Aggregated(projectId).Filter("ACTIVE").Fields("jobs(id,name,projectId,createTime)").Do() - if err != nil { - return nil, err - } - return resp.Jobs, nil -} - -// CancelJob requests the cancellation od a Dataflow job. -func (c dataflowClient) CancelJob(job *df.Job) error { - jobDone := df.Job{ - RequestedState: "JOB_STATE_DONE", - } - _, err := c.s.Update(job.ProjectId, job.Id, &jobDone).Do() - return err -} - -// cleanDataflowJobs cancels stale Dataflow jobs, excluding the longRunningPrefix prefixed jobs. -func cleanDataflowJobs(c client, projectId string, hoursStale float64) error { - now := c.CurrentTime() - jobs, err := c.ListJobs(projectId) - if err != nil { - return err - } - for _, j := range jobs { - t, err := time.Parse(time.RFC3339, j.CreateTime) - if err != nil { - return err - } - hoursSinceCreate := now.Sub(t).Hours() - log.Printf("Job %v %v %v %v %.2f\n", j.ProjectId, j.Id, j.Name, j.CreateTime, hoursSinceCreate) - if hoursSinceCreate > hoursStale && !strings.HasPrefix(j.Name, longRunningPrefix) { - log.Printf("Attempting to cancel %v\n", j.Id) - c.CancelJob(j) - } - } - return nil -} - -func main() { - client, err := newDataflowClient() - if err != nil { - log.Fatalf("Error creating dataflow client, %v", err) - } - // Cancel any jobs older than 3 hours. - err = cleanDataflowJobs(client, "apache-beam-testing", 3.0) - if err != nil { - log.Fatalf("Error cleaning dataflow jobs, %v", err) - } - log.Printf("Done") -} diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner.sh b/.test-infra/tools/stale_dataflow_jobs_cleaner.sh new file mode 100755 index 0000000..66bf880 --- /dev/null +++ b/.test-infra/tools/stale_dataflow_jobs_cleaner.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Cancels active Dataflow jobs older than 3 hours. +# +set -euo pipefail + +gcloud dataflow jobs list --created-before=-P3H --format='value(JOB_ID)' \ +--status=active --region=us-central1 | xargs gcloud dataflow jobs cancel diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go b/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go deleted file mode 100644 index 342052a..0000000 --- a/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - df "google.golang.org/api/dataflow/v1b3" - "reflect" - "testing" - "time" -) - -var ( - currentTime time.Time = time.Now() - jobsReturned = []*df.Job{} - cancelledJobs = []*df.Job{} -) - -type fakeClient struct{} - -func (c fakeClient) ListJobs(projectId string) ([]*df.Job, error) { - return jobsReturned, nil -} - -func (c fakeClient) CancelJob(job *df.Job) error { - cancelledJobs = append(cancelledJobs, job) - return nil -} - -func (c fakeClient) CurrentTime() time.Time { - return currentTime -} - -func helperForJobCancel(t *testing.T, hoursStale float64, jobList []*df.Job, expectedCancelled []*df.Job) { - var c fakeClient - jobsReturned = jobList - cancelledJobs = []*df.Job{} - cleanDataflowJobs(c, "some-project-id", 2.0) - if !reflect.DeepEqual(cancelledJobs, expectedCancelled) { - t.Errorf("Cancelled arrays not as expected actual=%v, expected=%v", cancelledJobs, expectedCancelled) - } -} - -func TestEmptyJobList(t *testing.T) { - helperForJobCancel(t, 2.0, []*df.Job{}, []*df.Job{}) -} - -func TestNotExpiredJob(t *testing.T) { - // Just under 2 hours. - createTime := currentTime.Add(-(2*time.Hour - time.Second)) - helperForJobCancel(t, 2.0, []*df.Job{&df.Job{CreateTime: createTime.Format(time.RFC3339)}}, []*df.Job{}) -} - -func TestExpiredJob(t *testing.T) { - // Just over 2 hours. - createTime := currentTime.Add(-(2*time.Hour + time.Second)) - job := &df.Job{CreateTime: createTime.Format(time.RFC3339)} - helperForJobCancel(t, 2.0, []*df.Job{job}, []*df.Job{job}) -}