Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@
# retried indefinitely.
_TRANSIENT_RETRY_STATUSES = frozenset({408, 425, 429})

# Inline retry policy for transient webhook failures (5xx, the statuses in
# _TRANSIENT_RETRY_STATUSES, and any exception raised by the request). We
# retry inside process() rather than re-raising and leaning on bundle retry,
# because AsyncWrapper does not reliably re-deliver bundles whose DoFn
Comment thread
claudevdm marked this conversation as resolved.
# raises -- so a raise would effectively drop the anomaly instead of
# retrying it. Backoff grows exponentially from _WEBHOOK_BASE_BACKOFF_SEC
# and is capped at _WEBHOOK_MAX_BACKOFF_SEC, yielding the sequence
# 0.5, 1, 2, 4, 8, 15, 15, 15, 15, 15 seconds across the 10 retries.
_WEBHOOK_MAX_RETRIES = 10
_WEBHOOK_BASE_BACKOFF_SEC = 0.5
_WEBHOOK_MAX_BACKOFF_SEC = 15.0

# Synthetic key attached to unkeyed pipelines' elements so they can flow
# through stateful DoFns. _unpack_result translates it back to None.
_UNKEYED_SENTINEL = '__bqm_unkeyed__'
Expand Down Expand Up @@ -460,18 +472,14 @@ class _PostAnomalyToWebhook(beam.DoFn):

String leaves in ``body`` and ``headers`` are format-substituted with
``anomaly fields | message_metadata | {anomaly_message}``.

