indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  EC2 spot instance requests can fail. Often the root cause is no
  availability of spot instances in the given availability zone.
  
  This commit teaches our CI system to recognize when a spot instance
  request has failed and to retry it.
  
  To do this, we create a pair of new Lambda functions, "spot instance
  request monitor" and "start job." The former is invoked periodically
  via CloudWatch Event. It scans all jobs waiting on a spot instance
  request. If the spot instance request is cancelled, it calls into
  the "start job" function, which simply calls an internal API for
  trying to start a job given a job ID.
  
  Our strategy for retrying spot instance requests is to try to use
  the "next" availability zone for the given region. We will cycle
  through availability zones until a spot instance request is
  acted on. In my testing, this always works. Although it could
  take several minutes to cycle through all availability zones and
  for an availability zone to have instance availability.
  
  There are definitely other strategies we could try. For example,
  we could try another AWS region. Or we could launch an on-demand
  instance after N failures. Or we could launch a difference EC2
  instance type - one with hopefully more availability. Or we could
  use spot fleet requests (which allow launching from multiple
  instance types). The easiest is probably to launch an on-demand
  instance. But until we need this functionality, I'm inclined to
  not build it, as I like the consistency of always using spot
  instances.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D6926

AFFECTED FILES
  contrib/ci/README.rst
  contrib/ci/lambda_functions/ci.py
  contrib/ci/lambda_functions/web.py
  contrib/ci/terraform/job_executor.tf
  contrib/ci/terraform/spot_instance_request_monitor.tf
  contrib/ci/terraform/storage.tf

CHANGE DETAILS

diff --git a/contrib/ci/terraform/storage.tf b/contrib/ci/terraform/storage.tf
--- a/contrib/ci/terraform/storage.tf
+++ b/contrib/ci/terraform/storage.tf
@@ -55,8 +55,21 @@
     name = "job_id"
     type = "S"
   }
+  attribute {
+    name = "execution_state"
+    type = "S"
+  }
 
   hash_key = "job_id"
+
+  # This allows us to easily query for jobs in a specific state.
+  global_secondary_index {
+    name = "execution_state"
+    hash_key = "execution_state"
+    range_key = "job_id"
+    projection_type = "ALL"
+  }
+
 }
 
 # Tracks results for individual tests in each job.
