Automating AWS Infrastructure Change Notifications to Slack with AWS Lambda

Automating AWS Infrastructure Change Notifications to Slack with AWS Lambda

In today’s dynamic cloud environments, staying informed about changes to your AWS infrastructure is critical. I recently enhanced a Lambda function that listens to AWS CloudTrail events (forwarded via EventBridge) and posts notifications to a Slack channel. Below, I’m excited to share the complete Python implementation along with an explanation of how it works.

Overview

The goal of this Lambda function is to notify our team about significant AWS infrastructure changes in near real-time. When CloudTrail logs an event (e.g., launching or terminating instances, modifying databases, etc.), the Lambda function extracts key details from the event, constructs a human-friendly message, and sends it to a pre-configured Slack channel via an incoming webhook.

The key improvements in this implementation include:

  • Robust Logging: Utilizing Python’s built-in logging for better traceability.

  • Environment Variable Usage: Reading the Slack webhook URL from the environment to enhance security and flexibility.

  • Modular Code: Refactoring the logic into a helper function to extract resource details based on the event type.

  • Error Handling: Providing clear error logs and structured responses for easier debugging.

The Code

If you want to get download latest code then go to the github and get download the lasted code. https://github.com/281992umeshsah/Automating-AWS-Infrastructure-Change-Notifications-to-Slack-with-AWS-Lambda

Below is the full Lambda function code:

import json

import os

import urllib.request

import logging

# Set up logging

logger = logging.getLogger()

logger.setLevel(logging.INFO)

# Read Slack Webhook URL from environment variables

SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")

def extract_resource_details(detail, event_name):

"""

Extract resource details from the CloudTrail event detail based on event_name.

Returns a tuple of (resourceKey, resourceValue). If not found, returns ("", "").

"""

resourceKey = ""

resourceValue = ""

response_elements = detail.get("responseElements", {})

request_parameters = detail.get("requestParameters", {})

if event_name in ["TerminateInstances", "RunInstances"]:

instances_set = response_elements.get("instancesSet", {})

items = instances_set.get("items", [])

if items:

resourceKey = "Instance_ID"

resourceValue = items[0].get("instanceId", "")

elif event_name in ["CreateDBInstance", "DeleteDBInstance"]:

resourceKey = "DB_Instance_ID"

resourceValue = response_elements.get("dBInstanceIdentifier", "")

elif event_name in ["CreateLoadBalancer", "DeleteLoadBalancer"]:

load_balancers = response_elements.get("loadBalancers", [])

if load_balancers:

resourceKey = "LoadBalancer_ID"

resourceValue = load_balancers[0].get("loadBalancerName", "")

elif event_name in ["CreateUser", "DeleteUser"]:

user_info = response_elements.get("user", {})

resourceKey = "User_ID"

resourceValue = user_info.get("userName", "")

elif event_name in ["CreateGroup", "DeleteGroup"]:

group_info = response_elements.get("group", {})

resourceKey = "Group"

resourceValue = group_info.get("groupName", "")

elif event_name in ["CreateRole", "DeleteRole"]:

role_info = response_elements.get("role", {})

resourceKey = "Role"

resourceValue = role_info.get("roleName", "")

elif event_name in ["CreatePolicy", "DeletePolicy"]:

policy_info = response_elements.get("policy", {})

resourceKey = "Policy"

resourceValue = policy_info.get("policyName", "")

elif event_name in ["CreateCluster", "DeleteCluster"]:

cluster_info = response_elements.get("cluster", {})

resourceKey = "Cluster"

resourceValue = cluster_info.get("clusterName", "")

elif event_name in ["CreateRestApi", "DeleteRestApi"]:

resourceKey = "RestApi"

resourceValue = response_elements.get("id", "")

elif event_name in ["CreatePipeline", "DeletePipeline"]:

pipeline_info = response_elements.get("pipeline", {})

resourceKey = "Pipeline"

resourceValue = pipeline_info.get("pipelineName", "")

elif event_name in ["CreateProject", "DeleteProject", "UpdateProject"]:

project_info = response_elements.get("project", {})

resourceKey = "Project"

resourceValue = project_info.get("projectName", "")

elif event_name in ["CreateApplication", "DeleteApplication"]:

application_info = response_elements.get("application", {})

resourceKey = "Application"

resourceValue = application_info.get("applicationName", "")

elif event_name in ["CreateHostedZone", "DeleteHostedZone"]:

hosted_zone_info = response_elements.get("hostedZone", {})

resourceKey = "HostedZone"

resourceValue = hosted_zone_info.get("id", "")

elif event_name == "CreateSecret":

resourceKey = "Secret_ID"

resourceValue = request_parameters.get("name", "")

elif event_name == "DeleteSecret":

resourceKey = "Secret_ID"

resourceValue = response_elements.get("name", "")