Response handling (because streaming Dataflow retries bundles
indefinitely, we cannot blindly raise on every non-2xx — that would
block the pipeline on a misconfigured request):
* 2xx → success.
* 5xx, 408, 425, 429 → raise (transient; bundle retries).
* Other 4xx → log + drop + increment ``dropped_permanent_4xx`` metric.
Network errors propagate as-is so Beam's bundle retry handles them.
No exception is allowed to escape ``process``: anything reaching
``AsyncWrapper`` would crash-loop the bundle.
"""

_DROPPED_4XX_COUNTER = Metrics.counter(
'bqmonitor.webhook', 'dropped_permanent_4xx')
_DROPPED_RETRIES_COUNTER = Metrics.counter(
'bqmonitor.webhook', 'dropped_exhausted_retries')

def __init__(self, webhook_spec, message_format, message_metadata):
self._webhook_spec = webhook_spec
Expand All @@ -496,44 +504,60 @@ def process(self, element):
body = _substitute_template_tree(self._webhook_spec['body'], merged)
headers = _substitute_template_tree(self._webhook_spec['headers'], merged)

window_str = f"{fields['window_start']}/{fields['window_end']}"
model_id = fields['model_id'] or '<none>'

start_monotonic = time.monotonic()
resp = self._session.request(
method=self._webhook_spec['method'],
url=self._webhook_spec['endpoint'],
json=body,
headers=headers or None,
timeout=self._webhook_spec['timeout_seconds'])
elapsed_sec = time.monotonic() - start_monotonic

status = resp.status_code
if 200 <= status < 300:
_LOGGER.info(
'Webhook %s %s posted anomaly window=%s model_id=%s '
'in %.2fs (status=%d).',
self._webhook_spec['method'], self._webhook_spec['endpoint'],
window_str, model_id, elapsed_sec, status)
return

if status >= 500 or status in _TRANSIENT_RETRY_STATUSES:
_LOGGER.warning(
'Webhook %s %s returned transient status %d after %.2fs for '
'anomaly window=%s model_id=%s; bundle will retry. '
'Response: %s',
self._webhook_spec['method'], self._webhook_spec['endpoint'],
status, elapsed_sec, window_str, model_id, resp.text[:500])
raise RuntimeError(
f'Webhook returned transient status {status}; retrying bundle.')

_LOGGER.error(
'Webhook %s %s returned permanent status %d after %.2fs for '
'anomaly window=%s model_id=%s; dropping anomaly. '
'Response: %s',
self._webhook_spec['method'], self._webhook_spec['endpoint'],
status, elapsed_sec, window_str, model_id, resp.text[:500])
self._DROPPED_4XX_COUNTER.inc()
method = self._webhook_spec['method']
endpoint = self._webhook_spec['endpoint']
ctx = (
f"{method} {endpoint} "
f"window={fields['window_start']}/{fields['window_end']} "
f"model_id={fields['model_id'] or '<none>'}")

# Retry transient failures (5xx, 408/425/429, and any error raised by
# the request) inline with exponential backoff capped at
# _WEBHOOK_MAX_BACKOFF_SEC. We retry here instead of raising for Beam to
# retry the bundle, because AsyncWrapper enters a crash loop if we raise.
last_error = None
for attempt in range(_WEBHOOK_MAX_RETRIES + 1):
if attempt:
time.sleep(min(_WEBHOOK_MAX_BACKOFF_SEC,
_WEBHOOK_BASE_BACKOFF_SEC * 2 ** (attempt - 1)))

start = time.monotonic()
try:
resp = self._session.request(
method=method, url=endpoint, json=body,
headers=headers or None,
timeout=self._webhook_spec['timeout_seconds'])
except Exception as exc: # pylint: disable=broad-except
last_error = f'request error: {exc!r}'
_LOGGER.warning('Webhook %s attempt %d/%d: %s',
ctx, attempt, _WEBHOOK_MAX_RETRIES, last_error)
continue
Comment thread
claudevdm marked this conversation as resolved.

status = resp.status_code
if 200 <= status < 300:
_LOGGER.info('Webhook %s posted anomaly in %.2fs (status=%d, '
'attempt=%d).', ctx, time.monotonic() - start,
status, attempt)
return

# Permanent client error (bad URL/auth/payload): drop, don't retry.
if status < 500 and status not in _TRANSIENT_RETRY_STATUSES:
_LOGGER.error('Webhook %s returned permanent status %d; dropping '
'anomaly. Response: %s', ctx, status, resp.text[:500])
self._DROPPED_4XX_COUNTER.inc()
return

last_error = f'status {status}'
_LOGGER.warning('Webhook %s attempt %d/%d: transient status %d. '
'Response: %s', ctx, attempt, _WEBHOOK_MAX_RETRIES,
status, resp.text[:500])

# Exhausted all retries. Drop rather than raise: raising would feed
# Beam's bundle retry, which AsyncWrapper does not honor reliably, so
# it would risk wedging the bundle without ever delivering the anomaly.
_LOGGER.error('Webhook %s still failing (%s) after %d retries; '
'dropping anomaly.', ctx, last_error, _WEBHOOK_MAX_RETRIES)
self._DROPPED_RETRIES_COUNTER.inc()


class _FormatResultForBQ(beam.DoFn):
Expand Down
175 changes: 160 additions & 15 deletions python/src/test/python/bigquery-anomaly-detection/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import logging
import time
import unittest
from unittest import mock

import requests

logging.basicConfig(level=logging.INFO)

Expand All @@ -42,6 +45,9 @@
from bqmonitor.pipeline import _parse_table_ref
from bqmonitor.pipeline import _parse_webhook_spec
from bqmonitor.pipeline import _PostAnomalyToWebhook
from bqmonitor.pipeline import _WEBHOOK_BASE_BACKOFF_SEC
from bqmonitor.pipeline import _WEBHOOK_MAX_BACKOFF_SEC
from bqmonitor.pipeline import _WEBHOOK_MAX_RETRIES
from bqmonitor.pipeline import _RateLimitAlerts
from bqmonitor.pipeline import _substitute_template_tree
from bqmonitor.pipeline import _ThresholdAlert
Expand Down Expand Up @@ -915,9 +921,51 @@ def request(self, method, url, json=None, headers=None, timeout=None):
return _StubResponse(status_code=self._status_code)


class _SequenceSession:
"""Session stub that returns a scripted outcome per request() call.