diff --git a/contrib/ci/terraform/spot_instance_request_monitor.tf 
b/contrib/ci/terraform/spot_instance_request_monitor.tf
new file mode 100644
--- /dev/null
+++ b/contrib/ci/terraform/spot_instance_request_monitor.tf
@@ -0,0 +1,94 @@
+# Defines resources to monitor spot instance requests.
+
+resource "aws_cloudwatch_log_group" "lambda_ci_spot_instance_request_monitor" {
+  name = 
"/aws/lambda/${aws_lambda_function.ci_spot_instance_request_monitor.function_name}"
+  retention_in_days = 7
+}
+
+resource "aws_iam_role" "lambda_ci_spot_instance_request_monitor" {
+  name = "lambda-ci-spot-instance-request-monitor"
+  description = "For Lambda function monitoring spot instance requests"
+  assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json
+}
+
+# Function that monitors spot instance requests and retries failed ones.
+resource "aws_lambda_function" "ci_spot_instance_request_monitor" {
+  function_name = "ci-spot-instance-request-monitor"
+  description = "Monitors spot instance requests and triggers activity"
+  filename = data.archive_file.lambda_ci.output_path
+  handler = "ci.handle_spot_instance_request_monitor"
+  source_code_hash = data.archive_file.lambda_ci.output_base64sha256
+  runtime = "python3.7"
+  timeout = 60
+  role = aws_iam_role.lambda_ci_spot_instance_request_monitor.arn
+  environment {
+    variables = {
+      DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name
+      LAMBDA_START_JOB_FUNCTION = 
aws_lambda_function.ci_start_job.function_name
+    }
+  }
+}
+
+data "aws_iam_policy_document" "lambda_ci_spot_instance_request_monitor" {
+  # Allow Lambda function to write CloudWatch events.
+  statement {
+    effect = "Allow"
+    actions = [
+      "logs:CreateLogGroup",
+      "logs:CreateLogStream",
+      "logs:PutLogEvents",
+    ]
+    resources = 
[aws_cloudwatch_log_group.lambda_ci_spot_instance_request_monitor.arn]
+  }
+  # Allow querying spot instance requests.
+  statement {
+    effect = "Allow"
+    actions = [
+      "ec2:DescribeSpotInstanceRequests",
+    ]
+    resources = ["*"]
+  }
+  # Allow fetching job state from DynamoDB.
+  statement {
+    effect = "Allow"
+    actions = [
+      "dynamodb:Query",
+    ]
+    resources = [
+      aws_dynamodb_table.ci_job.arn,
+      "${aws_dynamodb_table.ci_job.arn}/*",
+    ]
+  }
+  # Allow invoking the start job Lambda function.
+  statement {
+    effect = "Allow"
+    actions = ["lambda:InvokeFunction"]
+    resources = [aws_lambda_function.ci_start_job.arn]
+  }
+}
+
+resource "aws_iam_role_policy" "lambda_ci_spot_instance_request_monitor" {
+  role = aws_iam_role.lambda_ci_spot_instance_request_monitor.name
+  name = aws_iam_role.lambda_ci_spot_instance_request_monitor.name
+  policy = 
data.aws_iam_policy_document.lambda_ci_spot_instance_request_monitor.json
+}
+
+# Periodically trigger the Lambda function so state is continuously monitored.
+resource "aws_cloudwatch_event_rule" 
"trigger_ci_spot_instance_request_monitor" {
+  name = "trigger-ci-spot-instance-request-monitor"
+  description = "Trigger monitoring of spot instance requests"
+  schedule_expression = "rate(1 minute)"
+}
+
+resource "aws_cloudwatch_event_target" "ci_spot_instance_request_monitor" {
+  rule = 
aws_cloudwatch_event_rule.trigger_ci_spot_instance_request_monitor.name
+  arn = aws_lambda_function.ci_spot_instance_request_monitor.arn
+}
+
+resource "aws_lambda_permission" 
"ci_spot_instance_request_monitor_allow_cloudwatch" {
+  statement_id = "AllowExecutionFromCloudWatch"
+  action = "lambda:InvokeFunction"
+  function_name = 
aws_lambda_function.ci_spot_instance_request_monitor.function_name
+  principal = "events.amazonaws.com"
+  source_arn = 
aws_cloudwatch_event_rule.trigger_ci_spot_instance_request_monitor.arn
+}
diff --git a/contrib/ci/terraform/job_executor.tf 
b/contrib/ci/terraform/job_executor.tf
--- a/contrib/ci/terraform/job_executor.tf
+++ b/contrib/ci/terraform/job_executor.tf
@@ -204,3 +204,60 @@
   principal = "events.amazonaws.com"
   source_arn = aws_cloudwatch_event_rule.trigger_instance_state_change.arn
 }
+
+resource "aws_cloudwatch_log_group" "lambda_ci_start_job" {
+  name = "/aws/lambda/${aws_lambda_function.ci_start_job.function_name}"
+  retention_in_days = 7
+}
+
+resource "aws_iam_role" "lambda_ci_start_job" {
+  name = "lambda-ci-start-job"
+  description = "For Lambda function to trigger job start"
+  assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json
+}
+
+# Lambda function for starting a job. This is similar to `run_pending_job`
+# except it isn't a handler for SQS events.
+resource "aws_lambda_function" "ci_start_job" {
+  function_name = "ci-start-job"
+  description = "Starts a scheduled CI job"
+  filename = data.archive_file.lambda_ci.output_path
+  handler = "ci.handle_start_job"
+  source_code_hash = data.archive_file.lambda_ci.output_base64sha256
+  runtime = "python3.7"
+  timeout = 60
+  role = aws_iam_role.lambda_ci_start_job.arn
+  environment {
+    variables = {
+      DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name
+    }
+  }
+}
+
+# Inherit the policy from run_pending_job.
+resource "aws_iam_role_policy" "lambda_ci_start_job_run_pending_job" {
+  role = aws_iam_role.lambda_ci_start_job.name
+  name = "run-pending-job"
+  policy = data.aws_iam_policy_document.lambda_ci_run_pending_job.json
+}
+
+# Add supplement with additional policy.
+data "aws_iam_policy_document" "lambda_ci_start_job" {
+  # Allow Lambda function to write CloudWatch events.
+  statement {
+    effect = "Allow"
+    actions = [
+      "logs:CreateLogGroup",
+      "logs:CreateLogStream",
+      "logs:PutLogEvents",
+    ]
+    resources = [
+      aws_cloudwatch_log_group.lambda_ci_start_job.arn]
+  }
+}
+
+resource "aws_iam_role_policy" "lambda_ci_start_job" {
+  role = aws_iam_role.lambda_ci_start_job.name
+  name = aws_iam_role.lambda_ci_start_job.name
+  policy = data.aws_iam_policy_document.lambda_ci_start_job.json
+}
diff --git a/contrib/ci/lambda_functions/web.py 
b/contrib/ci/lambda_functions/web.py
--- a/contrib/ci/lambda_functions/web.py
+++ b/contrib/ci/lambda_functions/web.py
@@ -164,7 +164,8 @@
                     else:
                         skip_count = 'n/a'
 