elif event_name in ["CreateRepository", "DeleteRepository"]:

repository_info = response_elements.get("repository", {})

resourceKey = "Repository Name"

resourceValue = repository_info.get("repositoryName", "")

elif event_name in ["CreateAutoScalingGroup", "DeleteAutoScalingGroup"]:

resourceKey = "AutoScalingGroup"

resourceValue = response_elements.get("autoScalingGroupName", "")

return resourceKey, resourceValue

def lambda_handler(event, context):

"""

Lambda function triggered by AWS CloudTrail (via EventBridge) to send

notifications to a Slack channel when an AWS infrastructure change occurs.

"""

logger.info("Received event: %s", json.dumps(event))

try:

detail = event.get("detail", {})

event_name = detail.get("eventName", "UnknownEvent")

user_identity = detail.get("userIdentity", {})

aws_region = detail.get("awsRegion", "Unknown")

event_time = detail.get("eventTime", "Unknown")

event_source = detail.get("eventSource", "Unknown")

# Extract user details

user_name = user_identity.get("userName") or user_identity.get("principalId", "UnknownUser")

if ":" in user_name:

user_name = user_name.split(":")[-1]

# Filter notifications based on user_name (only notify if contains '@xzy.com')

if "@xzy.com" not in user_name:

logger.info("User %s does not match notification criteria. Skipping.", user_name)

return {"statusCode": 200, "body": "User not in notify list."}

# Extract resource details using the helper function

resourceKey, resourceValue = extract_resource_details(detail, event_name)

# Construct the Slack message payload

slack_message = {

"text": (

f"*AWS Infrastructure Change Detected! 🚨*\n"

f"👤 User: {user_name}\n"

f"🛠 Event Source: {event_source}\n"

f"🛠 Event: {event_name}\n"

f"{f'🛠 {resourceKey}: {resourceValue}\n' if resourceKey and resourceValue else ''}"

f"🌍 Region: {aws_region}\n"

f"🕒 Time: {event_time}\n"

)

}

logger.info("Sending Slack alert: %s", slack_message)

# Send alert to Slack

send_slack_notification(slack_message)

return {

"statusCode": 200,

"body": json.dumps({

"message": "Notification sent",

"slack_message": slack_message

})

}

except Exception as e:

logger.exception("Error processing event: %s", str(e))

return {"statusCode": 500, "body": f"Error processing event: {str(e)}"}

def send_slack_notification(message):

"""

Sends the formatted message to Slack via the incoming webhook.

"""

if not SLACK_WEBHOOK_URL:

logger.error("SLACK_WEBHOOK_URL not set in environment variables.")

return

req = urllib.request.Request(

SLACK_WEBHOOK_URL,

data​=json.dumps(message).encode("utf-8"),

headers={"Content-Type": "application/json"}

)

try:

with urllib.request.urlopen(req) as response:

logger.info("Slack Notification Sent! Response Code: %s", response.status)

except Exception as e:

logger.error("Error sending Slack notification: %s", str(e))

How It Works

  1. Logging and Environment Configuration: The function starts by setting up logging, which is essential for monitoring in production. It then reads the Slack webhook URL from the Lambda environment variables. This avoids hardcoding sensitive information directly in the code.

  2. Extracting Resource Details: The helper function processes the CloudTrail event's details. Based on the type of event (e.g., launching an EC2 instance, creating a DB instance, etc.), it extracts a resource identifier and its value. This modular approach makes it easy to add support for new event types in the future.

  3. Event Handling and Notification Filtering: The function is the entry point for the Lambda. It extracts common fields such as event name, region, time, and the source. It also pulls the user identity information and filters notifications to only send messages for users with a specific domain (in this case, ).

  4. Constructing and Sending the Slack Message: After gathering all the relevant details, the function constructs a Slack-friendly message payload. The function takes care of delivering this payload to Slack via an HTTP POST request using an incoming webhook.

  5. Error Handling: Throughout the process, robust error handling is implemented. Any exceptions are logged, and appropriate HTTP status codes are returned. This helps in quickly diagnosing issues if the Lambda function fails to process an event.

Implementation steps is mentioned in README.md file. - https://github.com/281992umeshsah/Automating-AWS-Infrastructure-Change-Notifications-to-Slack-with-AWS-Lambda/blob/main/README.md

Final Thoughts

This implementation is a powerful example of how to integrate AWS services (CloudTrail, EventBridge, Lambda) with external tools like Slack. Not only does it help in staying informed about critical changes in your infrastructure, but it also demonstrates the use of clean, maintainable Python code in a serverless environment.

If you’re looking to streamline your AWS operations and improve your incident response, consider giving this approach a try. Feel free to connect or comment below if you have any questions or would like to discuss further improvements.

Happy Cloud Monitoring!

To view or add a comment, sign in

Others also viewed

Explore topics