CPA 401 一键删除,无配置自动移出认证文件夹脚本
- 内容介绍
- 文章标签
- 相关推荐
image838×372 20 KB
最近在各位佬那里白嫖了很多 codex 账号,也算是 gpt 自由了。
根据某个佬的 401 一键删除脚本,让 codex 优化了。
1.一键删除所有 401 认证文件
2.自动检查所有认证文件是否达到限额,如果限额,则移出该认证文件到同级目录。下一次运行会检测该目录下的限额认证文件,如果不限额了,则自动移动回来。
3.多个并发线程
python3 tp.py --delete-401 --workers 50
image1158×704 368 KB
脚本如下:
#!/usr/bin/env python3
"""Scan Codex auth files and report HTTP 401 and no-limit credentials.
This script ports key parts from CLIProxyAPI's Codex implementation:
- Default Codex base URL from `internal/runtime/executor/codex_executor.go`
- Codex request headers style from `applyCodexHeaders`
- Refresh-token flow from `internal/auth/codex/openai_auth.go`
Usage examples:
python scripts/codex_quota_401_scanner.py --auth-dir ./auths
python scripts/codex_quota_401_scanner.py --auth-dir ~/.cli-proxy-api --refresh-before-check
python scripts/codex_quota_401_scanner.py --auth-dir ./auths --output-json
python scripts/codex_quota_401_scanner.py --auth-dir ./auths --delete-401
python scripts/codex_quota_401_scanner.py --auth-dir ./auths --delete-401 --yes
"""
from __future__ import annotations
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import os
import shutil
import sys
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Callable, Iterable
from urllib import error, parse, request
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
DEFAULT_REFRESH_URL = "https://auth.openai.com/oauth/token"
DEFAULT_AUTH_DIR = "~/.cli-proxy-api"
DEFAULT_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_VERSION = "0.98.0"
DEFAULT_USER_AGENT = "codex_cli_rs/0.98.0 (python-port)"
DEFAULT_WORKERS = min(32, max(4, (os.cpu_count() or 1) * 4))
DEFAULT_RETRY_ATTEMPTS = 3
DEFAULT_RETRY_BACKOFF = 0.6
ANSI_RESET = "\033[0m"
ANSI_BOLD = "\033[1m"
ANSI_DIM = "\033[2m"
ANSI_RED = "\033[31m"
ANSI_GREEN = "\033[32m"
ANSI_YELLOW = "\033[33m"
ANSI_CYAN = "\033[36m"
ANSI_MAGENTA = "\033[35m"
DEFAULT_EXCEEDED_DIR_NAME = "exceeded"
@dataclass
class CheckResult:
file: str
provider: str
email: str
account_id: str
status_code: int | None
unauthorized_401: bool
no_limit_unlimited: bool
quota_exceeded: bool
quota_resets_at: int | None
error: str
response_preview: str
@dataclass
class DeleteError:
file: str
error: str
def _is_tty_stdout() -> bool:
return hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
def _supports_color(disabled: bool) -> bool:
return (not disabled) and _is_tty_stdout() and ("NO_COLOR" not in os.environ)
def _paint(text: str, *codes: str, enabled: bool) -> str:
if not enabled or not codes:
return text
return "".join(codes) + text + ANSI_RESET
def _truncate(text: str, limit: int) -> str:
if limit <= 0 or len(text) <= limit:
return text
if limit <= 3:
return "." * limit
return text[: limit - 3] + "..."
class _ProgressDisplay:
def __init__(self, enabled: bool) -> None:
self.enabled = enabled
self._last_len = 0
self._finished = False
def update(self, current: int, total: int, path: Path) -> None:
if not self.enabled or total <= 0:
return
width = shutil.get_terminal_size(fallback=(100, 20)).columns
bar_width = max(12, min(30, width - 52))
percent = int((current * 100) / total)
filled = int((current * bar_width) / total)
bar = "#" * filled + "-" * (bar_width - filled)
message = f"[{bar}] {current}/{total} {percent:>3}% {_truncate(path.name, 28)}"
message = _truncate(message, max(10, width - 1))
padding = " " * max(0, self._last_len - len(message))
sys.stdout.write(f"\r{message}{padding}")
sys.stdout.flush()
self._last_len = len(message)
def finish(self) -> None:
if not self.enabled or self._finished:
return
self._finished = True
sys.stdout.write("\n")
sys.stdout.flush()
def _first_non_empty_str(values: Iterable[Any]) -> str:
for value in values:
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
return ""
def _dot_get(data: Any, dotted_key: str) -> Any:
current = data
for key in dotted_key.split("."):
if not isinstance(current, dict):
return None
current = current.get(key)
return current
def _pick(data: dict[str, Any], candidates: list[str]) -> str:
values = [_dot_get(data, key) for key in candidates]
return _first_non_empty_str(values)
def _looks_like_codex(path: Path, payload: dict[str, Any]) -> bool:
provider = _pick(payload, ["type", "provider", "metadata.type"])
if provider:
return provider.lower() == "codex"
name = path.name.lower()
if name.startswith("codex-"):
return True
access_token = _pick(
payload,
[
"access_token",
"accessToken",
"token.access_token",
"token.accessToken",
"metadata.access_token",
"metadata.accessToken",
"metadata.token.access_token",
"metadata.token.accessToken",
"attributes.api_key",
],
)
refresh_token = _pick(
payload,
[
"refresh_token",
"refreshToken",
"token.refresh_token",
"token.refreshToken",
"metadata.refresh_token",
"metadata.refreshToken",
"metadata.token.refresh_token",
"metadata.token.refreshToken",
],
)
account_id = _pick(
payload,
["account_id", "accountId", "metadata.account_id", "metadata.accountId"],
)
return bool(access_token and (refresh_token or account_id))
def _extract_auth_fields(payload: dict[str, Any]) -> dict[str, str]:
return {
"provider": _pick(payload, ["type", "provider", "metadata.type"]) or "codex",
"email": _pick(payload, ["email", "metadata.email", "attributes.email"]),
"access_token": _pick(
payload,
[
"access_token",
"accessToken",
"token.access_token",
"token.accessToken",
"metadata.access_token",
"metadata.accessToken",
"metadata.token.access_token",
"metadata.token.accessToken",
"attributes.api_key",
],
),
"refresh_token": _pick(
payload,
[
"refresh_token",
"refreshToken",
"token.refresh_token",
"token.refreshToken",
"metadata.refresh_token",
"metadata.refreshToken",
"metadata.token.refresh_token",
"metadata.token.refreshToken",
],
),
"account_id": _pick(
payload,
["account_id", "accountId", "metadata.account_id", "metadata.accountId"],
),
"base_url": _pick(
payload,
[
"base_url",
"baseUrl",
"metadata.base_url",
"metadata.baseUrl",
"attributes.base_url",
"attributes.baseUrl",
],
),
}
def _http_request(
*,
url: str,
method: str,
headers: dict[str, str],
body: bytes | None,
timeout: float,
) -> tuple[int, bytes]:
req = request.Request(url=url, data=body, method=method.upper())
for key, value in headers.items():
req.add_header(key, value)
try:
with request.urlopen(req, timeout=timeout) as resp:
return int(resp.status), resp.read()
except error.HTTPError as exc:
return int(exc.code), exc.read()
def _http_request_with_retry(
*,
url: str,
method: str,
headers: dict[str, str],
body: bytes | None,
timeout: float,
retry_attempts: int,
retry_backoff: float,
) -> tuple[int, bytes]:
last_exc: Exception | None = None
for attempt in range(1, retry_attempts + 1):
try:
return _http_request(
url=url,
method=method,
headers=headers,
body=body,
timeout=timeout,
)
except error.URLError as exc:
last_exc = exc
if attempt >= retry_attempts:
break
if retry_backoff > 0:
sleep_seconds = retry_backoff * (2 ** (attempt - 1))
time.sleep(sleep_seconds)
if last_exc is not None:
raise last_exc
raise RuntimeError("request failed without a captured exception")
_UNLIMITED_TEXT_MARKERS = (
"unlimited",
"no limit",
"no-limit",
"without limit",
"limitless",
"不限额",
"无限额",
"无限制",
)
_UNLIMITED_KEY_HINTS = (
"unlimited",
"no_limit",
"nolimit",
"limitless",
)
_LIMIT_LIKE_KEY_HINTS = (
"quota",
"limit",
"cap",
)
def _looks_unlimited_from_response(status_code: int | None, response_text: str) -> bool:
if status_code is None or status_code < 200 or status_code >= 300:
return False
lowered = (response_text or "").lower()
if any(marker in lowered for marker in _UNLIMITED_TEXT_MARKERS):
return True
try:
parsed = json.loads(response_text)
except json.JSONDecodeError:
return False
stack: list[Any] = [parsed]
while stack:
current = stack.pop()
if isinstance(current, dict):
for key, value in current.items():
key_lc = str(key).lower()
if any(hint in key_lc for hint in _UNLIMITED_KEY_HINTS):
if isinstance(value, bool) and value:
return True
if isinstance(value, str) and value.strip().lower() in {
"1",
"true",
"yes",
"unlimited",
"no_limit",
"nolimit",
}:
return True
if isinstance(value, (int, float)) and value == -1:
return True
if any(hint in key_lc for hint in _LIMIT_LIKE_KEY_HINTS):
if value is None:
return True
if isinstance(value, (int, float)) and (
value == -1 or value >= 9999
):
return True
if isinstance(value, str) and value.strip().lower() in {
"none",
"null",
"unlimited",
"no limit",
"no-limit",
"无限",
"不限额",
"无限额",
}:
return True
if isinstance(value, (dict, list)):
stack.append(value)
elif isinstance(value, str):
text_value = value.lower()
if any(marker in text_value for marker in _UNLIMITED_TEXT_MARKERS):
return True
elif isinstance(current, list):
stack.extend(current)
return False
_QUOTA_EXCEEDED_TEXT_MARKERS = (
"usage_limit_reached",
"usage limit has been reached",
"quota exceeded",
"limit exceeded",
"超出配额",
"额度已用完",
)
def _detect_quota_exceeded(response_text: str) -> tuple[bool, int | None]:
"""Return (is_exceeded, resets_at_unix_or_None).
Primary detection: ``error.type == 'usage_limit_reached'`` in JSON body.
Secondary: text marker scan for common quota-exceeded phrases.
"""
if not response_text:
return False, None
try:
parsed = json.loads(response_text)
except json.JSONDecodeError:
parsed = None
if isinstance(parsed, dict):
err = parsed.get("error")
if isinstance(err, dict):
if err.get("type") == "usage_limit_reached":
resets_at = err.get("resets_at")
if isinstance(resets_at, (int, float)):
return True, int(resets_at)
return True, None
lowered = response_text.lower()
if any(marker in lowered for marker in _QUOTA_EXCEEDED_TEXT_MARKERS):
return True, None
return False, None
def _refresh_access_token(
refresh_url: str, refresh_token: str, timeout: float
) -> tuple[str, str]:
body = parse.urlencode(
{
"client_id": DEFAULT_CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "openid profile email",
}
).encode("utf-8")
status, resp_body = _http_request(
url=refresh_url,
method="POST",
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
body=body,
timeout=timeout,
)
if status != 200:
msg = resp_body.decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"refresh failed with {status}: {msg}")
try:
parsed = json.loads(resp_body.decode("utf-8", errors="replace"))
except json.JSONDecodeError as exc:
raise RuntimeError(f"refresh response is not valid JSON: {exc}") from exc
new_token = _first_non_empty_str([parsed.get("access_token")])
new_refresh = _first_non_empty_str([parsed.get("refresh_token")])
if not new_token:
raise RuntimeError("refresh succeeded but access_token missing")
return new_token, new_refresh
def _build_probe_headers(access_token: str, account_id: str) -> dict[str, str]:
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
"Version": DEFAULT_VERSION,
"Openai-Beta": "responses=experimental",
"User-Agent": DEFAULT_USER_AGENT,
"Originator": "codex_cli_rs",
}
if account_id:
headers["Chatgpt-Account-Id"] = account_id
return headers
def _build_probe_body(model: str) -> bytes:
payload = {
"model": model,
"stream": True,
"store": False,
"instructions": "",
# Some proxies strictly validate `input` as a list for /responses.
"input": [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": "ping",
}
],
}
],
}
return json.dumps(payload, ensure_ascii=False).encode("utf-8")
def _load_json(path: Path) -> dict[str, Any]:
raw = path.read_text(encoding="utf-8-sig")
obj = json.loads(raw)
if not isinstance(obj, dict):
raise ValueError("root JSON value is not an object")
return obj
def _scan_single_file(path: Path, args: argparse.Namespace) -> list[CheckResult]:
try:
payload = _load_json(path)
except Exception as exc: # noqa: BLE001
return [
CheckResult(
file=str(path),
provider="unknown",
email="",
account_id="",
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=f"parse error: {exc}",
response_preview="",
)
]
if not _looks_like_codex(path, payload):
return []
fields = _extract_auth_fields(payload)
access_token = fields["access_token"]
refresh_token = fields["refresh_token"]
try:
if args.refresh_before_check and refresh_token:
access_token, _ = _refresh_access_token(
args.refresh_url, refresh_token, args.timeout
)
except Exception as exc: # noqa: BLE001
return [
CheckResult(
file=str(path),
provider=fields["provider"],
email=fields["email"],
account_id=fields["account_id"],
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=str(exc),
response_preview="",
)
]
if not access_token:
return [
CheckResult(
file=str(path),
provider=fields["provider"],
email=fields["email"],
account_id=fields["account_id"],
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error="missing access token",
response_preview="",
)
]
base_url = fields["base_url"] or args.base_url
probe_url = base_url.rstrip("/") + "/" + args.quota_path.lstrip("/")
headers = _build_probe_headers(access_token, fields["account_id"])
body = _build_probe_body(args.model)
try:
status, resp_body = _http_request_with_retry(
url=probe_url,
method="POST",
headers=headers,
body=body,
timeout=args.timeout,
retry_attempts=args.retry_attempts,
retry_backoff=args.retry_backoff,
)
response_text = resp_body.decode("utf-8", errors="replace")
preview = response_text[:300]
_quota_exceeded, _resets_at = _detect_quota_exceeded(response_text)
return [
CheckResult(
file=str(path),
provider=fields["provider"],
email=fields["email"],
account_id=fields["account_id"],
status_code=status,
unauthorized_401=(status == 401),
no_limit_unlimited=_looks_unlimited_from_response(
status, response_text
),
quota_exceeded=_quota_exceeded,
quota_resets_at=_resets_at,
error="",
response_preview=preview,
)
]
except error.URLError as exc:
return [
CheckResult(
file=str(path),
provider=fields["provider"],
email=fields["email"],
account_id=fields["account_id"],
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=f"network error: {exc}",
response_preview="",
)
]
def scan_auth_files(
args: argparse.Namespace,
progress_callback: Callable[[int, int, Path], None] | None = None,
) -> list[CheckResult]:
auth_dir = Path(args.auth_dir).expanduser().resolve()
if not auth_dir.exists() or not auth_dir.is_dir():
raise FileNotFoundError(f"auth directory not found: {auth_dir}")
json_files = sorted(auth_dir.rglob("*.json"))
total_json = len(json_files)
indexed_results: list[tuple[int, list[CheckResult]]] = []
if total_json == 0:
return []
workers = min(args.workers, total_json)
if workers <= 1:
for index, path in enumerate(json_files, start=1):
if progress_callback is not None:
progress_callback(index, total_json, path)
file_results = _scan_single_file(path, args)
if file_results:
indexed_results.append((index, file_results))
else:
with ThreadPoolExecutor(max_workers=workers) as pool:
future_map = {
pool.submit(_scan_single_file, path, args): (index, path)
for index, path in enumerate(json_files, start=1)
}
completed = 0
for future in as_completed(future_map):
completed += 1
index, path = future_map[future]
if progress_callback is not None:
progress_callback(completed, total_json, path)
try:
file_results = future.result()
except Exception as exc: # noqa: BLE001
file_results = [
CheckResult(
file=str(path),
provider="unknown",
email="",
account_id="",
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=f"internal error: {exc}",
response_preview="",
)
]
if file_results:
indexed_results.append((index, file_results))
indexed_results.sort(key=lambda item: item[0])
return [row for _, group in indexed_results for row in group]
def _status_label(item: CheckResult, use_color: bool) -> str:
if item.unauthorized_401:
return _paint("401", ANSI_BOLD, ANSI_RED, enabled=use_color)
if item.quota_exceeded:
return _paint("LIM", ANSI_BOLD, ANSI_MAGENTA, enabled=use_color)
if item.status_code is None:
return _paint("ERR", ANSI_BOLD, ANSI_YELLOW, enabled=use_color)
code = str(item.status_code)
if 200 <= item.status_code < 300:
return _paint(code, ANSI_GREEN, enabled=use_color)
if 400 <= item.status_code < 500:
return _paint(code, ANSI_YELLOW, enabled=use_color)
if item.status_code >= 500:
return _paint(code, ANSI_RED, enabled=use_color)
return code
def _print_table(results: list[CheckResult], use_color: bool) -> None:
if not results:
print(_paint("No codex auth files found.", ANSI_YELLOW, enabled=use_color))
return
unauthorized = [r for r in results if r.unauthorized_401]
quota_exceeded_list = [r for r in results if r.quota_exceeded and not r.unauthorized_401]
no_limit_unlimited = [r for r in results if r.no_limit_unlimited]
ok_count = sum(
1
for item in results
if item.status_code is not None and 200 <= item.status_code < 300
)
failed_count = len(results) - ok_count
print(_paint("Scan Summary", ANSI_BOLD, ANSI_CYAN, enabled=use_color))
print(f" checked codex files : {len(results)}")
print(f" unauthorized (401) : {len(unauthorized)}")
print(f" quota-exceeded : {len(quota_exceeded_list)}")
print(f" no-limit/unlimited : {len(no_limit_unlimited)}")
print(f" non-2xx or errors : {failed_count}")
print()
if unauthorized:
print(_paint("401 Files", ANSI_BOLD, ANSI_RED, enabled=use_color))
for item in unauthorized:
email = f" ({item.email})" if item.email else ""
print(f" [{_status_label(item, use_color)}] {item.file}{email}")
print()
if quota_exceeded_list:
print(_paint("Quota-Exceeded Files", ANSI_BOLD, ANSI_MAGENTA, enabled=use_color))
for item in quota_exceeded_list:
email = f" ({item.email})" if item.email else ""
if item.quota_resets_at is not None:
import datetime as _dt
resets_str = _dt.datetime.fromtimestamp(
item.quota_resets_at, tz=_dt.timezone.utc
).strftime("%Y-%m-%d %H:%M UTC")
suffix = f" [resets {resets_str}]"
else:
suffix = ""
print(f" [{_status_label(item, use_color)}] {item.file}{email}{suffix}")
print()
others = [
r
for r in results
if (not r.unauthorized_401)
and (not r.quota_exceeded)
and (not r.no_limit_unlimited)
]
if no_limit_unlimited:
print(
_paint("No-limit/Unlimited Files", ANSI_BOLD, ANSI_GREEN, enabled=use_color)
)
for item in no_limit_unlimited:
email = f" ({item.email})" if item.email else ""
print(f" [{_status_label(item, use_color)}] {item.file}{email}")
print()
if others:
print(_paint("Other Results", ANSI_BOLD, ANSI_CYAN, enabled=use_color))
for item in others:
status = _status_label(item, use_color)
reason = item.error or item.response_preview.replace("\n", " ")[:120]
reason = reason.strip() or "-"
print(f" [{status}] {item.file} :: {_truncate(reason, 120)}")
def _move_file_safely(
src: Path, dst_dir: Path
) -> tuple[str | None, str | None]:
"""Move *src* into *dst_dir*, creating dst_dir if necessary.
Returns ``(dst_path, None)`` on success or ``(None, error_message)`` on failure.
"""
try:
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / src.name
# If a file with the same name already exists, append a counter suffix.
counter = 1
while dst.exists():
dst = dst_dir / f"{src.stem}_{counter}{src.suffix}"
counter += 1
shutil.move(str(src), str(dst))
return str(dst), None
except Exception as exc: # noqa: BLE001
return None, str(exc)
def _scan_dir_flat(
dir_path: Path,
args: argparse.Namespace,
progress_callback: Callable[[int, int, Path], None] | None = None,
) -> list[CheckResult]:
"""Scan *dir_path* non-recursively (flat) for Codex JSON files."""
if not dir_path.exists() or not dir_path.is_dir():
return []
json_files = sorted(dir_path.glob("*.json"))
total = len(json_files)
if total == 0:
return []
results: list[CheckResult] = []
workers = min(args.workers, total)
if workers <= 1:
for index, path in enumerate(json_files, start=1):
if progress_callback is not None:
progress_callback(index, total, path)
results.extend(_scan_single_file(path, args))
else:
from concurrent.futures import ThreadPoolExecutor, as_completed as _as_completed
indexed: list[tuple[int, list[CheckResult]]] = []
with ThreadPoolExecutor(max_workers=workers) as pool:
future_map = {
pool.submit(_scan_single_file, path, args): (index, path)
for index, path in enumerate(json_files, start=1)
}
completed = 0
for future in _as_completed(future_map):
completed += 1
index, path = future_map[future]
if progress_callback is not None:
progress_callback(completed, total, path)
try:
file_results = future.result()
except Exception as exc: # noqa: BLE001
file_results = [
CheckResult(
file=str(path),
provider="unknown",
email="",
account_id="",
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=f"internal error: {exc}",
response_preview="",
)
]
indexed.append((index, file_results))
indexed.sort(key=lambda item: item[0])
results = [row for _, group in indexed for row in group]
return results
def _confirm_deletion(targets: list[str], assume_yes: bool) -> bool:
if not targets:
return False
if assume_yes:
return True
if not sys.stdin.isatty():
print(
"No interactive terminal for confirmation; deletion cancelled. Use --yes to force."
)
return False
print()
print(f"Delete {len(targets)} files with 401? This action cannot be undone.")
answer = input("Confirm deletion? [y/N]: ").strip().lower()
return answer in {"y", "yes"}
def _delete_files(paths: list[str]) -> tuple[list[str], list[DeleteError]]:
deleted: list[str] = []
errors: list[DeleteError] = []
seen: set[str] = set()
for raw_path in paths:
path = Path(raw_path)
normalized = str(path.resolve())
if normalized in seen:
continue
seen.add(normalized)
try:
path.unlink()
deleted.append(str(path))
except Exception as exc: # noqa: BLE001
errors.append(DeleteError(file=str(path), error=str(exc)))
return deleted, errors
def _print_deletion_summary(
*,
requested: bool,
target_count: int,
confirmed: bool,
deleted_files: list[str],
errors: list[DeleteError],
use_color: bool,
) -> None:
if not requested:
return
if target_count == 0:
print()
print(
_paint(
"Delete mode enabled, but no 401 files found.",
ANSI_DIM,
enabled=use_color,
)
)
return
print()
if not confirmed:
print(_paint("Deletion cancelled by user.", ANSI_YELLOW, enabled=use_color))
return
print(
_paint(
f"Deletion completed: {len(deleted_files)}/{target_count} removed.",
ANSI_BOLD,
ANSI_GREEN,
enabled=use_color,
)
)
for path in deleted_files:
print(f"[{_paint('deleted', ANSI_GREEN, enabled=use_color)}] {path}")
for item in errors:
print(
f"[{_paint('delete-failed', ANSI_RED, enabled=use_color)}] {item.file} :: {item.error}"
)
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Traverse auth folder and detect Codex auth files returning 401 or no-limit markers during quota probe."
)
parser.add_argument(
"--auth-dir",
default=DEFAULT_AUTH_DIR,
help=f"Folder containing auth JSON files (default: {DEFAULT_AUTH_DIR}).",
)
parser.add_argument(
"--base-url",
default=DEFAULT_CODEX_BASE_URL,
help=f"Codex base URL (default: {DEFAULT_CODEX_BASE_URL})",
)
parser.add_argument(
"--quota-path",
default="/responses",
help="API path used for quota/auth probe (default: /responses)",
)
parser.add_argument(
"--model",
default="gpt-5",
help="Model used in probe request body (default: gpt-5)",
)
parser.add_argument(
"--timeout",
type=float,
default=20,
help="HTTP timeout in seconds (default: 20)",
)
parser.add_argument(
"--workers",
type=int,
default=DEFAULT_WORKERS,
help=f"Number of concurrent workers for file scan/probe (default: {DEFAULT_WORKERS})",
)
parser.add_argument(
"--retry-attempts",
type=int,
default=DEFAULT_RETRY_ATTEMPTS,
help=f"Total attempts for network errors per file (default: {DEFAULT_RETRY_ATTEMPTS})",
)
parser.add_argument(
"--retry-backoff",
type=float,
default=DEFAULT_RETRY_BACKOFF,
help=f"Base seconds for exponential retry backoff (default: {DEFAULT_RETRY_BACKOFF})",
)
parser.add_argument(
"--refresh-before-check",
action="store_true",
help="Refresh access token with refresh_token before probe.",
)
parser.add_argument(
"--refresh-url",
default=DEFAULT_REFRESH_URL,
help=f"Token refresh endpoint (default: {DEFAULT_REFRESH_URL})",
)
parser.add_argument(
"--output-json",
action="store_true",
help="Print full results as JSON instead of table view.",
)
parser.add_argument(
"--no-progress",
action="store_true",
help="Disable live scan progress in terminal output.",
)
parser.add_argument(
"--no-color",
action="store_true",
help="Disable ANSI color output.",
)
parser.add_argument(
"--delete-401",
action="store_true",
help="Delete auth files that returned HTTP 401 after confirmation.",
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip deletion confirmation prompt (only applies with --delete-401).",
)
parser.add_argument(
"--exceeded-dir",
default=None,
help=(
"Directory to move quota-exceeded auth files into. "
f"Defaults to a sibling '{DEFAULT_EXCEEDED_DIR_NAME}/' folder next to --auth-dir."
),
)
parser.add_argument(
"--no-quarantine",
action="store_true",
help="Disable automatic quarantine: do not move quota-exceeded files and skip recovery scan.",
)
return parser
def main() -> int:
parser = _build_parser()
args = parser.parse_args()
if args.workers < 1:
parser.error("--workers must be >= 1")
if args.retry_attempts < 1:
parser.error("--retry-attempts must be >= 1")
if args.retry_backoff < 0:
parser.error("--retry-backoff must be >= 0")
use_color = _supports_color(args.no_color) and (not args.output_json)
progress_enabled = (
_is_tty_stdout() and (not args.no_progress) and (not args.output_json)
)
progress = _ProgressDisplay(progress_enabled)
auth_dir = Path(args.auth_dir).expanduser().resolve()
exceeded_dir = (
Path(args.exceeded_dir).expanduser().resolve()
if args.exceeded_dir
else auth_dir.parent / DEFAULT_EXCEEDED_DIR_NAME
)
if progress_enabled:
print(_paint("Scanning auth JSON files...", ANSI_DIM, enabled=use_color))
try:
results = scan_auth_files(
args,
progress_callback=progress.update if progress_enabled else None,
)
except Exception as exc: # noqa: BLE001
progress.finish()
print(f"Error: {exc}", file=sys.stderr)
return 2
progress.finish()
# --- Quarantine: move exceeded files out of auth_dir ---
moved_to_exceeded: list[str] = [] # dst paths
move_to_exceeded_errors: list[DeleteError] = []
if not args.no_quarantine:
for item in results:
if item.quota_exceeded:
dst, err = _move_file_safely(Path(item.file), exceeded_dir)
if err:
move_to_exceeded_errors.append(DeleteError(file=item.file, error=err))
else:
if dst is not None:
moved_to_exceeded.append(dst)
# --- Recovery: scan exceeded_dir, move back files that are no longer limited ---
exceeded_results: list[CheckResult] = []
moved_from_exceeded: list[str] = [] # dst paths (back in auth_dir)
move_from_exceeded_errors: list[DeleteError] = []
if not args.no_quarantine and exceeded_dir.exists():
if progress_enabled:
print(
_paint(
f"Scanning exceeded dir: {exceeded_dir} ...",
ANSI_DIM,
enabled=use_color,
)
)
exceeded_results = _scan_dir_flat(exceeded_dir, args)
for item in exceeded_results:
recovered = (
not item.quota_exceeded
and item.status_code is not None
and 200 <= item.status_code < 300
)
if recovered:
dst, err = _move_file_safely(Path(item.file), auth_dir)
if err:
move_from_exceeded_errors.append(
DeleteError(file=item.file, error=err)
)
else:
if dst is not None:
moved_from_exceeded.append(dst)
# --- Delete-401 flow ---
unauthorized_files = [item.file for item in results if item.unauthorized_401]
delete_confirmed = False
deleted_files: list[str] = []
delete_errors: list[DeleteError] = []
if args.delete_401 and unauthorized_files:
delete_confirmed = _confirm_deletion(unauthorized_files, args.yes)
if delete_confirmed:
deleted_files, delete_errors = _delete_files(unauthorized_files)
if args.output_json:
print(
json.dumps(
{
"results": [asdict(item) for item in results],
"exceeded_dir_results": [asdict(item) for item in exceeded_results],
"quarantine": {
"enabled": not args.no_quarantine,
"exceeded_dir": str(exceeded_dir),
"moved_to_exceeded": moved_to_exceeded,
"moved_to_exceeded_errors": [
asdict(e) for e in move_to_exceeded_errors
],
"moved_from_exceeded": moved_from_exceeded,
"moved_from_exceeded_errors": [
asdict(e) for e in move_from_exceeded_errors
],
},
"deletion": {
"requested": args.delete_401,
"target_count": len(unauthorized_files),
"confirmed": delete_confirmed,
"deleted_count": len(deleted_files),
"deleted_files": deleted_files,
"errors": [asdict(item) for item in delete_errors],
},
},
ensure_ascii=False,
indent=2,
)
)
else:
_print_table(results, use_color=use_color)
if exceeded_results:
print(
_paint(
f"Exceeded Dir Scan ({exceeded_dir})",
ANSI_BOLD,
ANSI_MAGENTA,
enabled=use_color,
)
)
_print_table(exceeded_results, use_color=use_color)
if moved_to_exceeded or move_to_exceeded_errors:
print()
print(_paint("Quarantine Moves (auth-dir → exceeded)", ANSI_BOLD, ANSI_MAGENTA, enabled=use_color))
for dst in moved_to_exceeded:
print(f" [{_paint('moved', ANSI_MAGENTA, enabled=use_color)}] → {dst}")
for e in move_to_exceeded_errors:
print(f" [{_paint('move-failed', ANSI_RED, enabled=use_color)}] {e.file} :: {e.error}")
if moved_from_exceeded or move_from_exceeded_errors:
print()
print(_paint("Recovery Moves (exceeded → auth-dir)", ANSI_BOLD, ANSI_GREEN, enabled=use_color))
for dst in moved_from_exceeded:
print(f" [{_paint('recovered', ANSI_GREEN, enabled=use_color)}] → {dst}")
for e in move_from_exceeded_errors:
print(f" [{_paint('recover-failed', ANSI_RED, enabled=use_color)}] {e.file} :: {e.error}")
_print_deletion_summary(
requested=args.delete_401,
target_count=len(unauthorized_files),
confirmed=delete_confirmed,
deleted_files=deleted_files,
errors=delete_errors,
use_color=use_color,
)
has_401 = any(item.unauthorized_401 for item in results)
return 1 if has_401 else 0
if __name__ == "__main__":
raise SystemExit(main())
网友解答:
--【壹】--:
熬,不会用。。。我在1panel里安装的,这脚本放进去运行没反应qaq
--【贰】--:
谢谢分享!
--【叁】--:
–auth-dir 指定你的认证文件夹
实在不行,问 ai 怎么用
python tp.py -h
usage: tp.py [-h] [--url CPA_URL] [--key MANAGEMENT_KEY] [--auth-dir AUTH_DIR] [--base-url BASE_URL] [--quota-path QUOTA_PATH] [--model MODEL] [--timeout TIMEOUT] [--workers WORKERS]
[--retry-attempts RETRY_ATTEMPTS] [--retry-backoff RETRY_BACKOFF] [--refresh-before-check] [--refresh-url REFRESH_URL] [--output-json] [--no-progress] [--no-color] [--delete-401] [--yes]
[--exceeded-dir EXCEEDED_DIR] [--no-quarantine]
Detect Codex credentials returning 401/no-limit markers during quota probe. Supports local auth-dir mode and CPA remote mode (--url/--key).
options:
-h, --help show this help message and exit
--url CPA_URL, --cpa-url CPA_URL
CLIProxyAPI base URL for CPA mode, e.g. http://127.0.0.1:8317
--key MANAGEMENT_KEY, --management-key MANAGEMENT_KEY
CLIProxyAPI management key for CPA mode.
--auth-dir AUTH_DIR Folder containing auth JSON files (default: ~/.cli-proxy-api).
--base-url BASE_URL Codex base URL (default: https://chatgpt.com/backend-api/codex)
--quota-path QUOTA_PATH
API path used for quota/auth probe (default: /responses)
--model MODEL Model used in probe request body (default: gpt-5)
--timeout TIMEOUT HTTP timeout in seconds (default: 20)
--workers WORKERS Number of concurrent workers for file scan/probe (default: 32)
--retry-attempts RETRY_ATTEMPTS
Total attempts for network errors per file (default: 3)
--retry-backoff RETRY_BACKOFF
Base seconds for exponential retry backoff (default: 0.6)
--refresh-before-check
Refresh access token with refresh_token before probe.
--refresh-url REFRESH_URL
Token refresh endpoint (default: https://auth.openai.com/oauth/token)
--output-json Print full results as JSON instead of table view.
--no-progress Disable live scan progress in terminal output.
--no-color Disable ANSI color output.
--delete-401 Delete auth files that returned HTTP 401 after confirmation.
--yes Skip deletion confirmation prompt (only applies with --delete-401).
--exceeded-dir EXCEEDED_DIR
Directory to move quota-exceeded auth files into. Defaults to a sibling 'exceeded/' folder next to --auth-dir.
--no-quarantine Disable automatic quarantine: do not move quota-exceeded files and skip recovery scan.
--【肆】--:
感谢佬友二开!
--【伍】--:
感谢大佬分享 很实用!
--【陆】--:
谢谢佬 好用
--【柒】--:
谢谢佬友,这个清理很方便
--【捌】--:
老师这个在哪里用呢
--【玖】--:
好资源,值得分享,多谢多谢
--【拾】--:
你可以让 codex 根据 cpa 的认证机制,在这个脚本上二开。效果类似于:
python3 tp.py --delete-401 --workers 50 --url http://xxx.com --key xxx
大概几分钟就能写好。我主要是本机在跑.
--【拾壹】--:
感谢大佬
--【拾贰】--:
这个是在本地跑的脚本吗?把cpa部署到了zeabur上,怎么在zeabur上跑?
--【拾叁】--:
感谢分享刚刚还在想怎么解决这个额度用完的问题
--【拾肆】--:
真不错,已经用上了
--【拾伍】--:
image1788×634 120 KB
明明很多401的,为啥检测出来全部都可用?
image700×156 8.08 KB
--【拾陆】--:
python 脚本哇,直接在你安装了 cpa 的机器上用,默认检测~/.cli-proxy-api 路径,你也可以指定你自己的认证文件文件夹
--【拾柒】--:
测401试了很好使,不过和这个测额度移动文件夹好像没变化
image838×372 20 KB
最近在各位佬那里白嫖了很多 codex 账号,也算是 gpt 自由了。
根据某个佬的 401 一键删除脚本,让 codex 优化了。
1.一键删除所有 401 认证文件
2.自动检查所有认证文件是否达到限额,如果限额,则移出该认证文件到同级目录。下一次运行会检测该目录下的限额认证文件,如果不限额了,则自动移动回来。
3.多个并发线程
python3 tp.py --delete-401 --workers 50
image1158×704 368 KB
脚本如下:
#!/usr/bin/env python3
"""Scan Codex auth files and report HTTP 401 and no-limit credentials.
This script ports key parts from CLIProxyAPI's Codex implementation:
- Default Codex base URL from `internal/runtime/executor/codex_executor.go`
- Codex request headers style from `applyCodexHeaders`
- Refresh-token flow from `internal/auth/codex/openai_auth.go`
Usage examples:
python scripts/codex_quota_401_scanner.py --auth-dir ./auths
python scripts/codex_quota_401_scanner.py --auth-dir ~/.cli-proxy-api --refresh-before-check
python scripts/codex_quota_401_scanner.py --auth-dir ./auths --output-json
python scripts/codex_quota_401_scanner.py --auth-dir ./auths --delete-401
python scripts/codex_quota_401_scanner.py --auth-dir ./auths --delete-401 --yes
"""
from __future__ import annotations
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import os
import shutil
import sys
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Callable, Iterable
from urllib import error, parse, request
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
DEFAULT_REFRESH_URL = "https://auth.openai.com/oauth/token"
DEFAULT_AUTH_DIR = "~/.cli-proxy-api"
DEFAULT_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_VERSION = "0.98.0"
DEFAULT_USER_AGENT = "codex_cli_rs/0.98.0 (python-port)"
DEFAULT_WORKERS = min(32, max(4, (os.cpu_count() or 1) * 4))
DEFAULT_RETRY_ATTEMPTS = 3
DEFAULT_RETRY_BACKOFF = 0.6
ANSI_RESET = "\033[0m"
ANSI_BOLD = "\033[1m"
ANSI_DIM = "\033[2m"
ANSI_RED = "\033[31m"
ANSI_GREEN = "\033[32m"
ANSI_YELLOW = "\033[33m"
ANSI_CYAN = "\033[36m"
ANSI_MAGENTA = "\033[35m"
DEFAULT_EXCEEDED_DIR_NAME = "exceeded"
@dataclass
class CheckResult:
file: str
provider: str
email: str
account_id: str
status_code: int | None
unauthorized_401: bool
no_limit_unlimited: bool
quota_exceeded: bool
quota_resets_at: int | None
error: str
response_preview: str
@dataclass
class DeleteError:
file: str
error: str
def _is_tty_stdout() -> bool:
return hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
def _supports_color(disabled: bool) -> bool:
return (not disabled) and _is_tty_stdout() and ("NO_COLOR" not in os.environ)
def _paint(text: str, *codes: str, enabled: bool) -> str:
if not enabled or not codes:
return text
return "".join(codes) + text + ANSI_RESET
def _truncate(text: str, limit: int) -> str:
if limit <= 0 or len(text) <= limit:
return text
if limit <= 3:
return "." * limit
return text[: limit - 3] + "..."
class _ProgressDisplay:
def __init__(self, enabled: bool) -> None:
self.enabled = enabled
self._last_len = 0
self._finished = False
def update(self, current: int, total: int, path: Path) -> None:
if not self.enabled or total <= 0:
return
width = shutil.get_terminal_size(fallback=(100, 20)).columns
bar_width = max(12, min(30, width - 52))
percent = int((current * 100) / total)
filled = int((current * bar_width) / total)
bar = "#" * filled + "-" * (bar_width - filled)
message = f"[{bar}] {current}/{total} {percent:>3}% {_truncate(path.name, 28)}"
message = _truncate(message, max(10, width - 1))
padding = " " * max(0, self._last_len - len(message))
sys.stdout.write(f"\r{message}{padding}")
sys.stdout.flush()
self._last_len = len(message)
def finish(self) -> None:
if not self.enabled or self._finished:
return
self._finished = True
sys.stdout.write("\n")
sys.stdout.flush()
def _first_non_empty_str(values: Iterable[Any]) -> str:
for value in values:
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
return ""
def _dot_get(data: Any, dotted_key: str) -> Any:
current = data
for key in dotted_key.split("."):
if not isinstance(current, dict):
return None
current = current.get(key)
return current
def _pick(data: dict[str, Any], candidates: list[str]) -> str:
values = [_dot_get(data, key) for key in candidates]
return _first_non_empty_str(values)
def _looks_like_codex(path: Path, payload: dict[str, Any]) -> bool:
provider = _pick(payload, ["type", "provider", "metadata.type"])
if provider:
return provider.lower() == "codex"
name = path.name.lower()
if name.startswith("codex-"):
return True
access_token = _pick(
payload,
[
"access_token",
"accessToken",
"token.access_token",
"token.accessToken",
"metadata.access_token",
"metadata.accessToken",
"metadata.token.access_token",
"metadata.token.accessToken",
"attributes.api_key",
],
)
refresh_token = _pick(
payload,
[
"refresh_token",
"refreshToken",
"token.refresh_token",
"token.refreshToken",
"metadata.refresh_token",
"metadata.refreshToken",
"metadata.token.refresh_token",
"metadata.token.refreshToken",
],
)
account_id = _pick(
payload,
["account_id", "accountId", "metadata.account_id", "metadata.accountId"],
)
return bool(access_token and (refresh_token or account_id))
def _extract_auth_fields(payload: dict[str, Any]) -> dict[str, str]:
return {
"provider": _pick(payload, ["type", "provider", "metadata.type"]) or "codex",
"email": _pick(payload, ["email", "metadata.email", "attributes.email"]),
"access_token": _pick(
payload,
[
"access_token",
"accessToken",
"token.access_token",
"token.accessToken",
"metadata.access_token",
"metadata.accessToken",
"metadata.token.access_token",
"metadata.token.accessToken",
"attributes.api_key",
],
),
"refresh_token": _pick(
payload,
[
"refresh_token",
"refreshToken",
"token.refresh_token",
"token.refreshToken",
"metadata.refresh_token",
"metadata.refreshToken",
"metadata.token.refresh_token",
"metadata.token.refreshToken",
],
),
"account_id": _pick(
payload,
["account_id", "accountId", "metadata.account_id", "metadata.accountId"],
),
"base_url": _pick(
payload,
[
"base_url",
"baseUrl",
"metadata.base_url",
"metadata.baseUrl",
"attributes.base_url",
"attributes.baseUrl",
],
),
}
def _http_request(
*,
url: str,
method: str,
headers: dict[str, str],
body: bytes | None,
timeout: float,
) -> tuple[int, bytes]:
req = request.Request(url=url, data=body, method=method.upper())
for key, value in headers.items():
req.add_header(key, value)
try:
with request.urlopen(req, timeout=timeout) as resp:
return int(resp.status), resp.read()
except error.HTTPError as exc:
return int(exc.code), exc.read()
def _http_request_with_retry(
*,
url: str,
method: str,
headers: dict[str, str],
body: bytes | None,
timeout: float,
retry_attempts: int,
retry_backoff: float,
) -> tuple[int, bytes]:
last_exc: Exception | None = None
for attempt in range(1, retry_attempts + 1):
try:
return _http_request(
url=url,
method=method,
headers=headers,
body=body,
timeout=timeout,
)
except error.URLError as exc:
last_exc = exc
if attempt >= retry_attempts:
break
if retry_backoff > 0:
sleep_seconds = retry_backoff * (2 ** (attempt - 1))
time.sleep(sleep_seconds)
if last_exc is not None:
raise last_exc
raise RuntimeError("request failed without a captured exception")
_UNLIMITED_TEXT_MARKERS = (
"unlimited",
"no limit",
"no-limit",
"without limit",
"limitless",
"不限额",
"无限额",
"无限制",
)
_UNLIMITED_KEY_HINTS = (
"unlimited",
"no_limit",
"nolimit",
"limitless",
)
_LIMIT_LIKE_KEY_HINTS = (
"quota",
"limit",
"cap",
)
def _looks_unlimited_from_response(status_code: int | None, response_text: str) -> bool:
if status_code is None or status_code < 200 or status_code >= 300:
return False
lowered = (response_text or "").lower()
if any(marker in lowered for marker in _UNLIMITED_TEXT_MARKERS):
return True
try:
parsed = json.loads(response_text)
except json.JSONDecodeError:
return False
stack: list[Any] = [parsed]
while stack:
current = stack.pop()
if isinstance(current, dict):
for key, value in current.items():
key_lc = str(key).lower()
if any(hint in key_lc for hint in _UNLIMITED_KEY_HINTS):
if isinstance(value, bool) and value:
return True
if isinstance(value, str) and value.strip().lower() in {
"1",
"true",
"yes",
"unlimited",
"no_limit",
"nolimit",
}:
return True
if isinstance(value, (int, float)) and value == -1:
return True
if any(hint in key_lc for hint in _LIMIT_LIKE_KEY_HINTS):
if value is None:
return True
if isinstance(value, (int, float)) and (
value == -1 or value >= 9999
):
return True
if isinstance(value, str) and value.strip().lower() in {
"none",
"null",
"unlimited",
"no limit",
"no-limit",
"无限",
"不限额",
"无限额",
}:
return True
if isinstance(value, (dict, list)):
stack.append(value)
elif isinstance(value, str):
text_value = value.lower()
if any(marker in text_value for marker in _UNLIMITED_TEXT_MARKERS):
return True
elif isinstance(current, list):
stack.extend(current)
return False
_QUOTA_EXCEEDED_TEXT_MARKERS = (
"usage_limit_reached",
"usage limit has been reached",
"quota exceeded",
"limit exceeded",
"超出配额",
"额度已用完",
)
def _detect_quota_exceeded(response_text: str) -> tuple[bool, int | None]:
"""Return (is_exceeded, resets_at_unix_or_None).
Primary detection: ``error.type == 'usage_limit_reached'`` in JSON body.
Secondary: text marker scan for common quota-exceeded phrases.
"""
if not response_text:
return False, None
try:
parsed = json.loads(response_text)
except json.JSONDecodeError:
parsed = None
if isinstance(parsed, dict):
err = parsed.get("error")
if isinstance(err, dict):
if err.get("type") == "usage_limit_reached":
resets_at = err.get("resets_at")
if isinstance(resets_at, (int, float)):
return True, int(resets_at)
return True, None
lowered = response_text.lower()
if any(marker in lowered for marker in _QUOTA_EXCEEDED_TEXT_MARKERS):
return True, None
return False, None
def _refresh_access_token(
refresh_url: str, refresh_token: str, timeout: float
) -> tuple[str, str]:
body = parse.urlencode(
{
"client_id": DEFAULT_CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "openid profile email",
}
).encode("utf-8")
status, resp_body = _http_request(
url=refresh_url,
method="POST",
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
body=body,
timeout=timeout,
)
if status != 200:
msg = resp_body.decode("utf-8", errors="replace")[:300]
raise RuntimeError(f"refresh failed with {status}: {msg}")
try:
parsed = json.loads(resp_body.decode("utf-8", errors="replace"))
except json.JSONDecodeError as exc:
raise RuntimeError(f"refresh response is not valid JSON: {exc}") from exc
new_token = _first_non_empty_str([parsed.get("access_token")])
new_refresh = _first_non_empty_str([parsed.get("refresh_token")])
if not new_token:
raise RuntimeError("refresh succeeded but access_token missing")
return new_token, new_refresh
def _build_probe_headers(access_token: str, account_id: str) -> dict[str, str]:
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
"Version": DEFAULT_VERSION,
"Openai-Beta": "responses=experimental",
"User-Agent": DEFAULT_USER_AGENT,
"Originator": "codex_cli_rs",
}
if account_id:
headers["Chatgpt-Account-Id"] = account_id
return headers
def _build_probe_body(model: str) -> bytes:
payload = {
"model": model,
"stream": True,
"store": False,
"instructions": "",
# Some proxies strictly validate `input` as a list for /responses.
"input": [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": "ping",
}
],
}
],
}
return json.dumps(payload, ensure_ascii=False).encode("utf-8")
def _load_json(path: Path) -> dict[str, Any]:
raw = path.read_text(encoding="utf-8-sig")
obj = json.loads(raw)
if not isinstance(obj, dict):
raise ValueError("root JSON value is not an object")
return obj
def _scan_single_file(path: Path, args: argparse.Namespace) -> list[CheckResult]:
try:
payload = _load_json(path)
except Exception as exc: # noqa: BLE001
return [
CheckResult(
file=str(path),
provider="unknown",
email="",
account_id="",
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=f"parse error: {exc}",
response_preview="",
)
]
if not _looks_like_codex(path, payload):
return []
fields = _extract_auth_fields(payload)
access_token = fields["access_token"]
refresh_token = fields["refresh_token"]
try:
if args.refresh_before_check and refresh_token:
access_token, _ = _refresh_access_token(
args.refresh_url, refresh_token, args.timeout
)
except Exception as exc: # noqa: BLE001
return [
CheckResult(
file=str(path),
provider=fields["provider"],
email=fields["email"],
account_id=fields["account_id"],
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=str(exc),
response_preview="",
)
]
if not access_token:
return [
CheckResult(
file=str(path),
provider=fields["provider"],
email=fields["email"],
account_id=fields["account_id"],
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error="missing access token",
response_preview="",
)
]
base_url = fields["base_url"] or args.base_url
probe_url = base_url.rstrip("/") + "/" + args.quota_path.lstrip("/")
headers = _build_probe_headers(access_token, fields["account_id"])
body = _build_probe_body(args.model)
try:
status, resp_body = _http_request_with_retry(
url=probe_url,
method="POST",
headers=headers,
body=body,
timeout=args.timeout,
retry_attempts=args.retry_attempts,
retry_backoff=args.retry_backoff,
)
response_text = resp_body.decode("utf-8", errors="replace")
preview = response_text[:300]
_quota_exceeded, _resets_at = _detect_quota_exceeded(response_text)
return [
CheckResult(
file=str(path),
provider=fields["provider"],
email=fields["email"],
account_id=fields["account_id"],
status_code=status,
unauthorized_401=(status == 401),
no_limit_unlimited=_looks_unlimited_from_response(
status, response_text
),
quota_exceeded=_quota_exceeded,
quota_resets_at=_resets_at,
error="",
response_preview=preview,
)
]
except error.URLError as exc:
return [
CheckResult(
file=str(path),
provider=fields["provider"],
email=fields["email"],
account_id=fields["account_id"],
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=f"network error: {exc}",
response_preview="",
)
]
def scan_auth_files(
args: argparse.Namespace,
progress_callback: Callable[[int, int, Path], None] | None = None,
) -> list[CheckResult]:
auth_dir = Path(args.auth_dir).expanduser().resolve()
if not auth_dir.exists() or not auth_dir.is_dir():
raise FileNotFoundError(f"auth directory not found: {auth_dir}")
json_files = sorted(auth_dir.rglob("*.json"))
total_json = len(json_files)
indexed_results: list[tuple[int, list[CheckResult]]] = []
if total_json == 0:
return []
workers = min(args.workers, total_json)
if workers <= 1:
for index, path in enumerate(json_files, start=1):
if progress_callback is not None:
progress_callback(index, total_json, path)
file_results = _scan_single_file(path, args)
if file_results:
indexed_results.append((index, file_results))
else:
with ThreadPoolExecutor(max_workers=workers) as pool:
future_map = {
pool.submit(_scan_single_file, path, args): (index, path)
for index, path in enumerate(json_files, start=1)
}
completed = 0
for future in as_completed(future_map):
completed += 1
index, path = future_map[future]
if progress_callback is not None:
progress_callback(completed, total_json, path)
try:
file_results = future.result()
except Exception as exc: # noqa: BLE001
file_results = [
CheckResult(
file=str(path),
provider="unknown",
email="",
account_id="",
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=f"internal error: {exc}",
response_preview="",
)
]
if file_results:
indexed_results.append((index, file_results))
indexed_results.sort(key=lambda item: item[0])
return [row for _, group in indexed_results for row in group]
def _status_label(item: CheckResult, use_color: bool) -> str:
if item.unauthorized_401:
return _paint("401", ANSI_BOLD, ANSI_RED, enabled=use_color)
if item.quota_exceeded:
return _paint("LIM", ANSI_BOLD, ANSI_MAGENTA, enabled=use_color)
if item.status_code is None:
return _paint("ERR", ANSI_BOLD, ANSI_YELLOW, enabled=use_color)
code = str(item.status_code)
if 200 <= item.status_code < 300:
return _paint(code, ANSI_GREEN, enabled=use_color)
if 400 <= item.status_code < 500:
return _paint(code, ANSI_YELLOW, enabled=use_color)
if item.status_code >= 500:
return _paint(code, ANSI_RED, enabled=use_color)
return code
def _print_table(results: list[CheckResult], use_color: bool) -> None:
if not results:
print(_paint("No codex auth files found.", ANSI_YELLOW, enabled=use_color))
return
unauthorized = [r for r in results if r.unauthorized_401]
quota_exceeded_list = [r for r in results if r.quota_exceeded and not r.unauthorized_401]
no_limit_unlimited = [r for r in results if r.no_limit_unlimited]
ok_count = sum(
1
for item in results
if item.status_code is not None and 200 <= item.status_code < 300
)
failed_count = len(results) - ok_count
print(_paint("Scan Summary", ANSI_BOLD, ANSI_CYAN, enabled=use_color))
print(f" checked codex files : {len(results)}")
print(f" unauthorized (401) : {len(unauthorized)}")
print(f" quota-exceeded : {len(quota_exceeded_list)}")
print(f" no-limit/unlimited : {len(no_limit_unlimited)}")
print(f" non-2xx or errors : {failed_count}")
print()
if unauthorized:
print(_paint("401 Files", ANSI_BOLD, ANSI_RED, enabled=use_color))
for item in unauthorized:
email = f" ({item.email})" if item.email else ""
print(f" [{_status_label(item, use_color)}] {item.file}{email}")
print()
if quota_exceeded_list:
print(_paint("Quota-Exceeded Files", ANSI_BOLD, ANSI_MAGENTA, enabled=use_color))
for item in quota_exceeded_list:
email = f" ({item.email})" if item.email else ""
if item.quota_resets_at is not None:
import datetime as _dt
resets_str = _dt.datetime.fromtimestamp(
item.quota_resets_at, tz=_dt.timezone.utc
).strftime("%Y-%m-%d %H:%M UTC")
suffix = f" [resets {resets_str}]"
else:
suffix = ""
print(f" [{_status_label(item, use_color)}] {item.file}{email}{suffix}")
print()
others = [
r
for r in results
if (not r.unauthorized_401)
and (not r.quota_exceeded)
and (not r.no_limit_unlimited)
]
if no_limit_unlimited:
print(
_paint("No-limit/Unlimited Files", ANSI_BOLD, ANSI_GREEN, enabled=use_color)
)
for item in no_limit_unlimited:
email = f" ({item.email})" if item.email else ""
print(f" [{_status_label(item, use_color)}] {item.file}{email}")
print()
if others:
print(_paint("Other Results", ANSI_BOLD, ANSI_CYAN, enabled=use_color))
for item in others:
status = _status_label(item, use_color)
reason = item.error or item.response_preview.replace("\n", " ")[:120]
reason = reason.strip() or "-"
print(f" [{status}] {item.file} :: {_truncate(reason, 120)}")
def _move_file_safely(
src: Path, dst_dir: Path
) -> tuple[str | None, str | None]:
"""Move *src* into *dst_dir*, creating dst_dir if necessary.
Returns ``(dst_path, None)`` on success or ``(None, error_message)`` on failure.
"""
try:
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / src.name
# If a file with the same name already exists, append a counter suffix.
counter = 1
while dst.exists():
dst = dst_dir / f"{src.stem}_{counter}{src.suffix}"
counter += 1
shutil.move(str(src), str(dst))
return str(dst), None
except Exception as exc: # noqa: BLE001
return None, str(exc)
def _scan_dir_flat(
dir_path: Path,
args: argparse.Namespace,
progress_callback: Callable[[int, int, Path], None] | None = None,
) -> list[CheckResult]:
"""Scan *dir_path* non-recursively (flat) for Codex JSON files."""
if not dir_path.exists() or not dir_path.is_dir():
return []
json_files = sorted(dir_path.glob("*.json"))
total = len(json_files)
if total == 0:
return []
results: list[CheckResult] = []
workers = min(args.workers, total)
if workers <= 1:
for index, path in enumerate(json_files, start=1):
if progress_callback is not None:
progress_callback(index, total, path)
results.extend(_scan_single_file(path, args))
else:
from concurrent.futures import ThreadPoolExecutor, as_completed as _as_completed
indexed: list[tuple[int, list[CheckResult]]] = []
with ThreadPoolExecutor(max_workers=workers) as pool:
future_map = {
pool.submit(_scan_single_file, path, args): (index, path)
for index, path in enumerate(json_files, start=1)
}
completed = 0
for future in _as_completed(future_map):
completed += 1
index, path = future_map[future]
if progress_callback is not None:
progress_callback(completed, total, path)
try:
file_results = future.result()
except Exception as exc: # noqa: BLE001
file_results = [
CheckResult(
file=str(path),
provider="unknown",
email="",
account_id="",
status_code=None,
unauthorized_401=False,
no_limit_unlimited=False,
quota_exceeded=False,
quota_resets_at=None,
error=f"internal error: {exc}",
response_preview="",
)
]
indexed.append((index, file_results))
indexed.sort(key=lambda item: item[0])
results = [row for _, group in indexed for row in group]
return results
def _confirm_deletion(targets: list[str], assume_yes: bool) -> bool:
if not targets:
return False
if assume_yes:
return True
if not sys.stdin.isatty():
print(
"No interactive terminal for confirmation; deletion cancelled. Use --yes to force."
)
return False
print()
print(f"Delete {len(targets)} files with 401? This action cannot be undone.")
answer = input("Confirm deletion? [y/N]: ").strip().lower()
return answer in {"y", "yes"}
def _delete_files(paths: list[str]) -> tuple[list[str], list[DeleteError]]:
deleted: list[str] = []
errors: list[DeleteError] = []
seen: set[str] = set()
for raw_path in paths:
path = Path(raw_path)
normalized = str(path.resolve())
if normalized in seen:
continue
seen.add(normalized)
try:
path.unlink()
deleted.append(str(path))
except Exception as exc: # noqa: BLE001
errors.append(DeleteError(file=str(path), error=str(exc)))
return deleted, errors
def _print_deletion_summary(
*,
requested: bool,
target_count: int,
confirmed: bool,
deleted_files: list[str],
errors: list[DeleteError],
use_color: bool,
) -> None:
if not requested:
return
if target_count == 0:
print()
print(
_paint(
"Delete mode enabled, but no 401 files found.",
ANSI_DIM,
enabled=use_color,
)
)
return
print()
if not confirmed:
print(_paint("Deletion cancelled by user.", ANSI_YELLOW, enabled=use_color))
return
print(
_paint(
f"Deletion completed: {len(deleted_files)}/{target_count} removed.",
ANSI_BOLD,
ANSI_GREEN,
enabled=use_color,
)
)
for path in deleted_files:
print(f"[{_paint('deleted', ANSI_GREEN, enabled=use_color)}] {path}")
for item in errors:
print(
f"[{_paint('delete-failed', ANSI_RED, enabled=use_color)}] {item.file} :: {item.error}"
)
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Traverse auth folder and detect Codex auth files returning 401 or no-limit markers during quota probe."
)
parser.add_argument(
"--auth-dir",
default=DEFAULT_AUTH_DIR,
help=f"Folder containing auth JSON files (default: {DEFAULT_AUTH_DIR}).",
)
parser.add_argument(
"--base-url",
default=DEFAULT_CODEX_BASE_URL,
help=f"Codex base URL (default: {DEFAULT_CODEX_BASE_URL})",
)
parser.add_argument(
"--quota-path",
default="/responses",
help="API path used for quota/auth probe (default: /responses)",
)
parser.add_argument(
"--model",
default="gpt-5",
help="Model used in probe request body (default: gpt-5)",
)
parser.add_argument(
"--timeout",
type=float,
default=20,
help="HTTP timeout in seconds (default: 20)",
)
parser.add_argument(
"--workers",
type=int,
default=DEFAULT_WORKERS,
help=f"Number of concurrent workers for file scan/probe (default: {DEFAULT_WORKERS})",
)
parser.add_argument(
"--retry-attempts",
type=int,
default=DEFAULT_RETRY_ATTEMPTS,
help=f"Total attempts for network errors per file (default: {DEFAULT_RETRY_ATTEMPTS})",
)
parser.add_argument(
"--retry-backoff",
type=float,
default=DEFAULT_RETRY_BACKOFF,
help=f"Base seconds for exponential retry backoff (default: {DEFAULT_RETRY_BACKOFF})",
)
parser.add_argument(
"--refresh-before-check",
action="store_true",
help="Refresh access token with refresh_token before probe.",
)
parser.add_argument(
"--refresh-url",
default=DEFAULT_REFRESH_URL,
help=f"Token refresh endpoint (default: {DEFAULT_REFRESH_URL})",
)
parser.add_argument(
"--output-json",
action="store_true",
help="Print full results as JSON instead of table view.",
)
parser.add_argument(
"--no-progress",
action="store_true",
help="Disable live scan progress in terminal output.",
)
parser.add_argument(
"--no-color",
action="store_true",
help="Disable ANSI color output.",
)
parser.add_argument(
"--delete-401",
action="store_true",
help="Delete auth files that returned HTTP 401 after confirmation.",
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip deletion confirmation prompt (only applies with --delete-401).",
)
parser.add_argument(
"--exceeded-dir",
default=None,
help=(
"Directory to move quota-exceeded auth files into. "
f"Defaults to a sibling '{DEFAULT_EXCEEDED_DIR_NAME}/' folder next to --auth-dir."
),
)
parser.add_argument(
"--no-quarantine",
action="store_true",
help="Disable automatic quarantine: do not move quota-exceeded files and skip recovery scan.",
)
return parser
def main() -> int:
parser = _build_parser()
args = parser.parse_args()
if args.workers < 1:
parser.error("--workers must be >= 1")
if args.retry_attempts < 1:
parser.error("--retry-attempts must be >= 1")
if args.retry_backoff < 0:
parser.error("--retry-backoff must be >= 0")
use_color = _supports_color(args.no_color) and (not args.output_json)
progress_enabled = (
_is_tty_stdout() and (not args.no_progress) and (not args.output_json)
)
progress = _ProgressDisplay(progress_enabled)
auth_dir = Path(args.auth_dir).expanduser().resolve()
exceeded_dir = (
Path(args.exceeded_dir).expanduser().resolve()
if args.exceeded_dir
else auth_dir.parent / DEFAULT_EXCEEDED_DIR_NAME
)
if progress_enabled:
print(_paint("Scanning auth JSON files...", ANSI_DIM, enabled=use_color))
try:
results = scan_auth_files(
args,
progress_callback=progress.update if progress_enabled else None,
)
except Exception as exc: # noqa: BLE001
progress.finish()
print(f"Error: {exc}", file=sys.stderr)
return 2
progress.finish()
# --- Quarantine: move exceeded files out of auth_dir ---
moved_to_exceeded: list[str] = [] # dst paths
move_to_exceeded_errors: list[DeleteError] = []
if not args.no_quarantine:
for item in results:
if item.quota_exceeded:
dst, err = _move_file_safely(Path(item.file), exceeded_dir)
if err:
move_to_exceeded_errors.append(DeleteError(file=item.file, error=err))
else:
if dst is not None:
moved_to_exceeded.append(dst)
# --- Recovery: scan exceeded_dir, move back files that are no longer limited ---
exceeded_results: list[CheckResult] = []
moved_from_exceeded: list[str] = [] # dst paths (back in auth_dir)
move_from_exceeded_errors: list[DeleteError] = []
if not args.no_quarantine and exceeded_dir.exists():
if progress_enabled:
print(
_paint(
f"Scanning exceeded dir: {exceeded_dir} ...",
ANSI_DIM,
enabled=use_color,
)
)
exceeded_results = _scan_dir_flat(exceeded_dir, args)
for item in exceeded_results:
recovered = (
not item.quota_exceeded
and item.status_code is not None
and 200 <= item.status_code < 300
)
if recovered:
dst, err = _move_file_safely(Path(item.file), auth_dir)
if err:
move_from_exceeded_errors.append(
DeleteError(file=item.file, error=err)
)
else:
if dst is not None:
moved_from_exceeded.append(dst)
# --- Delete-401 flow ---
unauthorized_files = [item.file for item in results if item.unauthorized_401]
delete_confirmed = False
deleted_files: list[str] = []
delete_errors: list[DeleteError] = []
if args.delete_401 and unauthorized_files:
delete_confirmed = _confirm_deletion(unauthorized_files, args.yes)
if delete_confirmed:
deleted_files, delete_errors = _delete_files(unauthorized_files)
if args.output_json:
print(
json.dumps(
{
"results": [asdict(item) for item in results],
"exceeded_dir_results": [asdict(item) for item in exceeded_results],
"quarantine": {
"enabled": not args.no_quarantine,
"exceeded_dir": str(exceeded_dir),
"moved_to_exceeded": moved_to_exceeded,
"moved_to_exceeded_errors": [
asdict(e) for e in move_to_exceeded_errors
],
"moved_from_exceeded": moved_from_exceeded,
"moved_from_exceeded_errors": [
asdict(e) for e in move_from_exceeded_errors
],
},
"deletion": {
"requested": args.delete_401,
"target_count": len(unauthorized_files),
"confirmed": delete_confirmed,
"deleted_count": len(deleted_files),
"deleted_files": deleted_files,
"errors": [asdict(item) for item in delete_errors],
},
},
ensure_ascii=False,
indent=2,
)
)
else:
_print_table(results, use_color=use_color)
if exceeded_results:
print(
_paint(
f"Exceeded Dir Scan ({exceeded_dir})",
ANSI_BOLD,
ANSI_MAGENTA,
enabled=use_color,
)
)
_print_table(exceeded_results, use_color=use_color)
if moved_to_exceeded or move_to_exceeded_errors:
print()
print(_paint("Quarantine Moves (auth-dir → exceeded)", ANSI_BOLD, ANSI_MAGENTA, enabled=use_color))
for dst in moved_to_exceeded:
print(f" [{_paint('moved', ANSI_MAGENTA, enabled=use_color)}] → {dst}")
for e in move_to_exceeded_errors:
print(f" [{_paint('move-failed', ANSI_RED, enabled=use_color)}] {e.file} :: {e.error}")
if moved_from_exceeded or move_from_exceeded_errors:
print()
print(_paint("Recovery Moves (exceeded → auth-dir)", ANSI_BOLD, ANSI_GREEN, enabled=use_color))
for dst in moved_from_exceeded:
print(f" [{_paint('recovered', ANSI_GREEN, enabled=use_color)}] → {dst}")
for e in move_from_exceeded_errors:
print(f" [{_paint('recover-failed', ANSI_RED, enabled=use_color)}] {e.file} :: {e.error}")
_print_deletion_summary(
requested=args.delete_401,
target_count=len(unauthorized_files),
confirmed=delete_confirmed,
deleted_files=deleted_files,
errors=delete_errors,
use_color=use_color,
)
has_401 = any(item.unauthorized_401 for item in results)
return 1 if has_401 else 0
if __name__ == "__main__":
raise SystemExit(main())
网友解答:
--【壹】--:
熬,不会用。。。我在1panel里安装的,这脚本放进去运行没反应qaq
--【贰】--:
谢谢分享!
--【叁】--:
–auth-dir 指定你的认证文件夹
实在不行,问 ai 怎么用
python tp.py -h
usage: tp.py [-h] [--url CPA_URL] [--key MANAGEMENT_KEY] [--auth-dir AUTH_DIR] [--base-url BASE_URL] [--quota-path QUOTA_PATH] [--model MODEL] [--timeout TIMEOUT] [--workers WORKERS]
[--retry-attempts RETRY_ATTEMPTS] [--retry-backoff RETRY_BACKOFF] [--refresh-before-check] [--refresh-url REFRESH_URL] [--output-json] [--no-progress] [--no-color] [--delete-401] [--yes]
[--exceeded-dir EXCEEDED_DIR] [--no-quarantine]
Detect Codex credentials returning 401/no-limit markers during quota probe. Supports local auth-dir mode and CPA remote mode (--url/--key).
options:
-h, --help show this help message and exit
--url CPA_URL, --cpa-url CPA_URL
CLIProxyAPI base URL for CPA mode, e.g. http://127.0.0.1:8317
--key MANAGEMENT_KEY, --management-key MANAGEMENT_KEY
CLIProxyAPI management key for CPA mode.
--auth-dir AUTH_DIR Folder containing auth JSON files (default: ~/.cli-proxy-api).
--base-url BASE_URL Codex base URL (default: https://chatgpt.com/backend-api/codex)
--quota-path QUOTA_PATH
API path used for quota/auth probe (default: /responses)
--model MODEL Model used in probe request body (default: gpt-5)
--timeout TIMEOUT HTTP timeout in seconds (default: 20)
--workers WORKERS Number of concurrent workers for file scan/probe (default: 32)
--retry-attempts RETRY_ATTEMPTS
Total attempts for network errors per file (default: 3)
--retry-backoff RETRY_BACKOFF
Base seconds for exponential retry backoff (default: 0.6)
--refresh-before-check
Refresh access token with refresh_token before probe.
--refresh-url REFRESH_URL
Token refresh endpoint (default: https://auth.openai.com/oauth/token)
--output-json Print full results as JSON instead of table view.
--no-progress Disable live scan progress in terminal output.
--no-color Disable ANSI color output.
--delete-401 Delete auth files that returned HTTP 401 after confirmation.
--yes Skip deletion confirmation prompt (only applies with --delete-401).
--exceeded-dir EXCEEDED_DIR
Directory to move quota-exceeded auth files into. Defaults to a sibling 'exceeded/' folder next to --auth-dir.
--no-quarantine Disable automatic quarantine: do not move quota-exceeded files and skip recovery scan.
--【肆】--:
感谢佬友二开!
--【伍】--:
感谢大佬分享 很实用!
--【陆】--:
谢谢佬 好用
--【柒】--:
谢谢佬友,这个清理很方便
--【捌】--:
老师这个在哪里用呢
--【玖】--:
好资源,值得分享,多谢多谢
--【拾】--:
你可以让 codex 根据 cpa 的认证机制,在这个脚本上二开。效果类似于:
python3 tp.py --delete-401 --workers 50 --url http://xxx.com --key xxx
大概几分钟就能写好。我主要是本机在跑.
--【拾壹】--:
感谢大佬
--【拾贰】--:
这个是在本地跑的脚本吗?把cpa部署到了zeabur上,怎么在zeabur上跑?
--【拾叁】--:
感谢分享刚刚还在想怎么解决这个额度用完的问题
--【拾肆】--:
真不错,已经用上了
--【拾伍】--:
image1788×634 120 KB
明明很多401的,为啥检测出来全部都可用?
image700×156 8.08 KB
--【拾陆】--:
python 脚本哇,直接在你安装了 cpa 的机器上用,默认检测~/.cli-proxy-api 路径,你也可以指定你自己的认证文件文件夹
--【拾柒】--:
测401试了很好使,不过和这个测额度移动文件夹好像没变化

