-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcodex_switch.py
More file actions
executable file
·923 lines (763 loc) · 31.4 KB
/
Copy pathcodex_switch.py
File metadata and controls
executable file
·923 lines (763 loc) · 31.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
#!/usr/bin/env python3
"""Small local CODEX_HOME shortcut switcher."""
from __future__ import annotations
import argparse
import json
import os
import re
import shlex
import shutil
import stat
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional
from urllib import error, request
ALIAS_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,63}$")
CHATGPT_USAGE_URL = "https://chatgpt.com/backend-api/wham/usage"
CODEX_USER_AGENT = "codex-cli/1.0.0"
DEFAULT_STORE_DIR = ".codex-shortcut-switcher"
EXPIRED_ACCESS_TOKEN_MESSAGE = "access token expired"
RECOMMENDED_MARKER = "*"
PRESERVE_PRIMARY = object()
class ConfigError(Exception):
"""Raised for user-fixable configuration errors."""
@dataclass(frozen=True)
class AccountAlias:
alias: str
codex_home: Path
created_at: str
@classmethod
def from_dict(cls, data: Mapping[str, str]) -> "AccountAlias":
return cls(
alias=str(data["alias"]),
codex_home=Path(str(data["codex_home"])).expanduser(),
created_at=str(data["created_at"]),
)
def to_dict(self) -> Dict[str, str]:
return {
"alias": self.alias,
"codex_home": str(self.codex_home),
"created_at": self.created_at,
}
def to_public_dict(self) -> Dict[str, str]:
return self.to_dict()
@dataclass(frozen=True)
class UsageAuth:
access_token: str
account_id: Optional[str]
def __repr__(self) -> str:
return f"UsageAuth(access_token=<hidden>, account_id={self.account_id!r})"
@dataclass(frozen=True)
class UsageWindow:
remaining_percent: float
used_percent: float
reset_at: Optional[int]
@dataclass(frozen=True)
class UsageSummary:
five_hour: UsageWindow
weekly: Optional[UsageWindow]
class CodexAliasStore:
def __init__(self, root: Optional[Path] = None):
if root is None:
configured = os.environ.get("CODEX_SWITCH_HOME")
root = Path(configured).expanduser() if configured else default_store_root()
self.root = root
self.homes_dir = self.root / "homes"
self.config_path = self.root / "aliases.json"
def add_from_current(self, alias: str, replace: bool = False) -> AccountAlias:
return self.add_from_auth(alias, current_codex_home() / "auth.json", replace=replace)
def add_from_auth(self, alias: str, auth_path: Path, replace: bool = False) -> AccountAlias:
alias = validate_alias(alias)
auth_path = auth_path.expanduser()
if not auth_path.is_file():
raise ConfigError(f"auth.json not found: {auth_path}")
accounts = {account.alias: account for account in self.list_accounts()}
if alias in accounts and not replace:
raise ConfigError(f"alias already exists: {alias} (use --replace to overwrite)")
codex_home = self.homes_dir / alias
codex_home.mkdir(parents=True, exist_ok=True)
set_private_dir(codex_home)
target = codex_home / "auth.json"
shutil.copyfile(auth_path, target)
os.chmod(target, 0o600)
account = AccountAlias(
alias=alias,
codex_home=codex_home,
created_at=accounts.get(alias, new_account(alias, codex_home)).created_at,
)
accounts[alias] = account
self._save_accounts(accounts.values())
return account
def remove(self, alias: str, delete_home: bool = False) -> AccountAlias:
alias = validate_alias(alias)
accounts = {account.alias: account for account in self.list_accounts()}
primary_alias = self.primary_alias()
account = accounts.pop(alias, None)
if account is None:
raise ConfigError(f"unknown alias: {alias}")
self._save_accounts(
accounts.values(),
primary_alias=None if primary_alias == alias else primary_alias,
)
if delete_home:
shutil.rmtree(account.codex_home, ignore_errors=True)
return account
def remove_all(self, confirm: bool = False, delete_homes: bool = False) -> List[AccountAlias]:
if not confirm:
raise ConfigError("remove-all requires --yes")
accounts = self.list_accounts()
self._save_accounts([], primary_alias=None)
if delete_homes:
for account in accounts:
shutil.rmtree(account.codex_home, ignore_errors=True)
return accounts
def remove_expired(
self,
confirm: bool = False,
delete_homes: bool = False,
usage_checker: Optional[Callable[[AccountAlias], str]] = None,
) -> List[AccountAlias]:
if not confirm:
raise ConfigError("remove-expired requires --yes")
checker = fetch_usage_label if usage_checker is None else usage_checker
expired = []
kept = []
primary_alias = self.primary_alias()
for account in self.list_accounts():
try:
checker(account)
except ConfigError as exc:
if is_expired_access_token_error(exc):
expired.append(account)
else:
kept.append(account)
else:
kept.append(account)
expired_aliases = {account.alias for account in expired}
self._save_accounts(
kept,
primary_alias=None if primary_alias in expired_aliases else primary_alias,
)
if delete_homes:
for account in expired:
shutil.rmtree(account.codex_home, ignore_errors=True)
return expired
def set_primary(self, alias: str) -> AccountAlias:
account = self.get(alias)
self._save_accounts(self.list_accounts(), primary_alias=account.alias)
return account
def clear_primary(self) -> Optional[str]:
old_primary = self.primary_alias()
self._save_accounts(self.list_accounts(), primary_alias=None)
return old_primary
def primary_alias(self) -> Optional[str]:
data = self._load_config()
primary_alias = data.get("primary_alias")
if not isinstance(primary_alias, str) or not primary_alias:
return None
try:
self.get(primary_alias)
except ConfigError:
return None
return primary_alias
def primary_account(self) -> AccountAlias:
primary_alias = self.primary_alias()
if primary_alias is None:
raise ConfigError("no main account configured")
return self.get(primary_alias)
def switch_primary(self, target_codex_home: Optional[Path] = None) -> AccountAlias:
return self.switch_alias(self.primary_account().alias, target_codex_home)
def select_auto_account(
self,
usage_checker: Optional[Callable[[AccountAlias], UsageSummary]] = None,
) -> AccountAlias:
checker = fetch_usage_summary if usage_checker is None else usage_checker
candidates = []
primary_candidate = None
primary_alias = self.primary_alias()
for account in self.list_accounts():
try:
usage = checker(account)
except ConfigError:
continue
if not usage_has_5h_and_weekly_remaining(usage):
continue
if account.alias == primary_alias:
primary_candidate = account
continue
candidates.append((account, usage))
if not candidates:
if primary_candidate is not None:
return primary_candidate
raise ConfigError("no account has both 5h and weekly usage remaining")
candidates.sort(
key=lambda item: (
weekly_reset_sort_key(item[1]),
five_hour_reset_sort_key(item[1]),
-(item[1].weekly.remaining_percent if item[1].weekly is not None else 0),
-item[1].five_hour.remaining_percent,
item[0].alias,
)
)
return candidates[0][0]
def switch_auto(self, target_codex_home: Optional[Path] = None) -> AccountAlias:
return self.switch_alias(self.select_auto_account().alias, target_codex_home)
def get(self, alias: str) -> AccountAlias:
alias = validate_alias(alias)
for account in self.list_accounts():
if account.alias == alias:
return account
raise ConfigError(f"unknown alias: {alias}")
def list_accounts(self) -> List[AccountAlias]:
data = self._load_config()
return sorted(
[AccountAlias.from_dict(item) for item in data.get("accounts", [])],
key=lambda account: account.alias,
)
def alias_names(self) -> List[str]:
return [account.alias for account in self.list_accounts()]
def command_env(
self,
alias: str,
base_env: Optional[Mapping[str, str]] = None,
) -> Dict[str, str]:
account = self.get(alias)
auth_path = account.codex_home / "auth.json"
if not auth_path.is_file():
raise ConfigError(f"auth.json missing for alias: {alias}")
env = dict(os.environ if base_env is None else base_env)
env["CODEX_HOME"] = str(account.codex_home)
return env
def switch_alias(self, alias: str, target_codex_home: Optional[Path] = None) -> AccountAlias:
account = self.get(alias)
source = account.codex_home / "auth.json"
if not source.is_file():
raise ConfigError(f"auth.json missing for alias: {alias}")
target_home = (target_codex_home or default_codex_home()).expanduser()
target_home.mkdir(parents=True, exist_ok=True)
set_private_dir(target_home)
target = target_home / "auth.json"
backup = target_home / "auth.json.codex-switch-backup"
if target.exists():
shutil.copyfile(target, backup)
os.chmod(backup, 0o600)
temp_target = target_home / "auth.json.codex-switch-tmp"
shutil.copyfile(source, temp_target)
os.chmod(temp_target, 0o600)
temp_target.replace(target)
os.chmod(target, 0o600)
return account
def _load_config(self) -> Dict[str, Any]:
if not self.config_path.exists():
return {"version": 1, "accounts": []}
try:
data = json.loads(self.config_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise ConfigError(f"invalid config JSON: {self.config_path}: {exc}") from exc
if not isinstance(data, dict):
raise ConfigError(f"invalid config JSON: {self.config_path}: expected object")
return data
def _save_accounts(
self,
accounts: Iterable[AccountAlias],
primary_alias: object = PRESERVE_PRIMARY,
) -> None:
self.root.mkdir(parents=True, exist_ok=True)
set_private_dir(self.root)
if primary_alias is PRESERVE_PRIMARY:
primary_alias = self.primary_alias()
data = {
"version": 1,
"accounts": [
account.to_dict()
for account in sorted(accounts, key=lambda item: item.alias)
],
}
if isinstance(primary_alias, str):
data["primary_alias"] = primary_alias
temp_path = self.config_path.with_suffix(".tmp")
temp_path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
os.chmod(temp_path, 0o600)
temp_path.replace(self.config_path)
def validate_alias(alias: str) -> str:
if not ALIAS_RE.fullmatch(alias):
raise ConfigError(
"alias must be 1-64 chars and use only letters, digits, '.', '_', or '-' "
"with a letter or digit first"
)
return alias
def current_codex_home() -> Path:
configured = os.environ.get("CODEX_HOME")
return Path(configured).expanduser() if configured else Path.home() / ".codex"
def default_codex_home() -> Path:
return Path.home() / ".codex"
def default_store_root() -> Path:
return Path.home() / DEFAULT_STORE_DIR
def set_private_dir(path: Path) -> None:
if os.name == "posix":
os.chmod(path, stat.S_IRWXU)
def new_account(alias: str, codex_home: Path) -> AccountAlias:
return AccountAlias(
alias=alias,
codex_home=codex_home,
created_at=datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="codex-switch",
description="Use separate local CODEX_HOME directories by account alias.",
)
parser.add_argument(
"--store",
type=Path,
default=None,
help="state directory (default: $CODEX_SWITCH_HOME or ~/.codex-shortcut-switcher)",
)
sub = parser.add_subparsers(dest="command", required=True)
add = sub.add_parser("add", help="add or replace an alias from an auth.json")
add.add_argument("alias")
source = add.add_mutually_exclusive_group()
source.add_argument("--from-auth", type=Path, help="path to an auth.json file")
source.add_argument("--from-current", action="store_true", help="copy auth.json from current CODEX_HOME")
add.add_argument("--replace", action="store_true", help="replace an existing alias")
list_parser = sub.add_parser("list", help="list configured aliases")
list_parser.add_argument(
"--usage",
action="store_true",
help="also show remaining 5-hour and weekly usage for OAuth aliases",
)
list_parser.add_argument(
"--path",
action="store_true",
help="also show the stored CODEX_HOME path for each alias",
)
sub.add_parser("aliases", help="print aliases only, one per line, for macOS Shortcuts")
path = sub.add_parser("path", help="print the CODEX_HOME path for an alias")
path.add_argument("alias", nargs="?")
env = sub.add_parser("env", help="print a shell export for an alias")
env.add_argument("alias", nargs="?")
switch = sub.add_parser("switch", help="copy an alias auth.json into ~/.codex/auth.json")
switch.add_argument("alias", nargs="?")
switch.add_argument(
"--target-codex-home",
type=Path,
default=None,
help="target Codex home to overwrite (default: ~/.codex)",
)
switch_main = sub.add_parser("switch-main", help="copy the main account auth.json into ~/.codex/auth.json")
switch_main.add_argument(
"--target-codex-home",
type=Path,
default=None,
help="target Codex home to overwrite (default: ~/.codex)",
)
switch_auto = sub.add_parser(
"switch-auto",
help="automatically switch to an account with 5-hour and weekly usage remaining",
)
switch_auto.add_argument(
"--target-codex-home",
type=Path,
default=None,
help="target Codex home to overwrite (default: ~/.codex)",
)
main_account = sub.add_parser("main", help="manage the main account alias")
main_sub = main_account.add_subparsers(dest="main_command", required=True)
main_set = main_sub.add_parser("set", help="mark an alias as the main account")
main_set.add_argument("alias")
main_sub.add_parser("show", help="print the main account alias")
main_sub.add_parser("clear", help="clear the main account marker")
run = sub.add_parser("run", help="run a command with CODEX_HOME set to an alias")
run.add_argument("alias", nargs="?")
run.add_argument("argv", nargs=argparse.REMAINDER, help="command to run after -- (default: codex)")
remove = sub.add_parser("remove", help="remove an alias")
remove.add_argument("alias", nargs="?")
remove.add_argument("--delete-home", action="store_true", help="also delete the alias CODEX_HOME directory")
remove_all = sub.add_parser("remove-all", help="remove all aliases")
remove_all.add_argument("--yes", action="store_true", help="confirm removing every configured alias")
remove_all.add_argument(
"--delete-homes",
action="store_true",
help="also delete every stored alias CODEX_HOME directory",
)
remove_expired = sub.add_parser("remove-expired", help="remove aliases with expired access tokens")
remove_expired.add_argument("--yes", action="store_true", help="confirm removing expired aliases")
remove_expired.add_argument(
"--delete-homes",
action="store_true",
help="also delete stored CODEX_HOME directories for expired aliases",
)
return parser
def main(argv: Optional[List[str]] = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
store = CodexAliasStore(args.store)
try:
if args.command == "add":
auth_path = args.from_auth
account = (
store.add_from_current(args.alias, replace=args.replace)
if auth_path is None
else store.add_from_auth(args.alias, auth_path, replace=args.replace)
)
print(f"added {account.alias}: {account.codex_home}")
return 0
if args.command == "list":
accounts = store.list_accounts()
if not accounts:
print("no aliases configured")
return 0
usage_by_alias = None
recommended_alias = None
if args.usage:
usage_summaries, usage_by_alias = fetch_usage_summaries_and_labels(accounts)
recommended_alias = recommended_alias_from_usage_summaries(
store,
usage_summaries,
)
for row in format_account_rows(
accounts,
usage_by_alias=usage_by_alias,
include_path=args.path,
recommended_alias=recommended_alias,
):
print(row)
return 0
if args.command == "aliases":
for alias in store.alias_names():
print(alias)
return 0
if args.command == "path":
alias = resolve_alias_arg(args.alias)
print(store.get(alias).codex_home)
return 0
if args.command == "env":
alias = resolve_alias_arg(args.alias)
account = store.get(alias)
print(f"export CODEX_HOME={shlex.quote(str(account.codex_home))}")
return 0
if args.command == "switch":
alias = resolve_alias_arg(args.alias)
account = store.switch_alias(alias, args.target_codex_home)
print(f"switched to {account.alias}")
return 0
if args.command == "switch-main":
account = store.switch_primary(args.target_codex_home)
print(f"switched to main account {account.alias}")
return 0
if args.command == "switch-auto":
account = store.switch_auto(args.target_codex_home)
print(f"auto-switched to {account.alias}")
return 0
if args.command == "main":
if args.main_command == "set":
account = store.set_primary(args.alias)
print(f"main account set to {account.alias}")
return 0
if args.main_command == "show":
print(store.primary_account().alias)
return 0
if args.main_command == "clear":
store.clear_primary()
print("main account cleared")
return 0
if args.command == "run":
alias = resolve_alias_arg(args.alias)
command = normalize_run_argv(args.argv)
env = store.command_env(alias)
return subprocess.run(command, env=env, check=False).returncode
if args.command == "remove":
alias = resolve_alias_arg(args.alias)
account = store.remove(alias, delete_home=args.delete_home)
suffix = " and deleted home" if args.delete_home else ""
print(f"removed {account.alias}{suffix}")
return 0
if args.command == "remove-all":
accounts = store.remove_all(confirm=args.yes, delete_homes=args.delete_homes)
suffix = " and deleted homes" if args.delete_homes else ""
print(f"removed {len(accounts)} aliases{suffix}")
return 0
if args.command == "remove-expired":
accounts = store.remove_expired(confirm=args.yes, delete_homes=args.delete_homes)
if accounts:
names = ", ".join(account.alias for account in accounts)
print(f"removed expired aliases: {names}")
else:
print("removed 0 expired aliases")
return 0
parser.error(f"unsupported command: {args.command}")
return 2
except ConfigError as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
def normalize_run_argv(argv: List[str]) -> List[str]:
if not argv:
return ["codex"]
if argv[0] == "--":
argv = argv[1:]
return argv or ["codex"]
def resolve_alias_arg(alias: Optional[str], stdin_text: Optional[str] = None) -> str:
if alias:
return normalize_alias_input(alias)
text = sys.stdin.read() if stdin_text is None else stdin_text
for line in text.splitlines():
candidate = line.strip()
if candidate:
return normalize_alias_input(candidate)
raise ConfigError("missing alias")
def normalize_alias_input(value: str) -> str:
candidate = value.strip()
if not candidate:
return candidate
if candidate.startswith(f"{RECOMMENDED_MARKER} "):
candidate = candidate[len(RECOMMENDED_MARKER) + 1 :].lstrip()
if ALIAS_RE.fullmatch(candidate):
return candidate
return candidate.split()[0]
def format_account_rows(
accounts: List[AccountAlias],
usage_by_alias: Optional[Mapping[str, str]] = None,
include_path: bool = False,
recommended_alias: Optional[str] = None,
) -> List[str]:
if not accounts:
return []
width = max(
len(display_alias_for_row(account.alias, recommended_alias))
for account in accounts
)
formatted_usage = format_usage_labels_for_rows(usage_by_alias) if usage_by_alias is not None else None
status_by_alias = {}
status_width = 0
if formatted_usage is None:
status_by_alias = {
account.alias: "ok" if (account.codex_home / "auth.json").is_file() else "missing-auth"
for account in accounts
}
status_width = max(len(status) for status in status_by_alias.values())
rows = []
for account in accounts:
alias = display_alias_for_row(account.alias, recommended_alias)
if formatted_usage is None:
status = status_by_alias[account.alias]
row = f"{alias:<{width}} {status:<{status_width}}"
else:
usage = formatted_usage.get(account.alias, "usage n/a")
row = f"{alias:<{width}} {usage:<8}"
if include_path:
row = f"{row} {account.codex_home}"
rows.append(row.rstrip())
return rows
def display_alias_for_row(alias: str, recommended_alias: Optional[str] = None) -> str:
if alias == recommended_alias:
return f"{RECOMMENDED_MARKER} {alias}"
return alias
def format_usage_labels_for_rows(usage_by_alias: Mapping[str, str]) -> Dict[str, str]:
parts_by_alias = {}
widths = [0, 0, 0, 0]
for alias, label in usage_by_alias.items():
parts = split_usage_label(label)
parts_by_alias[alias] = parts
if len(parts) != 4:
continue
for index, part in enumerate(parts):
widths[index] = max(widths[index], len(part))
formatted = {}
for alias, parts in parts_by_alias.items():
if len(parts) != 4:
formatted[alias] = " ".join(parts)
continue
first_percent, first_reset, weekly_percent, weekly_reset = parts
formatted[alias] = (
f"{first_percent:>{widths[0]}} "
f"{first_reset:<{widths[1]}} | "
f"{weekly_percent:>{widths[2]}} "
f"{weekly_reset:<{widths[3]}}"
).rstrip()
return formatted
def split_usage_label(label: str) -> List[str]:
sections = [section.strip() for section in label.split("|", 1)]
if len(sections) != 2:
return [label]
first = sections[0].split()
weekly = sections[1].split()
if len(first) != 2 or len(weekly) != 2:
return label.split()
return [first[0], first[1], weekly[0], weekly[1]]
def fetch_usage_labels(accounts: List[AccountAlias]) -> Dict[str, str]:
_, labels = fetch_usage_summaries_and_labels(accounts)
return labels
def fetch_usage_summaries_and_labels(
accounts: List[AccountAlias],
) -> tuple[Dict[str, UsageSummary], Dict[str, str]]:
summaries = {}
labels = {}
for account in accounts:
try:
summary = fetch_usage_summary(account)
except ConfigError as exc:
labels[account.alias] = f"error: {exc}"
continue
summaries[account.alias] = summary
labels[account.alias] = format_usage_label(summary)
return summaries, labels
def recommended_alias_from_usage_summaries(
store: CodexAliasStore,
usage_summaries: Mapping[str, UsageSummary],
) -> Optional[str]:
def cached_usage(account: AccountAlias) -> UsageSummary:
summary = usage_summaries.get(account.alias)
if summary is None:
raise ConfigError("usage unavailable")
return summary
try:
return store.select_auto_account(usage_checker=cached_usage).alias
except ConfigError:
return None
def fetch_usage_label(account: AccountAlias) -> str:
usage = fetch_usage_summary(account)
return format_usage_label(usage)
def fetch_usage_summary(account: AccountAlias) -> UsageSummary:
auth_path = account.codex_home / "auth.json"
usage_auth = extract_usage_auth(auth_path)
try:
payload = fetch_usage_payload(usage_auth)
except ConfigError as exc:
if "HTTP 401" not in str(exc):
raise
raise ConfigError(EXPIRED_ACCESS_TOKEN_MESSAGE) from exc
return parse_usage_summary(payload)
def usage_has_5h_and_weekly_remaining(usage: UsageSummary) -> bool:
return (
usage.five_hour.remaining_percent > 0
and usage.weekly is not None
and usage.weekly.remaining_percent > 0
)
def weekly_reset_sort_key(usage: UsageSummary) -> int:
if usage.weekly is None or usage.weekly.reset_at is None:
return sys.maxsize
return usage.weekly.reset_at
def five_hour_reset_sort_key(usage: UsageSummary) -> int:
if usage.five_hour.reset_at is None:
return sys.maxsize
return usage.five_hour.reset_at
def is_expired_access_token_error(exc: ConfigError) -> bool:
return str(exc) == EXPIRED_ACCESS_TOKEN_MESSAGE
def extract_usage_auth(auth_path: Path) -> UsageAuth:
try:
data = json.loads(auth_path.read_text(encoding="utf-8"))
except FileNotFoundError as exc:
raise ConfigError("missing auth.json") from exc
except json.JSONDecodeError as exc:
raise ConfigError("auth.json is not valid JSON") from exc
if data.get("OPENAI_API_KEY"):
raise ConfigError("api-key auth has no ChatGPT usage")
tokens = data.get("tokens")
if not isinstance(tokens, dict):
raise ConfigError("auth.json has no OAuth tokens")
access_token = tokens.get("access_token")
if not isinstance(access_token, str) or not access_token:
raise ConfigError("auth.json has no access token")
account_id = tokens.get("account_id")
if account_id is not None and not isinstance(account_id, str):
account_id = None
return UsageAuth(access_token=access_token, account_id=account_id)
def fetch_usage_payload(usage_auth: UsageAuth) -> Mapping[str, object]:
headers = {
"Authorization": f"Bearer {usage_auth.access_token}",
"User-Agent": CODEX_USER_AGENT,
}
if usage_auth.account_id:
headers["chatgpt-account-id"] = usage_auth.account_id
req = request.Request(CHATGPT_USAGE_URL, headers=headers, method="GET")
try:
with request.urlopen(req, timeout=15) as response:
body = response.read().decode("utf-8")
except error.HTTPError as exc:
raise ConfigError(f"usage API HTTP {exc.code}") from exc
except error.URLError as exc:
raise ConfigError(f"usage API unavailable: {exc.reason}") from exc
except TimeoutError as exc:
raise ConfigError("usage API timed out") from exc
try:
payload = json.loads(body)
except json.JSONDecodeError as exc:
raise ConfigError("usage API returned invalid JSON") from exc
if not isinstance(payload, dict):
raise ConfigError("usage API returned unexpected payload")
return payload
def parse_usage_summary(payload: Mapping[str, object]) -> UsageSummary:
rate_limit = payload.get("rate_limit")
if not isinstance(rate_limit, dict):
raise ConfigError("usage response has no rate_limit")
windows = [
rate_limit.get("primary_window"),
rate_limit.get("secondary_window"),
]
five_hour = None
weekly = None
for window in windows:
if not isinstance(window, dict):
continue
parsed = parse_usage_window(window)
if window.get("limit_window_seconds") == 18_000:
five_hour = parsed
elif window.get("limit_window_seconds") == 604_800:
weekly = parsed
elif five_hour is None:
five_hour = parsed
if five_hour is None:
raise ConfigError("usage response has no 5-hour window")
return UsageSummary(five_hour=five_hour, weekly=weekly)
def parse_usage_window(window: Mapping[str, object]) -> UsageWindow:
used = window.get("used_percent")
if not isinstance(used, (int, float)):
raise ConfigError("usage response has no used_percent")
used_float = float(used)
remaining = max(0.0, min(100.0, 100.0 - used_float))
reset_at = window.get("reset_at")
if not isinstance(reset_at, int):
reset_at = None
return UsageWindow(
remaining_percent=remaining,
used_percent=used_float,
reset_at=reset_at,
)
def format_usage_label(usage: UsageSummary, now: Optional[int] = None) -> str:
parts = [format_window_label("5h", usage.five_hour, now=now)]
if usage.weekly is not None:
parts.append(format_window_label("wk", usage.weekly, now=now))
return " | ".join(parts)
def format_window_label(name: str, window: UsageWindow, now: Optional[int] = None) -> str:
remaining = format_percent(window.remaining_percent)
label = remaining
if window.reset_at is not None:
label = f"{label} {format_reset_countdown(window.reset_at, now=now)}"
return label
def format_reset_countdown(reset_at: int, now: Optional[int] = None) -> str:
now = int(datetime.now(timezone.utc).timestamp()) if now is None else now
seconds = max(0, int(reset_at) - int(now))
days, remainder = divmod(seconds, 86_400)
hours, remainder = divmod(remainder, 3_600)
minutes = (remainder + 59) // 60
if minutes == 60:
hours += 1
minutes = 0
if hours == 24:
days += 1
hours = 0
if days:
return f"{days}d{hours}h" if hours else f"{days}d"
if hours:
return f"{hours}h{minutes}m" if minutes else f"{hours}h"
return f"{minutes}m"
def format_percent(value: float) -> str:
value_float = float(value)
if value_float.is_integer():
return f"{int(value_float)}%"
return f"{value_float:.1f}%"
if __name__ == "__main__":
raise SystemExit(main())