Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions bedrock_keys_security/commands/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@
@click.command()
@click.option('--json', 'output_json', is_flag=True, help='Output results as JSON')
@click.option('--csv', 'csv_file', default=None, metavar='FILE', help='Export results to CSV file')
@click.option('--verbose', '-v', is_flag=True, help='Show detailed per-user information')
@click.pass_context
def scan(ctx, output_json, csv_file):
def scan(ctx, output_json, csv_file, verbose):
"""Scan for phantom IAM users (default command)"""
# Merge: scan-level --verbose enables verbose on the scanner too
if verbose:
ctx.obj.verbose = True
ctx.obj._scanner = None # Reset so scanner picks up new verbose flag

scanner = ctx.obj.scanner

if not output_json:
Expand All @@ -22,6 +28,9 @@ def scan(ctx, output_json, csv_file):
else:
with output.spinner():
phantoms = scanner.find_phantom_users()
click.echo(scanner.generate_table_report(phantoms))
if verbose:
click.echo(scanner.generate_verbose_table_report(phantoms))
else:
click.echo(scanner.generate_table_report(phantoms))
if csv_file:
scanner.generate_csv_report(phantoms, csv_file)
95 changes: 95 additions & 0 deletions bedrock_keys_security/core/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,101 @@ def generate_table_report(self, phantoms: List[Dict]) -> str:

return '\n'.join(lines)

def generate_verbose_table_report(self, phantoms: List[Dict]) -> str:
"""Generate verbose report with detailed per-user information"""
if not phantoms:
return f"\n{output.green('No phantom users found in this account.')}\n"

total = len(phantoms)
active = len([u for u in phantoms if u['status'] == 'ACTIVE'])
orphaned = len([u for u in phantoms if u['status'] == 'ORPHANED'])
at_risk = len([u for u in phantoms if u['status'] == 'AT RISK'])

lines = []
lines.append(f"\n{output.bold(f'Found {total} phantom user(s)')}\n")

for i, user in enumerate(phantoms):
status = output.style_status(user['status'])
lines.append(output.bold('─' * 60))
lines.append(f" {output.bold(output.cyan(user['username']))} [{status}]")
lines.append(output.bold('─' * 60))

# Identity
lines.append(f" User ID: {user['user_id']}")
lines.append(f" ARN: {user['arn']}")
created = user['created']
if hasattr(created, 'strftime'):
lines.append(f" Created: {created.strftime('%Y-%m-%d %H:%M:%S UTC')}")
else:
lines.append(f" Created: {created}")
lines.append(f" Path: {user.get('path', '/')}")

# Bedrock credentials
active_creds = user.get('active_bedrock_credentials', 0)
total_creds = user.get('bedrock_credentials', 0)
cred_color = output.green if active_creds == 0 else output.yellow
lines.append(f"\n Bedrock API Keys: {cred_color(f'{active_creds} active')} / {total_creds} total")

for cred in user.get('credential_details', []):
cred_id = cred.get('ServiceSpecificCredentialId', 'N/A')
cred_status = cred.get('Status', 'N/A')
cred_created = cred.get('CreateDate', '')
if hasattr(cred_created, 'strftime'):
cred_created = cred_created.strftime('%Y-%m-%d %H:%M:%S UTC')
lines.append(f" • {cred_id} status={cred_status} created={cred_created}")

# Access keys
active_ak = user.get('active_access_keys', 0)
total_ak = user.get('access_keys', 0)
ak_color = output.green if active_ak == 0 else output.red
lines.append(f"\n IAM Access Keys: {ak_color(f'{active_ak} active')} / {total_ak} total")

for key_id in user.get('access_key_ids', []):
lines.append(f" • {output.red(key_id)}")

# Policies
attached = user.get('attached_policies', [])
inline = user.get('inline_policies', [])
lines.append(f"\n Policies: {user.get('total_policies', 0)} total")

if attached:
lines.append(" Managed:")
for p in attached:
lines.append(f" • {p}")
if inline:
lines.append(" Inline:")
for p in inline:
lines.append(f" • {p}")
if not attached and not inline:
lines.append(" (none)")

lines.append("")

# Summary
lines.append(output.bold('═' * 60))
lines.append(f"{output.bold('Summary:')}")
lines.append(f" Total phantom users: {output.cyan(str(total))}")
lines.append(f" Active: {output.green(str(active))}")
lines.append(f" Orphaned: {output.yellow(str(orphaned))} (safe to cleanup)")
lines.append(f" At Risk: {output.red(str(at_risk))} (IAM access keys found)")

if at_risk > 0:
lines.append(f"\n{click.style('AT RISK users detected:', fg='red', bold=True)}")
lines.append(output.red("These phantom users have IAM access keys (AKIA...) attached."))
lines.append(output.red("These keys grant bedrock:*, iam:ListRoles, kms:DescribeKey,"))
lines.append(output.red("ec2:Describe* and persist even if the API key is revoked. Investigate:"))
for user in phantoms:
if user['status'] == 'AT RISK':
lines.append(output.red(f" - {user['username']} ({user['active_access_keys']} access keys)"))
lines.append("")

if orphaned > 0:
lines.append(f"\n{output.yellow(f'{orphaned} orphaned phantom users can be cleaned up.')}")
lines.append(output.yellow("Run: bks cleanup --dry-run to preview, or cleanup to delete."))
lines.append("")

return '\n'.join(lines)

def generate_json_report(self, phantoms: List[Dict]) -> str:
"""Generate JSON report"""
for user in phantoms:
Expand Down