|
| 1 | +#!/usr/bin/env python3 |
| 2 | +import sys |
| 3 | +import os |
| 4 | +import re |
| 5 | + |
| 6 | +# Premium styling terminal colors |
| 7 | +BLUE = '\033[94m' |
| 8 | +GREEN = '\033[92m' |
| 9 | +YELLOW = '\033[93m' |
| 10 | +RED = '\033[91m' |
| 11 | +BOLD = '\033[1m' |
| 12 | +NC = '\033[0m' |
| 13 | + |
| 14 | +# Default CRAP threshold limit |
| 15 | +DEFAULT_CRAP_LIMIT = 30.0 |
| 16 | + |
| 17 | +def calculate_crap(complexity, coverage): |
| 18 | + """ |
| 19 | + CRAP = C^2 * (1 - Cov)^3 + C |
| 20 | + complexity: estimated cyclomatic complexity (int) |
| 21 | + coverage: coverage fraction from 0.0 to 1.0 (float) |
| 22 | + """ |
| 23 | + return (complexity ** 2) * ((1.0 - coverage) ** 3) + complexity |
| 24 | + |
| 25 | +def estimate_complexity(file_path): |
| 26 | + """ |
| 27 | + Estimate cyclomatic complexity by counting control flow decision points. |
| 28 | + Decision points = 1 + counts of (if, for, case, &&, ||, select, catch, while) |
| 29 | + """ |
| 30 | + if not os.path.exists(file_path): |
| 31 | + return 0 |
| 32 | + |
| 33 | + complexity = 1 |
| 34 | + |
| 35 | + # Regular expressions for control flow keywords |
| 36 | + # Matches Go and TS/JS decision structures |
| 37 | + patterns = [ |
| 38 | + r'\bif\b', |
| 39 | + r'\bfor\b', |
| 40 | + r'\bwhile\b', |
| 41 | + r'\bcase\b', |
| 42 | + r'\bselect\b', |
| 43 | + r'\bcatch\b', |
| 44 | + r'&&', |
| 45 | + r'\|\|' |
| 46 | + ] |
| 47 | + |
| 48 | + combined_pattern = re.compile('|'.join(patterns)) |
| 49 | + |
| 50 | + try: |
| 51 | + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
| 52 | + for line in f: |
| 53 | + # Strip comments to prevent false positives in text/docs |
| 54 | + line_stripped = strip_comments(line, file_path) |
| 55 | + matches = combined_pattern.findall(line_stripped) |
| 56 | + complexity += len(matches) |
| 57 | + except Exception as e: |
| 58 | + print(f"{RED}Error reading {file_path}: {e}{NC}") |
| 59 | + |
| 60 | + return complexity |
| 61 | + |
| 62 | +def strip_comments(line, file_path): |
| 63 | + """ |
| 64 | + Remove comments from line based on file type. |
| 65 | + """ |
| 66 | + if file_path.endswith(('.go', '.ts', '.tsx', '.js', '.jsx')): |
| 67 | + # Double slash comments |
| 68 | + idx = line.find('//') |
| 69 | + if idx != -1: |
| 70 | + return line[:idx] |
| 71 | + return line |
| 72 | + |
| 73 | +def parse_go_coverage(cover_file): |
| 74 | + """ |
| 75 | + Parse Go coverage profile (coverage.out) |
| 76 | + Returns a dict mapping filenames to their coverage fraction [0.0 - 1.0]. |
| 77 | + """ |
| 78 | + cov = {} |
| 79 | + if not cover_file or not os.path.exists(cover_file): |
| 80 | + return cov |
| 81 | + |
| 82 | + try: |
| 83 | + with open(cover_file, 'r') as f: |
| 84 | + for line in f: |
| 85 | + # Format: github.com/owner/repo/path/file.go:line.col,line.col statements count |
| 86 | + match = re.match(r'^([^:]+):(\d+)\.\d+,(\d+)\.\d+\s+(\d+)\s+(\d+)', line) |
| 87 | + if match: |
| 88 | + filepath, _, _, statements, count = match.groups() |
| 89 | + statements = int(statements) |
| 90 | + count = int(count) |
| 91 | + |
| 92 | + # Normalize local filename path (strip import path prefix) |
| 93 | + # e.g., github.com/cloud-shuttle/drover/pkg/db/db.go -> pkg/db/db.go |
| 94 | + # We will match by basename/suffix later if needed |
| 95 | + filename = filepath.split('/')[-1] |
| 96 | + |
| 97 | + if filepath not in cov: |
| 98 | + cov[filepath] = {"covered": 0, "total": 0} |
| 99 | + cov[filepath]["total"] += statements |
| 100 | + if count > 0: |
| 101 | + cov[filepath]["covered"] += statements |
| 102 | + except Exception as e: |
| 103 | + print(f"{YELLOW}Warning: Error parsing coverage file {cover_file}: {e}{NC}") |
| 104 | + |
| 105 | + # Convert statement counts to fractions |
| 106 | + result = {} |
| 107 | + for path, stats in cov.items(): |
| 108 | + if stats["total"] > 0: |
| 109 | + result[path] = stats["covered"] / stats["total"] |
| 110 | + else: |
| 111 | + result[path] = 1.0 |
| 112 | + return result |
| 113 | + |
| 114 | +def find_matching_coverage(local_file, coverage_dict): |
| 115 | + """ |
| 116 | + Match a local relative file path to the keys in the coverage dict. |
| 117 | + Go coverage paths use import names, e.g., github.com/cloud-shuttle/drover/pkg/db/db.go |
| 118 | + """ |
| 119 | + for cov_path, fraction in coverage_dict.items(): |
| 120 | + if cov_path.endswith(local_file): |
| 121 | + return fraction |
| 122 | + return 0.0 # Default to 0% coverage if not found in coverage profile |
| 123 | + |
| 124 | +def audit_directory(root_dir, coverage_file=None, crap_limit=DEFAULT_CRAP_LIMIT, file_extensions=('.go', '.ts', '.tsx')): |
| 125 | + """ |
| 126 | + Audit all code files in the directory. |
| 127 | + """ |
| 128 | + print(f"\n{BLUE}Scanning directory: {BOLD}{root_dir}{NC}") |
| 129 | + if coverage_file: |
| 130 | + print(f"{BLUE}Using coverage profile: {BOLD}{coverage_file}{NC}") |
| 131 | + else: |
| 132 | + print(f"{YELLOW}No coverage profile provided. Assuming 0% coverage for CRAP scoring.{NC}") |
| 133 | + |
| 134 | + coverage_dict = parse_go_coverage(coverage_file) |
| 135 | + |
| 136 | + files_audited = 0 |
| 137 | + violations = [] |
| 138 | + audit_results = [] |
| 139 | + |
| 140 | + for dirpath, _, filenames in os.walk(root_dir): |
| 141 | + # Exclude directories |
| 142 | + if any(part in dirpath for part in ['.git', 'node_modules', 'vendor', '.tmp', 'dist', 'testdata', '.drover-code-workers']): |
| 143 | + continue |
| 144 | + |
| 145 | + for filename in filenames: |
| 146 | + if filename.endswith(file_extensions) and not filename.endswith('_test.go'): |
| 147 | + full_path = os.path.join(dirpath, filename) |
| 148 | + rel_path = os.path.relpath(full_path, root_dir) |
| 149 | + |
| 150 | + # Estimate complexity |
| 151 | + complexity = estimate_complexity(full_path) |
| 152 | + |
| 153 | + # Retrieve coverage fraction |
| 154 | + cov_fraction = find_matching_coverage(rel_path, coverage_dict) |
| 155 | + |
| 156 | + # Compute CRAP score |
| 157 | + crap_score = calculate_crap(complexity, cov_fraction) |
| 158 | + |
| 159 | + files_audited += 1 |
| 160 | + |
| 161 | + result = { |
| 162 | + "file": rel_path, |
| 163 | + "complexity": complexity, |
| 164 | + "coverage": cov_fraction * 100.0, |
| 165 | + "crap": crap_score |
| 166 | + } |
| 167 | + audit_results.append(result) |
| 168 | + |
| 169 | + if crap_score > crap_limit: |
| 170 | + violations.append(result) |
| 171 | + |
| 172 | + # Sort results by CRAP score descending |
| 173 | + audit_results.sort(key=lambda x: x["crap"], reverse=True) |
| 174 | + |
| 175 | + # Print Premium Visual Report |
| 176 | + print(f"\n{BOLD}Audited {files_audited} files:{NC}") |
| 177 | + print(f"┌{'─'*60}┬{'─'*12}┬{'─'*10}┬{'─'*12}┬{'─'*10}┐") |
| 178 | + print(f"│ {'File Path':<58} │ {'Complexity':^10} │ {'Coverage':^8} │ {'CRAP Score':^10} │ {'Status':^8} │") |
| 179 | + print(f"├{'─'*60}┼{'─'*12}┼{'─'*10}┼{'─'*12}┼{'─'*10}┤") |
| 180 | + |
| 181 | + # Show top 15 highest risk files or those with violations |
| 182 | + show_limit = max(15, len(violations)) |
| 183 | + for r in audit_results[:show_limit]: |
| 184 | + status_str = f"{RED}{BOLD}FAIL{NC}" if r["crap"] > crap_limit else f"{GREEN}PASS{NC}" |
| 185 | + file_path_short = r["file"] |
| 186 | + if len(file_path_short) > 56: |
| 187 | + file_path_short = "..." + file_path_short[-53:] |
| 188 | + |
| 189 | + print(f"│ {file_path_short:<58} │ {r['complexity']:^10d} │ {r['coverage']:^6.1f}% │ {r['crap']:^10.2f} │ {status_str:^17} │") |
| 190 | + |
| 191 | + print(f"└{'─'*60}┴{'─'*12}┴{'─'*10}┴{'─'*12}┴{'─'*10}┘") |
| 192 | + |
| 193 | + # Print Summary statistics |
| 194 | + print(f"\n{BOLD}Summary Statistics:{NC}") |
| 195 | + print(f" • Total files audited: {files_audited}") |
| 196 | + print(f" • CRAP Limit allowed: {crap_limit:.2f}") |
| 197 | + if len(violations) > 0: |
| 198 | + print(f" • Violations detected: {RED}{BOLD}{len(violations)}{NC}") |
| 199 | + for v in violations: |
| 200 | + print(f" - {RED}{v['file']}{NC} (CRAP: {BOLD}{v['crap']:.2f}{NC}, Complexity: {v['complexity']}, Coverage: {v['coverage']:.1f}%)") |
| 201 | + else: |
| 202 | + print(f" • Violations detected: {GREEN}{BOLD}None{NC} ✨") |
| 203 | + |
| 204 | + return len(violations) |
| 205 | + |
| 206 | +def main(): |
| 207 | + print(f"\n{BLUE}{BOLD}🐂 Drover Platform CI Quality Gate — CRAP Scoring Engine{NC}") |
| 208 | + print("═" * 60) |
| 209 | + |
| 210 | + # Argument parser |
| 211 | + import argparse |
| 212 | + parser = argparse.ArgumentParser(description="Audit Drover files for Cyclomatic Complexity and CRAP scores.") |
| 213 | + parser.add_argument("directory", help="The target repository or package directory to scan.") |
| 214 | + parser.add_argument("--coverage", help="Path to Go coverage.out profile.") |
| 215 | + parser.add_argument("--limit", type=float, default=DEFAULT_CRAP_LIMIT, help=f"CRAP score threshold limit (default: {DEFAULT_CRAP_LIMIT})") |
| 216 | + |
| 217 | + args = parser.parse_args() |
| 218 | + |
| 219 | + if not os.path.exists(args.directory): |
| 220 | + print(f"{RED}Error: Target directory '{args.directory}' does not exist.{NC}") |
| 221 | + sys.exit(2) |
| 222 | + |
| 223 | + violations_count = audit_directory(args.directory, args.coverage, args.limit) |
| 224 | + |
| 225 | + if violations_count > 0: |
| 226 | + print(f"\n{RED}{BOLD}❌ Quality Gate Failed: CRAP violations detected!{NC}") |
| 227 | + print("Please refactor complex functions or increase test coverage to pass the gate.") |
| 228 | + sys.exit(1) |
| 229 | + |
| 230 | + print(f"\n{GREEN}{BOLD}✅ Quality Gate Passed successfully! All files within specifications.{NC}") |
| 231 | + sys.exit(0) |
| 232 | + |
| 233 | +if __name__ == '__main__': |
| 234 | + main() |
0 commit comments