Skip to content

Commit 3c54627

Browse files
authored
Dont throw exception in async dofn (#3928)
1 parent 65d422a commit 3c54627

2 files changed

Lines changed: 230 additions & 61 deletions

File tree

python/src/main/python/bigquery-anomaly-detection/src/bqmonitor/pipeline.py

Lines changed: 70 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@
9696
# retried indefinitely.
9797
_TRANSIENT_RETRY_STATUSES = frozenset({408, 425, 429})
9898

99+
# Inline retry policy for transient webhook failures (5xx, the statuses in
100+
# _TRANSIENT_RETRY_STATUSES, and any exception raised by the request). We
101+
# retry inside process() rather than re-raising and leaning on bundle retry,
102+
# because AsyncWrapper does not reliably re-deliver bundles whose DoFn
103+
# raises -- so a raise would effectively drop the anomaly instead of
104+
# retrying it. Backoff grows exponentially from _WEBHOOK_BASE_BACKOFF_SEC
105+
# and is capped at _WEBHOOK_MAX_BACKOFF_SEC, yielding the sequence
106+
# 0.5, 1, 2, 4, 8, 15, 15, 15, 15, 15 seconds across the 10 retries.
107+
_WEBHOOK_MAX_RETRIES = 10
108+
_WEBHOOK_BASE_BACKOFF_SEC = 0.5
109+
_WEBHOOK_MAX_BACKOFF_SEC = 15.0
110+
99111
# Synthetic key attached to unkeyed pipelines' elements so they can flow
100112
# through stateful DoFns. _unpack_result translates it back to None.
101113
_UNKEYED_SENTINEL = '__bqm_unkeyed__'
@@ -460,18 +472,14 @@ class _PostAnomalyToWebhook(beam.DoFn):
460472
461473
String leaves in ``body`` and ``headers`` are format-substituted with
462474
``anomaly fields | message_metadata | {anomaly_message}``.
463-
464-
Response handling (because streaming Dataflow retries bundles
465-
indefinitely, we cannot blindly raise on every non-2xx — that would
466-
block the pipeline on a misconfigured request):
467-
* 2xx → success.
468-
* 5xx, 408, 425, 429 → raise (transient; bundle retries).
469-
* Other 4xx → log + drop + increment ``dropped_permanent_4xx`` metric.
470-
Network errors propagate as-is so Beam's bundle retry handles them.
475+
No exception is allowed to escape ``process``: anything reaching
476+
``AsyncWrapper`` would crash-loop the bundle.
471477
"""
472478

473479
_DROPPED_4XX_COUNTER = Metrics.counter(
474480
'bqmonitor.webhook', 'dropped_permanent_4xx')
481+
_DROPPED_RETRIES_COUNTER = Metrics.counter(
482+
'bqmonitor.webhook', 'dropped_exhausted_retries')
475483

476484
def __init__(self, webhook_spec, message_format, message_metadata):
477485
self._webhook_spec = webhook_spec
@@ -496,44 +504,60 @@ def process(self, element):
496504
body = _substitute_template_tree(self._webhook_spec['body'], merged)
497505
headers = _substitute_template_tree(self._webhook_spec['headers'], merged)
498506

499-
window_str = f"{fields['window_start']}/{fields['window_end']}"
500-
model_id = fields['model_id'] or '<none>'
501-
502-
start_monotonic = time.monotonic()
503-
resp = self._session.request(
504-
method=self._webhook_spec['method'],
505-
url=self._webhook_spec['endpoint'],
506-
json=body,
507-
headers=headers or None,
508-
timeout=self._webhook_spec['timeout_seconds'])
509-
elapsed_sec = time.monotonic() - start_monotonic
510-
511-
status = resp.status_code
512-
if 200 <= status < 300:
513-
_LOGGER.info(
514-
'Webhook %s %s posted anomaly window=%s model_id=%s '
515-
'in %.2fs (status=%d).',
516-
self._webhook_spec['method'], self._webhook_spec['endpoint'],
517-
window_str, model_id, elapsed_sec, status)
518-
return
519-
520-
if status >= 500 or status in _TRANSIENT_RETRY_STATUSES:
521-
_LOGGER.warning(
522-
'Webhook %s %s returned transient status %d after %.2fs for '
523-
'anomaly window=%s model_id=%s; bundle will retry. '
524-
'Response: %s',
525-
self._webhook_spec['method'], self._webhook_spec['endpoint'],
526-
status, elapsed_sec, window_str, model_id, resp.text[:500])
527-
raise RuntimeError(
528-
f'Webhook returned transient status {status}; retrying bundle.')
529-
530-
_LOGGER.error(
531-
'Webhook %s %s returned permanent status %d after %.2fs for '
532-
'anomaly window=%s model_id=%s; dropping anomaly. '
533-
'Response: %s',
534-
self._webhook_spec['method'], self._webhook_spec['endpoint'],
535-
status, elapsed_sec, window_str, model_id, resp.text[:500])
536-
self._DROPPED_4XX_COUNTER.inc()
507+
method = self._webhook_spec['method']
508+
endpoint = self._webhook_spec['endpoint']
509+
ctx = (
510+
f"{method} {endpoint} "
511+
f"window={fields['window_start']}/{fields['window_end']} "
512+
f"model_id={fields['model_id'] or '<none>'}")
513+
514+
# Retry transient failures (5xx, 408/425/429, and any error raised by
515+
# the request) inline with exponential backoff capped at
516+
# _WEBHOOK_MAX_BACKOFF_SEC. We retry here instead of raising for Beam to
517+
# retry the bundle, because AsyncWrapper enters a crash loop if we raise.
518+
last_error = None
519+
for attempt in range(_WEBHOOK_MAX_RETRIES + 1):
520+
if attempt:
521+
time.sleep(min(_WEBHOOK_MAX_BACKOFF_SEC,
522+
_WEBHOOK_BASE_BACKOFF_SEC * 2 ** (attempt - 1)))
523+
524+
start = time.monotonic()
525+
try:
526+
resp = self._session.request(
527+
method=method, url=endpoint, json=body,
528+
headers=headers or None,
529+
timeout=self._webhook_spec['timeout_seconds'])
530+
except Exception as exc: # pylint: disable=broad-except
531+
last_error = f'request error: {exc!r}'
532+
_LOGGER.warning('Webhook %s attempt %d/%d: %s',
533+
ctx, attempt, _WEBHOOK_MAX_RETRIES, last_error)
534+
continue
535+
536+
status = resp.status_code
537+
if 200 <= status < 300:
538+
_LOGGER.info('Webhook %s posted anomaly in %.2fs (status=%d, '
539+
'attempt=%d).', ctx, time.monotonic() - start,
540+
status, attempt)
541+
return
542+
543+
# Permanent client error (bad URL/auth/payload): drop, don't retry.
544+
if status < 500 and status not in _TRANSIENT_RETRY_STATUSES:
545+
_LOGGER.error('Webhook %s returned permanent status %d; dropping '
546+
'anomaly. Response: %s', ctx, status, resp.text[:500])
547+
self._DROPPED_4XX_COUNTER.inc()
548+
return
549+
550+
last_error = f'status {status}'
551+
_LOGGER.warning('Webhook %s attempt %d/%d: transient status %d. '
552+
'Response: %s', ctx, attempt, _WEBHOOK_MAX_RETRIES,
553+
status, resp.text[:500])
554+
555+
# Exhausted all retries. Drop rather than raise: raising would feed
556+
# Beam's bundle retry, which AsyncWrapper does not honor reliably, so
557+
# it would risk wedging the bundle without ever delivering the anomaly.
558+
_LOGGER.error('Webhook %s still failing (%s) after %d retries; '
559+
'dropping anomaly.', ctx, last_error, _WEBHOOK_MAX_RETRIES)
560+
self._DROPPED_RETRIES_COUNTER.inc()
537561

538562

539563
class _FormatResultForBQ(beam.DoFn):

python/src/test/python/bigquery-anomaly-detection/pipeline_test.py

Lines changed: 160 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import logging
2121
import time
2222
import unittest
23+
from unittest import mock
24+
25+
import requests
2326

2427
logging.basicConfig(level=logging.INFO)
2528

@@ -42,6 +45,9 @@
4245
from bqmonitor.pipeline import _parse_table_ref
4346
from bqmonitor.pipeline import _parse_webhook_spec
4447
from bqmonitor.pipeline import _PostAnomalyToWebhook
48+
from bqmonitor.pipeline import _WEBHOOK_BASE_BACKOFF_SEC
49+
from bqmonitor.pipeline import _WEBHOOK_MAX_BACKOFF_SEC
50+
from bqmonitor.pipeline import _WEBHOOK_MAX_RETRIES
4551
from bqmonitor.pipeline import _RateLimitAlerts
4652
from bqmonitor.pipeline import _substitute_template_tree
4753
from bqmonitor.pipeline import _ThresholdAlert
@@ -915,9 +921,51 @@ def request(self, method, url, json=None, headers=None, timeout=None):
915921
return _StubResponse(status_code=self._status_code)
916922

917923

924+
class _SequenceSession:
925+
"""Session stub that returns a scripted outcome per request() call.
926+
927+
Each entry in ``outcomes`` is either an int HTTP status (returns a
928+
``_StubResponse`` with that code) or an ``Exception`` instance (raised,
929+
to emulate a network-level failure like a dropped connection). Once the
930+
script is exhausted the final entry repeats, so an "always failing"
931+
endpoint is expressed with a single-element script.
932+
"""
933+
934+
def __init__(self, outcomes):
935+
self._outcomes = list(outcomes)
936+
self.calls = []
937+
938+
def request(self, method, url, json=None, headers=None, timeout=None):
939+
self.calls.append({
940+
'method': method, 'url': url, 'json': json,
941+
'headers': headers, 'timeout': timeout,
942+
})
943+
idx = min(len(self.calls) - 1, len(self._outcomes) - 1)
944+
outcome = self._outcomes[idx]
945+
if isinstance(outcome, Exception):
946+
raise outcome
947+
return _StubResponse(status_code=outcome)
948+
949+
918950
class PostAnomalyToWebhookTest(unittest.TestCase):
919951
"""Tests for _PostAnomalyToWebhook DoFn (session stubbed; no network)."""
920952

953+
def setUp(self):
954+
# The DoFn sleeps between transient retries; patch it out so the
955+
# retry/backoff tests run instantly. Tests that never hit a transient
956+
# path simply leave this mock uncalled. The captured call args also
957+
# let the backoff-schedule test assert the exact sleep sequence.
958+
patcher = mock.patch('bqmonitor.pipeline.time.sleep')
959+
self.sleep_mock = patcher.start()
960+
self.addCleanup(patcher.stop)
961+
962+
def _make_dofn_with_outcomes(self, outcomes, body=None):
963+
"""Build a DoFn whose session replays ``outcomes`` (see
964+
_SequenceSession) across successive request() calls."""
965+
dofn = self._make_dofn(body or {'q': '{value}'})
966+
dofn._session = _SequenceSession(outcomes)
967+
return dofn
968+
921969
def _make_result(self, label, value=42.0, score=5.0, model_id='ZScore'):
922970
row = beam.Row(
923971
value=value,
@@ -1041,29 +1089,126 @@ def test_nested_body_substituted(self):
10411089
posted['dataAgentContext']['dataAgent'],
10421090
'projects/p/dataAgents/a')
10431091

1044-
def test_5xx_raises_for_retry(self):
1045-
"""5xx → transient; bundle retry covers server-side flapping."""
1092+
def test_persistent_5xx_retries_then_drops(self):
1093+
"""A persistently-5xx endpoint is retried inline _WEBHOOK_MAX_RETRIES
1094+
times (so 1 initial + N retries attempts) and then dropped -- NOT
1095+
raised. We retry inline rather than via Beam bundle retry because
1096+
AsyncWrapper does not reliably re-deliver a raising bundle, and for the
1097+
same reason we drop rather than raise once retries are exhausted
1098+
(raising would only risk wedging the bundle)."""
10461099
dofn = self._make_dofn({'q': '{value}'}, status_code=500)
1047-
with self.assertRaises(RuntimeError):
1048-
dofn.process(self._make_result(label=1))
1049-
# The POST was attempted even though it failed.
1050-
self.assertEqual(len(dofn._session.calls), 1)
1100+
# Must not raise.
1101+
dofn.process(self._make_result(label=1))
1102+
self.assertEqual(
1103+
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)
1104+
self.assertEqual(self.sleep_mock.call_count, _WEBHOOK_MAX_RETRIES)
10511105

1052-
def test_503_raises_for_retry(self):
1106+
def test_503_retries_then_drops(self):
10531107
dofn = self._make_dofn({'q': '{value}'}, status_code=503)
1054-
with self.assertRaises(RuntimeError):
1055-
dofn.process(self._make_result(label=1))
1108+
dofn.process(self._make_result(label=1)) # must not raise
1109+
self.assertEqual(
1110+
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)
10561111

1057-
def test_429_raises_for_retry(self):
1112+
def test_429_retries_then_drops(self):
10581113
"""429 Too Many Requests → transient; back off and retry."""
10591114
dofn = self._make_dofn({'q': '{value}'}, status_code=429)
1060-
with self.assertRaises(RuntimeError):
1061-
dofn.process(self._make_result(label=1))
1115+
dofn.process(self._make_result(label=1)) # must not raise
1116+
self.assertEqual(
1117+
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)
10621118

1063-
def test_408_raises_for_retry(self):
1119+
def test_408_retries_then_drops(self):
10641120
dofn = self._make_dofn({'q': '{value}'}, status_code=408)
1065-
with self.assertRaises(RuntimeError):
1066-
dofn.process(self._make_result(label=1))
1121+
dofn.process(self._make_result(label=1)) # must not raise
1122+
self.assertEqual(
1123+
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)
1124+
1125+
def test_transient_status_then_success_does_not_raise(self):
1126+
"""A few transient 503s followed by a 200 succeeds without raising;
1127+
the anomaly is delivered on the first non-transient response and no
1128+
further attempts are made."""
1129+
dofn = self._make_dofn_with_outcomes([503, 503, 503, 200])
1130+
# Must not raise.
1131+
dofn.process(self._make_result(label=1, value=99.0))
1132+
# 3 failures + 1 success = 4 attempts, then it stops.
1133+
self.assertEqual(len(dofn._session.calls), 4)
1134+
# One sleep before each of the 3 retries.
1135+
self.assertEqual(self.sleep_mock.call_count, 3)
1136+
# The successful POST carried the substituted body.
1137+
self.assertEqual(dofn._session.calls[-1]['json'], {'q': '99.0'})
1138+
1139+
def test_backoff_schedule_is_exponential_and_capped(self):
1140+
"""The sleep between retries grows exponentially from the base delay
1141+
and saturates at the max-backoff cap: 0.5, 1, 2, 4, 8, 15, 15, ...
1142+
There is exactly one sleep per retry (none before the first attempt)."""
1143+
dofn = self._make_dofn({'q': '{value}'}, status_code=500)
1144+
dofn.process(self._make_result(label=1))
1145+
1146+
actual_delays = [c.args[0] for c in self.sleep_mock.call_args_list]
1147+
expected_delays = [
1148+
min(_WEBHOOK_MAX_BACKOFF_SEC,
1149+
_WEBHOOK_BASE_BACKOFF_SEC * (2 ** n))
1150+
for n in range(_WEBHOOK_MAX_RETRIES)
1151+
]
1152+
self.assertEqual(actual_delays, expected_delays)
1153+
# Sanity-check the literal schedule the constants are meant to produce.
1154+
self.assertEqual(
1155+
actual_delays[:6], [0.5, 1.0, 2.0, 4.0, 8.0, 15.0])
1156+
# Every later delay sits exactly on the cap.
1157+
for delay in actual_delays[5:]:
1158+
self.assertEqual(delay, _WEBHOOK_MAX_BACKOFF_SEC)
1159+
1160+
def test_network_error_retried_then_succeeds(self):
1161+
"""A transient network-level error (e.g. dropped connection) is
1162+
retried with the same backoff as a transient status, and a later
1163+
success delivers the anomaly without raising."""
1164+
outcomes = [
1165+
requests.exceptions.ConnectionError('connection reset'),
1166+
200,
1167+
]
1168+
dofn = self._make_dofn_with_outcomes(outcomes)
1169+
dofn.process(self._make_result(label=1, value=7.0))
1170+
self.assertEqual(len(dofn._session.calls), 2)
1171+
self.assertEqual(self.sleep_mock.call_count, 1)
1172+
1173+
def test_network_error_exhausts_retries_and_drops(self):
1174+
"""A persistently failing connection exhausts the retry budget and is
1175+
then dropped, not raised (a raising bundle is not reliably retried by
1176+
AsyncWrapper, so raising would only risk wedging the bundle)."""
1177+
outcomes = [requests.exceptions.ConnectionError('boom')]
1178+
dofn = self._make_dofn_with_outcomes(outcomes)
1179+
dofn.process(self._make_result(label=1)) # must not raise
1180+
self.assertEqual(
1181+
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)
1182+
self.assertEqual(self.sleep_mock.call_count, _WEBHOOK_MAX_RETRIES)
1183+
1184+
def test_non_requests_exception_retried_then_succeeds(self):
1185+
"""Exceptions that are NOT requests.RequestException (e.g. a
1186+
google.auth RefreshError surfaces as a plain exception, SSL errors,
1187+
etc.) are also caught and retried -- otherwise they would escape
1188+
process() and crash-loop the bundle under AsyncWrapper. A later
1189+
success still delivers the anomaly."""
1190+
outcomes = [RuntimeError('auth refresh failed'), 200]
1191+
dofn = self._make_dofn_with_outcomes(outcomes)
1192+
dofn.process(self._make_result(label=1, value=5.0)) # must not raise
1193+
self.assertEqual(len(dofn._session.calls), 2)
1194+
self.assertEqual(self.sleep_mock.call_count, 1)
1195+
1196+
def test_non_requests_exception_exhausts_retries_and_drops(self):
1197+
"""A persistently raised non-requests exception is retried to the
1198+
budget and then dropped, never escaping process()."""
1199+
outcomes = [RuntimeError('persistent boom')]
1200+
dofn = self._make_dofn_with_outcomes(outcomes)
1201+
dofn.process(self._make_result(label=1)) # must not raise
1202+
self.assertEqual(
1203+
len(dofn._session.calls), _WEBHOOK_MAX_RETRIES + 1)
1204+
1205+
def test_permanent_4xx_not_retried(self):
1206+
"""A permanent 4xx is dropped on the first response with no retries
1207+
and no sleeps, even if later attempts would have succeeded."""
1208+
dofn = self._make_dofn_with_outcomes([400, 200])
1209+
dofn.process(self._make_result(label=1))
1210+
self.assertEqual(len(dofn._session.calls), 1)
1211+
self.sleep_mock.assert_not_called()
10671212

10681213
def test_permanent_4xx_dropped(self):
10691214
"""Permanent 4xx (e.g. 400 bad request) is logged and dropped, not

0 commit comments

Comments
 (0)