When working to deploy an Amazon EMR cluster via CloudFormation there were two items that we wanted to configure that the documentation didn’t make clear on how to handle, setting a value for Step Concurrency and enabling Consistent View. This article explains how we were able to make these two settings work within a CloudFormation template.

Enabling Consistent View

Searching through the available properties in the CloudFormation Documentation for EMR Cluster it was unclear how to enable Consistent View as there isn’t a property specifically for the setting. After staring at the Configure Consistent View instructions, and going back and forth to the CloudFormation document, the Configurations section jumped out. Drilling into that section it appeared to match up with how to configure Consistent View, so adding the following section to the Properties of the cluster template seemed like a logical thing to try. Thankfully, it worked. Here’s that block in YAML with a few properties set to default values. You could add other properties that are available from the Configure Consistent View document as well. The important line is "fs.s3.consistent": "true"


# Other stuff is happening above this to create the cluster...

  Configurations:
    Classification: emrfs-site
    ConfigurationProperties:
      "fs.s3.consistent.retryPeriodSeconds": "1"
      "fs.s3.consistent": "true"
      "fs.s3.consistent.retryCount": "10"
      "fs.s3.consistent.metadata.read.capacity": "500"
      "fs.s3.consistent.metadata.write.capacity": "100"
      "fs.s3.consistent.metadata.tableName": "EmrFSMetadata"

# And more fun after this to finish the cluster creation properties and any other items in the cfn template...

Setting a custom value for Step Concurrency

This was another case where there did not appear to be a specific property to set this based on the CloudFormation Documentation for EMR Cluster. What’s odd is that it is available as a property of the RunJobFlow action in the EMR API. Knowing that the API can handle it gives some hope and after a little more searching we find that the level can be set after a cluster has been created with the ModifyCluster action of the API. Well, that’s something we can work with at least. CloudFormation Custom Resources to the rescue.

The plan from there is to use a Custom Resource to launch a Lambda Function within the ERM Cluster creation template. The Lambda Function will utilize boto3, the Python AWS SDK, to pass the created cluster ID into the ModifyCluster API, setting the desired Step Concurrency Level. Since this is a small amount of code it can be passed in via ‘ZipFile’, rather than needing to use an S3 bucket, which lets us use the cfn-response Module to send responses back to CloudFormation stacks. Without the properly formatted responses the stack can hang for an hour waiting on information from the Custom Resource.

Here is the relevant YAML from the template that will launch the Custom Resource to pass the cluster ID and the wanted Step Concurrency Level on to the Lambda. Because the Custom Resource is referencing the cluster creation resource via ClusterId: !Ref cluster the dependency is set for us and the Lambda won’t run until the cluster resource has completed creating.

# Other stuff is happening above this to define 
# the StepConcurrencyLevel property and create 
# the 'cluster' Resource...

  lambdaSetConcurrency:
    Type: AWS::Lambda::Function
    Properties:
      Description: Set the concurrency value in an EMR cluster
      Handler: index.lambda_handler
      Role: !GetAtt lambdaExecutionRole.Arn
      Runtime: python3.7
      Code:
        ZipFile: |
          import boto3
          import cfnresponse
          def lambda_handler(event, context):
            responseData = {}
            if event['RequestType'] == 'Delete':
                print("Delete event received, nothing to do")
                cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData)
                return
            try:
              client = boto3.client('emr')
              cluster_id = event['ResourceProperties']['ClusterId']
              concurrency_level = event['ResourceProperties']['StepConcurrencyLevel']
              print(f"Setting concurrency to {concurrency_level} for cluster {cluster_id}")
              response = client.modify_cluster(
                ClusterId=cluster_id,
                StepConcurrencyLevel=int(concurrency_level)
              )
              responseData['Data'] = response
              cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData, "CustomResourcePhysicalID")
              return
            except Exception as e:
              print(str(e))
              cfnresponse.send(event, context, cfnresponse.FAILED, responseData)
              return
  lambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: AllowModifyEmrClusterAndCloudWatchLogging
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - elasticmapreduce:ModifyCluster
                Resource: !Sub "arn:aws:elasticmapreduce:${AWS::Region}:${AWS::AccountId}:cluster/${cluster}"
  customSetConcurrencyLevel:
    Type: Custom::SetConcurrencyLevel
    Properties:
      ServiceToken: !GetAtt lambdaSetConcurrency.Arn
      ClusterId: !Ref cluster
      StepConcurrencyLevel: !Ref StepConcurrencyLevel

# And more code below for Outputs this and possibly other Resources...

Hopefully this saves someone out there a little time.