Blog

AWS Lambda for Flow Logs Processing

A previous post described how to use a pipeline with CloudWatch Logs, Kinesis, and Lambda to process virtual private cloud (VPC) flow logs. This post will describe a simple Lambda function for processing VPC flow logs.

Amazon Web Services (AWS) provides a good walkthrough for writing Lambda functions for processing data delivered by Amazon Kinesis. Building on that example, we can develop a function that can read VPC flow logs in the following few steps.

This function will take each IP address in the flow logs’ stream, count how many other IP addresses each saw in a time period, and post the results to an web endpoint:


At Observable, we use Python extensively, and since Lambda’s newest supported language is also Python, we’ll write this function in that language.

We’ll start the process by interpreting the data Kinesis has provided. Each event contains a list of records. Those records contain Base64-encoded data, which represents a zip-compressed JSON stream:




Download A New Way to Look at AWS Security whitepaper.

Download White Paper


We want to wind up with a Python dictionary, so we decode, uncompress, and parse using functions from the Python standard library:

records = event.get('Records', [ ])
for record in records:
    compressed_json = b64decode(record['kinesis']['data'])
    uncompressed_json = decompress(compressed_json, 16 + MAX_WBITS)
    input_data = loads(uncompressed_json)

After parsing the record’s data, we can separate and read the VPC flow log event details. We can parse them according to the AWS docs, or use Observable’s flowlogs-reader library.

For this function, we’ll parse them directly, pulling out the IP addresses and timestamp:

log_events = input_data.get('logEvents', [ ])
for log_event in log_events:
    message = log_event['message'].split()
    srcaddr = message[3]
    dstaddr = message[4]
    start_time = int(message[10])

Once we have the flow data, we can collect the results. If we use a multidimensional dictionary, we can aggregate by time and by IP address:

# Declared earlier:
# BUCKET_SECONDS = 600
# connection_data = defaultdict(lambda: defaultdict(set))
bucket_time = (start_time // BUCKET_SECONDS) * BUCKET_SECONDS
connection_data[bucket_time][srcaddr].add(dstaddr)
connection_data[bucket_time][dstaddr].add(srcaddr)

When all the data has been collected, we can post the results to our web application:

# Declared earlier:
# ENDPOINT = '<a href="https://example.com/connection_sets/%7Bbucket_time:%7D">https://example.com/connection_sets/{bucket_time:}</a>'
for bucket_time in sorted(connection_data.iterkeys()):
    output_url = ENDPOINT.format(bucket_time=bucket_time)
    output_data = {k: len(v) for k, v in connection_data[bucket_time]}
    requests.post(output_url, data=dumps(output_data))

Our web application will receive JSON data grouped by 10-minute intervals. A message might look like this:

{"198.51.100.1": 30, "192.0.2.1": 10, "192.0.2.2": 20}

Interested in learning more? You can see the full sample Lamba Python function for VPC Flow Logs here. Also, please stay tuned for future blog posts on technical topics like these.


Experience Dynamic Endpoint Modeling on your own network

Getting better visibility into your network and improving your security couldn’t be easier. Sign up for a free, no-risk trial of Observable’s Endpoint Modeling solution today, and change the way you see security.

Start Your Free Trial Today

Detect Threats Faster – Start Your Free, No-Risk Trial