#!/usr/bin/env python # The MIT License (MIT) # Copyright (c) 2016 Michael-Keith Bernard # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import re import copy import logging import argparse import sqlite3 import datetime import calendar import itertools __author__ = "Michael-Keith Bernard (mkbernard@opentable.com)" def with_match(name, re): return r'(?P<{}>{})'.format(name, re) IP_RE = r"\d+(?:\.\d+){3}" QUOTED_STRING_RE = r'"[^\"]+"' TIMESTAMP_RE = r'\[[^\]]+\]' IP_LIST_RE = r'(?:{ip}(?:\,\s+{ip})*)|-'.format(ip=IP_RE) NUMERAL_RE = r'\d+(?:\.\d+)?' LOG_PARTS = [ with_match('remote_addr', IP_RE), with_match('user', r'- -'), with_match('timestamp', TIMESTAMP_RE), with_match('request', QUOTED_STRING_RE), with_match('response_code', r'\d+'), with_match('response_size', r'\d+'), with_match('referer', QUOTED_STRING_RE), with_match('user_agent', QUOTED_STRING_RE), with_match('forwarded_for', IP_LIST_RE), with_match('nginx_time', NUMERAL_RE), with_match('upstream_time', NUMERAL_RE), with_match('pipelined', r'[.p]'), ] LOG_RE = r"^{}$".format("\s+".join(LOG_PARTS)) CREATE_REQUESTS_TABLE = """ create table if not exists requests ( id integer primary key, source text, upstream_time float, nginx_time float, response_size integer, response_code integer, user_agent text, referer text, remote_addr text, forwarded_for text, pipelined boolean, timestamp text, unix_epoch float, method text, path text, query_string text, protocol text ); """ def parse_args(args=None, parse=True): """Parse command-line arguments""" parser = argparse.ArgumentParser( description="Log processor for Graphite Web nginx") parser.add_argument("logfile", help="Path to nginx log file") parser.add_argument("-d" ,"--db", default=":memory:", help="Path to sqlite3 database") parser.add_argument("-s" ,"--source", default="graphite", help="Source of logs (eg which web server)") parser.add_argument("-b" ,"--batch", default=500, type=int, help="Batch size for inserts") res = parser.parse_args(args) if parse else None return parser, res def migrate_db(db): """Run database migrations (create tables, etc)""" cur = db.cursor() cur.execute(CREATE_REQUESTS_TABLE) db.commit() def setup_db(path, migrations=True): """Initialize database connection""" db = sqlite3.connect(path) db.row_factory = sqlite3.Row if migrations: migrate_db(db) return db def parse_log(log_line): """Parse a single log line""" match = re.match(LOG_RE, log_line.strip()) return match.groupdict() if match else None def parse_date(timestamp): """Parse the nginx time format into datetime""" fmt = '[%d/%b/%Y:%H:%M:%S +0000]' dt = datetime.datetime.strptime(timestamp, fmt) return dt, calendar.timegm(dt.timetuple()) def parse_request(request): """Parse the request into method, path, query string, and HTTP protocol""" req = request[1:-1] method, rest = req.split(" ", 1) full_path, protocol = rest.rsplit(" ", 1) parts = full_path.split("?", 1) path, qs = parts if len(parts) > 1 else (parts[0], "") return method, path, qs, protocol def normalize_log(parsed): """Clean up a parsed log data""" n = {} ts, epoch = parse_date(parsed['timestamp']) method, path, qs, protocol = parse_request(parsed['request']) n['upstream_time'] = float(parsed['upstream_time']) n['nginx_time'] = float(parsed['nginx_time']) n['response_size'] = int(parsed['response_size']) n['response_code'] = int(parsed['response_code']) n['user_agent'] = parsed['user_agent'][1:-1] n['referer'] = parsed['referer'][1:-1] n['remote_addr'] = parsed['remote_addr'] n['forwarded_for'] = parsed['forwarded_for'].split(", ") n['pipelined'] = parsed['pipelined'] == 'p' n['timestamp'] = ts n['unix_epoch'] = epoch n['method'] = method n['path'] = path n['query_string'] = qs n['protocol'] = protocol return n def prepare_log(log, **kwargs): """Prepare a normalized log for database insertion""" p = copy.deepcopy(log) p['timestamp'] = p['timestamp'].isoformat() p['forwarded_for'] = ",".join(p['forwarded_for']) p.update(kwargs) return p def insert_log(cur, log): """Insert a prepared log line into the database""" items = log.items() keys = [e[0] for e in items] values = [e[1] for e in items] sql = """insert into requests ({}) values ({})""".format( ", ".join(keys), ", ".join(["?"] * len(keys))) cur.execute(sql, values) def load_db(db, logs, **kwargs): """Load logs into database""" cur = db.cursor() for (_raw, _parsed, normalized) in logs: prepared = prepare_log(normalized, **kwargs) insert_log(cur, prepared) db.commit() def batches(l, length=500): it = iter(l) while True: b = list(itertools.islice(it, length)) if b: yield b else: break def main(): _parser, args = parse_args() db = setup_db(args.db) def process(f): for line in open(f): log = line.strip() parsed = parse_log(log) if not parsed: logging.debug("Invalid log: %s", log) continue normalized = normalize_log(parsed) yield log, parsed, normalized for batch in batches(process(args.logfile), args.batch): load_db(db, batch, source=args.source) if __name__ == "__main__": main()