-                    if job_info['execution_state'] in ('pending', 
'spot-instance-requested', 'running'):
+                    if job_info['execution_state'] in (
+                        'pending', 'spot-instance-requested', 'cancelled', 
'running'):
                         job_state = job_info['execution_state']
                     elif job_info['execution_state'] == 'done':
                         exit_clean = job_info.get('exit_clean')
diff --git a/contrib/ci/lambda_functions/ci.py 
b/contrib/ci/lambda_functions/ci.py
--- a/contrib/ci/lambda_functions/ci.py
+++ b/contrib/ci/lambda_functions/ci.py
@@ -68,6 +68,32 @@
         start_pending_job(ec2, job_table, job_id)
 
 
+def handle_start_job(event, context):
+    """Handler for ci-start-job function."""
+    ec2 = boto3.client('ec2')
+    dynamodb = boto3.resource('dynamodb')
+
+    job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE'])
+
+    job_id = event['job_id']
+    print('received request to start job %s' % job_id)
+
+    start_pending_job(ec2, job_table, job_id)
+
+
+def handle_spot_instance_request_monitor(event, context):
+    """Handler to invoke spot instance request monitor."""
+    ec2 = boto3.client('ec2')
+    dynamodb = boto3.resource('dynamodb')
+    lambda_client = boto3.client('lambda')
+
+    job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE'])
+    start_job_function = os.environ['LAMBDA_START_JOB_FUNCTION']
+
+    monitor_spot_instance_requests(ec2, job_table, lambda_client,
+                                   start_job_function)
+
+
 def handle_job_result_s3_artifact(event, context):
     """Handler called when a new S3 object job artifact is uploaded."""
     dynamodb = boto3.resource('dynamodb')
@@ -455,14 +481,69 @@
     job_id = job['job_id']
     print('requesting spot instance for job %s' % job_id)
 
+    availability_zones = [
+        az['ZoneName']
+        for az in ec2.describe_availability_zones()['AvailabilityZones']
+        if az['State'] == 'available']
+
     # Fresh job request.
     if job['execution_state'] == 'pending':
         # Pick an availability zone randomly.
-        availability_zones = [
-            az['ZoneName']
-            for az in ec2.describe_availability_zones()['AvailabilityZones']
-            if az['State'] == 'available']
         availability_zone = random.choice(availability_zones)
+
+    # Looks like we previously tried to launch a spot instance for this
+    # job. Examine the state of that request and make sure we can
+    # replace it.
+    elif job['execution_state'] == 'spot-instance-requested':
+        spot_instance_requests = ec2.describe_spot_instance_requests(
+            SpotInstanceRequestIds=[job['spot_instance_request_id']],
+        )['SpotInstanceRequests']
+
+        # This should never happen assuming this code path doesn't run after
+        # the spot request was finalized, which should never happen, since
+        # we trigger things periodically.
+        if not spot_instance_requests:
+            print('unable to find spot instance request for job %s' % job_id)
+            return
+
+        sir = spot_instance_requests[0]
+
+        # If the spot request expired, try again on to the next availability
+        # zone.
+        if (sir['State'] == 'cancelled'
+            and sir['Status']['Code'] == 'schedule-expired'):
+            print('previous spot request for job %s expired; '
+                  'trying different az' % job_id)
+
+            previous_az = 
sir['LaunchSpecification']['Placement']['AvailabilityZone']
+            previous_index = availability_zones.index(previous_az)
+            try:
+                availability_zone = availability_zones[previous_index + 1]
+            except IndexError:
+                availability_zone = availability_zone[0]
+        else:
+            print('unhandled spot instance request state for job %s: '
+                  '%s; %s: %s' % (
+                job_id,
+                sir['State'],
+                sir['Status']['Code'],
+                sir['Status']['Message']))
+            print('cancelling job %s' % job_id)
+            job_table.update_item(
+                Key={'job_id': job_id},
+                UpdateExpression=(
+                    'set execution_state = :state, '
+                    'cancelled_reason = :reason'
+                ),
+                ExpressionAttributeValues={
+                    ':state': 'cancelled',
+                    ':reason': 'spot state: %s; %s' % (
+                        sir['State'],
+                        sir['Status']['Code'],
+                    ),
+                },
+            )
+
     else:
         print('unhandled execution_state: %s' % job['execution_state'])
         return
