Processing CloudFront Logs with Rust

2025/02/07

#rust #polars #data-engineering

Table of contents

Bear

Processing data with Rust

In data processing, Rust has emerged as a compelling alternative to traditional ETL languages like Python or Java. Its unique combination of performance, safety, and modern features makes it an attractive choice for handling large-scale data streams and complex computational tasks. Libraries like Polars make Rust a good option for building data processing pipelines. With its growing ecosystem of libraries and frameworks, Rust is primed to play a significant role in shaping the future of data-intensive applications.

As a toy project, I explored how to use Rust to transform AWS CloudFront logs into Parquet format. This need is now obsolete as of February 2025. AWS has released a new logging stack that writes Parquet files directly to S3. There is no need to have a converter going forward. However, for the already existing files, a converter might be still useful.

AWS Cloudfront Logs

Amazon CloudFront is a content delivery network (CDN) service offered by Amazon Web Services (AWS). CloudFront is designed to deliver content to users by caching it at edge locations around the world. This reduces the distance that the data must travel, and making the comunication faster. CloudFront has a feature to save the access logs to a S3 bucket using some formats, including TSV (tab-separated values).

This looks like this:

Jan 12:58 E2F28FYJOwT1P9.2024-01-23-11.84ac498a.gz
Jan 13:03 E2F28FYJOwT1P9.2024-01-23-11.c34e4413.gz
Jan 13:08 E2F28FYJOwT1P9.2024-01-23-12.db61b94a.gz
Jan 13:18 E2F28FYJOwT1P9.2024-01-23-12.46529b24.gz
Jan 13:28 E2F28FYJOwT1P9.2024-01-23-12.d77399a4.gz
Jan 13:33 E2F28FYJOwT1P9.2024-01-23-12.6f1c7b5e.gz
Jan 13:58 E2F28FYJOwT1P9.2024-01-23-12.a5e6ddca.gz
Jan 14:03 E2F28FYJOwT1P9.2024-01-23-12.4d5e6dbc.gz
Jan 14:18 E2F28FYJOwT1P9.2024-01-23-13.7125fd04.gz
Jan 14:38 E2F28FYJOwT1P9.2024-01-23-13.7d3dbdf0.gz

Looking into a log file we can find the following (breaking the long lines ino shorter for better readability).

❯ gzcat gz/E3F26FYJOTTOP6.2024-01-23-12.4d5e6dbc.gz | tr '\t' ' '
#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem 
 sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type 
 x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for 
 ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status 
 fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type 
 sc-content-type sc-content-len sc-range-start sc-range-end

2024-01-23 12:59:15 LAX53-P1 1897 100.100.100.100 GET d197w9j7zcw6og.cloudfront.net 
/ai/riding-hood/ 200 https://dev.l1x.be/ai/riding-hood/ 
Mozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010_15_7)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/114.0.0.0%20Safari/537.36 
- - RefreshHit UKrRsY4BzxJsshXB71YE7R2zxpoB3npsKy39TyIRKQJFw_DusKph4t== 
dev.l1x.be https 330 0.301 
- TLSv1.3 TLS_AES_128_GCM_SHA256 RefreshHit HTTP/1.1 - - 
48634 0.301 RefreshHit text/html - - -

It would be great to have a single file per day in a format, that supports applying summary statistics on the dataset. It is simple to add more data as a new day, with an ETL job that runs daily. This job might run hourly in high traffic environments like e-commerce sites for example.

Using Parquet

After working with these file formats for a long time, I believe Parquet is an excellent option for long-term storage of data. It is a columnar storage format that is becoming increasingly popular in the data engineering world due to its numerous advantages.

Here are some of the key reasons why using Parquet is a good idea for data engineering:

  • Reduced Storage Requirements:

Parquet allows for more efficient data compression, resulting in significantly smaller file sizes compared to row-based formats like CSV. This is particularly beneficial for storing large datasets, as it can significantly reduce storage costs and the amount of disk space required. Combining this with compression, especially Zstandard (ZSTD), yields excellent results.

  • Faster Query Performance Compared to Text Data Formats:

Parquet enables faster query execution compared to text data formats, especially for complex analytical queries that involve aggregations or filtering on specific columns. Since Parquet only reads the required data (usually the header or specific columns), it minimizes disk I/O and improves processing speed. This makes it well-suited for large-scale data analysis and data warehousing applications.

Using the Rust Parquet lib might be a bit more complex for a simple use case like this one. Luckily we have Polars, an open-source library written in Rust (also has a Python interface) for data analysis and manipulation. It is well designed to be fast and efficient and has a LazyFrame for larger data sets.

Creating a workflow

Reading the TSV file (actually reading and unzipping) is easy with Rust. One assumption I have is that the entire dataset fits in memory. In the future, I’d like to improve this by exploring lazy evaluation with iterators. For now, we can have a few hundred megabytes worth of TSV in memory. I believe it’s more important to understand when O(n) memory usage is acceptable rather than prematurely optimizing for stream-based processing.

