Monitoring Flink on AWS EMR
How to use Datadog (EU) to monitor your Flink cluster running on AWS EMR in a private VPC.
Brief intro
This is going to be a somewhat unusual post on this blog. It is about a problem I recently encountered while trying to improve the monitoring of a long-running Flink cluster we have on AWS EMR, following the official documentation from Datadog.
The EMR setup
Our EMR cluster consumes 4 Kinesis Data Streams which are used to send s3 files in AVRO format for processing. When a new file arrives, the Flink job will fetch it from S3, do some validation and filtering and then convert it to ORC format and save it to a new location on s3. In early June we experienced a failure in one of the Flink jobs consuming a production stream. Sadly we did not have adequate monitoring set up to detect this on time. We only learnt about it when we noticed that data in the output bucket was missing for certain dates. Our streams were configured with the maximum retention period of 7 days. By the time we noticed the missing data in the stream was already piling up, and the oldest was close to half of this retention period. By the time we managed to find the root cause and deploy the fix to the Flink job, it was too late, and some data had already expired from the stream.
The existing monitoring solution was implemented via AWS Lambda functions running every 8 hours. These functions were making Athena queries to check if any data arrived to the S3 bucket during the last 48 hours. The problem with this was approach was that we do not get alerts about missing data for up to 2 days because of the way our query used a sliding window of 2 days.
The Flink cluster runs in a private VPC, so reaching the Flink Web UI to check the status of the jobs was quite difficult to say the least. We either had to set up an SSH port forwarding session and use a FoxyProxy setup in Firefox, or set up a personal VM the same private VPC via the AWS WorkSpaces managed service and then connect from that VM’s browser to the cluster’s Flink UI. Either way it was quite cumbersome and still a manual process to connect to the Flink UI to check the cluster health. I wanted an automated way of gathering metrics and alerting if something went wrong, so I looked into how Flink could be monitored by Datadog.
Datadog ❤️ Flink
A quick Google search threw up the official documentation from Datadog where I found really straightforward instructions on enabling the submission of Flink metrics to Datadog, which could be instantly visualized in their default Flink dashboard. These main steps are:
- adding some new parameters to the flink-conf.yaml, such as the Datadog API/APP keys and custom tags
- copying the
flink-datadog-metrics.jarto the active flink installation path
The first step was quite easy. Our cluster was defined in Cloudformation where we used AWS::EMR::Cluster which allows specifying the flink-conf.yaml content as below:
Cluster:
Type: AWS::EMR::Cluster
Properties:
Name: Flink-Cluster
Configurations:
- Classification: flink-conf
ConfigurationProperties:
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: '{{resolve:secretsmanager:datadog/api_key:SecretString}}'
metrics.reporter.dghttp.tags: name:flink-cluster, app:flink-cluster, region:eu-central-1, env:prod
[...]
The above CF snippet shows just the 3 most important lines of the flink-conf.yaml: (1) the full package name of the java class which implements the metric submission, (2) the Datadog API key loaded from AWS Secrets Manager and (3) a few custom tags which will be added to metrics sent to Datadog.
To copy the necessary datadog-metrics JAR where it would be loaded from (/usr/lib/flink/lib), I added a new AWS::EMR::Step to in CloudFormation which is executed only on the EMR Master Node in order to activate Datadog monitoring on the cluster via the supplied Java class and API key in the flink-conf.yaml.
To test that it was working properly I just needed to redeploy the cluster which was surprisingly easy thanks to the Cloudformation setup we had in place. But something was still not right.
Know your continent
After redeploying the cluster I waited and waited and waited a bit more but metrics were not showing up in the Flink dashboard. So I got in touch with Datadog support who were very helpful in figuring out what the issue was. After a few rounds of emails back and forth we quickly discovered why the metrics were not showing up.
The reason was that we had our Datadog account set up in the EU region and not in the USA. Thus, all our metrics were supposed to flow to the EU endpoint at app.datadoghq.eu/api/ instead of the USA endpoint at app.datadoghq.com/api/. The difference is quite subtle, only a simple change in the TLD from .com to .eu. The catch was that our EMR cluster was running Flink 1.9.1 (provided by the EMR release 5.29.0) which had this API endpoint hardcoded, pointing to the USA data centre. The Datadog Support Engineer uncovered some extra
instructions on how this can be solved by adding an extra line to the flink-conf.yaml to change the default US region to the EU instead:
Cluster:
Type: AWS::EMR::Cluster
Properties:
Name: Flink-Cluster
[...]
Configurations:
- Classification: flink-conf
ConfigurationProperties:
[...]
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: '{{resolve:secretsmanager:datadog/api_key:SecretString}}'
metrics.reporter.dghttp.tags: name:flink-cluster, app:flink-cluster, region:eu-central-1, env:prod
metrics.reporter.dghttp.dataCenter: EU # << points the metrics reported to the EU region
[...]
The problem was that this was only available in Flink v1.11.0 while the highest version offered by EMR through the latest EMR Release was only v1.10.0, so this was not going to work for me. I almost gave up on the idea of monitoring Flink via Datadog when I had the idea to clone the official Flink repository from Github and tweak the code in v1.9.1 which we were running to change the hardcoded API endpoint from .com to .eu. It was much easier than I expected, I just needed to tweak this class slightly ./src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java:
/**
* Http client talking to Datadog.
*/
public class DatadogHttpClient {
/* Changed endpoint for metric submission to use .eu instead of .com */
private static final String SERIES_URL_FORMAT = "https://app.datadoghq.eu/api/v1/series?api_key=%s";
/* Changed endpoint for API key validation to use .eu instead of .com */
private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.eu/api/v1/validate?api_key=%s";
...
}
Once I made the above code changes, I built a new JAR via mvn clean package. The new JAR was made available at ./flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-1.9.1.jar which I then uploaded to an S3 bucket where we store such files in my team. Next I slightly tweaked the AWS EMR step to load this JAR from S3 redeployed the cluster once more. Finally, metrics started flowing! And it looked so nice, I was especially happy to see the TaskManager heap distribution, because the issue which sparked this whole endeavor was showing symptoms of Heap Memory issues.

Unfortunately this default dashboard was not perfect, as it had some graphs that were failing to show some data. Maybe it was because of using v1.9.1 of Flink instead of v1.11.0, not sure. In any case, I ended up cloning the dashboard and fixing the graphs manually, while also adding a few extras to show data about the AWS Kinesis streams which were feeding into the Flink cluster.

Now it shows very nicely the age of each Flink job, which was not visible at all on the default dashboard. The end result is much better in my opinion.
Conclusion
All in all, I am quite happy with how this whole story turned out in the end. Despite the issue with the hardcoded API endpoints to the USA region in v1.9.1 of Flink, I managed to implement a simple workaround thanks to the Open Source nature of the project. The result is that we have much better visibility and monitoring implemented for our Flink cluster which makes our lives in the DevOps world much better. I did not write much about it in this post, but once these metrics became available in our Datadog account it was trivial to set up a few Monitors which would alert us if for example one of the 4 Flink jobs were failing. I will leave it up to the reader to imagine how that’s done.