Converting Amazon S3 logs to Avro

2016/02/03

#AWS #S3 #Clojure

Table of contents

Amazon S3 Website Hosting

Amazon S3 is an excellent resource for hosting static websites (html, css, js) because it provides free SSL certs for free and fast content delivery network as well for reasonable pricing. Hosting websites is trivial and well documented. After setting up all these we have a running website using SSL with geographically distributed edge caches for faster page load.

S3 provides access logging for tracking requests to your bucket. Each access log entry (called the record) has information about a single request, including requester, request time, response status, bucket, key, etc. The actual format is described in this document, explaining each field in depth. 

Example entry:

79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 3E57427F3EXAMPLE REST.GET.VERSIONING - "GET /mybucket?versioning HTTP/1.1" 200 - 113 - 7 - "-" "S3Console/0.4" -

The problem is with such log format that we cant access individual fields easily (without a regexp) and that we store the information a human friendly way using text. This is not optimal for storing and querying larger datasets, we need to transform it to a more space efficient solution that reduces the IO when reading a large chunk of the data on disk or using distributed analytical platforms like Hadoop.

Why Apache Avro

Apache Avro has a long track record being used in production and it can be queried on Hadoop with ease.

According to the documentation Avro provides

  • Rich data structures.
  • A compact, fast, binary data format.
  • A container file, to store persistent data.
  • and few other things we don’t need right now

Avro also uses schemas so we can trust our data while processing it. The other alternative would be Apache ORC that is even more suitable for analytical use. I am going with Avro this time, because it is better supported than ORC in Clojure at the moment.

Why Clojure

My personal reasons why I am using Clojure for data projects like this is: 

quick prototyping (REPL) support for asynchronous programming (link) small code base, less verbose than Java yet more readable access to all of the Java libraries

Most of the data services I am working with on a daily basis has decent Java support, that means I just as easily use those libraries in Clojure. I also like small nice things. :)

Getting Started

Just to summarise what are trying to achieve with this project and article series:

  • covering reading text files from Amazon S3 and convert the data to Avro (part I)
  • explaining how to convert a single thread execution to an asynchronous one with core.async (part II)
  • build a simple DSL to query Avro files (part III)

For starting I am going through the major topics involved in the process, how to use AWS S3 api, how to create Avro files and finally how to process lines of log files.

Talking to S3

After some initial poking around with the libraries we need for this I decided to use the raw Java S3 api, since it is so well written, using it in Clojure is a breeze.

(defn create-basic-aws-credentials
 Takes a hashmap with AWS security credentials and creates a BasicAWSCredentials
 ^BasicAWSCredentials [^PersistentArrayMap credentials]
 ;guard function with both keys checked if present
 (BasicAWSCredentials.
 (:aws_access_key_id credentials)
 (:aws_secret_access_key credentials)))

(defn connect-with-basic-credentials
 Connecting to S3 only with credentials
 ^AmazonS3Client [^BasicAWSCredentials basic-aws-credentials]
 (AmazonS3Client. basic-aws-credentials))

Creating a credential and using it to create an AmazonS3Client is simple. We can use many S3 clients at the same time for better performance but for the initial version we are going to stick to a single connection.

Log files are organised around dates, keeping one file per day sounds reasonable. Each day has zero or many entries, where many is less than a 10.000 so there is no need for splitting up a day for smaller chunks. On average there are 1000–2000 files per day, depending on the number of access entries. We are going to process data day by day, using a moving window. The size of the window and when is starts can be configured in the config.

:days {
 :start 2 ; starts the processing x days ago
 :stop 12 ; stops the processing y days ago, processing 10 days worth of data this way
 }

Using the example from the config and yields to the following list of dates:

("2016–01–20" "2016–01–21" "2016–01–22" "2016–01–23"
  "2016–01–24" "2016–01–25" "2016–01–26" "2016–01–27"
  "2016–01–28" "2016–01–29" "2016–01–30" "2016–01–31")

Fetching actual file names for each day can be tricky at the first sight but we can use the truncated field for checking if there are more than 1000 (by default) files for the particular day.