Get a list of files with a glob

Rust offers two main ways (and possibly more that I’m unaware of) to work with files:

  • glob::glob
  • std::fs

For flexible patterns with wildcards, opt for glob. It efficiently iterates over matching files, letting you perform actions like printing paths or collecting them into a vector. If fine-grained control over individual entries is needed, and globbing patterns are simpler, consider std::fs. It allows filtering directory entries based on criteria like file type or extension, giving you more granular control over the processing tasks. Both approaches handle errors and offer customization options. I opted for the glob create.

The files have date in the name (for example: E2F28FYJOwT1P9.2024-01-23-11.84ac498a.gz) and we can create a simple glob representing a single day in the dataset.

use glob::{glob, Paths, PatternError};
fn get_files(date: &str) -> Result<Paths, PatternError> {
    glob(&format!("gz/*.{}*.gz", &date))
}

The first approach is to simple iterate over the glob, ignore the errors for now and collect the file content in an accumulator (hence making this O(n) memory).

for path in paths {
    if path.is_err() {
        error!("Could not process path with error: {:?}", path);
        continue;
    }
    let path: PathBuf = path.unwrap();
    info!("Processing path: {:?}", path);
    // panic if cannot unzip file
    let tsv_maybe: Result<String, Error> = uncompress_gzip(path.to_str().unwrap());
    // the unzipped content is a potentially multiline TSV (tab separated values)
    match tsv_maybe {
        Ok(tsv) => {
            let entries: Vec<Vec<String>> = split_tsv(tsv);
            acc.extend(entries)
        }
        Err(e) => error!("{:?}", e),
    }
}

Unzip and parse

The next phase is unzipping the files in memory and creating a Vec for each line. Assemling the lines together into a Vec<Vec> is simple and this is exactly what

fn split_tsv(tsv: String) -> Vec<Vec<String>> {
    //This is a bit unreadable
    // - splitting the TSV to lines
    // - removing lines starting with # or being empty
    // - split the lines by tab
    // - convert &str to String so it can be returned (maybe this is not necessary?)

    tsv.split('\n')
        .filter(|l| !l.starts_with('#'))
        .filter(|l| !l.is_empty())
        .map(|l| {
            l.split('\t')
                .map(|s| s.to_string())
                .collect::<Vec<String>>()
        })
        .collect()
}

This function produces the following:

[
  ["2024-01-23", "12:14:37", "LAX53-P1", ...],
  ["2024-01-23", "12:56:19", "SOF50-C1", ... ]
]

After having a 2D vector like the one above, we are almost ready to write the dataset to its permanent storage, a local folder in this case. There is a small problem with this dataset, though. The API that Polars has for creating a dataframe does not match the vector we got from processing TSVs. Polars expects a dataset that looks like this:

let s1 = Series::new("Fruit", &["Apple", "Apple", "Pear"]);
let s2 = Series::new("Color", &["Red", "Yellow", "Green"]);

let df: PolarsResult<DataFrame> = DataFrame::new(vec![s1, s2]);

As you can see the new dataframe has a vector of Series. Each vector contains the same kind of data, for example, fruit names or colors. We can simply transpose the 2D vector we have to produce something similar.

fn transpose<T>(v: Vec<Vec<T>>) -> Vec<Vec<T>> {
    // Transposing a 2D matrix
    // https://stackoverflow.com/questions/64498617/how-to-transpose-a-vector-of-vectors-in-rust

    assert!(!v.is_empty());
    let len = v[0].len();
    let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect();
    (0..len)
        .map(|_| {
            iters
                .iter_mut()
                .map(|n| n.next().unwrap())
                .collect::<Vec<T>>()
        })
        .collect()
}

I was not able to determine which SO answer was better just by reading the source codes. I decided that I would like to understand if there is a significant performance difference between these.

use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn transpose1<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
    assert!(!original.is_empty());
    let mut transposed = (0..original[0].len()).map(|_| vec![]).collect::<Vec<_>>();

    for original_row in original {
        for (item, transposed_row) in original_row.into_iter().zip(&mut transposed) {
            transposed_row.push(item);
        }
    }

    transposed
}

fn transpose2<T>(v: Vec<Vec<T>>) -> Vec<Vec<T>> {
    assert!(!v.is_empty());
    let len = v[0].len();
    let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect();
    (0..len)
        .map(|_| {
            iters
                .iter_mut()
                .map(|n| n.next().unwrap())
                .collect::<Vec<T>>()
        })
        .collect()
}