@@ -485,10 +566,13 @@
     # we can just retry the job.
     #
     # The max bid price is the on-demand price. So in the typical case we save
-    # $$$. If we're unlucky we pay the on-demand rate. You can't lose.
+    # $$$. If we're unlucky we pay the on-demand rate. You can't lose. Unless
+    # there are no available spot instances. But we handle this by setting a
+    # short request validity window and retrying in a different availability
+    # zone. Eventually we should find someone willing to satisfy our request.
     res = ec2.request_spot_instances(
         BlockDurationMinutes=60,
-        ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=10),
+        ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=1),
         LaunchSpecification=launch_spec,
     )
 
@@ -511,6 +595,55 @@
     )
 
 
+def monitor_spot_instance_requests(ec2, job_table, lambda_client,
+                                   start_job_function):
+    """Looks for stale spot instance requests and updates state accordingly."""
+    print('monitoring for stale spot instance requests')
+
+    # We could call the EC2 API directly. But we want DynamoDB to be our
+    # source of truth. So key off its state.
+    res = job_table.query(
+        IndexName='execution_state',
+        
KeyConditionExpression=Key('execution_state').eq('spot-instance-requested'),
+        Select='ALL_PROJECTED_ATTRIBUTES',
+    )
+    print('found %d jobs in spot-instance-requested state' % len(res['Items']))
+
+    for item in res['Items']:
+        print('job %s is in spot-instance-requested' % item['job_id'])
+
+        # If the spot instance request is expired, retrigger scheduling.
+        request_id = item['spot_instance_request_id']
+        print('checking state of %s' % request_id)
+        spot_instance_requests = ec2.describe_spot_instance_requests(
+            SpotInstanceRequestIds=[request_id],
+        )['SpotInstanceRequests']
+
+        if not spot_instance_requests:
+            print('could not find %s; weird' % request_id)
+            continue
+
+        sir = spot_instance_requests[0]
+
+        print('spot instance request %s is in state %s: %s' % (
+            request_id, sir['State'], sir['Status']['Code']))
+
+        if (sir['State'] == 'cancelled'
+            and sir['Status']['Code'] == 'schedule-expired'):
+            print('spot instance request %s for job %s has expired; '
+                  'retrying scheduling' % (request_id, item['job_id']))
+
+            payload = json.dumps({
+                'job_id': item['job_id'],
+            })
+
+            lambda_client.invoke(
+                FunctionName=start_job_function,
+                InvocationType='Event',
+                Payload=payload,
+            )
+
+
 def react_to_instance_state_change(job_table, instance, state):
     """React to a CI worker instance state change."""
     now = decimal.Decimal(time.time())
diff --git a/contrib/ci/README.rst b/contrib/ci/README.rst
--- a/contrib/ci/README.rst
+++ b/contrib/ci/README.rst
@@ -139,8 +139,36 @@
 This function will update job records in DynamoDB to record that a
 job has started/finished/aborted.
 
+This component contains a *start job* Lambda function, which can be
+invoked with the Job ID of a job that someone wishes to start. It
+tries to ensure the job has run. In the case of an expired spot instance
+request, it will try again.
+
 The Terraform code for this component lives in ``job_executor.tf``.
 
+Spot Instance Request Monitor
+-----------------------------
+
+The *spot instance request monitor* is a component for monitoring the
+state of spot instance requests.
+
+We utilize EC2 spot instances to run jobs. Spot instances, unlike
+on-demand instances, can't be launched directly. Instead, you create
+a request for spot instances and this request is eventually fulfilled
+by EC2, if possible. Often, the request is fulfilled immediately and
+an EC2 instance launches within a few seconds.
+
+But sometimes a spot instance request fails. This is often due to no
+spot instances being available at that time. This is where this component
+plays a part.
+
+The *spot instance request monitor* is a Lambda function that is
+periodically invoked via a CloudWatch Event. It scans all jobs currently
+waiting on a spot instance request. If the spot instance request
+couldn't be fulfilled, it calls out to the *start job* Lambda function
+to tell it to try to reschedule it. Theoretically, the job should
+eventually start.
+
 Worker
 ------
 



To: indygreg, #hg-reviewers
Cc: mercurial-devel
_______________________________________________
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel

Reply via email to