Skip to content
Open
Comment thread
exploreriii marked this conversation as resolved.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Comment thread
prajeeta15 marked this conversation as resolved.
Comment thread
prajeeta15 marked this conversation as resolved.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
120 changes: 120 additions & 0 deletions src/hiero_analytics/analysis/contributor_churn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import pandas as pd
from typing import List, Dict, Any
from hiero_analytics.domain.labels import DIFFICULTY_LEVELS

def compute_progression_stats(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute contributor-level progression statistics from PR records.
Deduplicates PRs to avoid inflation from multiple linked issues.
Highest difficulty level is chosen if a PR closes multiple issues.
"""
if df.empty:
return pd.DataFrame()

level_order = {spec.name: i for i, spec in enumerate(DIFFICULTY_LEVELS)}
level_order["Unknown"] = -1

# One level per PR: highest difficulty across its closing issues.
# This ensures start_level and levels list are deterministic and not inflated.
pr_level = (
df.assign(_rank=df["level"].map(lambda l: level_order.get(l, -1)))
.sort_values(["author", "pr_merged_at", "_rank"])
.drop_duplicates(subset=["author", "pr_number"], keep="last")
.drop(columns="_rank")
)

# Progression Analysis
progression = pr_level.groupby("author").agg({
"level": list,
"pr_merged_at": ["min", "max"],
"pr_number": "nunique"
})
progression.columns = ["levels", "first_seen", "last_seen", "pr_count"]

progression["max_level"] = progression["levels"].apply(
lambda lvls: max(lvls, key=lambda l: level_order.get(l, -1))
)
progression["start_level"] = progression["levels"].apply(lambda lvls: lvls[0])
progression["tenure_days"] = (progression["last_seen"] - progression["first_seen"]).dt.days

# Calculate early activity (first 30 days) to avoid data leakage in predictions
early_window = pd.Timedelta(days=30)

# Join first_seen back to deduplicated pr_level
df_with_start = pr_level.merge(progression[["first_seen"]], on="author")
early_prs = df_with_start[df_with_start["pr_merged_at"] <= df_with_start["first_seen"] + early_window]

early_stats = early_prs.groupby("author").agg({"pr_number": "nunique"}).rename(columns={"pr_number": "early_pr_count"})
progression = progression.join(early_stats).fillna({"early_pr_count": 0})

return progression

def compute_transition_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute transition metrics between difficulty levels.
Deduplicates PRs to avoid spurious intra-PR transitions.
"""
if df.empty:
return pd.DataFrame()

level_order = {spec.name: i for i, spec in enumerate(DIFFICULTY_LEVELS)}
level_order["Unknown"] = -1

# Deduplicate to one level per PR (highest difficulty) before walking transitions
df_sorted = (
df.assign(_rank=df["level"].map(lambda l: level_order.get(l, -1)))
.sort_values(["author", "pr_merged_at", "_rank"])
.drop_duplicates(subset=["author", "pr_number"], keep="last")
.sort_values(["author", "pr_merged_at"])
)

transitions = []
for author, group in df_sorted.groupby("author"):
levels = group["level"].tolist()
last_level = None

for level in levels:
Comment thread
exploreriii marked this conversation as resolved.
if level != last_level:
Comment thread
exploreriii marked this conversation as resolved.
Outdated
if last_level is not None:
transitions.append({"from": last_level, "to": level})
last_level = level

if not transitions:
return pd.DataFrame(columns=["from", "to", "count"])

trans_df = pd.DataFrame(transitions)
counts = trans_df.groupby(["from", "to"]).size().reset_index(name="count")
return counts

def run_prediction_analysis(df: pd.DataFrame):
"""
Prediction analysis using features from early behavior to avoid leakage.
Target: reached 'Advanced' level.
"""
print("\n--- ML Prediction Analysis (80/20 Split) ---")
Comment thread
exploreriii marked this conversation as resolved.
Outdated

if df.empty:
print("No data for prediction.")
return

# target: reached advanced
df["is_advanced"] = (df["max_level"] == "Advanced").astype(int)

# Shuffle and split
df_split = df.sample(frac=1, random_state=42).reset_index(drop=True)
split_idx = int(len(df_split) * 0.8)
train_df = df_split.iloc[:split_idx]
test_df = df_split.iloc[split_idx:].copy()

# Simple characteristic-based prediction using EARLY behaviors:
# If they did more than 1 PR in their first 30 days, predict progression to Advanced
# This avoids using total tenure or total PR count which leaks the outcome.
def predict(row):
return 1 if row["early_pr_count"] > 1 else 0

test_df["prediction"] = test_df.apply(predict, axis=1)

accuracy = (test_df["prediction"] == test_df["is_advanced"]).mean()
print(f"Training set size: {len(train_df)}")
print(f"Test set size: {len(test_df)}")
print(f"Prediction Accuracy (using features from first 30 days): {accuracy:.2f}")
1 change: 1 addition & 0 deletions src/hiero_analytics/plotting/scatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def plot_scatter_with_regression(
# Layout polish
# -------------------------
ax.margins(x=0.05, y=0.08)
ax.set_ylim(bottom=0)

# -------------------------
# Finalize
Expand Down
119 changes: 119 additions & 0 deletions src/hiero_analytics/run_contributor_churn_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
from datetime import datetime
import pandas as pd
from hiero_analytics.config.logging import setup_logging
from hiero_analytics.config.paths import ORG, ensure_repo_dirs
from hiero_analytics.data_sources.github_client import GitHubClient
from hiero_analytics.data_sources.github_ingest import fetch_repo_merged_pr_difficulty_graphql
from hiero_analytics.analysis.prs import prs_to_dataframe
from hiero_analytics.analysis.contributor_churn import (
compute_progression_stats,
compute_transition_metrics,
run_prediction_analysis
)
from hiero_analytics.domain.labels import DIFFICULTY_LEVELS
from hiero_analytics.plotting.bars import plot_bar
from hiero_analytics.plotting.lines import plot_line

setup_logging()

ORG_NAME = ORG
REPO = "hiero-sdk-python"
short_repo = REPO.split("/")[-1]

def get_contributor_level(labels: set[str]) -> str:
"""Classify PR difficulty level based on labels."""
for spec in reversed(DIFFICULTY_LEVELS): # advanced, intermediate, beginner, gfi
if spec.matches(labels):
return spec.name
return "Unknown"

def run():
Comment thread
exploreriii marked this conversation as resolved.
repo_data_dir, repo_charts_dir = ensure_repo_dirs(f"{ORG_NAME}/{REPO}")

if not os.getenv("GITHUB_TOKEN"):
raise EnvironmentError("GITHUB_TOKEN not set. Real data is required for churn analysis.")

client = GitHubClient()
print(f"Fetching PR data for {ORG_NAME}/{REPO}...")
prs = fetch_repo_merged_pr_difficulty_graphql(
client,
owner=ORG_NAME,
repo=REPO,
use_cache=True
)

df = prs_to_dataframe(prs)
if df.empty:
raise ValueError(f"No PR data found for {ORG_NAME}/{REPO}. Cannot perform churn analysis.")

df["level"] = df["issue_labels"].apply(lambda labels: get_contributor_level(set(labels or [])))

df = df.dropna(subset=["author", "pr_merged_at"]).sort_values(["author", "pr_merged_at"])

# Core analysis logic moved to hiero_analytics.analysis.contributor_churn
progression = compute_progression_stats(df)
Comment thread
exploreriii marked this conversation as resolved.

# Filter to GFI starters
gfi_starters = progression[progression["start_level"] == "Good First Issue"].copy()
total_gfi = len(gfi_starters)

if total_gfi == 0:
print("No GFI starters found.")
return

# Stats Summary
reached_beginner = len(gfi_starters[gfi_starters["max_level"].isin(["Beginner", "Intermediate", "Advanced"])])
reached_intermediate = len(gfi_starters[gfi_starters["max_level"].isin(["Intermediate", "Advanced"])])
reached_advanced = len(gfi_starters[gfi_starters["max_level"] == "Advanced"])

funnel_df = pd.DataFrame([
{"stage": "GFI Starters", "count": total_gfi},
{"stage": "Progressed to Beginner+", "count": reached_beginner},
{"stage": "Progressed to Intermediate+", "count": reached_intermediate},
{"stage": "Progressed to Advanced", "count": reached_advanced},
])

print("\n--- Contributor Churn Analysis ---")
for _, row in funnel_df.iterrows():
print(f"{row['stage']}: {row['count']} ({row['count']/total_gfi*100:.1f}%)")

# Transition Metrics
print("\n--- Level Transition Metrics ---")
transitions = compute_transition_metrics(df)
Comment thread
exploreriii marked this conversation as resolved.
Outdated
if not transitions.empty:
print(transitions.to_string(index=False))
else:
print("No transitions detected.")

run_prediction_analysis(gfi_starters)

# Visualizations using project utilities
plot_bar(
df=funnel_df,
x_col="stage",
y_col="count",
title=f"{short_repo}: Contributor Progression Funnel",
output_path=repo_charts_dir / "contributor_churn_funnel.png"
)

# Retention Chart - extended range as requested
max_prs = int(gfi_starters["pr_count"].max()) if not gfi_starters.empty else 10
retention_rows = []
for i in range(1, max_prs + 1):
retention_rows.append({
"min_prs": i,
"contributors": len(gfi_starters[gfi_starters["pr_count"] >= i])
})
retention_df = pd.DataFrame(retention_rows)

plot_line(
df=retention_df,
x_col="min_prs",
y_col="contributors",
title=f"{short_repo}: Contributor Retention by PR Count",
output_path=repo_charts_dir / "contributor_retention.png"
)

if __name__ == "__main__":
run()