Each entry in ``outcomes`` is either an int HTTP status (returns a
``_StubResponse`` with that code) or an ``Exception`` instance (raised,
to emulate a network-level failure like a dropped connection). Once the
script is exhausted the final entry repeats, so an "always failing"
endpoint is expressed with a single-element script.
"""

def __init__(self, outcomes):
self._outcomes = list(outcomes)
self.calls = []

def request(self, method, url, json=None, headers=None, timeout=None):
self.calls.append({
'method': method, 'url': url, 'json': json,
'headers': headers, 'timeout': timeout,
})
idx = min(len(self.calls) - 1, len(self._outcomes) - 1)
outcome = self._outcomes[idx]
if isinstance(outcome, Exception):
raise outcome
return _StubResponse(status_code=outcome)


class PostAnomalyToWebhookTest(unittest.TestCase):
"""Tests for _PostAnomalyToWebhook DoFn (session stubbed; no network)."""

def setUp(self):
# The DoFn sleeps between transient retries; patch it out so the
# retry/backoff tests run instantly. Tests that never hit a transient
# path simply leave this mock uncalled. The captured call args also
# let the backoff-schedule test assert the exact sleep sequence.
patcher = mock.patch('bqmonitor.pipeline.time.sleep')
self.sleep_mock = patcher.start()
self.addCleanup(patcher.stop)

def _make_dofn_with_outcomes(self, outcomes, body=None):
"""Build a DoFn whose session replays ``outcomes`` (see
_SequenceSession) across successive request() calls."""
dofn = self._make_dofn(body or {'q': '{value}'})
dofn._session = _SequenceSession(outcomes)
return dofn

def _make_result(self, label, value=42.0, score=5.0, model_id='ZScore'):
row = beam.Row(
value=value,
Expand Down Expand Up @@ -1041,29 +1089,126 @@ def test_nested_body_substituted(self):
posted['dataAgentContext']['dataAgent'],
'projects/p/dataAgents/a')

def test_5xx_raises_for_retry(self):
"""5xx → transient; bundle retry covers server-side flapping."""
def test_persistent_5xx_retries_then_drops(self):
"""A persistently-5xx endpoint is retried inline _WEBHOOK_MAX_RETRIES
times (so 1 initial + N retries attempts) and then dropped -- NOT
raised. We retry inline rather than via Beam bundle retry because
AsyncWrapper does not reliably re-deliver a raising bundle, and for the
same reason we drop rather than raise once retries are exhausted
(raising would only risk wedging the bundle)."""
dofn = self._make_dofn({'q': '{value}'}, status_code=500)
with self.assertRaises(RuntimeError):
dofn.process(self._make_result(label=1))
# The POST was attempted even though it failed.
self.assertEqual(len(dofn._session.calls), 1)
# Must not raise.
dofn.process(self._make_result(label=1))
self.assertEqual(
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)
self.assertEqual(self.sleep_mock.call_count, _WEBHOOK_MAX_RETRIES)

def test_503_raises_for_retry(self):
def test_503_retries_then_drops(self):
dofn = self._make_dofn({'q': '{value}'}, status_code=503)
with self.assertRaises(RuntimeError):
dofn.process(self._make_result(label=1))
dofn.process(self._make_result(label=1)) # must not raise
self.assertEqual(
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)

def test_429_raises_for_retry(self):
def test_429_retries_then_drops(self):
"""429 Too Many Requests → transient; back off and retry."""
dofn = self._make_dofn({'q': '{value}'}, status_code=429)
with self.assertRaises(RuntimeError):
dofn.process(self._make_result(label=1))
dofn.process(self._make_result(label=1)) # must not raise
self.assertEqual(
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)

def test_408_raises_for_retry(self):
def test_408_retries_then_drops(self):
dofn = self._make_dofn({'q': '{value}'}, status_code=408)
with self.assertRaises(RuntimeError):
dofn.process(self._make_result(label=1))
dofn.process(self._make_result(label=1)) # must not raise
self.assertEqual(
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)

def test_transient_status_then_success_does_not_raise(self):
"""A few transient 503s followed by a 200 succeeds without raising;
the anomaly is delivered on the first non-transient response and no
further attempts are made."""
dofn = self._make_dofn_with_outcomes([503, 503, 503, 200])
# Must not raise.
dofn.process(self._make_result(label=1, value=99.0))
# 3 failures + 1 success = 4 attempts, then it stops.
self.assertEqual(len(dofn._session.calls), 4)
# One sleep before each of the 3 retries.
self.assertEqual(self.sleep_mock.call_count, 3)
# The successful POST carried the substituted body.
self.assertEqual(dofn._session.calls[-1]['json'], {'q': '99.0'})

def test_backoff_schedule_is_exponential_and_capped(self):
"""The sleep between retries grows exponentially from the base delay
and saturates at the max-backoff cap: 0.5, 1, 2, 4, 8, 15, 15, ...
There is exactly one sleep per retry (none before the first attempt)."""
dofn = self._make_dofn({'q': '{value}'}, status_code=500)
dofn.process(self._make_result(label=1))

actual_delays = [c.args[0] for c in self.sleep_mock.call_args_list]
expected_delays = [
min(_WEBHOOK_MAX_BACKOFF_SEC,
_WEBHOOK_BASE_BACKOFF_SEC * (2 ** n))
for n in range(_WEBHOOK_MAX_RETRIES)
]
self.assertEqual(actual_delays, expected_delays)
# Sanity-check the literal schedule the constants are meant to produce.
self.assertEqual(
actual_delays[:6], [0.5, 1.0, 2.0, 4.0, 8.0, 15.0])
# Every later delay sits exactly on the cap.
for delay in actual_delays[5:]:
self.assertEqual(delay, _WEBHOOK_MAX_BACKOFF_SEC)

def test_network_error_retried_then_succeeds(self):
"""A transient network-level error (e.g. dropped connection) is
retried with the same backoff as a transient status, and a later
success delivers the anomaly without raising."""
outcomes = [
requests.exceptions.ConnectionError('connection reset'),
200,
]
dofn = self._make_dofn_with_outcomes(outcomes)
dofn.process(self._make_result(label=1, value=7.0))
self.assertEqual(len(dofn._session.calls), 2)
self.assertEqual(self.sleep_mock.call_count, 1)

def test_network_error_exhausts_retries_and_drops(self):
"""A persistently failing connection exhausts the retry budget and is
then dropped, not raised (a raising bundle is not reliably retried by
AsyncWrapper, so raising would only risk wedging the bundle)."""
outcomes = [requests.exceptions.ConnectionError('boom')]
dofn = self._make_dofn_with_outcomes(outcomes)
dofn.process(self._make_result(label=1)) # must not raise
self.assertEqual(
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)
self.assertEqual(self.sleep_mock.call_count, _WEBHOOK_MAX_RETRIES)

def test_non_requests_exception_retried_then_succeeds(self):
"""Exceptions that are NOT requests.RequestException (e.g. a
google.auth RefreshError surfaces as a plain exception, SSL errors,
etc.) are also caught and retried -- otherwise they would escape
process() and crash-loop the bundle under AsyncWrapper. A later
success still delivers the anomaly."""
outcomes = [RuntimeError('auth refresh failed'), 200]
dofn = self._make_dofn_with_outcomes(outcomes)
dofn.process(self._make_result(label=1, value=5.0)) # must not raise
self.assertEqual(len(dofn._session.calls), 2)
self.assertEqual(self.sleep_mock.call_count, 1)

def test_non_requests_exception_exhausts_retries_and_drops(self):
"""A persistently raised non-requests exception is retried to the
budget and then dropped, never escaping process()."""
outcomes = [RuntimeError('persistent boom')]
dofn = self._make_dofn_with_outcomes(outcomes)
dofn.process(self._make_result(label=1)) # must not raise
self.assertEqual(
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)

def test_permanent_4xx_not_retried(self):
"""A permanent 4xx is dropped on the first response with no retries
and no sleeps, even if later attempts would have succeeded."""
dofn = self._make_dofn_with_outcomes([400, 200])
dofn.process(self._make_result(label=1))
self.assertEqual(len(dofn._session.calls), 1)
self.sleep_mock.assert_not_called()

def test_permanent_4xx_dropped(self):
"""Permanent 4xx (e.g. 400 bad request) is logged and dropped, not
Expand Down
Loading