(defn list-all-files-eager-blocking
  "Returns a sequence of the items in a bucket or bucket/folder  "
  [ ^AmazonS3Client amazon-s3-client ^String bucket-name ^String prefix
    ^String marker ^String delimiter ^Integer max-keys ^PersistentList acc]
  (log/debug bucket-name acc)
  (let [  ^ListObjectsRequest list-object-request (create-list-object-request
                                                    bucket-name
                                                    prefix marker
                                                    delimiter
                                                    max-keys)
          ^ObjectListing object-listing  (list-objects amazon-s3-client list-object-request)]
    (if-not (is-truncated? object-listing)
      ; return
      (flatten (concat acc (map get-s3-object-summary-clj
                      (get-object-summaries object-listing))))
      ; recur with the new request                           ;
      (recur  amazon-s3-client
              bucket-name
              prefix
              (get-next-marker object-listing)
              delimiter
              max-keys
              (conj acc (map get-s3-object-summary-clj
                          (get-object-summaries object-listing)))))))

This function is blocking so it won’t return until all of the items are fetched, it is not recommended to process 100.000+ files at the same time. For processing that many files we need to re-write it to be lazy producing a lazy sequence where the items are looked up when needed. (Added to the TODO). The function that returns the Clojure representation (a hash-map) of a log entry is get-s3-object-summary-clj.

(defn get-s3-object-summary-clj
  "Returns a Clojure representation of a S3ObjectSummary"
  [^S3ObjectSummary s3-object-summary]
  { :bucket-name    (.getBucketName     s3-object-summary)
    :e-tag          (.getETag           s3-object-summary)
    :key            (.getKey            s3-object-summary)
    :last-modified  (.getLastModified   s3-object-summary)
    :owner          (.getOwner          s3-object-summary)
    :size           (.getSize           s3-object-summary)
    :storage-class  (.getStorageClass   s3-object-summary) })

This way are have a list of entires that we are going to process later. For booting up all this in REPL we can use the following few lines assuming the configuration is correct and the credential file is present and it has valid access and secret key.

