Mypal/security/nss/mach

601 lines
21 KiB
Python

#!/usr/bin/env python
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
##########################################################################
#
# This is a collection of helper tools to get stuff done in NSS.
#
import sys
import argparse
import fnmatch
import io
import subprocess
import os
import platform
import shutil
import tarfile
import tempfile
from hashlib import sha256
DEVNULL = open(os.devnull, 'wb')
cwd = os.path.dirname(os.path.abspath(__file__))
def run_tests(test, cycles="standard", env={}, silent=False):
domsuf = os.getenv('DOMSUF', "localdomain")
host = os.getenv('HOST', "localhost")
env = env.copy()
env.update({
"NSS_TESTS": test,
"NSS_CYCLES": cycles,
"DOMSUF": domsuf,
"HOST": host
})
os_env = os.environ
os_env.update(env)
command = cwd + "/tests/all.sh"
stdout = stderr = DEVNULL if silent else None
subprocess.check_call(command, env=os_env, stdout=stdout, stderr=stderr)
class coverityAction(argparse.Action):
def get_coverity_remote_cfg(self):
secret_name = 'project/relman/coverity-nss'
secrets_url = 'http://taskcluster/secrets/v1/secret/{}'.format(secret_name)
print('Using symbol upload token from the secrets service: "{}"'.
format(secrets_url))
import requests
res = requests.get(secrets_url)
res.raise_for_status()
secret = res.json()
cov_config = secret['secret'] if 'secret' in secret else None
if cov_config is None:
print('Ill formatted secret for Coverity. Aborting analysis.')
return None
return cov_config
def get_coverity_local_cfg(self, path):
try:
import yaml
file_handler = open(path)
config = yaml.safe_load(file_handler)
except Exception:
print('Unable to load coverity config from {}'.format(path))
return None
return config
def get_cov_config(self, path):
cov_config = None
if self.local_config:
cov_config = self.get_coverity_local_cfg(path)
else:
cov_config = self.get_coverity_remote_cfg()
if cov_config is None:
print('Unable to load Coverity config.')
return 1
self.cov_analysis_url = cov_config.get('package_url')
self.cov_package_name = cov_config.get('package_name')
self.cov_url = cov_config.get('server_url')
self.cov_port = cov_config.get('server_port')
self.cov_auth = cov_config.get('auth_key')
self.cov_package_ver = cov_config.get('package_ver')
self.cov_full_stack = cov_config.get('full_stack', False)
return 0
def download_coverity(self):
if self.cov_url is None or self.cov_port is None or self.cov_analysis_url is None or self.cov_auth is None:
print('Missing Coverity config options!')
return 1
COVERITY_CONFIG = '''
{
"type": "Coverity configuration",
"format_version": 1,
"settings": {
"server": {
"host": "%s",
"port": %s,
"ssl" : true,
"on_new_cert" : "trust",
"auth_key_file": "%s"
},
"stream": "NSS",
"cov_run_desktop": {
"build_cmd": ["%s"],
"clean_cmd": ["%s", "-cc"],
}
}
}
'''
# Generate the coverity.conf and auth files
build_cmd = os.path.join(cwd, 'build.sh')
cov_auth_path = os.path.join(self.cov_state_path, 'auth')
cov_setup_path = os.path.join(self.cov_state_path, 'coverity.conf')
cov_conf = COVERITY_CONFIG % (self.cov_url, self.cov_port, cov_auth_path, build_cmd, build_cmd)
def download(artifact_url, target):
import requests
resp = requests.get(artifact_url, verify=False, stream=True)
resp.raise_for_status()
# Extract archive into destination
with tarfile.open(fileobj=io.BytesIO(resp.content)) as tar:
tar.extractall(target)
download(self.cov_analysis_url, self.cov_state_path)
with open(cov_auth_path, 'w') as f:
f.write(self.cov_auth)
# Modify it's permission to 600
os.chmod(cov_auth_path, 0o600)
with open(cov_setup_path, 'a') as f:
f.write(cov_conf)
def setup_coverity(self, config_path, storage_path=None, force_download=True):
rc = self.get_cov_config(config_path)
if rc != 0:
return rc
if storage_path is None:
# If storage_path is None we set the context of the coverity into the cwd.
storage_path = cwd
self.cov_state_path = os.path.join(storage_path, "coverity")
if force_download is True or not os.path.exists(self.cov_state_path):
shutil.rmtree(self.cov_state_path, ignore_errors=True)
os.mkdir(self.cov_state_path)
# Download everything that we need for Coverity from out private instance
self.download_coverity()
self.cov_path = os.path.join(self.cov_state_path, self.cov_package_name)
self.cov_run_desktop = os.path.join(self.cov_path, 'bin', 'cov-run-desktop')
self.cov_translate = os.path.join(self.cov_path, 'bin', 'cov-translate')
self.cov_configure = os.path.join(self.cov_path, 'bin', 'cov-configure')
self.cov_work_path = os.path.join(self.cov_state_path, 'data-coverity')
self.cov_idir_path = os.path.join(self.cov_work_path, self.cov_package_ver, 'idir')
if not os.path.exists(self.cov_path) or \
not os.path.exists(self.cov_run_desktop) or \
not os.path.exists(self.cov_translate) or \
not os.path.exists(self.cov_configure):
print('Missing Coverity in {}'.format(self.cov_path))
return 1
return 0
def run_process(self, args, cwd=cwd):
proc = subprocess.Popen(args, cwd=cwd)
status = None
while status is None:
try:
status = proc.wait()
except KeyboardInterrupt:
pass
return status
def cov_is_file_in_source(self, abs_path):
if os.path.islink(abs_path):
abs_path = os.path.realpath(abs_path)
return abs_path
def dump_cov_artifact(self, cov_results, source, output):
import json
def relpath(path):
'''Build path relative to repository root'''
if path.startswith(cwd):
return os.path.relpath(path, cwd)
return path
# Parse Coverity json into structured issues
with open(cov_results) as f:
result = json.load(f)
# Parse the issues to a standard json format
issues_dict = {'files': {}}
files_list = issues_dict['files']
def build_element(issue):
# We look only for main event
event_path = next((event for event in issue['events'] if event['main'] is True), None)
dict_issue = {
'line': issue['mainEventLineNumber'],
'flag': issue['checkerName'],
'message': event_path['eventDescription'],
'extra': {
'category': issue['checkerProperties']['category'],
'stateOnServer': issue['stateOnServer'],
'stack': []
}
}
# Embed all events into extra message
for event in issue['events']:
dict_issue['extra']['stack'].append({'file_path': relpath(event['strippedFilePathname']),
'line_number': event['lineNumber'],
'path_type': event['eventTag'],
'description': event['eventDescription']})
return dict_issue
for issue in result['issues']:
path = self.cov_is_file_in_source(issue['strippedMainEventFilePathname'])
if path is None:
# Since we skip a result we should log it
print('Skipping CID: {0} from file: {1} since it\'s not related with the current patch.'.format(
issue['stateOnServer']['cid'], issue['strippedMainEventFilePathname']))
continue
# If path does not start with `cwd` skip it
if not path.startswith(cwd):
continue
path = relpath(path)
if path in files_list:
files_list[path]['warnings'].append(build_element(issue))
else:
files_list[path] = {'warnings': [build_element(issue)]}
with open(output, 'w') as f:
json.dump(issues_dict, f)
def mutate_paths(self, paths):
for index in xrange(len(paths)):
paths[index] = os.path.abspath(paths[index])
def __call__(self, parser, args, paths, option_string=None):
self.local_config = True
config_path = args.config
storage_path = args.storage
have_paths = True
if len(paths) == 0:
have_paths = False
print('No files have been specified for analysis, running Coverity on the entire project.')
self.mutate_paths(paths)
if config_path is None:
self.local_config = False
print('No coverity config path has been specified, so running in automation.')
if 'NSS_AUTOMATION' not in os.environ:
print('Coverity based static-analysis cannot be ran outside automation.')
return 1
rc = self.setup_coverity(config_path, storage_path, args.force)
if rc != 0:
return 1
# First run cov-run-desktop --setup in order to setup the analysis env
cmd = [self.cov_run_desktop, '--setup']
print('Running {} --setup'.format(self.cov_run_desktop))
rc = self.run_process(args=cmd, cwd=self.cov_path)
if rc != 0:
print('Running {} --setup failed!'.format(self.cov_run_desktop))
return rc
cov_result = os.path.join(self.cov_state_path, 'cov-results.json')
# Once the capture is performed we need to do the actual Coverity Desktop analysis
if have_paths:
cmd = [self.cov_run_desktop, '--json-output-v6', cov_result] + paths
else:
cmd = [self.cov_run_desktop, '--json-output-v6', cov_result, '--analyze-captured-source']
print('Running Coverity Analysis for {}'.format(cmd))
rc = self.run_process(cmd, cwd=self.cov_state_path)
if rc != 0:
print('Coverity Analysis failed!')
# On automation, like try, we want to build an artifact with the results.
if 'NSS_AUTOMATION' in os.environ:
self.dump_cov_artifact(cov_result, cov_result, "/home/worker/nss/coverity/coverity.json")
class cfAction(argparse.Action):
docker_command = None
restorecon = None
def __call__(self, parser, args, values, option_string=None):
self.setDockerCommand(args)
if values:
files = [os.path.relpath(os.path.abspath(x), start=cwd) for x in values]
else:
files = self.modifiedFiles()
# First check if we can run docker.
try:
with open(os.devnull, "w") as f:
subprocess.check_call(
self.docker_command + ["images"], stdout=f)
except:
self.docker_command = None
if self.docker_command is None:
print("warning: running clang-format directly, which isn't guaranteed to be correct")
command = [cwd + "/automation/clang-format/run_clang_format.sh"] + files
repr(command)
subprocess.call(command)
return
files = [os.path.join('/home/worker/nss', x) for x in files]
docker_image = 'clang-format-service:latest'
cf_docker_folder = cwd + "/automation/clang-format"
# Build the image if necessary.
if self.filesChanged(cf_docker_folder):
self.buildImage(docker_image, cf_docker_folder)
# Check if we have the docker image.
try:
command = self.docker_command + [
"image", "inspect", "clang-format-service:latest"
]
with open(os.devnull, "w") as f:
subprocess.check_call(command, stdout=f)
except:
print("I have to build the docker image first.")
self.buildImage(docker_image, cf_docker_folder)
command = self.docker_command + [
'run', '-v', cwd + ':/home/worker/nss:Z', '--rm', '-ti', docker_image
]
# The clang format script returns 1 if something's to do. We don't
# care.
subprocess.call(command + files)
if self.restorecon is not None:
subprocess.call([self.restorecon, '-R', cwd])
def filesChanged(self, path):
hash = sha256()
for dirname, dirnames, files in os.walk(path):
for file in files:
with open(os.path.join(dirname, file), "rb") as f:
hash.update(f.read())
chk_file = cwd + "/.chk"
old_chk = ""
new_chk = hash.hexdigest()
if os.path.exists(chk_file):
with open(chk_file) as f:
old_chk = f.readline()
if old_chk != new_chk:
with open(chk_file, "w+") as f:
f.write(new_chk)
return True
return False
def buildImage(self, docker_image, cf_docker_folder):
command = self.docker_command + [
"build", "-t", docker_image, cf_docker_folder
]
subprocess.check_call(command)
return
def setDockerCommand(self, args):
from distutils.spawn import find_executable
if platform.system() == "Linux":
self.restorecon = find_executable("restorecon")
dcmd = find_executable("docker")
if dcmd is not None:
self.docker_command = [dcmd]
if not args.noroot:
self.docker_command = ["sudo"] + self.docker_command
else:
self.docker_command = None
def modifiedFiles(self):
files = []
if os.path.exists(os.path.join(cwd, '.hg')):
st = subprocess.Popen(['hg', 'status', '-m', '-a'],
cwd=cwd, stdout=subprocess.PIPE, universal_newlines=True)
for line in iter(st.stdout.readline, ''):
files += [line[2:].rstrip()]
elif os.path.exists(os.path.join(cwd, '.git')):
st = subprocess.Popen(['git', 'status', '--porcelain'],
cwd=cwd, stdout=subprocess.PIPE)
for line in iter(st.stdout.readline, ''):
if line[1] == 'M' or line[1] != 'D' and \
(line[0] == 'M' or line[0] == 'A' or
line[0] == 'C' or line[0] == 'U'):
files += [line[3:].rstrip()]
elif line[0] == 'R':
files += [line[line.index(' -> ', beg=4) + 4:]]
else:
print('Warning: neither mercurial nor git detected!')
def isFormatted(x):
return x[-2:] == '.c' or x[-3:] == '.cc' or x[-2:] == '.h'
return [x for x in files if isFormatted(x)]
class buildAction(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
subprocess.check_call([cwd + "/build.sh"] + values)
class testAction(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
run_tests(values)
class covAction(argparse.Action):
def runSslGtests(self, outdir):
env = {
"GTESTFILTER": "*", # Prevent parallel test runs.
"ASAN_OPTIONS": "coverage=1:coverage_dir=" + outdir,
"NSS_DEFAULT_DB_TYPE": "dbm"
}
run_tests("ssl_gtests", env=env, silent=True)
def findSanCovFile(self, outdir):
for file in os.listdir(outdir):
if fnmatch.fnmatch(file, 'ssl_gtest.*.sancov'):
return os.path.join(outdir, file)
return None
def __call__(self, parser, args, values, option_string=None):
outdir = args.outdir
print("Output directory: " + outdir)
print("\nBuild with coverage sanitizers...\n")
sancov_args = "edge,no-prune,trace-pc-guard,trace-cmp"
subprocess.check_call([
os.path.join(cwd, "build.sh"), "-c", "--clang", "--asan", "--enable-legacy-db",
"--sancov=" + sancov_args
])
print("\nRun ssl_gtests to get a coverage report...")
self.runSslGtests(outdir)
print("Done.")
sancov_file = self.findSanCovFile(outdir)
if not sancov_file:
print("Couldn't find .sancov file.")
sys.exit(1)
symcov_file = os.path.join(outdir, "ssl_gtest.symcov")
out = open(symcov_file, 'wb')
# Don't exit immediately on error
symbol_retcode = subprocess.call([
"sancov",
"-blacklist=" + os.path.join(cwd, ".sancov-blacklist"),
"-symbolize", sancov_file,
os.path.join(cwd, "../dist/Debug/bin/ssl_gtest")
], stdout=out)
out.close()
print("\nCopying ssl_gtests to artifacts...")
shutil.copyfile(os.path.join(cwd, "../dist/Debug/bin/ssl_gtest"),
os.path.join(outdir, "ssl_gtest"))
print("\nCoverage report: " + symcov_file)
if symbol_retcode > 0:
print("sancov failed to symbolize with return code {}".format(symbol_retcode))
sys.exit(symbol_retcode)
class commandsAction(argparse.Action):
commands = []
def __call__(self, parser, args, values, option_string=None):
for c in commandsAction.commands:
print(c)
def parse_arguments():
parser = argparse.ArgumentParser(
description='NSS helper script. ' +
'Make sure to separate sub-command arguments with --.')
subparsers = parser.add_subparsers()
parser_build = subparsers.add_parser(
'build', help='All arguments are passed to build.sh')
parser_build.add_argument(
'build_args', nargs='*', help="build arguments", action=buildAction)
parser_cf = subparsers.add_parser(
'clang-format',
help="""
Run clang-format.
By default this runs against any files that you have modified. If
there are no modified files, it checks everything.
""")
parser_cf.add_argument(
'--noroot',
help='On linux, suppress the use of \'sudo\' for running docker.',
action='store_true')
parser_cf.add_argument(
'<file/dir>',
nargs='*',
help="Specify files or directories to run clang-format on",
action=cfAction)
parser_sa = subparsers.add_parser(
'static-analysis',
help="""
Run static-analysis tools based on coverity.
By default this runs only on automation and provides a list of issues that
are only present locally.
""")
parser_sa.add_argument(
'--config', help='Path to Coverity config file. Only used for local runs.',
default=None)
parser_sa.add_argument(
'--storage', help="""
Path where to store Coverity binaries and results. If none, the base repository will be used.
""",
default=None)
parser_sa.add_argument(
'--force', help='Force the re-download of the coverity artefact.',
action='store_true')
parser_sa.add_argument(
'<file>',
nargs='*',
help="Specify files to run Coverity on. If no files are specified the analysis will check the entire project.",
action=coverityAction)
parser_test = subparsers.add_parser(
'tests', help='Run tests through tests/all.sh.')
tests = [
"cipher", "lowhash", "chains", "cert", "dbtests", "tools", "fips",
"sdr", "crmf", "smime", "ssl", "ocsp", "merge", "pkits", "ec",
"gtests", "ssl_gtests", "bogo", "interop", "policy"
]
parser_test.add_argument(
'test', choices=tests, help="Available tests", action=testAction)
parser_cov = subparsers.add_parser(
'coverage', help='Generate coverage report')
cov_modules = ["ssl_gtests"]
parser_cov.add_argument(
'--outdir', help='Output directory for coverage report data.',
default=tempfile.mkdtemp())
parser_cov.add_argument(
'module', choices=cov_modules, help="Available coverage modules",
action=covAction)
parser_commands = subparsers.add_parser(
'mach-completion',
help="list commands")
parser_commands.add_argument(
'mach-completion',
nargs='*',
action=commandsAction)
commandsAction.commands = [c for c in subparsers.choices]
return parser.parse_args()
def main():
parse_arguments()
if __name__ == '__main__':
main()