diff options
Diffstat (limited to 'tools/cron.py')
-rwxr-xr-x | tools/cron.py | 405 |
1 files changed, 405 insertions, 0 deletions
diff --git a/tools/cron.py b/tools/cron.py new file mode 100755 index 0000000..01e04af --- /dev/null +++ b/tools/cron.py @@ -0,0 +1,405 @@ +#!/usr/bin/env python +# kernel-check -- Kernel security information +# Copyright 2009-2009 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from contextlib import closing +import xml.etree.cElementTree as et +import cStringIO +import datetime +import logging +import mmap +import os +import portage +import re +import sys +import time +import urllib + + +class CronError(Exception): + def __init__(self, value): + self.value = value + +NOCVE = 'GENERIC-MAP-NOMATCH' +NOCVEDESC = 'This GENERIC identifier is not specific to any vulnerability. '\ + 'GENERIC-MAP-NOMATCH is used by products, databases, and ' \ + 'services to specify when a particular vulnerability element ' \ + 'does not map to a corresponding CVE entry.' +DELAY = 0.2 +SKIP = True +MINYEAR = 2002 +MAXYEAR = 2020 +NVDURL = 'http://nvd.nist.gov/' +BZURL = 'https://bugs.gentoo.org/' +STATE = ['NEW', 'ASSIGNED', 'REOPENED', 'RESOLVED', 'VERIFIED', 'CLOSED'] +RESOLUTION = ['FIXED', 'LATER', 'TEST-REQUEST', 'UPSTREAM', '---'] +BUGORDER = ['bugid', 'reporter', 'reported', 'status', 'arch', 'affected'] +CVEORDER = ['cve', 'published', 'desc', 'severity', 'vector', 'score', 'refs'] +FILEPATH = os.path.dirname(os.path.realpath(__file__)) +PORTDIR = portage.settings['PORTDIR'] +LOGFILE = None #os.path.join(FILEPATH, 'cron.log') +DIR = { + 'tmp' : os.path.join(FILEPATH, 'tmp'), + 'out' : os.path.join(PORTDIR, 'metadata', 'kernel'), + 'bug' : os.path.join(FILEPATH, 'tmp', 'bug'), + 'nvd' : os.path.join(FILEPATH, 'tmp', 'nvd') +} +REGEX = { + 'bugzilla' : re.compile(r'(?<=bug.cgi\?id=)\d*'), + 'grp_all' : re.compile(r'(?<=\()[ (]*CVE-(\d{4})' \ + r'([-,(){}|, \d]+)(?=\))'), + 'm_nomatch' : re.compile(r'.*GENERIC-MAP-NOMATCH.*'), + 'grp_split' : re.compile(r'(?<=\D)(\d{4})(?=\D|$)'), + 'wb_match' : re.compile(r'\s*\[\s*([^ +<=>]+)\s*([' \ + r'<=>]{1,2})\s*([^ <=>\]]+' \ + r')\s*(?:([<=>]{1,2})\s*([' \ + r'^ \]]+))?\s*\]\s*(.*)'), + 'wb_version' : re.compile(r'^(?:\d{1,2}\.){0,3}\d{1,2}' \ + r'(?:[-_](?:r|rc)?\d{1,2})*$') +} +CVES = dict() +logging.basicConfig(format='[%(asctime)s] %(levelname)-6s : %(message)s', + datefmt='%H:%M:%S', filename=LOGFILE, level=logging.DEBUG) + + +def main(argv): + 'Main function' + + logging.info('Running cron...') + + current_year = datetime.datetime.now().year + if current_year < MINYEAR or current_year > MAXYEAR: + current_year = MAXYEAR + + for directory in DIR: + if not os.path.isdir(DIR[directory]): + os.makedirs(DIR[directory]) + + logging.info('Receiving the latest xml file from the nvd') + + receive_file(DIR['nvd'], [NVDURL, 'download/'],'nvdcve-recent.xml') + + if not SKIP: + logging.info('Receiving earlier xml files from the nvd') + + for year in xrange(MINYEAR, current_year + 1): + receive_file(DIR['nvd'], [NVDURL, 'download/'], + 'nvdcve-%s.xml' % str(year)) + + logging.info('Receiving the kernel vulnerability list from bugzilla') + + url = [BZURL, 'buglist.cgi?query_format=advanced&component=Kernel'] + + for item in STATE: + url.append('&bug_status=' + item) + for item in RESOLUTION: + url.append('&resolution=' + item) + url.append('#') + + receive_file(DIR['tmp'], url, 'bugzilla.xml') + + filename = os.path.join(DIR['tmp'], 'bugzilla.xml') + with open(filename, 'r+') as buglist_file: + memory_map = mmap.mmap(buglist_file.fileno(), 0) + buglist = REGEX['bugzilla'].findall(memory_map.read(-1)) + + logging.info('Found %i kernel vulnerabilities' % len(buglist)) + + logging.info('Creating the nvd dictionary') + nvd_dict = parse_nvd_dict(DIR['nvd']) + + logging.info('Creating the xml files') + + created_files = 0 + for item in buglist: + try: + receive_file(DIR['bug'], [BZURL, 'show_bug.cgi?ctype=xml&id='], + item) + + vul = parse_bugzilla_dict(DIR['bug'], item) + + for cve in vul['cvelist']: + if cve == NOCVE: + vul['cves'] = [NOCVE] + break; #TODO + else: + try: + vul['cves'].append(nvd_dict[cve]) + except KeyError: + raise CronError('No Nvd entry: ' + cve) + + write_cve_file(DIR['out'], vul) + created_files += 1 + time.sleep(DELAY) + + except CronError, e: + logging.error('[%s] %s' % (item, e.value)) + + logging.info('Created %i xml files' % created_files) + + +def receive_file(directory, url, xml_file): + 'Generic download function' + + filename = os.path.join(directory, xml_file) + url.append(xml_file) + + try: + with closing(cStringIO.StringIO()) as data: + with closing(urllib.urlopen(''.join(url))) as resource: + data.write(resource.read()) + + with open(filename, 'w') as output: + output.write(data.getvalue()) + + except IOError: + logging.error('File %s - Download failed!' % filename) + + logging.debug('File %s - %sKB received' % + (filename, os.path.getsize(filename)/1024)) + + +def parse_nvd_dict(directory): + 'Returns a dictionary from the National Vulnerability Database' + + nvd = dict() + + for nvdfile in os.listdir(directory): + filename = os.path.join(directory, nvdfile) + try: + with open(filename, 'r+') as xml_data: + memory_map = mmap.mmap(xml_data.fileno(), 0) + + except SyntaxError: + continue + + root = et.parse(memory_map).getroot() + namespace = root.tag[:-3] + + for tree in root: + cve = { + 'cve' : tree.get('name'), + 'published' : tree.get('published'), + 'severity' : tree.get('severity'), + 'vector' : tree.get('CVSS_vector'), + 'score' : tree.get('CVSS_score') + } + + desc = tree.find('%sdesc/%sdescript/' % (namespace, namespace)) + if desc is not None: + cve['desc'] = desc.text + + reftree = tree.find(namespace + 'refs') + reftree.tag = reftree.tag.replace(namespace, '') + for elem in reftree.findall('.//*'): + elem.tag = elem.tag.replace(namespace, '') + + bugref = et.SubElement(reftree, 'ref') + bugref.set('source', 'GENTOO') + bugref.set('url', '%sshow_bug.cgi?id=%s' % (BZURL, cve['cve'])) + bugref.text = 'Gentoo %s' % cve['cve'] + + cve['refs'] = reftree + + nvd[cve['cve']] = cve + + return nvd + +def parse_bugzilla_dict(directory, bugid): + 'Returns a vulnerability class containing information about a bug' + + filename = os.path.join(directory, bugid) + + try: + with open(filename, 'r+') as xml_data: + memory_map = mmap.mmap(xml_data.fileno(), 0) + root = et.parse(memory_map).getroot()[0] + + except IOError: + return + + string = str() + + try: + string = root.find('short_desc').text + except AttributeError: + CronError('No Cve') + + try: + cvelist = list() + string = string.replace('CAN', 'CVE') + + if string in REGEX['m_nomatch'].findall(string): + cvelist = [NOCVE] + + for (year, split_cves) in REGEX['grp_all'].findall(string): + for cve in REGEX['grp_split'].findall(split_cves): + cvelist.append('CVE-%s-%s' % (year, cve)) + + vul = { + 'bugid' : bugid, + 'cvelist' : cvelist, + 'cves' : list(), + 'arch' : root.find('rep_platform').text.lower(), + 'reporter' : root.find('reporter').text.lower(), + 'reported' : root.find('creation_ts').text, + 'status' : root.find('bug_status').text.lower(), + } + + for item in vul['cvelist']: + if item != NOCVE: + if item not in CVES: + CVES[item] = vul.bugid + else: + raise CronError('Duplicate: ' + CVES[item]) + + except AttributeError: + pass + + try: + wb = root.find('status_whiteboard').text; + vul['affected'] = interval_from_whiteboard(wb) + + if vul['affected'] == None: + raise CronError('Invalid whiteboard: ' + wb) + + except AttributeError: + raise CronError('Empty whiteboard') + + return vul + + +def interval_from_whiteboard(whiteboard): + 'Returns a list of intervals within a whiteboard string' + + upper_inc = None + upper = None + lower_inc = None + lower = None + + affected = list() + + while len(whiteboard.strip()) > 0: + match = REGEX['wb_match'].match(whiteboard) + if not match: + return None + + name = match.group(1) + comp1 = match.group(2) + vers1 = match.group(3) + comp2 = match.group(4) + vers2 = match.group(5) + whiteboard = match.group(6) + + if comp1 == '=' or comp1 == '==': + lower_inc = True + upper_inc = True + lower = vers1 + upper = vers1 + + if not REGEX['wb_version'].match(vers1): + return None + else: + for (char, version) in ((comp1, vers1), (comp2, vers2)): + + if char == '<': + upper_inc = False + upper = version + elif char == '<=' or char == '=<': + upper_inc = True + upper = version + elif char == '>': + lower_inc = False + lower = version + elif char == '>=' or char == '=>': + lower_inc = True + lower = version + elif char: + return None + + if version and not REGEX['wb_version'].match(version): + return None + + interval = { + 'name' : name, + 'lower' : lower, + 'upper' : upper, + 'lower_inc' : lower_inc, + 'upper_inc' : upper_inc + } + + affected.append(interval) + + return affected + + +def write_cve_file(directory, vul): + 'Write a bug file containing all important information for kernel-check' + + filename = os.path.join(directory, vul['bugid'] + '.xml') + + root = et.Element('vulnerability') + bugroot = et.SubElement(root, 'bug') + + for element in BUGORDER: + if element == 'affected': + affectedroot = et.SubElement(bugroot, 'affected') + for item in vul['affected']: + intnode = et.Element('interval') + intnode.set('source', item['name']) + + affectedroot.append(intnode) + + for i in ('lower', 'upper'): + if item[i]: + node = et.SubElement(intnode, i) + node.text = item[i] + node.set('inclusive', + str(item[i + '_inc']).lower()) + else: + node = et.SubElement(bugroot, element) + node.text = vul[element] + + for cve in vul['cves']: + cveroot = et.SubElement(root, 'cve') + if cve == NOCVE: + node = et.SubElement(cveroot, 'cve') + node.text = NOCVE + node = et.SubElement(cveroot, 'desc') + node.text = NOCVEDESC + else: + for element in CVEORDER: + if element == 'refs': + cveroot.append(cve[element]) + else: + node = et.SubElement(cveroot, element) + node.text = cve[element] + + with open(filename, 'w') as xmlout: + __indent__(root) + doc = et.ElementTree(root) + doc.write(xmlout, encoding='utf-8') + + +def __indent__(node, level=0): + 'Indents xml layout for printing' + + i = '\n' + level * ' ' * 4 + if len(node): + if not node.text or not node.text.strip(): + node.text = i + ' ' * 4 + if not node.tail or not node.tail.strip(): + node.tail = i + for node in node: + __indent__(node, level + 1) + if not node.tail or not node.tail.strip(): + node.tail = i + else: + if level and (not node.tail or not node.tail.strip()): + node.tail = i + + +if __name__ == '__main__': + main(sys.argv[1:]) + |