Skip to content
260 changes: 85 additions & 175 deletions client/src/cbltest/greenboarduploader.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,49 @@
import json
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4

import pytest
from _pytest.reports import TestReport
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import DocumentNotFoundException
from couchbase.options import ClusterOptions
from packaging.version import InvalidVersion, Version

from cbltest.api.syncgateway import CouchbaseVersion
from cbltest.logging import cbl_info, cbl_warning


def _version_sort_key(v: str) -> tuple[Version, int]:
"""Sort key for SGW-style version strings of the form ``<semver>[-<build>]``.

Splits on the first ``-``: if the suffix is a pure integer, returns
``(Version(semver), int(build))`` so that ``3.3.0-99 < 3.3.0-100 <
3.3.0-1234``. Plain semver (no suffix) sorts as build ``0`` of that
semver. Non-numeric suffixes fall through to ``Version(v)`` so PEP 440
pre/post/rc tags still order correctly.
"""
semver, _, build = v.partition("-")
if build and build.isdigit():
return Version(semver), int(build)
return Version(v), 0


class GreenboardUploader:
"""
A class for uploading results to a specified greenboard server bucket.

Supports two modes:
- **Normal mode**: uploads results for regular test sessions.
- **Upgrade mode** (``SGW_UPGRADE_VERSIONS`` is set): uploads per-step
upgrade results with ``upgradePath``, ``upgradeFrom``, and ``upgradeTo``
fields under ``platform="sgw-upgrade"``.
Two upload paths are supported:

- **Normal mode** (:py:meth:`upload`) — writes a new per-session document
with a fresh ``uuid4`` doc id. Used by every regular (non-upgrade) test
session.
- **Upgrade matrix mode** (:py:meth:`upload_upgrade_result`) —
read-merge-upserts a single deterministic document per upgrade type
(``sgw-upgrade::waterfall`` and ``sgw-upgrade::rolling``). Each doc
holds a semver-sorted ``versions`` axis plus a ``matrix[from][to]``
nested map of pass/fail entries. Reruns of the same ``(from, to)``
pair override their prior entry.
"""

def __init__(self, url: str, username: str, password: str):
Expand All @@ -35,7 +56,6 @@ def __init__(self, url: str, username: str, password: str):
self.__fail_count = 0
self.__pass_count = 0
self.__overall_fail = False
self.__test_ran = False
self.__has_sgw_marker = False

