summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'tools/cron.py')
-rwxr-xr-xtools/cron.py405
1 files changed, 405 insertions, 0 deletions
diff --git a/tools/cron.py b/tools/cron.py
new file mode 100755
index 0000000..01e04af
--- /dev/null
+++ b/tools/cron.py
@@ -0,0 +1,405 @@
+#!/usr/bin/env python
+# kernel-check -- Kernel security information
+# Copyright 2009-2009 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from contextlib import closing
+import xml.etree.cElementTree as et
+import cStringIO
+import datetime
+import logging
+import mmap
+import os
+import portage
+import re
+import sys
+import time
+import urllib
+
+
+class CronError(Exception):
+ def __init__(self, value):
+ self.value = value
+
+NOCVE = 'GENERIC-MAP-NOMATCH'
+NOCVEDESC = 'This GENERIC identifier is not specific to any vulnerability. '\
+ 'GENERIC-MAP-NOMATCH is used by products, databases, and ' \
+ 'services to specify when a particular vulnerability element ' \
+ 'does not map to a corresponding CVE entry.'
+DELAY = 0.2
+SKIP = True
+MINYEAR = 2002
+MAXYEAR = 2020
+NVDURL = 'http://nvd.nist.gov/'
+BZURL = 'https://bugs.gentoo.org/'
+STATE = ['NEW', 'ASSIGNED', 'REOPENED', 'RESOLVED', 'VERIFIED', 'CLOSED']
+RESOLUTION = ['FIXED', 'LATER', 'TEST-REQUEST', 'UPSTREAM', '---']
+BUGORDER = ['bugid', 'reporter', 'reported', 'status', 'arch', 'affected']
+CVEORDER = ['cve', 'published', 'desc', 'severity', 'vector', 'score', 'refs']
+FILEPATH = os.path.dirname(os.path.realpath(__file__))
+PORTDIR = portage.settings['PORTDIR']
+LOGFILE = None #os.path.join(FILEPATH, 'cron.log')
+DIR = {
+ 'tmp' : os.path.join(FILEPATH, 'tmp'),
+ 'out' : os.path.join(PORTDIR, 'metadata', 'kernel'),
+ 'bug' : os.path.join(FILEPATH, 'tmp', 'bug'),
+ 'nvd' : os.path.join(FILEPATH, 'tmp', 'nvd')
+}
+REGEX = {
+ 'bugzilla' : re.compile(r'(?<=bug.cgi\?id=)\d*'),
+ 'grp_all' : re.compile(r'(?<=\()[ (]*CVE-(\d{4})' \
+ r'([-,(){}|, \d]+)(?=\))'),
+ 'm_nomatch' : re.compile(r'.*GENERIC-MAP-NOMATCH.*'),
+ 'grp_split' : re.compile(r'(?<=\D)(\d{4})(?=\D|$)'),
+ 'wb_match' : re.compile(r'\s*\[\s*([^ +<=>]+)\s*([' \
+ r'<=>]{1,2})\s*([^ <=>\]]+' \
+ r')\s*(?:([<=>]{1,2})\s*([' \
+ r'^ \]]+))?\s*\]\s*(.*)'),
+ 'wb_version' : re.compile(r'^(?:\d{1,2}\.){0,3}\d{1,2}' \
+ r'(?:[-_](?:r|rc)?\d{1,2})*$')
+}
+CVES = dict()
+logging.basicConfig(format='[%(asctime)s] %(levelname)-6s : %(message)s',
+ datefmt='%H:%M:%S', filename=LOGFILE, level=logging.DEBUG)
+
+
+def main(argv):
+ 'Main function'
+
+ logging.info('Running cron...')
+
+ current_year = datetime.datetime.now().year
+ if current_year < MINYEAR or current_year > MAXYEAR:
+ current_year = MAXYEAR
+
+ for directory in DIR:
+ if not os.path.isdir(DIR[directory]):
+ os.makedirs(DIR[directory])
+
+ logging.info('Receiving the latest xml file from the nvd')
+
+ receive_file(DIR['nvd'], [NVDURL, 'download/'],'nvdcve-recent.xml')
+
+ if not SKIP:
+ logging.info('Receiving earlier xml files from the nvd')
+
+ for year in xrange(MINYEAR, current_year + 1):
+ receive_file(DIR['nvd'], [NVDURL, 'download/'],
+ 'nvdcve-%s.xml' % str(year))
+
+ logging.info('Receiving the kernel vulnerability list from bugzilla')
+
+ url = [BZURL, 'buglist.cgi?query_format=advanced&component=Kernel']
+
+ for item in STATE:
+ url.append('&bug_status=' + item)
+ for item in RESOLUTION:
+ url.append('&resolution=' + item)
+ url.append('#')
+
+ receive_file(DIR['tmp'], url, 'bugzilla.xml')
+
+ filename = os.path.join(DIR['tmp'], 'bugzilla.xml')
+ with open(filename, 'r+') as buglist_file:
+ memory_map = mmap.mmap(buglist_file.fileno(), 0)
+ buglist = REGEX['bugzilla'].findall(memory_map.read(-1))
+
+ logging.info('Found %i kernel vulnerabilities' % len(buglist))
+
+ logging.info('Creating the nvd dictionary')
+ nvd_dict = parse_nvd_dict(DIR['nvd'])
+
+ logging.info('Creating the xml files')
+
+ created_files = 0
+ for item in buglist:
+ try:
+ receive_file(DIR['bug'], [BZURL, 'show_bug.cgi?ctype=xml&id='],
+ item)
+
+ vul = parse_bugzilla_dict(DIR['bug'], item)
+
+ for cve in vul['cvelist']:
+ if cve == NOCVE:
+ vul['cves'] = [NOCVE]
+ break; #TODO
+ else:
+ try:
+ vul['cves'].append(nvd_dict[cve])
+ except KeyError:
+ raise CronError('No Nvd entry: ' + cve)
+
+ write_cve_file(DIR['out'], vul)
+ created_files += 1
+ time.sleep(DELAY)
+
+ except CronError, e:
+ logging.error('[%s] %s' % (item, e.value))
+
+ logging.info('Created %i xml files' % created_files)
+
+
+def receive_file(directory, url, xml_file):
+ 'Generic download function'
+
+ filename = os.path.join(directory, xml_file)
+ url.append(xml_file)
+
+ try:
+ with closing(cStringIO.StringIO()) as data:
+ with closing(urllib.urlopen(''.join(url))) as resource:
+ data.write(resource.read())
+
+ with open(filename, 'w') as output:
+ output.write(data.getvalue())
+
+ except IOError:
+ logging.error('File %s - Download failed!' % filename)
+
+ logging.debug('File %s - %sKB received' %
+ (filename, os.path.getsize(filename)/1024))
+
+
+def parse_nvd_dict(directory):
+ 'Returns a dictionary from the National Vulnerability Database'
+
+ nvd = dict()
+
+ for nvdfile in os.listdir(directory):
+ filename = os.path.join(directory, nvdfile)
+ try:
+ with open(filename, 'r+') as xml_data:
+ memory_map = mmap.mmap(xml_data.fileno(), 0)
+
+ except SyntaxError:
+ continue
+
+ root = et.parse(memory_map).getroot()
+ namespace = root.tag[:-3]
+
+ for tree in root:
+ cve = {
+ 'cve' : tree.get('name'),
+ 'published' : tree.get('published'),
+ 'severity' : tree.get('severity'),
+ 'vector' : tree.get('CVSS_vector'),
+ 'score' : tree.get('CVSS_score')
+ }
+
+ desc = tree.find('%sdesc/%sdescript/' % (namespace, namespace))
+ if desc is not None:
+ cve['desc'] = desc.text
+
+ reftree = tree.find(namespace + 'refs')
+ reftree.tag = reftree.tag.replace(namespace, '')
+ for elem in reftree.findall('.//*'):
+ elem.tag = elem.tag.replace(namespace, '')
+
+ bugref = et.SubElement(reftree, 'ref')
+ bugref.set('source', 'GENTOO')
+ bugref.set('url', '%sshow_bug.cgi?id=%s' % (BZURL, cve['cve']))
+ bugref.text = 'Gentoo %s' % cve['cve']
+
+ cve['refs'] = reftree
+
+ nvd[cve['cve']] = cve
+
+ return nvd
+
+def parse_bugzilla_dict(directory, bugid):
+ 'Returns a vulnerability class containing information about a bug'
+
+ filename = os.path.join(directory, bugid)
+
+ try:
+ with open(filename, 'r+') as xml_data:
+ memory_map = mmap.mmap(xml_data.fileno(), 0)
+ root = et.parse(memory_map).getroot()[0]
+
+ except IOError:
+ return
+
+ string = str()
+
+ try:
+ string = root.find('short_desc').text
+ except AttributeError:
+ CronError('No Cve')
+
+ try:
+ cvelist = list()
+ string = string.replace('CAN', 'CVE')
+
+ if string in REGEX['m_nomatch'].findall(string):
+ cvelist = [NOCVE]
+
+ for (year, split_cves) in REGEX['grp_all'].findall(string):
+ for cve in REGEX['grp_split'].findall(split_cves):
+ cvelist.append('CVE-%s-%s' % (year, cve))
+
+ vul = {
+ 'bugid' : bugid,
+ 'cvelist' : cvelist,
+ 'cves' : list(),
+ 'arch' : root.find('rep_platform').text.lower(),
+ 'reporter' : root.find('reporter').text.lower(),
+ 'reported' : root.find('creation_ts').text,
+ 'status' : root.find('bug_status').text.lower(),
+ }
+
+ for item in vul['cvelist']:
+ if item != NOCVE:
+ if item not in CVES:
+ CVES[item] = vul.bugid
+ else:
+ raise CronError('Duplicate: ' + CVES[item])
+
+ except AttributeError:
+ pass
+
+ try:
+ wb = root.find('status_whiteboard').text;
+ vul['affected'] = interval_from_whiteboard(wb)
+
+ if vul['affected'] == None:
+ raise CronError('Invalid whiteboard: ' + wb)
+
+ except AttributeError:
+ raise CronError('Empty whiteboard')
+
+ return vul
+
+
+def interval_from_whiteboard(whiteboard):
+ 'Returns a list of intervals within a whiteboard string'
+
+ upper_inc = None
+ upper = None
+ lower_inc = None
+ lower = None
+
+ affected = list()
+
+ while len(whiteboard.strip()) > 0:
+ match = REGEX['wb_match'].match(whiteboard)
+ if not match:
+ return None
+
+ name = match.group(1)
+ comp1 = match.group(2)
+ vers1 = match.group(3)
+ comp2 = match.group(4)
+ vers2 = match.group(5)
+ whiteboard = match.group(6)
+
+ if comp1 == '=' or comp1 == '==':
+ lower_inc = True
+ upper_inc = True
+ lower = vers1
+ upper = vers1
+
+ if not REGEX['wb_version'].match(vers1):
+ return None
+ else:
+ for (char, version) in ((comp1, vers1), (comp2, vers2)):
+
+ if char == '<':
+ upper_inc = False
+ upper = version
+ elif char == '<=' or char == '=<':
+ upper_inc = True
+ upper = version
+ elif char == '>':
+ lower_inc = False
+ lower = version
+ elif char == '>=' or char == '=>':
+ lower_inc = True
+ lower = version
+ elif char:
+ return None
+
+ if version and not REGEX['wb_version'].match(version):
+ return None
+
+ interval = {
+ 'name' : name,
+ 'lower' : lower,
+ 'upper' : upper,
+ 'lower_inc' : lower_inc,
+ 'upper_inc' : upper_inc
+ }
+
+ affected.append(interval)
+
+ return affected
+
+
+def write_cve_file(directory, vul):
+ 'Write a bug file containing all important information for kernel-check'
+
+ filename = os.path.join(directory, vul['bugid'] + '.xml')
+
+ root = et.Element('vulnerability')
+ bugroot = et.SubElement(root, 'bug')
+
+ for element in BUGORDER:
+ if element == 'affected':
+ affectedroot = et.SubElement(bugroot, 'affected')
+ for item in vul['affected']:
+ intnode = et.Element('interval')
+ intnode.set('source', item['name'])
+
+ affectedroot.append(intnode)
+
+ for i in ('lower', 'upper'):
+ if item[i]:
+ node = et.SubElement(intnode, i)
+ node.text = item[i]
+ node.set('inclusive',
+ str(item[i + '_inc']).lower())
+ else:
+ node = et.SubElement(bugroot, element)
+ node.text = vul[element]
+
+ for cve in vul['cves']:
+ cveroot = et.SubElement(root, 'cve')
+ if cve == NOCVE:
+ node = et.SubElement(cveroot, 'cve')
+ node.text = NOCVE
+ node = et.SubElement(cveroot, 'desc')
+ node.text = NOCVEDESC
+ else:
+ for element in CVEORDER:
+ if element == 'refs':
+ cveroot.append(cve[element])
+ else:
+ node = et.SubElement(cveroot, element)
+ node.text = cve[element]
+
+ with open(filename, 'w') as xmlout:
+ __indent__(root)
+ doc = et.ElementTree(root)
+ doc.write(xmlout, encoding='utf-8')
+
+
+def __indent__(node, level=0):
+ 'Indents xml layout for printing'
+
+ i = '\n' + level * ' ' * 4
+ if len(node):
+ if not node.text or not node.text.strip():
+ node.text = i + ' ' * 4
+ if not node.tail or not node.tail.strip():
+ node.tail = i
+ for node in node:
+ __indent__(node, level + 1)
+ if not node.tail or not node.tail.strip():
+ node.tail = i
+ else:
+ if level and (not node.tail or not node.tail.strip()):
+ node.tail = i
+
+
+if __name__ == '__main__':
+ main(sys.argv[1:])
+