Newer
Older

Andreas Hamacher
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import yaml
from argparse import ArgumentParser
import subprocess
from pathlib import Path
import re
import sys
import os
from collections import defaultdict
def parse_argument():
parser = ArgumentParser("ansible lint runner with customized spec")
parser.add_argument('--targets', type=str, nargs='*',
help="path to roles or playbook targets")
parser.add_argument('--logdir', type=Path, default=Path( __file__ + '/../logdir').resolve(), nargs='?', help='log directory default to ./ansiblelint/logdir')
args = parser.parse_args()
args.logdir.mkdir(exist_ok=True)
return args
def parse_rule_output(line):
# (filepath, line, rule, severity, rule_desc)
expression = '(.*\.yml):([0-9]+): \[(.*)\] \[(.*)\] (.*$)'
matched = re.match(expression, line)
# print(line)
matched_groups = matched.groups()
return matched_groups
def group_by(output, idx):
res = defaultdict(list)
for i in output:
# print(i)
res[i[idx]].append(i)
return res
cmd_template = "ansible-lint --parseable-severity --nocolor "
outputs = defaultdict()
def main():
exit_code = 0
args = parse_argument()
for item in args.logdir.iterdir():
item.unlink()
cmd = cmd_template
if args.targets is not None:
cmd += ' ' + ' '.join(args.targets)
else:
rolenames = [str(i.resolve())
for i in Path(__file__ + '/../../plays/roles').resolve().iterdir() if i.is_dir()]
cmd += ' ' + ' '.join(rolenames)
# print(cmd)
logfile = args.logdir.joinpath('logfile')
cmd += ' 2>&1 | tee {}'.format(str(logfile.resolve()))
# print(cmd)
output = subprocess.check_output(cmd, shell=True)
print(output.decode())
output = output.decode().splitlines()
# print(output)
output = [parse_rule_output(line) for line in output]
# group by serverity
output = group_by(output, 3)
# print(output.keys())
# print(output.keys())
for k,v in output.items():
# print(k, v)
if (k=='VERY_HIGH') and len(v) != 0:
exit_code = 1
current_log = args.logdir.joinpath(k).resolve()
with current_log.open(mode='w') as f:
f.writelines(['filepath\tline\trule\tserverity\trule description\n'])
f.writelines(['\t'.join(list(i)) + '\n' for i in v])
sys.exit(exit_code)
# return
if __name__ == "__main__":
main()