fn criterion_benchmark(crit: &mut Criterion) {
    crit.bench_function("transpose1", |b| {
        b.iter(|| {
            let a: Vec<u64> = (0..1000).map(|v| v + 1000).collect();
            let b: Vec<u64> = (0..1000).map(|v| v + 1000).collect();
            let c: Vec<u64> = (0..1000).map(|v| v + 1000).collect();

            let vx: Vec<Vec<u64>> = vec![a, b, c];

            transpose1(black_box(vx))
        })
    });

    crit.bench_function("transpose2", |b| {
        b.iter(|| {
            let a: Vec<u64> = (0..1000).map(|v| v + 1000).collect();
            let b: Vec<u64> = (0..1000).map(|v| v + 1000).collect();
            let c: Vec<u64> = (0..1000).map(|v| v + 1000).collect();

            let vx: Vec<Vec<u64>> = vec![a, b, c];

            transpose2(black_box(vx))
        })
    });
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

Invoking the test with cargo bench is easy and it produces an HTML output that is not that hard to understand. The two functions are very close to each other in terms of performance and I picked the slightly faster one.

 cargo bench
    Finished bench [optimized] target(s) in 0.13s
     Running unittests src/main.rs (target/release/deps/cf_logs-5058ea9e2adfad2d)

running 1 test
test tests::exploration_df ... ignored

test result: ok. 0 passed; 0 failed; 1 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running benches/bench.rs (target/release/deps/bench-199f64d850bf6279)
Gnuplot not found, using plotters backend
transpose1              time:   [34.802 µs 34.944 µs 35.087 µs]
                        change: [-0.2763% +0.0465% +0.3828%] (p = 0.79 > 0.05)
                        No change in performance detected.
Found 6 outliers among 100 measurements (6.00%)
  5 (5.00%) high mild
  1 (1.00%) high severe

transpose2              time:   [27.966 µs 28.068 µs 28.174 µs]
                        change: [-0.0055% +0.3648% +0.6945%] (p = 0.04 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

Based on this I opted for the transpose2 function.

As we have this out of the way time to create the function that produces the data that we can feed to Polars.

use polars::{prelude::NamedFrom, series::Series};

pub(crate) fn get_columns(transposed: Vec<Vec<String>>) -> Vec<Series> {
    // this creates a vec of Series from a 2D vec

    vec![
        Series::new("date", &transposed[0]),                // date
        Series::new("time", &transposed[1]),                // time
        Series::new("x-edge-location", &transposed[2]),     // x-edge-location
        Series::new("sc-bytes", &transposed[3]),            // sc-bytes
        Series::new("c-ip", &transposed[4]),                // c-ip
        Series::new("cs-method", &transposed[5]),           // cs-method
        Series::new("cs(Host)", &transposed[6]),            // cs(Host)
        Series::new("cs-uri-stem", &transposed[7]),         // cs-uri-stem
        Series::new("sc-status", &transposed[8]),           // sc-status
        Series::new("cs(Referer)", &transposed[9]),         // cs(Referer)
        Series::new("cs(User-Agent)", &transposed[10]),     // cs(User-Agent)
        Series::new("cs-uri-query", &transposed[11]),       // cs-uri-query
        Series::new("cs(Cookie)", &transposed[12]),         // cs(Cookie)
        Series::new("x-edge-result-type", &transposed[13]), //  x-edge-result-type
        Series::new("x-edge-request-id", &transposed[14]),  // x-edge-request-id
        Series::new("x-host-header", &transposed[15]),      // x-host-header
        Series::new("cs-protocol", &transposed[16]),        // cs-protocol
        Series::new("cs-bytes", &transposed[17]),           // cs-bytes
        Series::new("time-taken", &transposed[18]),         // time-taken
        Series::new("x-forwarded-for", &transposed[19]),    // x-forwarded-for
        Series::new("ssl-protocol", &transposed[20]),       // ssl-protocol
        Series::new("ssl-cipher", &transposed[21]),         // ssl-cipher
        Series::new("x-edge-response-result-type", &transposed[22]), // x-edge-response-result-type
        Series::new("cs-protocol-version", &transposed[23]), // cs-protocol-version
        Series::new("fle-status", &transposed[24]),         // fle-status
        Series::new("fle-encrypted-fields", &transposed[25]), // fle-encrypted-fields
        Series::new("c-port", &transposed[26]),             // c-port
        Series::new("time-to-first-byte", &transposed[27]), // time-to-first-byte
        Series::new("x-edge-detailed-result-type", &transposed[28]), // x-edge-detailed-result-type
        Series::new("sc-content-type", &transposed[29]),    // sc-content-type
        Series::new("sc-content-len", &transposed[30]),     // sc-content-len
        Series::new("sc-range-start", &transposed[31]),     // sc-range-start
        Series::new("sc-range-end", &transposed[32]),       // sc-range-end
    ]
}

There is only one step is remaining in the project: writing the Parquet files to disk.

let mut df: DataFrame = DataFrame::new(columns).unwrap();
let file_name: String = format!("par/{}.zstd.parq", date);
let mut file: File = File::create(file_name).unwrap();
Ok(ParquetWriter::new(&mut file).finish(&mut df)?)

This function writes the DataFrame to a Parquet file compressed with Zstandard.

Closing

Rust is a powerful language for data engineering with modern features and great libraries. By combining Rust and Polars (and the underlying great libraries), we can efficiently process and analyze datasets like AWS CloudFront logs. Give Rust a try for your next data engineering project—you won’t be disappointed!