-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathclick.py
More file actions
492 lines (407 loc) · 14.4 KB
/
Copy pathclick.py
File metadata and controls
492 lines (407 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
import logging
import pwd
import textwrap
import zoneinfo
from abc import ABC, abstractmethod
from datetime import tzinfo
from pathlib import Path
from typing import (
Any,
Callable,
Generic,
Iterable,
Mapping,
Optional,
Protocol,
Type,
TypeVar,
Union,
)
import click
import daemon
import tomli
from gcm.exporters import registry
from gcm.monitoring.coerce import ensure_dict
from gcm.monitoring.features.features_config import FeaturesConfig
from gcm.monitoring.passwd import Passwd
from gcm.monitoring.sink.utils import (
Factory,
format_factory_docstrings,
get_factory_metadata,
)
from typeguard import typechecked
from typing_extensions import ParamSpec
logger = logging.getLogger(__name__)
class EpilogFormatter(Protocol):
def format_epilog(self) -> str: ...
_Object = TypeVar("_Object", bound=EpilogFormatter)
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
FC = TypeVar("FC", bound=Union[Callable[..., Any], click.Command])
class DaemonGroup(click.Group):
def invoke(self, ctx: click.Context) -> None:
detach = ctx.params.get("detach", False)
if detach:
with daemon.DaemonContext():
return super().invoke(ctx)
else:
return super().invoke(ctx)
detach_option = click.option(
"--detach",
"-d",
is_flag=True,
default=False,
help=("Exit immediately instead of waiting for GCM to run. "),
)
class IntWithSISymbol(click.ParamType):
name = "integer_si"
_symbol_map = {"k": 1000, "M": 1_000_000}
def convert(
self, value: str, param: Optional[click.Parameter], ctx: Optional[click.Context]
) -> int:
multiplier = 1
extracted_value = value
if not value[-1].isdigit():
if any(value.endswith(s) for s in self._symbol_map):
multiplier = self._symbol_map[value[-1]]
extracted_value = value[:-1]
else:
allowed = ", ".join(self._symbol_map.keys())
self.fail(
f"Unrecognized SI symbol '{value[-1]}'. Allowed symbols are: {allowed}",
param,
ctx,
)
try:
return int(extracted_value) * multiplier
except TypeError:
self.fail(
f"Expected string, but got {value!r} of type {type(value).__name__}",
param,
ctx,
)
except ValueError:
self.fail(f"{value!r} is not a valid integer", param, ctx)
cluster_option = click.option(
"--cluster",
help=(
"Which cluster to collect stats for (usually the cluster where the script is running). "
"If omitted, then the cluster is inferred (may fail)."
),
)
sink_option = click.option(
"--sink",
default="stdout",
help="The sink where data should be published.",
)
sink_opts_option = click.option(
"-o",
"--sink-opt",
"sink_opts",
multiple=True,
help="Sink instantiation customization using OmegaConf dot-list syntax. See [1]",
)
log_level_option = click.option(
"--log-level",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]),
default="INFO",
show_default=True,
help="Logging verbosity level.",
)
log_folder_option = click.option(
"--log-folder",
type=click.Path(file_okay=False),
default="sacct_running_logs",
help="The directory where logs will be stored.",
)
stdout_option = click.option(
"--stdout",
is_flag=True,
default=False,
help="Whether to display logs to stdout.",
)
once_option = click.option(
"--once",
is_flag=True,
default=False,
help="Do only one round of data collection and publishing",
)
retries_option = click.option(
"--retries",
type=click.IntRange(min=0),
default=2,
show_default=True,
help="The maximum number of times to retry writing to sink before failing.",
)
dry_run_option = click.option(
"--dry-run",
"-n",
is_flag=True,
help="Print logs to STDOUT as JSON.",
)
chunk_size_option = click.option(
"--chunk-size",
type=IntWithSISymbol(),
default="1M",
show_default=True,
help=(
"The maximum size in bytes of each chunk when writing data to sink. "
"Recognizes a subset of SI symbols for multiples for shorthand, e.g. 1k for "
"1000, 1M for 1,000,000. Pass 0 to disable chunking."
),
)
def interval_option(default: int) -> Callable[[FC], FC]:
return click.option(
"--interval",
type=click.IntRange(min=0),
default=default,
show_default=True,
help="The interval in seconds for collecting data.",
)
heterogeneous_cluster_v1_option = click.option(
"--heterogeneous-cluster-v1",
is_flag=True,
default=False,
help="Compute 'derived cluster' attribute for heterogeneous GPU SLURM clusters. "
"Use when multiple types of GPUs are managed by separate partitions, e.g., "
"NVIDIA H100 and H200 GPUs in separate 'h100' and 'h200' partitions."
"This flag when passed will result in the 'derived cluster' attribute being: <cluster_name>.<partition_name>"
"For QoS-based commands, this flag generates the <partition_name> by extracting characters from the QoS name string up to the first underscore (_) or using all characters if no underscore is present. For example, a QoS named: h100_myqos would result in `derived_cluster = <cluster_name>.h100`.",
)
def click_default_cmd(
epilog: str = "",
cls: Type[click.Command] = click.Command,
context_settings: Optional[dict[str, Any]] = None,
) -> Callable[[Callable[..., T]], click.Command]:
return click.command(
cls=cls,
context_settings=context_settings,
epilog=f"\b{epilog}\nSink documentation:\n\n\b\n"
+ textwrap.indent(
format_factory_docstrings(get_factory_metadata(registry)),
prefix=" " * 2,
predicate=lambda _: True,
)
+ get_docs_for_references(
[
"https://omegaconf.readthedocs.io/en/2.2_branch/usage.html#from-a-dot-list",
]
),
)
class DynamicEpilogCommand(Generic[_Object], click.Command):
"""A command which has its help epilog dynamically constructed by the object
attached to the command's context.
Example:
>>> class Obj:
... def format_epilog(self):
... return "hello, world!"
...
>>> class MyCommand(DynamicEpilogCommand[Obj], obj_cls=Obj):
... pass
...
>>> @click.command(cls=MyCommand, context_settings={"obj": Obj()})
... def main():
... pass
...
Now `--help` should yield:
```
Usage: scratch.py [OPTIONS]
Options:
--help Show this message and exit.
hello, world!
```
"""
obj_cls: Type[_Object]
def __init_subclass__(cls, *, obj_cls: Type[_Object]) -> None:
super().__init_subclass__()
cls.obj_cls = obj_cls
def format_epilog(self, ctx: click.Context, formatter: click.HelpFormatter) -> None:
obj = ctx.find_object(self.obj_cls)
if not isinstance(obj, expected_cls := self.obj_cls):
logger.debug(
f"{obj} does not appear to be an instance of {expected_cls.__name__}"
)
return super().format_epilog(ctx, formatter)
if self.epilog is not None:
logger.debug("Epilog is set; not generating it dynamically.")
return super().format_epilog(ctx, formatter)
logger.debug("Generating epilog dynamically")
self.epilog = obj.format_epilog()
return super().format_epilog(ctx, formatter)
def get_docs_for_registry(r: Mapping[str, Factory]) -> str:
"""Get click-formatted documentation for a plugin registry."""
return "\b\nSink documentation:\n" + textwrap.indent(
format_factory_docstrings(get_factory_metadata(r), paragraph_marker="\b"),
prefix=" " * 2,
predicate=lambda _: True,
)
def get_docs_for_references(refs: Iterable[str]) -> str:
"""Get click formatted documentation for an iterable of references. The output
is an ordered list of each reference numbered started from 1.
Examples:
>>> get_docs_for_references(["r1", "r2"])
'\\x08\\nReferences:\\n [1]: r1\\n [2]: r2'
"""
return "\b\nReferences:\n" + textwrap.indent(
"\n".join(f"[{i}]: {ref}" for i, ref in enumerate(refs, start=1)),
prefix=" " * 2,
predicate=lambda _: True,
)
_Tv = TypeVar("_Tv")
_ClickCallback = Callable[[click.Context, click.Parameter, _Tv], None]
def _set_default_map(name: str) -> _ClickCallback[Path]:
@typechecked
def cb(ctx: click.Context, param: click.Parameter, path: Path) -> None:
if not path.exists() or path == Path("/dev/null"):
return
logger.info(f"Reading config from {path}...")
with path.open("rb") as f:
try:
conf = tomli.load(f)
except tomli.TOMLDecodeError as e:
# TODO: treat default differently?
raise click.BadParameter(
f"{path} does not contain valid TOML.",
ctx=ctx,
param=param,
) from e
try:
default_map = ensure_dict(conf[name])
except KeyError as e:
raise click.BadParameter(
f"'{name}' is not a top-level table name in {path}. Valid names: {list(conf.keys())}",
ctx=ctx,
param=param,
) from e
logger.info(f"Loaded table '{name}'.")
ctx.default_map = {**(ctx.default_map or {}), **default_map}
return cb
_P = ParamSpec("_P")
_R = TypeVar("_R")
def toml_config_option(
name: str,
*,
default_config_path: Union[str, Path] = "/etc/fb-gcm/config.toml",
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
"""Shared decorator for loading default option values from a TOML config file.
Adds a `--config` option to the given command which takes a path. A non-existent
path or `/dev/null` is treated as an empty dictionary.
Precedence (lowest to highest):
* `default` argument to `click.option`
* the context's `default_map` setting (unless it is a subcommand; see below)
* the value in the config file
* value passed at the command line
If used on a command group, subtables will configure subcommands, recursively.
However, if a subcommand sets `default_map` explicitly
(e.g. `@click.command(context_settings={"default_map": ...})`), this will stop the
propagation.
Parameters:
name: The top-level table name in the config file containing the default values
to use.
default_config_path: The path from which to load the config if the option is
omitted at the command line.
"""
def decorator(f: Callable[_P, _R]) -> Callable[_P, _R]:
return click.option(
"--config",
type=click.Path(dir_okay=False, path_type=Path),
callback=_set_default_map(name),
default=default_config_path,
show_default=True,
expose_value=False,
help=(
f"Load option values from table '{name}' in the given TOML config file. "
"A non-existent path or '/dev/null' are ignored and treated as empty tables."
),
)(f)
return decorator
class TypedParamType(click.ParamType, ABC, Generic[_Tv]):
"""Typesafe click.ParamType which is generic in the return type of `convert`"""
@abstractmethod
def convert(
self,
value: Any,
param: Optional[click.Parameter],
ctx: Optional[click.Context],
) -> _Tv:
pass
class UserOrUid(TypedParamType[Passwd]):
"""Convert an integer user id or user name into a passwd structure."""
name = "user"
def __init__(
self,
*,
from_uid: Callable[[int], Passwd] = pwd.getpwuid,
from_name: Callable[[str], Passwd] = pwd.getpwnam,
):
self.__from_uid = from_uid
self.__from_name = from_name
def convert(
self,
value: Any,
param: Optional[click.Parameter],
ctx: Optional[click.Context],
) -> Passwd:
if isinstance(value, int):
try:
return self.__from_uid(value)
except Exception:
self.fail(f"Invalid user id: {value}.", param, ctx)
if isinstance(value, str):
try:
n = int(value)
except ValueError:
pass
else:
return self.convert(n, param, ctx)
try:
return self.__from_name(value)
except Exception:
self.fail(f"Invalid user name: {value}.", param, ctx)
self.fail(
f"User must be either a numeric user ID (int) or user name (str), but got {type(value).__name__}",
param,
ctx,
)
class Timezone(click.ParamType):
name = "timezone"
def convert(
self, value: str, param: Optional[click.Parameter], ctx: Optional[click.Context]
) -> tzinfo:
try:
# SAFETY: https://github.com/pganssle/zoneinfo/issues/125
return zoneinfo.ZoneInfo(value) # type: ignore[abstract]
except TypeError:
self.fail(
f"Expected string, but got {value!r} of type {type(value).__name__}",
param,
ctx,
)
except ValueError:
self.fail(f"{value!r} is not a valid timezone", param, ctx)
def set_features_config_path(config: Type[FeaturesConfig]) -> _ClickCallback:
def set_path(
ctx: click.Context, param: click.Parameter, path: Optional[Path]
) -> None:
if path is None:
return
if path.exists():
config.config_path = path
else:
raise FileNotFoundError(f"features_config path doesn't exist: {path}")
return set_path
def feature_flags_config(
config: Type[FeaturesConfig],
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
def decorator(func: Callable[_P, _R]) -> Callable[_P, _R]:
return click.option(
"--features-config",
type=click.Path(dir_okay=False, path_type=Path),
callback=set_features_config_path(config),
expose_value=False,
help="Path parameter for the features config file, to load feature values.",
)(func)
return decorator