(def config                           (cli/process-config "conf/app.edn"))
(def credentials                      (:ok (get-credentials (get-in config [:ok :aws :credentials-file]))))
(def bucket                           (name (get-in config [:ok :aws :s3 :bucket])))
(def aws-basic-cred                   (s4/create-basic-aws-credentials credentials))
(def aws-s3-connection                (s4/connect-with-basic-credentials aws-basic-cred))
(def processing-days                  (days 2 12)
(def s3-log-pattern                   (re-pattern (get-in config [:ok :aws :log-format])))
(def all-files-for-a-day              (s4/list-all-files-eager aws-s3-connection bucket (str "logs/" "2015-12-25-10") "" "" (int 1000) ()))
(def first-entry                      (first all-files-for-a-day)
(def object-content                   (s4/get-object-content-safe (s4/get-object aws-s3-connection bucket "logs/2015-12-25-00-23-17-8AC95FEBE0374F7B")))
(def schema-file                      "schema/amazon-log.avsc")
(def s3-log-avro-schema-json          (json/parse-stream (io/reader schema-file)))
(def s3-log-avro-schema-fields        (get-avro-schema-fields s3-log-avro-schema-json))
(def s3-log-avro-schema-fields-dash   (replace-field-names s3-log-avro-schema-fields "_" "-"))
(def s3-log-avro-schema               (avro-schema schema-file))
(def int-fields                        #{:turn-around-time :http-status :total-time :bytes-sent :object-size})

After got connected to S3 we can play with the log files. Checking the first entry (calling first on all-files-for-a-day):

{:bucket-name "www.streambrightdata.com", :e-tag "a4092cf1a282c3fb7d027ee56e4155d4",
 :key "logs/2016-02-01-10-22-42-07BA495229374DB4", :last-modified #inst "2016-02-01T10:22:43.000-00:00",
 :owner #object[com.amazonaws.services.s3.model.Owner 0x77314d5c "S3Owner [name=s3-log-service,id=3272e1]"],
 :size 398, :storage-class "STANDARD"}

Since Clojure keywords can be used as functions we can easily list all of the file names in the list we produced earlier.

;s3-logrotate.core=> (map :key all-files-for-a-day)
("logs/2016-02-01-10-22-42-07BA495229374DB4" "logs/2016-02-01-10-22-50-BD7DAC7BF88CBDC8"
"logs/2016-02-01-10-22-55-296AE2E3EBCDD3B6" "logs/2016-02-01-10-22-57-5853C8B48DC9163A"
"logs/2016-02-01-10-23-07-EDBAAA82DFF3B039" "logs/2016-02-01-10-23-07-F8325A25289E1015"
"logs/2016-02-01-10-23-11-E949D06BEFE68356" "logs/2016-02-01-10-23-14-90FFF938F152B30A"
"logs/2016-02-01-10-25-40-465E2F36B71741F9" "logs/2016-02-01-10-31-03-CA1510898F68F9FF"
"logs/2016-02-01-10-31-16-6B7F2165642094E5" "logs/2016-02-01-10-31-17-64F85764086EB154"
"logs/2016-02-01-10-39-57-E7C3302E2ECBAF51" "logs/2016-02-01-10-40-08-EB8A7B1695CECB67"
"logs/2016-02-01-10-40-13-A79F7E8E40B5151B" "logs/2016-02-01-10-40-56-F18AA7085783DF53"
"logs/2016-02-01-10-40-57-3B568EBF4995F2A5" "logs/2016-02-01-10-41-14-27A07093139561C9"
"logs/2016-02-01-10-41-29-B08373BAFE62149E" "logs/2016-02-01-10-41-44-DC32EBA17CF8604F" )

Processing a single line

Unfortunately there is no better way of processing these lines than using a regular expression.

I guess it is not nice but at least gets the job done. I still need to run it on bigger data sets but for our use case it works. When there are parenthesized groups in the pattern and re-find finds a match, it returns a vector. The first element is the matching string, the remaining elements are the individual groups. In this case we need to pay attention not only that Amazon uses “-” for null values but also to match all of the possible values of the referer and user agent fields.

#"(\S+)
([a-z0-9][a-z0-9-.]+)
\[(.*\+.*)\]
(\b(?:\d{1,3}\.){3}\d{1,3}\b)
(\S+)
(\S+)
(\S+)
(\S+)
\"(\w+\ \S+ \S+)\"
(\d+|\-)
(\S+)
(\d+|\-)
(\d+|\-)
(\d+|\-)
(\d+|\-)
\"(https?\:\/\/.*\/?|\-)\"
\"(.*)\"
(\S+)"

This works reasonably well, I haven’t found a non matching long entry yet. Now we can extend the s3api with get object content capabilities, that is required for downloading an object from S3.

(defn get-object-content-unsafe
  "Gets the input stream containing the contents of this object.
  This function returns an InputStream, holding onto it result in resource pool
  exhaustion"
  ^S3ObjectInputStream [^S3Object object]
  (.getObjectContent object))

(defn close-object
  "Closes object"
  [^S3Object object]
  (.close object))

(defn get-object-content-safe
  ""
  [^S3Object object]
  (let [
          ^PersistentVector return  (with-open
                                      [rdr (io/reader (get-object-content-unsafe object))]
                                       (reduce conj () (line-seq rdr)))
                            _       (close-object object) ]
    ; returning
    return))

Creating an S3Object is easy, we just need to supply a connection, a bucket and a key.

(defn get-object
  "Gets S3 object"
  [^AmazonS3Client amazon-s3-client ^String bucket-name ^String s3-key]
  (.getObject amazon-s3-client bucket-name s3-key))

Now that we have means to talk to S3 and read files from it we could move on to have a closer look to Avro files and how to write them in Clojure.

Working with Apache Avro in Clojure

Luckily there is a good library that we can use to work with Avro files in Clojure, so we don’t need to re-invent the hot water this time. Abracad provides serialization and deserialization for Clojure data structures with Avro that can be persisted to disk or used in message passing systems like Kafka for example. We are going to persist the data to disk this time.

Before we can write any Avro entry to disk we need a schema for the data that we are collecting here. There are some challenges coming up with the right schema but we can jump these hoops.

{
  "type": "record",
  "name": "amazon-log",
  "namespace": "com.streambright.avro",
  "fields": [{
      "name": "bucket_owner",
      "type": "string",
      "doc": "The canonical user ID of the owner of the source bucket."
    }, {
      "name": "bucket",
      "type": "string",
      "doc": "The name of the bucket that the request was processed against."
    }, {
      "name": "time",
      "type": "string",
      "doc": "The time at which the request was received. The format, using strftime() terminology, is as follows: [%d/%b/%Y:%H:%M:%S %z]"
    }, {
      "name": "remote_ip",
      "type": "string",
      "doc": "The apparent Internet address of the requester. Intermediate proxies and firewalls might obscure the actual address of the machine making the request."
    }, {
      "name": "requester",
      "type": ["null", "string"],
      "doc": "The canonical user ID of the requester, or the string Anonymous  for unauthenticated requests."
    }, {
      "name": "request_id",
      "type": "string",
      "doc": "The request ID is a string generated by Amazon S3 to uniquely identify each request."
    }, {
      "name": "operation",
      "type": "string",
      "doc": "The operation listed here is declared as SOAP.operation, REST.HTTP_method.resource_type, WEBSITE.HTTP_method.resource_type, or BATCH.DELETE.OBJECT."
    }, {
      "name": "key",
      "type": ["null", "string"],
      "doc": "The key part of the request, URL encoded, or  -  if the operation does not take a key parameter."
    }, {
      "name": "request_uri",
      "type": "string",
      "doc": "The Request-URI part of the HTTP request message."
    }, {
      "name": "http_status",
      "type": "int",
      "doc": "The numeric HTTP status code of the response."
    }, {
      "name": "error_code",
      "type": ["null", "string"],
      "doc": "The request ID is a string generated by Amazon S3 to uniquely identify each request."
    }, {
      "name": "bytes_sent",
      "type": "int",
      "doc": "The number of response bytes sent, excluding HTTP protocol overhead, or  -  if zero."
    }, {
      "name": "object_size",
      "type": "int",
      "doc": "The total size of the object in question."
    }, {
      "name": "total_time",
      "type": "int",
      "doc": "The number of milliseconds the request was in flight from the server's perspective. This value is measured from the time your request is received to the time that the last byte of the response is sent."
    }, {
      "name": "turn_around_time",
      "type": "int",
      "doc": "The number of milliseconds that Amazon S3 spent processing your request. This value is measured from the time the last byte of your request was received until the time the first byte of the response was sent."
    }, {
      "name": "referrer",
      "type": ["null", "string"],
      "doc": "The value of the HTTP Referrer header, if present. HTTP user-agents (e.g. browsers) typically set this header to the URL of the linking or embedding page when making a request."
    }, {
      "name": "user_agent",
      "type": ["null", "string"],
      "doc": "The value of the HTTP User-Agent header."
    }, {
      "name": "version_id",
      "type": ["null", "string"],
      "doc": "The version ID in the request, or - if the operation does not take a versionId parameter."
    }
  ],
  "doc:": "Schema for Amazon log format - http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html"
}

First and foremost Avro does not let “-” to be used as the field separator in Avro schemas we have to use “_” instead. Since Amazon allows null values for certain fields we need to reflect that in our schema. Defining nullable fields is easy, we just use an array of possible types for the type as in the example above. There is no strong support for dates yet, we are going to store it as string for now. After we generated the schema we can use it to create an Avro file.

(def schema-file                      "schema/amazon-log.avsc")
(def s3-log-avro-schema-json          (json/parse-stream (io/reader schema-file)))
(def s3-log-avro-schema-fields        (get-avro-schema-fields s3-log-avro-schema-json))
(def s3-log-avro-schema-fields-dash   (replace-field-names s3-log-avro-schema-fields "_" "-"))
(def s3-log-avro-schema               (avro-schema schema-file))
(def int-fields                        #{:turn-around-time :http-status :total-time :bytes-sent :object-size})

s3-logrotate.core=> (def avr-file (avro/data-file-writer "deflate" s3-log-avro-schema (str "data/" "tt.avro")))
#'s3-logrotate.core/avr-file
s3-logrotate.core=> (for [_ (range 10)] (.append avr-file '{
:request-uri "PUT /www.streambrightdata.com/logs/2015-12-08-04-42-35-C1C3217A278399FA HTTP/1.1",
:request-id "BDE6E681EDC7FDB0", :user-agent "aws-internal/3", :remote-ip "10.194.229.49",
:key "logs/2015-12-08-04-42-35-C1C3217A278399FA", :version-id nil, :time "08/Dec/2015:04:42:35 +0000",
:operation "REST.PUT.OBJECT", :object-size 398, :error-code nil, :bytes-sent 0, :referrer nil,
:requester "3272ee65a9", :http-status 200, :turn-around-time 20, :total-time 36,
:bucket "www.streambrightdata.com", :bucket-owner "f2b98d9dd4d"
}))
s3-logrotate.core=> (.close avr-file)
nil
s3-logrotate.core=> (count (with-open [adf (avro/data-file-reader "data/tt.avro")] (doall (seq adf))))
10

Summarising

As you can see Clojure provides pretty good tools to work with Amazon S3 and Avro files. The codebase is pretty small (559 LOC) and it already does a lot. In the next articles in the series I am going to make it asynchronous and faster with core.async (channels) and finish the code to upload the converted files to S3 afterwards. Even though I have processed two months worth of log with s3-logrotate already and it works reasonably well it is just a prototype this stage. I am going to improve it during the upcoming months.

The full code is available here check it out:

https://github.com/StreamBright/s3-logrotate