@pytest.hookimpl(hookwrapper=True, tryfirst=True)
Expand All @@ -47,10 +67,6 @@ def pytest_runtest_makereport(self, item: pytest.Item, call: pytest.CallInfo[Non
self.__overall_fail = True
return

# Used by record_upgrade_step to skip iterations where pytest
# collected zero tests (and no setup crash occurred).
self.__test_ran = True

if self.__overall_fail:
return

Expand Down Expand Up @@ -133,179 +149,78 @@ def upload(
}
)

def record_upgrade_step(
def upload_upgrade_result(
self,
results_file: str,
sgw_version: CouchbaseVersion | None,
upgrade_versions_str: str,
phase: str | None,
node_index: str | None,
upgrade_type: str,
from_version: str,
to_version: str,
) -> None:
"""Append this iteration's result to a JSON state file.

The aggregated batch document is uploaded later (once per upgrade
run) by ``upload_upgrade_batch``. Failed iterations are recorded
too so the UI can surface where in the upgrade sequence the
failure occurred.

:param results_file: Path to the JSON state file to append to
:param sgw_version: Current SGW version (target of this step)
:param upgrade_versions_str: Comma-separated ordered version list
:param phase: SGW_UPGRADE_PHASE env value (e.g. "initial",
"rolling_node_0", "complete")
:param node_index: SGW_UPGRADED_NODE_INDEX env value, if any
"""
upgrade_path = [v.strip() for v in upgrade_versions_str.split(",") if v.strip()]
"""Upsert one pass/fail entry into the per-type SGW upgrade matrix doc.

# No tests collected AND no setup crash means nothing was ever
# attempted (e.g. wrong marker filter). Don't record an iteration
# — the chart shouldn't show a row for a run that never executed.
if not self.__test_ran and not self.__overall_fail:
Doc id is ``sgw-upgrade::{upgrade_type}``; entry is stored at
``matrix[from_version][to_version]`` and overrides any prior entry
for the same pair. ``versions`` axis is the union of all FROM/TO
versions ever seen, semver-sorted ascending for the UI to render
as both row and column labels of an N×N grid.
"""
if (
self.__pass_count == 0
and self.__fail_count == 0
and not self.__overall_fail
):
cbl_info(
f"No tests ran for phase={phase!r}; skipping iteration "
"record (no upload contribution)"
"No tests ran for upgrade upload; skipping "
f"({upgrade_type} {from_version} -> {to_version})"
)
return

# Resolve the destination version of this iteration. Live SGW is
# primary; on get_version() failure the caller passes None and we
# fall back to the shell-exported step target so the dot still
# maps to the right node on the chart. Last-resort is the planned
# final target.
target_build = 0
if sgw_version is not None and sgw_version.version:
current_version = sgw_version.version
target_build = sgw_version.build_number
else:
current_version = os.environ.get("SGW_VERSION_UNDER_TEST") or (
upgrade_path[-1] if upgrade_path else "0.0.0"
)

upgrade_from = "initial"
for i, v in enumerate(upgrade_path):
if v == current_version and i > 0:
upgrade_from = upgrade_path[i - 1]
break

# Any deviation from "tests ran and all passed" is a failure.
# Includes: a test failed (call phase) or setup/teardown crashed
# (overall_fail). Zero-collected was already short-circuited above.
had_test_failures = self.__fail_count > 0
setup_failure = self.__overall_fail
failed = had_test_failures or setup_failure

# Surface non-test-call failures as at least one failed count so
# the top-level batch doc's failCount is correctly 1, not 0.
fail_count = self.__fail_count
if failed and fail_count == 0:
fail_count = 1

iteration = {
"phase": phase,
"nodeIndex": int(node_index) if node_index is not None else None,
"upgradeFrom": upgrade_from,
"upgradeTo": current_version,
"build": target_build,
"passCount": self.__pass_count,
"failCount": fail_count,
"failed": failed,
passed = not (self.__overall_fail or self.__fail_count > 0)
now = datetime.now(timezone.utc)
unix_ts = (now - datetime(1970, 1, 1, tzinfo=timezone.utc)).total_seconds()
entry = {
"passed": passed,
"uploaded": unix_ts,
"date": now.strftime("%Y-%m-%d"),
}

path = Path(results_file)
if path.exists():
try:
state = json.loads(path.read_text())
except (json.JSONDecodeError, OSError) as e:
cbl_warning(f"Could not read existing results file {path}: {e}")
state = {}
else:
state = {}

state.setdefault("upgradePath", upgrade_path)
state.setdefault("iterations", []).append(iteration)
path.write_text(json.dumps(state, indent=2))
cbl_info(
f"Upgrade step recorded ({phase}): {upgrade_from} → {current_version} "
f"(pass={self.__pass_count}, fail={self.__fail_count})"
)

def upload_upgrade_batch(self, results_file: str) -> None:
"""Upload one aggregate document for the whole upgrade run.

Reads the iterations recorded by ``record_upgrade_step`` and emits
a single greenboard doc summarising the run. If any iteration
failed, ``failedAt`` points at the first failed iteration so the
UI can show where in the sequence the run broke.
"""
path = Path(results_file)
if not path.exists():
cbl_warning(f"No upgrade results file at {path}; nothing to upload")
return
doc_id = f"sgw-upgrade::{upgrade_type}"
coll = self._open_collection()

doc: dict[str, Any]
try:
state = json.loads(path.read_text())
except (json.JSONDecodeError, OSError) as e:
cbl_warning(f"Could not parse upgrade results file {path}: {e}")
return
doc = coll.get(doc_id).content_as[dict]
except DocumentNotFoundException:
doc = {
"type": upgrade_type,
"lastUpdated": 0.0,
"versions": [],
"matrix": {},
}

iterations = state.get("iterations", [])
if not iterations:
cbl_warning(
f"Upgrade results file {path} has no iterations; skipping upload"
)
return
versions = list({*doc.get("versions", []), from_version, to_version})
try:
versions.sort(key=_version_sort_key)
except InvalidVersion:
cbl_warning(f"Falling back to lexicographic sort for versions {versions}")
versions.sort()
doc["versions"] = versions

upgrade_path = state.get("upgradePath", [])
# version is always the planned final target so the UI's
# "filter by target version" picks up this run even when execution
# stopped early at an intermediate version.
target_version = (
upgrade_path[-1]
if upgrade_path
else iterations[-1].get("upgradeTo", "0.0.0")
)
doc.setdefault("matrix", {}).setdefault(from_version, {})[to_version] = entry
doc["lastUpdated"] = unix_ts

failed_at = None
for i in iterations:
if i.get("failed"):
failed_at = {
"phase": i.get("phase"),
"upgradeFrom": i.get("upgradeFrom"),
"upgradeTo": i.get("upgradeTo"),
"nodeIndex": i.get("nodeIndex"),
}
break

# One upgrade batch == one upgrade test from the UI's POV. Bars are
# built by the UI by aggregating across past runs that share
# (version, upgradePath); per-run pass/fail is therefore 1/0.
if failed_at is None:
pass_count, fail_count = 1, 0
target_build = iterations[-1].get("build", 0)
else:
pass_count, fail_count = 0, 1
# The planned target build was never reached; per-iteration
# `build` fields preserve what was actually running at each step.
target_build = 0

self._upload_document(
{
"build": target_build,
"version": target_version,
"upgradePath": upgrade_path,
"iterations": iterations,
"passCount": pass_count,
"failCount": fail_count,
"failedAt": failed_at,
"platform": "sgw-upgrade",
}
)
coll.upsert(doc_id, doc)
cbl_info(
f"Upgrade batch uploaded: path={'->'.join(upgrade_path)} "
f"target={target_version} pass={pass_count} fail={fail_count} "
f"failedAt={failed_at}"
f"Greenboard upgrade upload: {doc_id} "
f"{from_version} -> {to_version} ({'pass' if passed else 'fail'})"
)

def _open_collection(self):
auth = PasswordAuthenticator(self.__username, self.__password)
opts = ClusterOptions(auth)
cluster = Cluster(self.__url, opts)
cluster.wait_until_ready(timedelta(seconds=10))
return cluster.bucket("greenboard").default_collection()

def _upload_document(self, doc: dict) -> None:
"""Upload a document to the greenboard bucket with common fields added."""
now = datetime.now(timezone.utc)
Expand All @@ -316,9 +231,4 @@ def _upload_document(self, doc: dict) -> None:
doc["uploaded"] = unix_timestamp
doc["date"] = now.strftime("%Y-%m-%d")

auth = PasswordAuthenticator(self.__username, self.__password)
opts = ClusterOptions(auth)
cluster = Cluster(self.__url, opts)
cluster.wait_until_ready(timedelta(seconds=10))

cluster.bucket("greenboard").default_collection().upsert(str(uuid4()), doc)
self._open_collection().upsert(str(uuid4()), doc)
Loading
Loading