Skip to content
Snippets Groups Projects
run_lint.py 2.76 KiB
import yaml
from argparse import ArgumentParser
import subprocess
from pathlib import Path
import re
import sys
from collections import defaultdict
def parse_argument():
    parser = ArgumentParser("ansible lint runner with customized spec")
    parser.add_argument('--spec', type=str, nargs='?', help="yaml file which defines warning and error rules")
    parser.add_argument('targets', type=str, nargs='+', help="role or task to lint")
    parser.add_argument('--logdir', type=Path, default=Path('./ansiblelint/logdir'), help='log directory default to ./ansiblelint/logdir')

    args = parser.parse_args()
    
    args.logdir.mkdir(exist_ok=True)
    return args
def parse_rule_output(line):
    # (filepath, line, rule, severity, rule_desc)
    expression = '(.*\.yml):([0-9]+): \[(.*)\] \[(.*)\] (.*$)'
    matched = re.match(expression, line)
    matched_groups = matched.groups()
    return matched_groups
def group_by(output, idx):
    res = defaultdict(list)
    for i in output:
        # print(i)
        res[i[idx]].append(i)
    return res
cmd_template = "ansible-lint --parseable-severity --nocolor "
outputs = defaultdict()
def main():
    exit_code = 0
    args = parse_argument()
    if args.spec is not None:
        with open(args.spec, 'r') as f:
            spec = yaml.load(f)
        for k,v in spec.items():
            cmd = cmd_template + ' '.join(['-t ' + str(tag) for tag in v])
            cmd += ' ' + ' '.join(args.targets)
            logfile = args.logdir.joinpath(k)
            cmd += ' 2>&1 | tee {}'.format(str(logfile.resolve()))
            output = subprocess.check_output(cmd, shell=True)
            output = output.decode().splitlines()
            output = [parse_rule_output(line) for line in output]
            outputs[k] = output
            # outputs.append(output)
    else:
        cmd = cmd_template
        cmd += ' ' + ' '.join(args.targets)
        logfile = args.logdir.joinpath('logfile')
        cmd += ' 2>&1 | tee {}'.format(str(logfile.resolve()))
        # print(cmd)
        output = subprocess.check_output(cmd, shell=True)
        output = output.decode().splitlines()
        output = [parse_rule_output(line) for line in output]

        # group by serverity
        output = group_by(output, 3)
        # print(output.keys())

        for k,v in output.items():
            if (k == 'HIGH' or k=='VERY_HIGH') and len(v) != 0:
                exit_code = 1
            current_log = args.logdir.joinpath(k).resolve()

            with current_log.open(mode='w') as f:
                f.writelines(['filepath\tline\trule\tserverity\trule description\n'])
                # print(v[-1])
                # return
                f.writelines(['\t'.join(list(i)) + '\n' for i in v])
        sys.exit(exit_code)
    return
if __name__ == "__main__":
    main()