Amazon EMR CloudFormation for Consistent View and Step Concurrency
When working to deploy an Amazon EMR cluster via CloudFormation there were two items that we wanted to configure that the documentation didn’t make clear on how to handle, setting a value for Step Concurrency and enabling Consistent View. This article explains how we were able to make these two settings work within a CloudFormation template.
Enabling Consistent View
Searching through the available properties in the CloudFormation Documentation for EMR Cluster it was unclear how to enable Consistent View as there isn’t a property specifically for the setting. After staring at the Configure Consistent View instructions, and going back and forth to the CloudFormation document, the Configurations section jumped out. Drilling into that section it appeared to match up with how to configure Consistent View, so adding the following section to the Properties of the cluster template seemed like a logical thing to try. Thankfully, it worked. Here’s that block in YAML with a few properties set to default values. You could add other properties that are available from the Configure Consistent View document as well. The important line is "fs.s3.consistent": "true"
# Other stuff is happening above this to create the cluster...
Configurations:
Classification: emrfs-site
ConfigurationProperties:
"fs.s3.consistent.retryPeriodSeconds": "1"
"fs.s3.consistent": "true"
"fs.s3.consistent.retryCount": "10"
"fs.s3.consistent.metadata.read.capacity": "500"
"fs.s3.consistent.metadata.write.capacity": "100"
"fs.s3.consistent.metadata.tableName": "EmrFSMetadata"
# And more fun after this to finish the cluster creation properties and any other items in the cfn template...
Setting a custom value for Step Concurrency
This was another case where there did not appear to be a specific property to set this based on the CloudFormation Documentation for EMR Cluster. What’s odd is that it is available as a property of the RunJobFlow action in the EMR API. Knowing that the API can handle it gives some hope and after a little more searching we find that the level can be set after a cluster has been created with the ModifyCluster action of the API. Well, that’s something we can work with at least. CloudFormation Custom Resources to the rescue.
The plan from there is to use a Custom Resource to launch a Lambda Function within the ERM Cluster creation template. The Lambda Function will utilize boto3, the Python AWS SDK, to pass the created cluster ID into the ModifyCluster API, setting the desired Step Concurrency Level. Since this is a small amount of code it can be passed in via ‘ZipFile’, rather than needing to use an S3 bucket, which lets us use the cfn-response Module to send responses back to CloudFormation stacks. Without the properly formatted responses the stack can hang for an hour waiting on information from the Custom Resource.
Here is the relevant YAML from the template that will launch the Custom Resource to pass the cluster ID and the wanted Step Concurrency Level on to the Lambda. Because the Custom Resource is referencing the cluster creation resource via ClusterId: !Ref cluster
the dependency is set for us and the Lambda won’t run until the cluster resource has completed creating.
# Other stuff is happening above this to define
# the StepConcurrencyLevel property and create
# the 'cluster' Resource...
lambdaSetConcurrency:
Type: AWS::Lambda::Function
Properties:
Description: Set the concurrency value in an EMR cluster
Handler: index.lambda_handler
Role: !GetAtt lambdaExecutionRole.Arn
Runtime: python3.7
Code:
ZipFile: |
import boto3
import cfnresponse
def lambda_handler(event, context):
responseData = {}
if event['RequestType'] == 'Delete':
print("Delete event received, nothing to do")
cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData)
return
try:
client = boto3.client('emr')
cluster_id = event['ResourceProperties']['ClusterId']
concurrency_level = event['ResourceProperties']['StepConcurrencyLevel']
print(f"Setting concurrency to {concurrency_level} for cluster {cluster_id}")
response = client.modify_cluster(
ClusterId=cluster_id,
StepConcurrencyLevel=int(concurrency_level)
)
responseData['Data'] = response
cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData, "CustomResourcePhysicalID")
return
except Exception as e:
print(str(e))
cfnresponse.send(event, context, cfnresponse.FAILED, responseData)
return
lambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: "/"
Policies:
- PolicyName: AllowModifyEmrClusterAndCloudWatchLogging
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
- Effect: Allow
Action:
- elasticmapreduce:ModifyCluster
Resource: !Sub "arn:aws:elasticmapreduce:${AWS::Region}:${AWS::AccountId}:cluster/${cluster}"
customSetConcurrencyLevel:
Type: Custom::SetConcurrencyLevel
Properties:
ServiceToken: !GetAtt lambdaSetConcurrency.Arn
ClusterId: !Ref cluster
StepConcurrencyLevel: !Ref StepConcurrencyLevel
# And more code below for Outputs this and possibly other Resources...
Hopefully this saves someone out there a little time.
Comments powered by Talkyard.