【CPA】Codex 401账号清理与额度耗尽停用脚本

2026-04-11 14:201阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐
问题描述:

找不到是哪个佬写的脚本改的。。。

修改内容与功能

  1. 新增了 api-call 主动探测。
    全量探测,等同于CPA中的 配额管理 刷新,本次脚本运行只会完整探测一遍
    默认是关闭的,只有加 --enable-api-call-check 才会跑,且只跑一遍
  2. api-call探测方式:
    一批同时探测 9 个,整批跑完以后,随机等待 5 到 10 秒,再继续下一批。这样比一个个串行扫。
  3. 额度耗尽的处理:
    通过调用/v0/management/auth-files/status禁用账号
  4. 原有功能:
    自动删除请求中401的账号。

使用说明

  • ‘–base-url’, default=‘http://127.0.0.1:8317’
  • ‘–management-key’, default=‘’ 写你的cpa管理密钥
  • ‘–timeout’, type=int, default=‘20’
  • ‘–enable-api-call-check’, action=‘store_true’ 开启 /api-call主动探测
  • ‘–api-call-url’, default=os.environ.get(‘CLIPROXY_API_CALL_URL’, DEFAULT_API_CALL_URL), help=‘主动探测时调用的上游 URL’
  • ‘–api-call-method’, default=os.environ.get(‘CLIPROXY_API_CALL_METHOD’, ‘GET’), help=‘主动探测时使用的 HTTP 方法’
  • ‘–api-call-account-id’, default=os.environ.get(‘CLIPROXY_API_CALL_ACCOUNT_ID’, DEFAULT_API_CALL_ACCOUNT_ID), help=‘主动探测时附带的 Chatgpt-Account-Id’
  • ‘–api-call-user-agent’, default=os.environ.get(‘CLIPROXY_API_CALL_USER_AGENT’, DEFAULT_API_CALL_USER_AGENT), help=‘主动探测时附带的 User-Agent’
  • ‘–api-call-body’, default=os.environ.get(‘CLIPROXY_API_CALL_BODY’, ‘’), help=‘主动探测时透传到 api-call 的 data 字段’
  • ‘–api-call-providers’, default=os.environ.get(‘CLIPROXY_API_CALL_PROVIDERS’, DEFAULT_API_CALL_PROVIDERS), help=‘哪些 provider 需要做 /api-call 主动探测,逗号分隔;留空表示全部’
  • ‘–api-call-max-per-run’, type=int, default=int(os.environ.get(‘CLIPROXY_API_CALL_MAX_PER_RUN’, str(DEFAULT_API_CALL_MAX_PER_RUN))), help=‘每批最多探测多少个账号,最大 9’
  • ‘–api-call-sleep’, type=float, default=None, help=‘固定批次等待秒数;如不设置则使用随机等待’
  • ‘–api-call-sleep-min’, type=float, default=float(os.environ.get(‘CLIPROXY_API_CALL_SLEEP_MIN’, str(DEFAULT_API_CALL_SLEEP_MIN))), help=‘批次随机等待最小秒数’
  • ‘–api-call-sleep-max’, type=float, default=float(os.environ.get(‘CLIPROXY_API_CALL_SLEEP_MAX’, str(DEFAULT_API_CALL_SLEEP_MAX))), help=‘批次随机等待最大秒数’
  • ‘–dry-run’, action=‘store_true’, help=‘模拟运行,不实际删除或禁用’
  • ‘–interval’, type=int, default=60, help=‘检测间隔时间(秒),默认60秒’
  • ‘–once’, action=‘store_true’, help=‘只执行一次,不循环’

脚本

import os, sys, re, json, argparse, time, random from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from urllib import request, parse, error from datetime import datetime, timezone P401 = re.compile(r'(^|\D)401(\D|$)|unauthorized|unauthenticated|token\s+expired|login\s+required|authentication\s+failed', re.I) PQUOTA = re.compile(r'(^|\D)(402|403|429)(\D|$)|quota|insufficient\s*quota|resource\s*exhausted|rate\s*limit|too\s+many\s+requests|payment\s+required|billing|credit|额度|用完|超限|上限|usage_limit_reached', re.I) DEFAULT_API_CALL_PROVIDERS = 'codex,openai,chatgpt' DEFAULT_API_CALL_URL = 'https://chatgpt.com/backend-api/wham/usage' DEFAULT_API_CALL_USER_AGENT = 'codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal' DEFAULT_API_CALL_ACCOUNT_ID = '141c5c10-0993-45c2-ad18-9a01ba2ab3e0' DEFAULT_API_CALL_MAX_PER_RUN = 9 DEFAULT_API_CALL_SLEEP_MIN = 5.0 DEFAULT_API_CALL_SLEEP_MAX = 10.0 AUTH_FILE_STATUS_METHODS = ('PATCH', 'PUT', 'POST') def get_current_time(): return datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S %z') def run_id(): return datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') def api(base, key, method, path, timeout=20, query=None, expect_json=True, body=None, extra_headers=None): url = base.rstrip('/') + '/v0/management' + path if query: url += '?' + parse.urlencode(query) headers = { 'Authorization': 'Bearer ' + key, 'Accept': 'application/json', 'User-Agent': 'cliproxyapi-cleaner/1.0', } if extra_headers: headers.update(extra_headers) data = None if body is not None: if isinstance(body, (dict, list)): data = json.dumps(body, ensure_ascii=False).encode('utf-8') headers.setdefault('Content-Type', 'application/json') elif isinstance(body, bytes): data = body else: data = str(body).encode('utf-8') headers.setdefault('Content-Type', 'application/json') req = request.Request(url, data=data, headers=headers, method=method.upper()) try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read() code = resp.getcode() except error.HTTPError as e: raw = e.read() code = e.code except error.URLError as e: raise RuntimeError('请求管理 API 失败: %s' % e) if expect_json: try: payload = json.loads(raw.decode('utf-8')) if raw else {} except Exception: payload = {'raw': raw.decode('utf-8', errors='replace')} return code, payload return code, raw def extract_error_message(msg): """从状态消息中提取错误信息""" try: if isinstance(msg, str) and msg.strip().startswith('{'): error_data = json.loads(msg) if 'error' in error_data: error_obj = error_data['error'] if isinstance(error_obj, dict): error_type = error_obj.get('type', '') error_message = error_obj.get('message', '') return error_type, error_message if isinstance(error_obj, str): return 'error', error_obj return None, msg except Exception: pass return None, msg def parse_csv_set(value): if not value: return set() return {x.strip().lower() for x in str(value).split(',') if x.strip()} def simplify_reason(reason): text = str(reason or '').strip() if not text: return '' if not text.startswith('{'): return text[:120] try: error_data = json.loads(text) except Exception: return text[:120] if 'error' not in error_data: return text[:120] error_obj = error_data['error'] if isinstance(error_obj, dict): error_type = str(error_obj.get('type') or '').strip() error_message = str(error_obj.get('message') or '').strip() if error_type == 'usage_limit_reached': return ('usage_limit_reached: ' + error_message)[:120] return (error_type or error_message or text)[:120] return str(error_obj)[:120] def classify(item): status = str(item.get('status', '')).strip().lower() msg = str(item.get('status_message', '') or '').strip() error_type, error_msg = extract_error_message(msg) text = (status + '\n' + msg).lower() if P401.search(text): return 'delete_401', msg or status or '401/unauthorized' if error_type == 'usage_limit_reached' or 'usage_limit_reached' in text: return 'quota_exhausted', msg or status or 'usage_limit_reached' if PQUOTA.search(text): return 'quota_exhausted', msg or status or 'quota' if bool(item.get('disabled', False)) or status == 'disabled': return 'disabled', msg or status or 'disabled' if bool(item.get('unavailable', False)) or status == 'error': return 'unavailable', msg or status or 'error' return 'available', msg or status or 'active' def should_probe_api_call(item, args): if not args.enable_api_call_check: return False auth_index = str(item.get('auth_index') or '').strip() if not auth_index: return False provider = str(item.get('provider') or item.get('type') or '').strip().lower() if args.api_call_provider_set and provider not in args.api_call_provider_set: return False initial_kind, _ = classify(item) return initial_kind == 'available' def api_call_item_key(item): auth_index = str(item.get('auth_index') or '').strip() if auth_index: return 'auth_index:' + auth_index return 'name:' + str(item.get('name') or item.get('id') or '').strip() def build_api_call_payload(item, args): headers = { 'Authorization': 'Bearer $TOKEN$', 'Content-Type': 'application/json', 'User-Agent': args.api_call_user_agent, } if args.api_call_account_id.strip(): headers['Chatgpt-Account-Id'] = args.api_call_account_id.strip() payload = { 'authIndex': str(item.get('auth_index') or '').strip(), 'method': args.api_call_method.upper(), 'url': args.api_call_url.strip(), 'header': headers, } if args.api_call_body: payload['data'] = args.api_call_body return payload def normalize_api_call_body(body): if body is None: return '', None if isinstance(body, str): text = body trimmed = text.strip() if not trimmed: return text, None try: return text, json.loads(trimmed) except Exception: return text, text try: return json.dumps(body, ensure_ascii=False), body except Exception: return str(body), body def is_limit_reached_window(value): if not isinstance(value, dict): return False if value.get('allowed') is False: return True if value.get('limit_reached') is True: return True return False def classify_api_call_response(payload): nested_status = payload.get('status_code', payload.get('statusCode', 0)) try: nested_status = int(nested_status) except Exception: nested_status = 0 header = payload.get('header') or payload.get('headers') or {} body_text, body = normalize_api_call_body(payload.get('body')) try: header_text = json.dumps(header, ensure_ascii=False) except Exception: header_text = str(header) if isinstance(body, (dict, list)): try: body_signal = json.dumps(body, ensure_ascii=False) except Exception: body_signal = body_text else: body_signal = body_text if nested_status == 401: return 'delete_401', body_signal or ('api-call status_code=%s' % nested_status) if nested_status in (402, 403, 429): return 'quota_exhausted', body_signal or ('api-call status_code=%s' % nested_status) if isinstance(body, dict): error_obj = body.get('error') if isinstance(error_obj, dict): error_type = str(error_obj.get('type') or '').strip().lower() error_message = str(error_obj.get('message') or '').strip() error_text = (error_type + '\n' + error_message).lower() if error_type == 'usage_limit_reached' or PQUOTA.search(error_text): return 'quota_exhausted', body_signal or error_message or error_type if P401.search(error_text): return 'delete_401', body_signal or error_message or error_type rate_limit = body.get('rate_limit') code_review_rate_limit = body.get('code_review_rate_limit') if is_limit_reached_window(rate_limit) or is_limit_reached_window(code_review_rate_limit): return 'quota_exhausted', body_signal or 'rate_limit_reached' if nested_status == 200: return None, body_signal or 'ok' fallback_text = ('%s\n%s\n%s' % (nested_status, header_text, body_signal)).lower() if P401.search(fallback_text): return 'delete_401', body_signal or ('api-call status_code=%s' % nested_status) if nested_status != 200 and PQUOTA.search(fallback_text): return 'quota_exhausted', body_signal or ('api-call status_code=%s' % nested_status) return None, body_signal or ('api-call status_code=%s' % nested_status if nested_status else 'ok') def run_api_call_probe(args, item): request_payload = build_api_call_payload(item, args) code, payload = api( args.base_url, args.management_key, 'POST', '/api-call', args.timeout, expect_json=True, body=request_payload, ) if code != 200: raise RuntimeError('调用 /api-call 失败: HTTP %s %s' % (code, payload)) kind, reason = classify_api_call_response(payload) return { 'request': request_payload, 'response': payload, 'classification': kind, 'reason': reason, 'status_code': payload.get('status_code', payload.get('statusCode')), } def pick_api_call_sleep_seconds(args): fixed_sleep = getattr(args, 'api_call_sleep', None) if fixed_sleep is not None: return max(0.0, float(fixed_sleep)) return random.uniform(args.api_call_sleep_min, args.api_call_sleep_max) def run_api_call_full_scan(args, files, counts): if not args.enable_api_call_check: counts['api-call候选数'] = 0 counts['api-call批次数'] = 0 return {} if getattr(args, 'api_call_scan_completed', False): counts['api-call候选数'] = 0 counts['api-call批次数'] = 0 print('[api-call] 当前进程已完成一次全量探测,本轮跳过', flush=True) return {} eligible = [item for item in files if should_probe_api_call(item, args)] counts['api-call候选数'] = len(eligible) if not eligible: counts['api-call批次数'] = 0 args.api_call_scan_completed = True print('[api-call] 没有需要探测的候选账号,本次进程不再执行 api-call', flush=True) return {} batch_size = max(1, min(int(args.api_call_max_per_run), DEFAULT_API_CALL_MAX_PER_RUN)) batch_count = (len(eligible) + batch_size - 1) // batch_size counts['api-call批次数'] = batch_count sleep_desc = '固定 %.1f 秒' % args.api_call_sleep if args.api_call_sleep is not None else '随机 %.1f-%.1f 秒' % (args.api_call_sleep_min, args.api_call_sleep_max) print('[api-call] 已开启全量探测,本次运行将探测 %s 个候选账号,共 %s 批,每批最多 %s 个,批次间隔 %s' % ( len(eligible), batch_count, batch_size, sleep_desc, ), flush=True) probe_results = {} probed_total = 0 for batch_index in range(batch_count): batch = eligible[batch_index * batch_size:(batch_index + 1) * batch_size] print('[api-call批次 %s/%s] 并发探测 %s 个账号' % (batch_index + 1, batch_count, len(batch)), flush=True) with ThreadPoolExecutor(max_workers=len(batch)) as executor: future_to_item = {executor.submit(run_api_call_probe, args, item): item for item in batch} for future in as_completed(future_to_item): item = future_to_item[future] counts['api-call已探测'] += 1 probed_total += 1 name = str(item.get('name') or item.get('id') or '').strip() provider = str(item.get('provider') or item.get('type') or '').strip() auth_index = item.get('auth_index') try: probe = future.result() probe_results[api_call_item_key(item)] = probe classification = probe.get('classification') or 'ok' if classification == 'delete_401': counts['api-call发现401'] += 1 elif classification == 'quota_exhausted': counts['api-call发现配额耗尽'] += 1 print(' [api-call完成 %s/%s] %s provider=%s auth_index=%s result=%s' % ( probed_total, len(eligible), name, provider, auth_index, classification, ), flush=True) except Exception as e: counts['api-call探测失败'] += 1 probe_results[api_call_item_key(item)] = {'error': str(e)} print(' [api-call完成 %s/%s] %s provider=%s auth_index=%s result=error error=%s' % ( probed_total, len(eligible), name, provider, auth_index, e, ), flush=True) if batch_index + 1 < batch_count: sleep_seconds = pick_api_call_sleep_seconds(args) if sleep_seconds > 0: print('[api-call批次 %s/%s] 整批完成,等待 %.1f 秒后继续下一批' % ( batch_index + 1, batch_count, sleep_seconds, ), flush=True) time.sleep(sleep_seconds) args.api_call_scan_completed = True print('[api-call] 本次运行已完成全部候选账号探测,后续轮次不再重复探测', flush=True) return probe_results def disable_auth_file(args, name): payload = {'name': name, 'disabled': True} attempts = [] for method in AUTH_FILE_STATUS_METHODS: code, resp = api( args.base_url, args.management_key, method, '/auth-files/status', args.timeout, expect_json=True, body=payload, ) attempts.append({'method': method, 'code': code, 'response': resp}) if 200 <= code < 300: return attempts[-1] if code not in (404, 405, 501): break raise RuntimeError('更新 auth-files/status 失败: %s' % attempts) def run_check(args): """执行一次检查""" code, payload = api(args.base_url, args.management_key, 'GET', '/auth-files', args.timeout) if code != 200: print('[错误] 获取 auth-files 失败: HTTP %s %s' % (code, payload), file=sys.stderr) return None files = payload.get('files') or [] if not isinstance(files, list): print('[错误] auth-files 返回异常: %s' % payload, file=sys.stderr) return None rid = run_id() backup_root = Path('./backups/cliproxyapi-auth-cleaner') / rid report_root = Path('./reports/cliproxyapi-auth-cleaner') report_root.mkdir(parents=True, exist_ok=True) counts = { '检查总数': 0, '可用账号': 0, '配额耗尽': 0, '已禁用': 0, '不可用': 0, 'api-call候选数': 0, 'api-call批次数': 0, 'api-call已探测': 0, 'api-call发现401': 0, 'api-call发现配额耗尽': 0, 'api-call探测失败': 0, '待删除401': 0, '已删除': 0, '备份失败': 0, '删除失败': 0, '额度账号已禁用': 0, '禁用失败': 0, } results = [] print('[%s] 开始检查 %s 个账号' % (get_current_time(), len(files)), flush=True) probe_results = run_api_call_full_scan(args, files, counts) for idx, item in enumerate(files): counts['检查总数'] += 1 name = str(item.get('name') or item.get('id') or '').strip() provider = str(item.get('provider') or item.get('type') or '').strip() kind, reason = classify(item) row = { 'name': name, 'provider': provider, 'auth_index': item.get('auth_index'), 'status': item.get('status'), 'status_message': item.get('status_message'), 'disabled': item.get('disabled'), 'unavailable': item.get('unavailable'), 'runtime_only': item.get('runtime_only'), 'source': item.get('source'), } probe = probe_results.get(api_call_item_key(item)) if probe is not None: row['api_call_probe'] = probe if probe.get('classification') == 'delete_401': kind = 'delete_401' reason = probe.get('reason') or reason elif probe.get('classification') == 'quota_exhausted': kind = 'quota_exhausted' reason = probe.get('reason') or reason row['final_classification'] = kind row['reason'] = reason display_reason = simplify_reason(reason) if kind == 'available': counts['可用账号'] += 1 elif kind == 'quota_exhausted': counts['配额耗尽'] += 1 print('[配额耗尽] %s provider=%s reason=%s' % (name, provider, display_reason), flush=True) if args.dry_run: row['disable_result'] = 'dry_run_skip' print(' [模拟运行] 将调用 /auth-files/status 设置 disabled=true', flush=True) elif not name: counts['禁用失败'] += 1 row['disable_result'] = 'skip_no_name' row['disable_error'] = '缺少 name,无法调用 /auth-files/status' print(' [禁用失败] 缺少 name,无法更新状态', flush=True) else: try: disable_resp = disable_auth_file(args, name) counts['额度账号已禁用'] += 1 row['disable_result'] = 'disabled_true' row['disable_response'] = disable_resp print(' [已禁用] method=%s HTTP %s' % (disable_resp['method'], disable_resp['code']), flush=True) except Exception as e: counts['禁用失败'] += 1 row['disable_result'] = 'disable_failed' row['disable_error'] = str(e) print(' [禁用失败] %s' % e, flush=True) elif kind == 'disabled': counts['已禁用'] += 1 print('[已禁用-不删除] %s provider=%s' % (name, provider), flush=True) elif kind == 'unavailable': counts['不可用'] += 1 print('[不可用-不删除] %s provider=%s reason=%s' % (name, provider, display_reason), flush=True) elif kind == 'delete_401': counts['待删除401'] += 1 print('[待删除-401认证失败] %s provider=%s reason=%s' % (name, provider, display_reason), flush=True) if args.dry_run: row['delete_result'] = 'dry_run_skip' print(' [模拟运行] 将删除此文件', flush=True) else: runtime_only = bool(item.get('runtime_only', False)) source = str(item.get('source') or '').strip().lower() if runtime_only or (source and source != 'file'): counts['备份失败'] += 1 row['delete_result'] = 'skip_runtime_only' row['delete_error'] = 'runtime_only/source!=file,管理 API 无法删除' print(' [跳过] runtime_only 或非磁盘文件,无法通过 /auth-files 删除', flush=True) elif not name.lower().endswith('.json'): counts['备份失败'] += 1 row['delete_result'] = 'skip_no_json_name' row['delete_error'] = '不是标准 .json 文件名,默认不删' print(' [跳过] 不是 .json 文件', flush=True) else: try: code, raw = api(args.base_url, args.management_key, 'GET', '/auth-files/download', args.timeout, {'name': name}, False) if code != 200: raise RuntimeError('下载 auth 文件失败: %s HTTP %s' % (name, code)) backup_root.mkdir(parents=True, exist_ok=True) backup_path = backup_root / Path(name).name backup_path.write_bytes(raw) row['backup_path'] = str(backup_path) code, payload = api(args.base_url, args.management_key, 'DELETE', '/auth-files', args.timeout, {'name': name}, True) if code != 200: raise RuntimeError('删除 auth 文件失败: %s HTTP %s %s' % (name, code, payload)) counts['已删除'] += 1 row['delete_result'] = 'deleted' row['delete_response'] = payload print(' [已删除] 备份路径: %s' % row['backup_path'], flush=True) except Exception as e: counts['删除失败'] += 1 row['delete_result'] = 'delete_failed' row['delete_error'] = str(e) print(' [删除失败] %s' % e, flush=True) results.append(row) report = { 'run_id': rid, 'base_url': args.base_url, 'dry_run': args.dry_run, 'api_call': { 'enabled': args.enable_api_call_check, 'completed_in_this_process': bool(getattr(args, 'api_call_scan_completed', False)), 'providers': args.api_call_providers, 'url': args.api_call_url, 'batch_size': args.api_call_max_per_run, 'sleep_fixed': args.api_call_sleep, 'sleep_min': args.api_call_sleep_min, 'sleep_max': args.api_call_sleep_max, }, 'results': results, 'summary': counts, } report_path = report_root / ('report-' + rid + '.json') report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding='utf-8') print('\n' + '=' * 60) print('【统计结果】') print('=' * 60) for key, value in counts.items(): print(' %s: %d' % (key, value)) print('\n【操作说明】') if args.dry_run: print(' ✅ 模拟运行模式 - 没有实际删除或禁用任何账号') if counts['待删除401'] > 0: print(' 📝 发现 %d 个 401 认证失败账号(去掉 --dry-run 后会备份并删除)' % counts['待删除401']) if counts['配额耗尽'] > 0: print(' 📝 发现 %d 个额度耗尽账号(去掉 --dry-run 后会调用 /auth-files/status 禁用)' % counts['配额耗尽']) else: print(' ✅ 已删除 %d 个 401 认证失败账号' % counts['已删除']) print(' ✅ 已禁用 %d 个额度耗尽账号' % counts['额度账号已禁用']) if counts['删除失败'] > 0: print(' ⚠️ 有 %d 个账号删除失败,请查看报告' % counts['删除失败']) if counts['禁用失败'] > 0: print(' ⚠️ 有 %d 个额度账号禁用失败,请查看报告' % counts['禁用失败']) print('\n【报告文件】') print(' 📄 %s' % report_path) print('=' * 60, flush=True) return counts def main(): ap = argparse.ArgumentParser(description='CLIProxyAPI 清理工具 - 删除 401 账号并禁用额度耗尽账号') ap.add_argument('--base-url', default='http://127.0.0.1:8317') ap.add_argument('--management-key', default='') ap.add_argument('--timeout', type=int, default=int(os.environ.get('CLIPROXY_TIMEOUT', '20'))) ap.add_argument('--enable-api-call-check', action='store_true', help='开启 /api-call 全量探测;本次脚本运行只会完整探测一遍') ap.add_argument('--api-call-url', default=os.environ.get('CLIPROXY_API_CALL_URL', DEFAULT_API_CALL_URL), help='主动探测时调用的上游 URL') ap.add_argument('--api-call-method', default=os.environ.get('CLIPROXY_API_CALL_METHOD', 'GET'), help='主动探测时使用的 HTTP 方法') ap.add_argument('--api-call-account-id', default=os.environ.get('CLIPROXY_API_CALL_ACCOUNT_ID', DEFAULT_API_CALL_ACCOUNT_ID), help='主动探测时附带的 Chatgpt-Account-Id') ap.add_argument('--api-call-user-agent', default=os.environ.get('CLIPROXY_API_CALL_USER_AGENT', DEFAULT_API_CALL_USER_AGENT), help='主动探测时附带的 User-Agent') ap.add_argument('--api-call-body', default=os.environ.get('CLIPROXY_API_CALL_BODY', ''), help='主动探测时透传到 api-call 的 data 字段') ap.add_argument('--api-call-providers', default=os.environ.get('CLIPROXY_API_CALL_PROVIDERS', DEFAULT_API_CALL_PROVIDERS), help='哪些 provider 需要做 /api-call 主动探测,逗号分隔;留空表示全部') ap.add_argument('--api-call-max-per-run', type=int, default=int(os.environ.get('CLIPROXY_API_CALL_MAX_PER_RUN', str(DEFAULT_API_CALL_MAX_PER_RUN))), help='每批最多探测多少个账号,最大 9') ap.add_argument('--api-call-sleep', type=float, default=None, help='固定批次等待秒数;如不设置则使用随机等待') ap.add_argument('--api-call-sleep-min', type=float, default=float(os.environ.get('CLIPROXY_API_CALL_SLEEP_MIN', str(DEFAULT_API_CALL_SLEEP_MIN))), help='批次随机等待最小秒数') ap.add_argument('--api-call-sleep-max', type=float, default=float(os.environ.get('CLIPROXY_API_CALL_SLEEP_MAX', str(DEFAULT_API_CALL_SLEEP_MAX))), help='批次随机等待最大秒数') ap.add_argument('--dry-run', action='store_true', help='模拟运行,不实际删除或禁用') ap.add_argument('--interval', type=int, default=60, help='检测间隔时间(秒),默认60秒') ap.add_argument('--once', action='store_true', help='只执行一次,不循环') args = ap.parse_args() args.api_call_provider_set = parse_csv_set(args.api_call_providers) args.api_call_max_per_run = max(0, min(int(args.api_call_max_per_run), DEFAULT_API_CALL_MAX_PER_RUN)) if args.api_call_sleep is not None: args.api_call_sleep = max(0.0, float(args.api_call_sleep)) args.api_call_sleep_min = max(0.0, float(args.api_call_sleep_min)) args.api_call_sleep_max = max(args.api_call_sleep_min, float(args.api_call_sleep_max)) args.api_call_scan_completed = False if not args.management_key.strip(): print('❌ 缺少 management key:请先设置 CLIPROXY_MANAGEMENT_KEY', file=sys.stderr) return 2 print('\n' + '=' * 60) print('【CLIProxyAPI 清理工具】') print('=' * 60) print(' 🎯 清理目标: 删除 401 认证失败账号 + 禁用额度耗尽账号') if args.enable_api_call_check: providers_desc = args.api_call_providers if args.api_call_providers.strip() else '全部 provider' sleep_desc = '固定 %.1f 秒' % args.api_call_sleep if args.api_call_sleep is not None else '随机 %.1f-%.1f 秒' % (args.api_call_sleep_min, args.api_call_sleep_max) print(' 🔎 主动探测: 已开启 /v0/management/api-call') print(' - 上游 URL: %s' % args.api_call_url) print(' - 适用 provider: %s' % providers_desc) print(' - 单批最多: %s 个' % args.api_call_max_per_run) print(' - 批次间隔: %s' % sleep_desc) print(' - 探测策略: 本次运行只完整扫描一遍,后续轮次不再重复') else: print(' 🔎 主动探测: 未开启 /v0/management/api-call') print(' 🛡️ 保护机制: 不会删除配额耗尽、禁用、不可用账号,只会禁用额度耗尽账号') if args.dry_run: print(' 🔍 运行模式: 模拟运行(不会实际删除或禁用)') else: print(' ⚡ 运行模式: 实际运行(将删除/禁用符合条件的账号)') print('=' * 60 + '\n') if args.once: run_check(args) return 0 print('🔄 自动循环检测模式,间隔 %d 秒' % args.interval) print('💡 提示: 按 Ctrl+C 停止程序\n') loop_count = 0 try: while True: loop_count += 1 print('\n' + '🔵' * 30) print('【第 %d 次检测】%s' % (loop_count, get_current_time())) print('🔵' * 30) try: run_check(args) except Exception as e: print('❌ 检测过程中发生异常: %s' % e, flush=True) print('\n⏰ 等待 %d 秒后进行下一次检测...' % args.interval) time.sleep(args.interval) except KeyboardInterrupt: print('\n\n🛑 用户中断程序,共执行 %d 次检测' % loop_count) return 0 if __name__ == '__main__': raise SystemExit(main())

清理效果

ScreenShot_2026-03-25_120509_516481×433 124 KB

网友解答:
--【壹】--:

-enable-api-call-check 一周一次感觉就够了


--【贰】--:

还是自动的好啊。


--【叁】--:

是的。如果要刷新配额检查的话,再加上-enable-api-call-check


--【肆】--:

定时任务执行,是不是加个–once参数就行了?
image1675×352 25.5 KB


--【伍】--:

这个可以在青龙里跑吗


--【陆】--:

主动测活会导致号死的更快


--【柒】--:

来一个js版本的,没有依赖库,直接

  • node check.js --help

就行。

const fs = require('fs/promises'); const http = require('http'); const https = require('https'); const path = require('path'); const P401 = /(^|\D)401(\D|$)|unauthorized|unauthenticated|token\s+expired|login\s+required|authentication\s+failed/i; const PQUOTA = /(^|\D)(402|403|429)(\D|$)|quota|insufficient\s*quota|resource\s*exhausted|rate\s*limit|too\s+many\s+requests|payment\s+required|billing|credit|额度|用完|超限|上限|usage_limit_reached/i; const DEFAULT_API_CALL_PROVIDERS = 'codex,openai,chatgpt'; const DEFAULT_API_CALL_URL = 'https://chatgpt.com/backend-api/wham/usage'; const DEFAULT_API_CALL_USER_AGENT = 'codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal'; const DEFAULT_API_CALL_ACCOUNT_ID = '141c5c10-0993-45c2-ad18-9a01ba2ab3e0'; const DEFAULT_API_CALL_MAX_PER_RUN = 9; const DEFAULT_API_CALL_SLEEP_MIN = 5.0; const DEFAULT_API_CALL_SLEEP_MAX = 10.0; const AUTH_FILE_STATUS_METHODS = ['PATCH', 'PUT', 'POST']; function pad2(value) { return String(value).padStart(2, '0'); } function formatOffset(date) { const offsetMinutes = -date.getTimezoneOffset(); const sign = offsetMinutes >= 0 ? '+' : '-'; const absolute = Math.abs(offsetMinutes); const hours = Math.floor(absolute / 60); const minutes = absolute % 60; return `${sign}${pad2(hours)}${pad2(minutes)}`; } function getCurrentTime() { const now = new Date(); return `${now.getFullYear()}-${pad2(now.getMonth() + 1)}-${pad2(now.getDate())} ${pad2(now.getHours())}:${pad2(now.getMinutes())}:${pad2(now.getSeconds())} ${formatOffset(now)}`; } function runId() { const now = new Date(); return `${now.getUTCFullYear()}${pad2(now.getUTCMonth() + 1)}${pad2(now.getUTCDate())}T${pad2(now.getUTCHours())}${pad2(now.getUTCMinutes())}${pad2(now.getUTCSeconds())}Z`; } function stringifySafe(value) { try { return JSON.stringify(value); } catch (_error) { return String(value); } } function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } function requestRaw(urlObject, { method, headers, body, timeoutSeconds }) { return new Promise((resolve, reject) => { const transport = urlObject.protocol === 'https:' ? https : http; const req = transport.request( urlObject, { method, headers, }, (res) => { const chunks = []; res.on('data', (chunk) => chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))); res.on('end', () => { resolve({ statusCode: res.statusCode || 0, headers: res.headers || {}, body: Buffer.concat(chunks), }); }); } ); req.setTimeout(timeoutSeconds * 1000, () => { req.destroy(new Error(`request timeout after ${timeoutSeconds}s`)); }); req.on('error', (error) => reject(error)); if (body !== null && body !== undefined) { req.write(body); } req.end(); }); } async function api(base, key, method, apiPath, timeout = 20, query = null, expectJson = true, body = null, extraHeaders = null) { const urlObject = new URL(`${String(base).replace(/\/+$/, '')}/v0/management${apiPath}`); if (query && typeof query === 'object') { for (const [name, value] of Object.entries(query)) { if (value !== undefined && value !== null) { urlObject.searchParams.append(name, String(value)); } } } const headers = { Authorization: `Bearer ${key}`, Accept: 'application/json', 'User-Agent': 'cliproxyapi-cleaner/1.0', ...(extraHeaders || {}), }; let payloadBuffer = null; if (body !== null && body !== undefined) { if (Buffer.isBuffer(body)) { payloadBuffer = body; } else if (typeof body === 'object') { payloadBuffer = Buffer.from(JSON.stringify(body), 'utf8'); if (!headers['Content-Type']) { headers['Content-Type'] = 'application/json'; } } else { payloadBuffer = Buffer.from(String(body), 'utf8'); if (!headers['Content-Type']) { headers['Content-Type'] = 'application/json'; } } headers['Content-Length'] = String(payloadBuffer.length); } let response; try { response = await requestRaw(urlObject, { method: String(method).toUpperCase(), headers, body: payloadBuffer, timeoutSeconds: timeout, }); } catch (error) { throw new Error(`请求管理 API 失败: ${error.message || error}`); } if (expectJson) { const text = response.body.length ? response.body.toString('utf8') : ''; let payload; try { payload = text ? JSON.parse(text) : {}; } catch (_error) { payload = { raw: text }; } return [response.statusCode, payload]; } return [response.statusCode, response.body]; } function extractErrorMessage(message) { try { if (typeof message === 'string' && message.trim().startsWith('{')) { const errorData = JSON.parse(message); if (Object.prototype.hasOwnProperty.call(errorData, 'error')) { const errorObject = errorData.error; if (errorObject && typeof errorObject === 'object' && !Array.isArray(errorObject)) { const errorType = errorObject.type || ''; const errorMessage = errorObject.message || ''; return [errorType, errorMessage]; } if (typeof errorObject === 'string') { return ['error', errorObject]; } } return [null, message]; } } catch (_error) { return [null, message]; } return [null, message]; } function parseCsvSet(value) { if (!value) { return new Set(); } return new Set( String(value) .split(',') .map((item) => item.trim().toLowerCase()) .filter(Boolean) ); } function simplifyReason(reason) { const text = String(reason || '').trim(); if (!text) { return ''; } if (!text.startsWith('{')) { return text.slice(0, 120); } let errorData; try { errorData = JSON.parse(text); } catch (_error) { return text.slice(0, 120); } if (!Object.prototype.hasOwnProperty.call(errorData, 'error')) { return text.slice(0, 120); } const errorObject = errorData.error; if (errorObject && typeof errorObject === 'object' && !Array.isArray(errorObject)) { const errorType = String(errorObject.type || '').trim(); const errorMessage = String(errorObject.message || '').trim(); if (errorType === 'usage_limit_reached') { return `usage_limit_reached: ${errorMessage}`.slice(0, 120); } return (errorType || errorMessage || text).slice(0, 120); } return String(errorObject).slice(0, 120); } function classify(item) { const status = String(item.status || '').trim().toLowerCase(); const message = String(item.status_message || '').trim(); const [errorType] = extractErrorMessage(message); const text = `${status}\n${message}`.toLowerCase(); if (P401.test(text)) { return ['delete_401', message || status || '401/unauthorized']; } if (errorType === 'usage_limit_reached' || text.includes('usage_limit_reached')) { return ['quota_exhausted', message || status || 'usage_limit_reached']; } if (PQUOTA.test(text)) { return ['quota_exhausted', message || status || 'quota']; } if (Boolean(item.disabled) || status === 'disabled') { return ['disabled', message || status || 'disabled']; } if (Boolean(item.unavailable) || status === 'error') { return ['unavailable', message || status || 'error']; } return ['available', message || status || 'active']; } function shouldProbeApiCall(item, args) { if (!args.enableApiCallCheck) { return false; } const authIndex = String(item.auth_index || '').trim(); if (!authIndex) { return false; } const provider = String(item.provider || item.type || '').trim().toLowerCase(); if (args.apiCallProviderSet.size > 0 && !args.apiCallProviderSet.has(provider)) { return false; } const [initialKind] = classify(item); return initialKind === 'available'; } function apiCallItemKey(item) { const authIndex = String(item.auth_index || '').trim(); if (authIndex) { return `auth_index:${authIndex}`; } return `name:${String(item.name || item.id || '').trim()}`; } function buildApiCallPayload(item, args) { const headers = { Authorization: 'Bearer $TOKEN$', 'Content-Type': 'application/json', 'User-Agent': args.apiCallUserAgent, }; if (args.apiCallAccountId.trim()) { headers['Chatgpt-Account-Id'] = args.apiCallAccountId.trim(); } const payload = { authIndex: String(item.auth_index || '').trim(), method: args.apiCallMethod.toUpperCase(), url: args.apiCallUrl.trim(), header: headers, }; if (args.apiCallBody) { payload.data = args.apiCallBody; } return payload; } function normalizeApiCallBody(body) { if (body === null || body === undefined) { return ['', null]; } if (typeof body === 'string') { const text = body; const trimmed = text.trim(); if (!trimmed) { return [text, null]; } try { return [text, JSON.parse(trimmed)]; } catch (_error) { return [text, text]; } } try { return [JSON.stringify(body), body]; } catch (_error) { return [String(body), body]; } } function isLimitReachedWindow(value) { if (!value || typeof value !== 'object' || Array.isArray(value)) { return false; } if (value.allowed === false) { return true; } if (value.limit_reached === true) { return true; } return false; } function classifyApiCallResponse(payload) { let nestedStatus = payload.status_code ?? payload.statusCode ?? 0; nestedStatus = Number.parseInt(nestedStatus, 10); if (Number.isNaN(nestedStatus)) { nestedStatus = 0; } const header = payload.header || payload.headers || {}; const [bodyText, body] = normalizeApiCallBody(payload.body); let headerText; try { headerText = JSON.stringify(header); } catch (_error) { headerText = String(header); } let bodySignal = bodyText; if (body && typeof body === 'object') { try { bodySignal = JSON.stringify(body); } catch (_error) { bodySignal = bodyText; } } if (nestedStatus === 401) { return ['delete_401', bodySignal || `api-call status_code=${nestedStatus}`]; } if (nestedStatus === 402 || nestedStatus === 403 || nestedStatus === 429) { return ['quota_exhausted', bodySignal || `api-call status_code=${nestedStatus}`]; } if (body && typeof body === 'object' && !Array.isArray(body)) { const errorObject = body.error; if (errorObject && typeof errorObject === 'object' && !Array.isArray(errorObject)) { const errorType = String(errorObject.type || '').trim().toLowerCase(); const errorMessage = String(errorObject.message || '').trim(); const errorText = `${errorType}\n${errorMessage}`.toLowerCase(); if (errorType === 'usage_limit_reached' || PQUOTA.test(errorText)) { return ['quota_exhausted', bodySignal || errorMessage || errorType]; } if (P401.test(errorText)) { return ['delete_401', bodySignal || errorMessage || errorType]; } } const rateLimit = body.rate_limit; const codeReviewRateLimit = body.code_review_rate_limit; if (isLimitReachedWindow(rateLimit) || isLimitReachedWindow(codeReviewRateLimit)) { return ['quota_exhausted', bodySignal || 'rate_limit_reached']; } if (nestedStatus === 200) { return [null, bodySignal || 'ok']; } } const fallbackText = `${nestedStatus}\n${headerText}\n${bodySignal}`.toLowerCase(); if (P401.test(fallbackText)) { return ['delete_401', bodySignal || `api-call status_code=${nestedStatus}`]; } if (nestedStatus !== 200 && PQUOTA.test(fallbackText)) { return ['quota_exhausted', bodySignal || `api-call status_code=${nestedStatus}`]; } return [null, bodySignal || (nestedStatus ? `api-call status_code=${nestedStatus}` : 'ok')]; } async function runApiCallProbe(args, item) { const requestPayload = buildApiCallPayload(item, args); const [code, payload] = await api( args.baseUrl, args.managementKey, 'POST', '/api-call', args.timeout, null, true, requestPayload ); if (code !== 200) { throw new Error(`调用 /api-call 失败: HTTP ${code} ${stringifySafe(payload)}`); } const [kind, reason] = classifyApiCallResponse(payload); return { request: requestPayload, response: payload, classification: kind, reason, status_code: payload.status_code ?? payload.statusCode, }; } function pickApiCallSleepSeconds(args) { if (args.apiCallSleep !== null && args.apiCallSleep !== undefined) { return Math.max(0, Number(args.apiCallSleep)); } return args.apiCallSleepMin + Math.random() * (args.apiCallSleepMax - args.apiCallSleepMin); } async function runApiCallFullScan(args, files, counts) { if (!args.enableApiCallCheck) { counts['api-call候选数'] = 0; counts['api-call批次数'] = 0; return {}; } if (args.apiCallScanCompleted) { counts['api-call候选数'] = 0; counts['api-call批次数'] = 0; console.log('[api-call] 当前进程已完成一次全量探测,本轮跳过'); return {}; } const eligible = files.filter((item) => shouldProbeApiCall(item, args)); counts['api-call候选数'] = eligible.length; if (eligible.length === 0) { counts['api-call批次数'] = 0; args.apiCallScanCompleted = true; console.log('[api-call] 没有需要探测的候选账号,本次进程不再执行 api-call'); return {}; } const batchSize = Math.max(1, Math.min(Number.parseInt(args.apiCallMaxPerRun, 10), DEFAULT_API_CALL_MAX_PER_RUN)); const batchCount = Math.ceil(eligible.length / batchSize); counts['api-call批次数'] = batchCount; const sleepDesc = args.apiCallSleep !== null && args.apiCallSleep !== undefined ? `固定 ${Number(args.apiCallSleep).toFixed(1)} 秒` : `随机 ${Number(args.apiCallSleepMin).toFixed(1)}-${Number(args.apiCallSleepMax).toFixed(1)} 秒`; console.log( `[api-call] 已开启全量探测,本次运行将探测 ${eligible.length} 个候选账号,共 ${batchCount} 批,每批最多 ${batchSize} 个,批次间隔 ${sleepDesc}` ); const probeResults = {}; let probedTotal = 0; for (let batchIndex = 0; batchIndex < batchCount; batchIndex += 1) { const start = batchIndex * batchSize; const batch = eligible.slice(start, start + batchSize); console.log(`[api-call批次 ${batchIndex + 1}/${batchCount}] 并发探测 ${batch.length} 个账号`); await Promise.all( batch.map(async (item) => { const name = String(item.name || item.id || '').trim(); const provider = String(item.provider || item.type || '').trim(); const authIndex = item.auth_index; try { const probe = await runApiCallProbe(args, item); counts['api-call已探测'] += 1; probedTotal += 1; const currentIndex = probedTotal; probeResults[apiCallItemKey(item)] = probe; const classification = probe.classification || 'ok'; if (classification === 'delete_401') { counts['api-call发现401'] += 1; } else if (classification === 'quota_exhausted') { counts['api-call发现配额耗尽'] += 1; } console.log( ` [api-call完成 ${currentIndex}/${eligible.length}] ${name} provider=${provider} auth_index=${authIndex} result=${classification}` ); } catch (error) { counts['api-call已探测'] += 1; probedTotal += 1; const currentIndex = probedTotal; counts['api-call探测失败'] += 1; probeResults[apiCallItemKey(item)] = { error: error.message || String(error) }; console.log( ` [api-call完成 ${currentIndex}/${eligible.length}] ${name} provider=${provider} auth_index=${authIndex} result=error error=${error.message || error}` ); } }) ); if (batchIndex + 1 < batchCount) { const sleepSeconds = pickApiCallSleepSeconds(args); if (sleepSeconds > 0) { console.log(`[api-call批次 ${batchIndex + 1}/${batchCount}] 整批完成,等待 ${sleepSeconds.toFixed(1)} 秒后继续下一批`); await sleep(sleepSeconds * 1000); } } } args.apiCallScanCompleted = true; console.log('[api-call] 本次运行已完成全部候选账号探测,后续轮次不再重复探测'); return probeResults; } async function disableAuthFile(args, name) { const payload = { name, disabled: true }; const attempts = []; for (const method of AUTH_FILE_STATUS_METHODS) { const [code, response] = await api( args.baseUrl, args.managementKey, method, '/auth-files/status', args.timeout, null, true, payload ); attempts.push({ method, code, response }); if (code >= 200 && code < 300) { return attempts[attempts.length - 1]; } if (![404, 405, 501].includes(code)) { break; } } throw new Error(`更新 auth-files/status 失败: ${stringifySafe(attempts)}`); } async function runCheck(args) { const [code, payload] = await api(args.baseUrl, args.managementKey, 'GET', '/auth-files', args.timeout); if (code !== 200) { console.error(`[错误] 获取 auth-files 失败: HTTP ${code} ${stringifySafe(payload)}`); return null; } const files = payload.files || []; if (!Array.isArray(files)) { console.error(`[错误] auth-files 返回异常: ${stringifySafe(payload)}`); return null; } const rid = runId(); const backupRoot = path.join('backups', 'cliproxyapi-auth-cleaner', rid); const reportRoot = path.join('reports', 'cliproxyapi-auth-cleaner'); await fs.mkdir(reportRoot, { recursive: true }); const counts = { '检查总数': 0, '可用账号': 0, '配额耗尽': 0, '已禁用': 0, '不可用': 0, 'api-call候选数': 0, 'api-call批次数': 0, 'api-call已探测': 0, 'api-call发现401': 0, 'api-call发现配额耗尽': 0, 'api-call探测失败': 0, '待删除401': 0, '已删除': 0, '备份失败': 0, '删除失败': 0, '额度账号已禁用': 0, '禁用失败': 0, }; const results = []; console.log(`[${getCurrentTime()}] 开始检查 ${files.length} 个账号`); const probeResults = await runApiCallFullScan(args, files, counts); for (const item of files) { counts['检查总数'] += 1; const name = String(item.name || item.id || '').trim(); const provider = String(item.provider || item.type || '').trim(); let [kind, reason] = classify(item); const row = { name, provider, auth_index: item.auth_index, status: item.status, status_message: item.status_message, disabled: item.disabled, unavailable: item.unavailable, runtime_only: item.runtime_only, source: item.source, }; const probe = probeResults[apiCallItemKey(item)]; if (probe !== undefined) { row.api_call_probe = probe; if (probe.classification === 'delete_401') { kind = 'delete_401'; reason = probe.reason || reason; } else if (probe.classification === 'quota_exhausted') { kind = 'quota_exhausted'; reason = probe.reason || reason; } } row.final_classification = kind; row.reason = reason; const displayReason = simplifyReason(reason); if (kind === 'available') { counts['可用账号'] += 1; } else if (kind === 'quota_exhausted') { counts['配额耗尽'] += 1; console.log(`[配额耗尽] ${name} provider=${provider} reason=${displayReason}`); if (args.dryRun) { row.disable_result = 'dry_run_skip'; console.log(' [模拟运行] 将调用 /auth-files/status 设置 disabled=true'); } else if (!name) { counts['禁用失败'] += 1; row.disable_result = 'skip_no_name'; row.disable_error = '缺少 name,无法调用 /auth-files/status'; console.log(' [禁用失败] 缺少 name,无法更新状态'); } else { try { const disableResponse = await disableAuthFile(args, name); counts['额度账号已禁用'] += 1; row.disable_result = 'disabled_true'; row.disable_response = disableResponse; console.log(` [已禁用] method=${disableResponse.method} HTTP ${disableResponse.code}`); } catch (error) { counts['禁用失败'] += 1; row.disable_result = 'disable_failed'; row.disable_error = error.message || String(error); console.log(` [禁用失败] ${error.message || error}`); } } } else if (kind === 'disabled') { counts['已禁用'] += 1; console.log(`[已禁用-不删除] ${name} provider=${provider}`); } else if (kind === 'unavailable') { counts['不可用'] += 1; console.log(`[不可用-不删除] ${name} provider=${provider} reason=${displayReason}`); } else if (kind === 'delete_401') { counts['待删除401'] += 1; console.log(`[待删除-401认证失败] ${name} provider=${provider} reason=${displayReason}`); if (args.dryRun) { row.delete_result = 'dry_run_skip'; console.log(' [模拟运行] 将删除此文件'); } else { const runtimeOnly = Boolean(item.runtime_only); const source = String(item.source || '').trim().toLowerCase(); if (runtimeOnly || (source && source !== 'file')) { counts['备份失败'] += 1; row.delete_result = 'skip_runtime_only'; row.delete_error = 'runtime_only/source!=file,管理 API 无法删除'; console.log(' [跳过] runtime_only 或非磁盘文件,无法通过 /auth-files 删除'); } else if (!name.toLowerCase().endsWith('.json')) { counts['备份失败'] += 1; row.delete_result = 'skip_no_json_name'; row.delete_error = '不是标准 .json 文件名,默认不删'; console.log(' [跳过] 不是 .json 文件'); } else { try { const [downloadCode, raw] = await api( args.baseUrl, args.managementKey, 'GET', '/auth-files/download', args.timeout, { name }, false ); if (downloadCode !== 200) { throw new Error(`下载 auth 文件失败: ${name} HTTP ${downloadCode}`); } await fs.mkdir(backupRoot, { recursive: true }); const backupPath = path.join(backupRoot, path.basename(name)); await fs.writeFile(backupPath, raw); row.backup_path = backupPath; const [deleteCode, deletePayload] = await api( args.baseUrl, args.managementKey, 'DELETE', '/auth-files', args.timeout, { name }, true ); if (deleteCode !== 200) { throw new Error(`删除 auth 文件失败: ${name} HTTP ${deleteCode} ${stringifySafe(deletePayload)}`); } counts['已删除'] += 1; row.delete_result = 'deleted'; row.delete_response = deletePayload; console.log(` [已删除] 备份路径: ${row.backup_path}`); } catch (error) { counts['删除失败'] += 1; row.delete_result = 'delete_failed'; row.delete_error = error.message || String(error); console.log(` [删除失败] ${error.message || error}`); } } } } results.push(row); } const report = { run_id: rid, base_url: args.baseUrl, dry_run: args.dryRun, api_call: { enabled: args.enableApiCallCheck, completed_in_this_process: Boolean(args.apiCallScanCompleted), providers: args.apiCallProviders, url: args.apiCallUrl, batch_size: args.apiCallMaxPerRun, sleep_fixed: args.apiCallSleep, sleep_min: args.apiCallSleepMin, sleep_max: args.apiCallSleepMax, }, results, summary: counts, }; const reportPath = path.join(reportRoot, `report-${rid}.json`); await fs.writeFile(reportPath, `${JSON.stringify(report, null, 2)}\n`, 'utf8'); console.log(`\n${'='.repeat(60)}`); console.log('【统计结果】'); console.log('='.repeat(60)); for (const [key, value] of Object.entries(counts)) { console.log(` ${key}: ${value}`); } console.log('\n【操作说明】'); if (args.dryRun) { console.log(' ✅ 模拟运行模式 - 没有实际删除或禁用任何账号'); if (counts['待删除401'] > 0) { console.log(` 📝 发现 ${counts['待删除401']} 个 401 认证失败账号(去掉 --dry-run 后会备份并删除)`); } if (counts['配额耗尽'] > 0) { console.log(` 📝 发现 ${counts['配额耗尽']} 个额度耗尽账号(去掉 --dry-run 后会调用 /auth-files/status 禁用)`); } } else { console.log(` ✅ 已删除 ${counts['已删除']} 个 401 认证失败账号`); console.log(` ✅ 已禁用 ${counts['额度账号已禁用']} 个额度耗尽账号`); if (counts['删除失败'] > 0) { console.log(` ⚠️ 有 ${counts['删除失败']} 个账号删除失败,请查看报告`); } if (counts['禁用失败'] > 0) { console.log(` ⚠️ 有 ${counts['禁用失败']} 个额度账号禁用失败,请查看报告`); } } console.log('\n【报告文件】'); console.log(` 📄 ${reportPath}`); console.log('='.repeat(60)); return counts; } const OPTION_DEFS = { '--base-url': { key: 'baseUrl', type: 'string', defaultValue: 'http://127.0.0.1:8317', help: '管理 API 的基础地址', }, '--management-key': { key: 'managementKey', type: 'string', defaultValue: '', help: '管理 API 的 Bearer token', }, '--timeout': { key: 'timeout', type: 'int', defaultValue: Number.parseInt(process.env.CLIPROXY_TIMEOUT || '20', 10), help: '管理 API 请求超时秒数', }, '--enable-api-call-check': { key: 'enableApiCallCheck', type: 'boolean', defaultValue: false, help: '开启 /api-call 全量探测;本次脚本运行只会完整探测一遍', }, '--api-call-url': { key: 'apiCallUrl', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_URL || DEFAULT_API_CALL_URL, help: '主动探测时调用的上游 URL', }, '--api-call-method': { key: 'apiCallMethod', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_METHOD || 'GET', help: '主动探测时使用的 HTTP 方法', }, '--api-call-account-id': { key: 'apiCallAccountId', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_ACCOUNT_ID || DEFAULT_API_CALL_ACCOUNT_ID, help: '主动探测时附带的 Chatgpt-Account-Id', }, '--api-call-user-agent': { key: 'apiCallUserAgent', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_USER_AGENT || DEFAULT_API_CALL_USER_AGENT, help: '主动探测时附带的 User-Agent', }, '--api-call-body': { key: 'apiCallBody', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_BODY || '', help: '主动探测时透传到 api-call 的 data 字段', }, '--api-call-providers': { key: 'apiCallProviders', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_PROVIDERS || DEFAULT_API_CALL_PROVIDERS, help: '哪些 provider 需要做 /api-call 主动探测,逗号分隔;留空表示全部', }, '--api-call-max-per-run': { key: 'apiCallMaxPerRun', type: 'int', defaultValue: Number.parseInt(process.env.CLIPROXY_API_CALL_MAX_PER_RUN || String(DEFAULT_API_CALL_MAX_PER_RUN), 10), help: '每批最多探测多少个账号,最大 9', }, '--api-call-sleep': { key: 'apiCallSleep', type: 'float', defaultValue: null, help: '固定批次等待秒数;如不设置则使用随机等待', }, '--api-call-sleep-min': { key: 'apiCallSleepMin', type: 'float', defaultValue: Number.parseFloat(process.env.CLIPROXY_API_CALL_SLEEP_MIN || String(DEFAULT_API_CALL_SLEEP_MIN)), help: '批次随机等待最小秒数', }, '--api-call-sleep-max': { key: 'apiCallSleepMax', type: 'float', defaultValue: Number.parseFloat(process.env.CLIPROXY_API_CALL_SLEEP_MAX || String(DEFAULT_API_CALL_SLEEP_MAX)), help: '批次随机等待最大秒数', }, '--dry-run': { key: 'dryRun', type: 'boolean', defaultValue: false, help: '模拟运行,不实际删除或禁用', }, '--interval': { key: 'interval', type: 'int', defaultValue: 60, help: '检测间隔时间(秒),默认 60 秒', }, '--once': { key: 'once', type: 'boolean', defaultValue: false, help: '只执行一次,不循环', }, }; function parseValue(rawValue, type, flag) { if (type === 'string') { return rawValue; } if (type === 'int') { const value = Number.parseInt(rawValue, 10); if (Number.isNaN(value)) { throw new Error(`${flag} 需要整数值`); } return value; } if (type === 'float') { const value = Number.parseFloat(rawValue); if (Number.isNaN(value)) { throw new Error(`${flag} 需要数字值`); } return value; } throw new Error(`未知参数类型: ${type}`); } function buildDefaultArgs() { const args = {}; for (const option of Object.values(OPTION_DEFS)) { args[option.key] = option.defaultValue; } return args; } function printUsage() { console.log('用法: node check.js [options]'); console.log(''); console.log('选项:'); console.log(' -h, --help 显示帮助'); for (const [flag, option] of Object.entries(OPTION_DEFS)) { const suffix = option.type === 'boolean' ? '' : ` <${option.type}>`; console.log(` ${flag.padEnd(30)}${suffix.padEnd(10)}${option.help}`); } } function parseCliArgs(argv) { const args = buildDefaultArgs(); for (let index = 0; index < argv.length; index += 1) { const token = argv[index]; if (token === '--help' || token === '-h') { args.help = true; continue; } if (!token.startsWith('--')) { throw new Error(`未知参数: ${token}`); } let flag = token; let inlineValue; const equalIndex = token.indexOf('='); if (equalIndex !== -1) { flag = token.slice(0, equalIndex); inlineValue = token.slice(equalIndex + 1); } const definition = OPTION_DEFS[flag]; if (!definition) { throw new Error(`未知参数: ${flag}`); } if (definition.type === 'boolean') { args[definition.key] = true; continue; } let rawValue = inlineValue; if (rawValue === undefined) { index += 1; if (index >= argv.length) { throw new Error(`${flag} 缺少参数值`); } rawValue = argv[index]; } args[definition.key] = parseValue(rawValue, definition.type, flag); } args.apiCallProviderSet = parseCsvSet(args.apiCallProviders); args.apiCallMaxPerRun = Math.max(0, Math.min(Number.parseInt(args.apiCallMaxPerRun, 10), DEFAULT_API_CALL_MAX_PER_RUN)); if (args.apiCallSleep !== null) { args.apiCallSleep = Math.max(0, Number(args.apiCallSleep)); } args.apiCallSleepMin = Math.max(0, Number(args.apiCallSleepMin)); args.apiCallSleepMax = Math.max(args.apiCallSleepMin, Number(args.apiCallSleepMax)); args.apiCallScanCompleted = false; return args; } async function main() { let args; try { args = parseCliArgs(process.argv.slice(2)); } catch (error) { console.error(`❌ ${error.message || error}`); console.error('使用 --help 查看可用参数'); return 2; } if (args.help) { printUsage(); return 0; } if (!String(args.managementKey || '').trim()) { console.error('❌ 缺少 management key:请先设置 CLIPROXY_MANAGEMENT_KEY'); return 2; } console.log(`\n${'='.repeat(60)}`); console.log('【CLIProxyAPI 清理工具】'); console.log('='.repeat(60)); console.log(' 🎯 清理目标: 删除 401 认证失败账号 + 禁用额度耗尽账号'); if (args.enableApiCallCheck) { const providersDesc = args.apiCallProviders.trim() ? args.apiCallProviders : '全部 provider'; const sleepDesc = args.apiCallSleep !== null ? `固定 ${Number(args.apiCallSleep).toFixed(1)} 秒` : `随机 ${Number(args.apiCallSleepMin).toFixed(1)}-${Number(args.apiCallSleepMax).toFixed(1)} 秒`; console.log(' 🔎 主动探测: 已开启 /v0/management/api-call'); console.log(` - 上游 URL: ${args.apiCallUrl}`); console.log(` - 适用 provider: ${providersDesc}`); console.log(` - 单批最多: ${args.apiCallMaxPerRun} 个`); console.log(` - 批次间隔: ${sleepDesc}`); console.log(' - 探测策略: 本次运行只完整扫描一遍,后续轮次不再重复'); } else { console.log(' 🔎 主动探测: 未开启 /v0/management/api-call'); } console.log(' 🛡️ 保护机制: 不会删除配额耗尽、禁用、不可用账号,只会禁用额度耗尽账号'); if (args.dryRun) { console.log(' 🔍 运行模式: 模拟运行(不会实际删除或禁用)'); } else { console.log(' ⚡ 运行模式: 实际运行(将删除/禁用符合条件的账号)'); } console.log(`${'='.repeat(60)}\n`); if (args.once) { await runCheck(args); return 0; } console.log(`🔄 自动循环检测模式,间隔 ${args.interval} 秒`); console.log('💡 提示: 按 Ctrl+C 停止程序\n'); let loopCount = 0; try { while (true) { loopCount += 1; console.log(`\n${'🔵'.repeat(30)}`); console.log(`【第 ${loopCount} 次检测】${getCurrentTime()}`); console.log('🔵'.repeat(30)); try { await runCheck(args); } catch (error) { console.log(`❌ 检测过程中发生异常: ${error.message || error}`); } console.log(`\n⏰ 等待 ${args.interval} 秒后进行下一次检测...`); await sleep(args.interval * 1000); } } catch (error) { if (error && error.name === 'AbortError') { throw error; } if (error && error.code === 'SIGINT') { console.log(`\n\n🛑 用户中断程序,共执行 ${loopCount} 次检测`); return 0; } throw error; } } process.on('SIGINT', () => { console.log('\n\n🛑 用户中断程序'); process.exit(0); }); main() .then((code) => { process.exitCode = code; }) .catch((error) => { console.error(`❌ ${error.message || error}`); process.exitCode = 1; });

5.4 vibe的产品,一次成型的,不保证准确


--【捌】--:

感谢大佬


--【玖】--:

应该没啥问题


--【拾】--:

加了参数启用,只执行一遍

问题描述:

找不到是哪个佬写的脚本改的。。。

修改内容与功能

  1. 新增了 api-call 主动探测。
    全量探测,等同于CPA中的 配额管理 刷新,本次脚本运行只会完整探测一遍
    默认是关闭的,只有加 --enable-api-call-check 才会跑,且只跑一遍
  2. api-call探测方式:
    一批同时探测 9 个,整批跑完以后,随机等待 5 到 10 秒,再继续下一批。这样比一个个串行扫。
  3. 额度耗尽的处理:
    通过调用/v0/management/auth-files/status禁用账号
  4. 原有功能:
    自动删除请求中401的账号。

使用说明

  • ‘–base-url’, default=‘http://127.0.0.1:8317’
  • ‘–management-key’, default=‘’ 写你的cpa管理密钥
  • ‘–timeout’, type=int, default=‘20’
  • ‘–enable-api-call-check’, action=‘store_true’ 开启 /api-call主动探测
  • ‘–api-call-url’, default=os.environ.get(‘CLIPROXY_API_CALL_URL’, DEFAULT_API_CALL_URL), help=‘主动探测时调用的上游 URL’
  • ‘–api-call-method’, default=os.environ.get(‘CLIPROXY_API_CALL_METHOD’, ‘GET’), help=‘主动探测时使用的 HTTP 方法’
  • ‘–api-call-account-id’, default=os.environ.get(‘CLIPROXY_API_CALL_ACCOUNT_ID’, DEFAULT_API_CALL_ACCOUNT_ID), help=‘主动探测时附带的 Chatgpt-Account-Id’
  • ‘–api-call-user-agent’, default=os.environ.get(‘CLIPROXY_API_CALL_USER_AGENT’, DEFAULT_API_CALL_USER_AGENT), help=‘主动探测时附带的 User-Agent’
  • ‘–api-call-body’, default=os.environ.get(‘CLIPROXY_API_CALL_BODY’, ‘’), help=‘主动探测时透传到 api-call 的 data 字段’
  • ‘–api-call-providers’, default=os.environ.get(‘CLIPROXY_API_CALL_PROVIDERS’, DEFAULT_API_CALL_PROVIDERS), help=‘哪些 provider 需要做 /api-call 主动探测,逗号分隔;留空表示全部’
  • ‘–api-call-max-per-run’, type=int, default=int(os.environ.get(‘CLIPROXY_API_CALL_MAX_PER_RUN’, str(DEFAULT_API_CALL_MAX_PER_RUN))), help=‘每批最多探测多少个账号,最大 9’
  • ‘–api-call-sleep’, type=float, default=None, help=‘固定批次等待秒数;如不设置则使用随机等待’
  • ‘–api-call-sleep-min’, type=float, default=float(os.environ.get(‘CLIPROXY_API_CALL_SLEEP_MIN’, str(DEFAULT_API_CALL_SLEEP_MIN))), help=‘批次随机等待最小秒数’
  • ‘–api-call-sleep-max’, type=float, default=float(os.environ.get(‘CLIPROXY_API_CALL_SLEEP_MAX’, str(DEFAULT_API_CALL_SLEEP_MAX))), help=‘批次随机等待最大秒数’
  • ‘–dry-run’, action=‘store_true’, help=‘模拟运行,不实际删除或禁用’
  • ‘–interval’, type=int, default=60, help=‘检测间隔时间(秒),默认60秒’
  • ‘–once’, action=‘store_true’, help=‘只执行一次,不循环’

脚本

import os, sys, re, json, argparse, time, random from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from urllib import request, parse, error from datetime import datetime, timezone P401 = re.compile(r'(^|\D)401(\D|$)|unauthorized|unauthenticated|token\s+expired|login\s+required|authentication\s+failed', re.I) PQUOTA = re.compile(r'(^|\D)(402|403|429)(\D|$)|quota|insufficient\s*quota|resource\s*exhausted|rate\s*limit|too\s+many\s+requests|payment\s+required|billing|credit|额度|用完|超限|上限|usage_limit_reached', re.I) DEFAULT_API_CALL_PROVIDERS = 'codex,openai,chatgpt' DEFAULT_API_CALL_URL = 'https://chatgpt.com/backend-api/wham/usage' DEFAULT_API_CALL_USER_AGENT = 'codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal' DEFAULT_API_CALL_ACCOUNT_ID = '141c5c10-0993-45c2-ad18-9a01ba2ab3e0' DEFAULT_API_CALL_MAX_PER_RUN = 9 DEFAULT_API_CALL_SLEEP_MIN = 5.0 DEFAULT_API_CALL_SLEEP_MAX = 10.0 AUTH_FILE_STATUS_METHODS = ('PATCH', 'PUT', 'POST') def get_current_time(): return datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S %z') def run_id(): return datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') def api(base, key, method, path, timeout=20, query=None, expect_json=True, body=None, extra_headers=None): url = base.rstrip('/') + '/v0/management' + path if query: url += '?' + parse.urlencode(query) headers = { 'Authorization': 'Bearer ' + key, 'Accept': 'application/json', 'User-Agent': 'cliproxyapi-cleaner/1.0', } if extra_headers: headers.update(extra_headers) data = None if body is not None: if isinstance(body, (dict, list)): data = json.dumps(body, ensure_ascii=False).encode('utf-8') headers.setdefault('Content-Type', 'application/json') elif isinstance(body, bytes): data = body else: data = str(body).encode('utf-8') headers.setdefault('Content-Type', 'application/json') req = request.Request(url, data=data, headers=headers, method=method.upper()) try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read() code = resp.getcode() except error.HTTPError as e: raw = e.read() code = e.code except error.URLError as e: raise RuntimeError('请求管理 API 失败: %s' % e) if expect_json: try: payload = json.loads(raw.decode('utf-8')) if raw else {} except Exception: payload = {'raw': raw.decode('utf-8', errors='replace')} return code, payload return code, raw def extract_error_message(msg): """从状态消息中提取错误信息""" try: if isinstance(msg, str) and msg.strip().startswith('{'): error_data = json.loads(msg) if 'error' in error_data: error_obj = error_data['error'] if isinstance(error_obj, dict): error_type = error_obj.get('type', '') error_message = error_obj.get('message', '') return error_type, error_message if isinstance(error_obj, str): return 'error', error_obj return None, msg except Exception: pass return None, msg def parse_csv_set(value): if not value: return set() return {x.strip().lower() for x in str(value).split(',') if x.strip()} def simplify_reason(reason): text = str(reason or '').strip() if not text: return '' if not text.startswith('{'): return text[:120] try: error_data = json.loads(text) except Exception: return text[:120] if 'error' not in error_data: return text[:120] error_obj = error_data['error'] if isinstance(error_obj, dict): error_type = str(error_obj.get('type') or '').strip() error_message = str(error_obj.get('message') or '').strip() if error_type == 'usage_limit_reached': return ('usage_limit_reached: ' + error_message)[:120] return (error_type or error_message or text)[:120] return str(error_obj)[:120] def classify(item): status = str(item.get('status', '')).strip().lower() msg = str(item.get('status_message', '') or '').strip() error_type, error_msg = extract_error_message(msg) text = (status + '\n' + msg).lower() if P401.search(text): return 'delete_401', msg or status or '401/unauthorized' if error_type == 'usage_limit_reached' or 'usage_limit_reached' in text: return 'quota_exhausted', msg or status or 'usage_limit_reached' if PQUOTA.search(text): return 'quota_exhausted', msg or status or 'quota' if bool(item.get('disabled', False)) or status == 'disabled': return 'disabled', msg or status or 'disabled' if bool(item.get('unavailable', False)) or status == 'error': return 'unavailable', msg or status or 'error' return 'available', msg or status or 'active' def should_probe_api_call(item, args): if not args.enable_api_call_check: return False auth_index = str(item.get('auth_index') or '').strip() if not auth_index: return False provider = str(item.get('provider') or item.get('type') or '').strip().lower() if args.api_call_provider_set and provider not in args.api_call_provider_set: return False initial_kind, _ = classify(item) return initial_kind == 'available' def api_call_item_key(item): auth_index = str(item.get('auth_index') or '').strip() if auth_index: return 'auth_index:' + auth_index return 'name:' + str(item.get('name') or item.get('id') or '').strip() def build_api_call_payload(item, args): headers = { 'Authorization': 'Bearer $TOKEN$', 'Content-Type': 'application/json', 'User-Agent': args.api_call_user_agent, } if args.api_call_account_id.strip(): headers['Chatgpt-Account-Id'] = args.api_call_account_id.strip() payload = { 'authIndex': str(item.get('auth_index') or '').strip(), 'method': args.api_call_method.upper(), 'url': args.api_call_url.strip(), 'header': headers, } if args.api_call_body: payload['data'] = args.api_call_body return payload def normalize_api_call_body(body): if body is None: return '', None if isinstance(body, str): text = body trimmed = text.strip() if not trimmed: return text, None try: return text, json.loads(trimmed) except Exception: return text, text try: return json.dumps(body, ensure_ascii=False), body except Exception: return str(body), body def is_limit_reached_window(value): if not isinstance(value, dict): return False if value.get('allowed') is False: return True if value.get('limit_reached') is True: return True return False def classify_api_call_response(payload): nested_status = payload.get('status_code', payload.get('statusCode', 0)) try: nested_status = int(nested_status) except Exception: nested_status = 0 header = payload.get('header') or payload.get('headers') or {} body_text, body = normalize_api_call_body(payload.get('body')) try: header_text = json.dumps(header, ensure_ascii=False) except Exception: header_text = str(header) if isinstance(body, (dict, list)): try: body_signal = json.dumps(body, ensure_ascii=False) except Exception: body_signal = body_text else: body_signal = body_text if nested_status == 401: return 'delete_401', body_signal or ('api-call status_code=%s' % nested_status) if nested_status in (402, 403, 429): return 'quota_exhausted', body_signal or ('api-call status_code=%s' % nested_status) if isinstance(body, dict): error_obj = body.get('error') if isinstance(error_obj, dict): error_type = str(error_obj.get('type') or '').strip().lower() error_message = str(error_obj.get('message') or '').strip() error_text = (error_type + '\n' + error_message).lower() if error_type == 'usage_limit_reached' or PQUOTA.search(error_text): return 'quota_exhausted', body_signal or error_message or error_type if P401.search(error_text): return 'delete_401', body_signal or error_message or error_type rate_limit = body.get('rate_limit') code_review_rate_limit = body.get('code_review_rate_limit') if is_limit_reached_window(rate_limit) or is_limit_reached_window(code_review_rate_limit): return 'quota_exhausted', body_signal or 'rate_limit_reached' if nested_status == 200: return None, body_signal or 'ok' fallback_text = ('%s\n%s\n%s' % (nested_status, header_text, body_signal)).lower() if P401.search(fallback_text): return 'delete_401', body_signal or ('api-call status_code=%s' % nested_status) if nested_status != 200 and PQUOTA.search(fallback_text): return 'quota_exhausted', body_signal or ('api-call status_code=%s' % nested_status) return None, body_signal or ('api-call status_code=%s' % nested_status if nested_status else 'ok') def run_api_call_probe(args, item): request_payload = build_api_call_payload(item, args) code, payload = api( args.base_url, args.management_key, 'POST', '/api-call', args.timeout, expect_json=True, body=request_payload, ) if code != 200: raise RuntimeError('调用 /api-call 失败: HTTP %s %s' % (code, payload)) kind, reason = classify_api_call_response(payload) return { 'request': request_payload, 'response': payload, 'classification': kind, 'reason': reason, 'status_code': payload.get('status_code', payload.get('statusCode')), } def pick_api_call_sleep_seconds(args): fixed_sleep = getattr(args, 'api_call_sleep', None) if fixed_sleep is not None: return max(0.0, float(fixed_sleep)) return random.uniform(args.api_call_sleep_min, args.api_call_sleep_max) def run_api_call_full_scan(args, files, counts): if not args.enable_api_call_check: counts['api-call候选数'] = 0 counts['api-call批次数'] = 0 return {} if getattr(args, 'api_call_scan_completed', False): counts['api-call候选数'] = 0 counts['api-call批次数'] = 0 print('[api-call] 当前进程已完成一次全量探测,本轮跳过', flush=True) return {} eligible = [item for item in files if should_probe_api_call(item, args)] counts['api-call候选数'] = len(eligible) if not eligible: counts['api-call批次数'] = 0 args.api_call_scan_completed = True print('[api-call] 没有需要探测的候选账号,本次进程不再执行 api-call', flush=True) return {} batch_size = max(1, min(int(args.api_call_max_per_run), DEFAULT_API_CALL_MAX_PER_RUN)) batch_count = (len(eligible) + batch_size - 1) // batch_size counts['api-call批次数'] = batch_count sleep_desc = '固定 %.1f 秒' % args.api_call_sleep if args.api_call_sleep is not None else '随机 %.1f-%.1f 秒' % (args.api_call_sleep_min, args.api_call_sleep_max) print('[api-call] 已开启全量探测,本次运行将探测 %s 个候选账号,共 %s 批,每批最多 %s 个,批次间隔 %s' % ( len(eligible), batch_count, batch_size, sleep_desc, ), flush=True) probe_results = {} probed_total = 0 for batch_index in range(batch_count): batch = eligible[batch_index * batch_size:(batch_index + 1) * batch_size] print('[api-call批次 %s/%s] 并发探测 %s 个账号' % (batch_index + 1, batch_count, len(batch)), flush=True) with ThreadPoolExecutor(max_workers=len(batch)) as executor: future_to_item = {executor.submit(run_api_call_probe, args, item): item for item in batch} for future in as_completed(future_to_item): item = future_to_item[future] counts['api-call已探测'] += 1 probed_total += 1 name = str(item.get('name') or item.get('id') or '').strip() provider = str(item.get('provider') or item.get('type') or '').strip() auth_index = item.get('auth_index') try: probe = future.result() probe_results[api_call_item_key(item)] = probe classification = probe.get('classification') or 'ok' if classification == 'delete_401': counts['api-call发现401'] += 1 elif classification == 'quota_exhausted': counts['api-call发现配额耗尽'] += 1 print(' [api-call完成 %s/%s] %s provider=%s auth_index=%s result=%s' % ( probed_total, len(eligible), name, provider, auth_index, classification, ), flush=True) except Exception as e: counts['api-call探测失败'] += 1 probe_results[api_call_item_key(item)] = {'error': str(e)} print(' [api-call完成 %s/%s] %s provider=%s auth_index=%s result=error error=%s' % ( probed_total, len(eligible), name, provider, auth_index, e, ), flush=True) if batch_index + 1 < batch_count: sleep_seconds = pick_api_call_sleep_seconds(args) if sleep_seconds > 0: print('[api-call批次 %s/%s] 整批完成,等待 %.1f 秒后继续下一批' % ( batch_index + 1, batch_count, sleep_seconds, ), flush=True) time.sleep(sleep_seconds) args.api_call_scan_completed = True print('[api-call] 本次运行已完成全部候选账号探测,后续轮次不再重复探测', flush=True) return probe_results def disable_auth_file(args, name): payload = {'name': name, 'disabled': True} attempts = [] for method in AUTH_FILE_STATUS_METHODS: code, resp = api( args.base_url, args.management_key, method, '/auth-files/status', args.timeout, expect_json=True, body=payload, ) attempts.append({'method': method, 'code': code, 'response': resp}) if 200 <= code < 300: return attempts[-1] if code not in (404, 405, 501): break raise RuntimeError('更新 auth-files/status 失败: %s' % attempts) def run_check(args): """执行一次检查""" code, payload = api(args.base_url, args.management_key, 'GET', '/auth-files', args.timeout) if code != 200: print('[错误] 获取 auth-files 失败: HTTP %s %s' % (code, payload), file=sys.stderr) return None files = payload.get('files') or [] if not isinstance(files, list): print('[错误] auth-files 返回异常: %s' % payload, file=sys.stderr) return None rid = run_id() backup_root = Path('./backups/cliproxyapi-auth-cleaner') / rid report_root = Path('./reports/cliproxyapi-auth-cleaner') report_root.mkdir(parents=True, exist_ok=True) counts = { '检查总数': 0, '可用账号': 0, '配额耗尽': 0, '已禁用': 0, '不可用': 0, 'api-call候选数': 0, 'api-call批次数': 0, 'api-call已探测': 0, 'api-call发现401': 0, 'api-call发现配额耗尽': 0, 'api-call探测失败': 0, '待删除401': 0, '已删除': 0, '备份失败': 0, '删除失败': 0, '额度账号已禁用': 0, '禁用失败': 0, } results = [] print('[%s] 开始检查 %s 个账号' % (get_current_time(), len(files)), flush=True) probe_results = run_api_call_full_scan(args, files, counts) for idx, item in enumerate(files): counts['检查总数'] += 1 name = str(item.get('name') or item.get('id') or '').strip() provider = str(item.get('provider') or item.get('type') or '').strip() kind, reason = classify(item) row = { 'name': name, 'provider': provider, 'auth_index': item.get('auth_index'), 'status': item.get('status'), 'status_message': item.get('status_message'), 'disabled': item.get('disabled'), 'unavailable': item.get('unavailable'), 'runtime_only': item.get('runtime_only'), 'source': item.get('source'), } probe = probe_results.get(api_call_item_key(item)) if probe is not None: row['api_call_probe'] = probe if probe.get('classification') == 'delete_401': kind = 'delete_401' reason = probe.get('reason') or reason elif probe.get('classification') == 'quota_exhausted': kind = 'quota_exhausted' reason = probe.get('reason') or reason row['final_classification'] = kind row['reason'] = reason display_reason = simplify_reason(reason) if kind == 'available': counts['可用账号'] += 1 elif kind == 'quota_exhausted': counts['配额耗尽'] += 1 print('[配额耗尽] %s provider=%s reason=%s' % (name, provider, display_reason), flush=True) if args.dry_run: row['disable_result'] = 'dry_run_skip' print(' [模拟运行] 将调用 /auth-files/status 设置 disabled=true', flush=True) elif not name: counts['禁用失败'] += 1 row['disable_result'] = 'skip_no_name' row['disable_error'] = '缺少 name,无法调用 /auth-files/status' print(' [禁用失败] 缺少 name,无法更新状态', flush=True) else: try: disable_resp = disable_auth_file(args, name) counts['额度账号已禁用'] += 1 row['disable_result'] = 'disabled_true' row['disable_response'] = disable_resp print(' [已禁用] method=%s HTTP %s' % (disable_resp['method'], disable_resp['code']), flush=True) except Exception as e: counts['禁用失败'] += 1 row['disable_result'] = 'disable_failed' row['disable_error'] = str(e) print(' [禁用失败] %s' % e, flush=True) elif kind == 'disabled': counts['已禁用'] += 1 print('[已禁用-不删除] %s provider=%s' % (name, provider), flush=True) elif kind == 'unavailable': counts['不可用'] += 1 print('[不可用-不删除] %s provider=%s reason=%s' % (name, provider, display_reason), flush=True) elif kind == 'delete_401': counts['待删除401'] += 1 print('[待删除-401认证失败] %s provider=%s reason=%s' % (name, provider, display_reason), flush=True) if args.dry_run: row['delete_result'] = 'dry_run_skip' print(' [模拟运行] 将删除此文件', flush=True) else: runtime_only = bool(item.get('runtime_only', False)) source = str(item.get('source') or '').strip().lower() if runtime_only or (source and source != 'file'): counts['备份失败'] += 1 row['delete_result'] = 'skip_runtime_only' row['delete_error'] = 'runtime_only/source!=file,管理 API 无法删除' print(' [跳过] runtime_only 或非磁盘文件,无法通过 /auth-files 删除', flush=True) elif not name.lower().endswith('.json'): counts['备份失败'] += 1 row['delete_result'] = 'skip_no_json_name' row['delete_error'] = '不是标准 .json 文件名,默认不删' print(' [跳过] 不是 .json 文件', flush=True) else: try: code, raw = api(args.base_url, args.management_key, 'GET', '/auth-files/download', args.timeout, {'name': name}, False) if code != 200: raise RuntimeError('下载 auth 文件失败: %s HTTP %s' % (name, code)) backup_root.mkdir(parents=True, exist_ok=True) backup_path = backup_root / Path(name).name backup_path.write_bytes(raw) row['backup_path'] = str(backup_path) code, payload = api(args.base_url, args.management_key, 'DELETE', '/auth-files', args.timeout, {'name': name}, True) if code != 200: raise RuntimeError('删除 auth 文件失败: %s HTTP %s %s' % (name, code, payload)) counts['已删除'] += 1 row['delete_result'] = 'deleted' row['delete_response'] = payload print(' [已删除] 备份路径: %s' % row['backup_path'], flush=True) except Exception as e: counts['删除失败'] += 1 row['delete_result'] = 'delete_failed' row['delete_error'] = str(e) print(' [删除失败] %s' % e, flush=True) results.append(row) report = { 'run_id': rid, 'base_url': args.base_url, 'dry_run': args.dry_run, 'api_call': { 'enabled': args.enable_api_call_check, 'completed_in_this_process': bool(getattr(args, 'api_call_scan_completed', False)), 'providers': args.api_call_providers, 'url': args.api_call_url, 'batch_size': args.api_call_max_per_run, 'sleep_fixed': args.api_call_sleep, 'sleep_min': args.api_call_sleep_min, 'sleep_max': args.api_call_sleep_max, }, 'results': results, 'summary': counts, } report_path = report_root / ('report-' + rid + '.json') report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding='utf-8') print('\n' + '=' * 60) print('【统计结果】') print('=' * 60) for key, value in counts.items(): print(' %s: %d' % (key, value)) print('\n【操作说明】') if args.dry_run: print(' ✅ 模拟运行模式 - 没有实际删除或禁用任何账号') if counts['待删除401'] > 0: print(' 📝 发现 %d 个 401 认证失败账号(去掉 --dry-run 后会备份并删除)' % counts['待删除401']) if counts['配额耗尽'] > 0: print(' 📝 发现 %d 个额度耗尽账号(去掉 --dry-run 后会调用 /auth-files/status 禁用)' % counts['配额耗尽']) else: print(' ✅ 已删除 %d 个 401 认证失败账号' % counts['已删除']) print(' ✅ 已禁用 %d 个额度耗尽账号' % counts['额度账号已禁用']) if counts['删除失败'] > 0: print(' ⚠️ 有 %d 个账号删除失败,请查看报告' % counts['删除失败']) if counts['禁用失败'] > 0: print(' ⚠️ 有 %d 个额度账号禁用失败,请查看报告' % counts['禁用失败']) print('\n【报告文件】') print(' 📄 %s' % report_path) print('=' * 60, flush=True) return counts def main(): ap = argparse.ArgumentParser(description='CLIProxyAPI 清理工具 - 删除 401 账号并禁用额度耗尽账号') ap.add_argument('--base-url', default='http://127.0.0.1:8317') ap.add_argument('--management-key', default='') ap.add_argument('--timeout', type=int, default=int(os.environ.get('CLIPROXY_TIMEOUT', '20'))) ap.add_argument('--enable-api-call-check', action='store_true', help='开启 /api-call 全量探测;本次脚本运行只会完整探测一遍') ap.add_argument('--api-call-url', default=os.environ.get('CLIPROXY_API_CALL_URL', DEFAULT_API_CALL_URL), help='主动探测时调用的上游 URL') ap.add_argument('--api-call-method', default=os.environ.get('CLIPROXY_API_CALL_METHOD', 'GET'), help='主动探测时使用的 HTTP 方法') ap.add_argument('--api-call-account-id', default=os.environ.get('CLIPROXY_API_CALL_ACCOUNT_ID', DEFAULT_API_CALL_ACCOUNT_ID), help='主动探测时附带的 Chatgpt-Account-Id') ap.add_argument('--api-call-user-agent', default=os.environ.get('CLIPROXY_API_CALL_USER_AGENT', DEFAULT_API_CALL_USER_AGENT), help='主动探测时附带的 User-Agent') ap.add_argument('--api-call-body', default=os.environ.get('CLIPROXY_API_CALL_BODY', ''), help='主动探测时透传到 api-call 的 data 字段') ap.add_argument('--api-call-providers', default=os.environ.get('CLIPROXY_API_CALL_PROVIDERS', DEFAULT_API_CALL_PROVIDERS), help='哪些 provider 需要做 /api-call 主动探测,逗号分隔;留空表示全部') ap.add_argument('--api-call-max-per-run', type=int, default=int(os.environ.get('CLIPROXY_API_CALL_MAX_PER_RUN', str(DEFAULT_API_CALL_MAX_PER_RUN))), help='每批最多探测多少个账号,最大 9') ap.add_argument('--api-call-sleep', type=float, default=None, help='固定批次等待秒数;如不设置则使用随机等待') ap.add_argument('--api-call-sleep-min', type=float, default=float(os.environ.get('CLIPROXY_API_CALL_SLEEP_MIN', str(DEFAULT_API_CALL_SLEEP_MIN))), help='批次随机等待最小秒数') ap.add_argument('--api-call-sleep-max', type=float, default=float(os.environ.get('CLIPROXY_API_CALL_SLEEP_MAX', str(DEFAULT_API_CALL_SLEEP_MAX))), help='批次随机等待最大秒数') ap.add_argument('--dry-run', action='store_true', help='模拟运行,不实际删除或禁用') ap.add_argument('--interval', type=int, default=60, help='检测间隔时间(秒),默认60秒') ap.add_argument('--once', action='store_true', help='只执行一次,不循环') args = ap.parse_args() args.api_call_provider_set = parse_csv_set(args.api_call_providers) args.api_call_max_per_run = max(0, min(int(args.api_call_max_per_run), DEFAULT_API_CALL_MAX_PER_RUN)) if args.api_call_sleep is not None: args.api_call_sleep = max(0.0, float(args.api_call_sleep)) args.api_call_sleep_min = max(0.0, float(args.api_call_sleep_min)) args.api_call_sleep_max = max(args.api_call_sleep_min, float(args.api_call_sleep_max)) args.api_call_scan_completed = False if not args.management_key.strip(): print('❌ 缺少 management key:请先设置 CLIPROXY_MANAGEMENT_KEY', file=sys.stderr) return 2 print('\n' + '=' * 60) print('【CLIProxyAPI 清理工具】') print('=' * 60) print(' 🎯 清理目标: 删除 401 认证失败账号 + 禁用额度耗尽账号') if args.enable_api_call_check: providers_desc = args.api_call_providers if args.api_call_providers.strip() else '全部 provider' sleep_desc = '固定 %.1f 秒' % args.api_call_sleep if args.api_call_sleep is not None else '随机 %.1f-%.1f 秒' % (args.api_call_sleep_min, args.api_call_sleep_max) print(' 🔎 主动探测: 已开启 /v0/management/api-call') print(' - 上游 URL: %s' % args.api_call_url) print(' - 适用 provider: %s' % providers_desc) print(' - 单批最多: %s 个' % args.api_call_max_per_run) print(' - 批次间隔: %s' % sleep_desc) print(' - 探测策略: 本次运行只完整扫描一遍,后续轮次不再重复') else: print(' 🔎 主动探测: 未开启 /v0/management/api-call') print(' 🛡️ 保护机制: 不会删除配额耗尽、禁用、不可用账号,只会禁用额度耗尽账号') if args.dry_run: print(' 🔍 运行模式: 模拟运行(不会实际删除或禁用)') else: print(' ⚡ 运行模式: 实际运行(将删除/禁用符合条件的账号)') print('=' * 60 + '\n') if args.once: run_check(args) return 0 print('🔄 自动循环检测模式,间隔 %d 秒' % args.interval) print('💡 提示: 按 Ctrl+C 停止程序\n') loop_count = 0 try: while True: loop_count += 1 print('\n' + '🔵' * 30) print('【第 %d 次检测】%s' % (loop_count, get_current_time())) print('🔵' * 30) try: run_check(args) except Exception as e: print('❌ 检测过程中发生异常: %s' % e, flush=True) print('\n⏰ 等待 %d 秒后进行下一次检测...' % args.interval) time.sleep(args.interval) except KeyboardInterrupt: print('\n\n🛑 用户中断程序,共执行 %d 次检测' % loop_count) return 0 if __name__ == '__main__': raise SystemExit(main())

清理效果

ScreenShot_2026-03-25_120509_516481×433 124 KB

网友解答:
--【壹】--:

-enable-api-call-check 一周一次感觉就够了


--【贰】--:

还是自动的好啊。


--【叁】--:

是的。如果要刷新配额检查的话,再加上-enable-api-call-check


--【肆】--:

定时任务执行,是不是加个–once参数就行了?
image1675×352 25.5 KB


--【伍】--:

这个可以在青龙里跑吗


--【陆】--:

主动测活会导致号死的更快


--【柒】--:

来一个js版本的,没有依赖库,直接

  • node check.js --help

就行。

const fs = require('fs/promises'); const http = require('http'); const https = require('https'); const path = require('path'); const P401 = /(^|\D)401(\D|$)|unauthorized|unauthenticated|token\s+expired|login\s+required|authentication\s+failed/i; const PQUOTA = /(^|\D)(402|403|429)(\D|$)|quota|insufficient\s*quota|resource\s*exhausted|rate\s*limit|too\s+many\s+requests|payment\s+required|billing|credit|额度|用完|超限|上限|usage_limit_reached/i; const DEFAULT_API_CALL_PROVIDERS = 'codex,openai,chatgpt'; const DEFAULT_API_CALL_URL = 'https://chatgpt.com/backend-api/wham/usage'; const DEFAULT_API_CALL_USER_AGENT = 'codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal'; const DEFAULT_API_CALL_ACCOUNT_ID = '141c5c10-0993-45c2-ad18-9a01ba2ab3e0'; const DEFAULT_API_CALL_MAX_PER_RUN = 9; const DEFAULT_API_CALL_SLEEP_MIN = 5.0; const DEFAULT_API_CALL_SLEEP_MAX = 10.0; const AUTH_FILE_STATUS_METHODS = ['PATCH', 'PUT', 'POST']; function pad2(value) { return String(value).padStart(2, '0'); } function formatOffset(date) { const offsetMinutes = -date.getTimezoneOffset(); const sign = offsetMinutes >= 0 ? '+' : '-'; const absolute = Math.abs(offsetMinutes); const hours = Math.floor(absolute / 60); const minutes = absolute % 60; return `${sign}${pad2(hours)}${pad2(minutes)}`; } function getCurrentTime() { const now = new Date(); return `${now.getFullYear()}-${pad2(now.getMonth() + 1)}-${pad2(now.getDate())} ${pad2(now.getHours())}:${pad2(now.getMinutes())}:${pad2(now.getSeconds())} ${formatOffset(now)}`; } function runId() { const now = new Date(); return `${now.getUTCFullYear()}${pad2(now.getUTCMonth() + 1)}${pad2(now.getUTCDate())}T${pad2(now.getUTCHours())}${pad2(now.getUTCMinutes())}${pad2(now.getUTCSeconds())}Z`; } function stringifySafe(value) { try { return JSON.stringify(value); } catch (_error) { return String(value); } } function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } function requestRaw(urlObject, { method, headers, body, timeoutSeconds }) { return new Promise((resolve, reject) => { const transport = urlObject.protocol === 'https:' ? https : http; const req = transport.request( urlObject, { method, headers, }, (res) => { const chunks = []; res.on('data', (chunk) => chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))); res.on('end', () => { resolve({ statusCode: res.statusCode || 0, headers: res.headers || {}, body: Buffer.concat(chunks), }); }); } ); req.setTimeout(timeoutSeconds * 1000, () => { req.destroy(new Error(`request timeout after ${timeoutSeconds}s`)); }); req.on('error', (error) => reject(error)); if (body !== null && body !== undefined) { req.write(body); } req.end(); }); } async function api(base, key, method, apiPath, timeout = 20, query = null, expectJson = true, body = null, extraHeaders = null) { const urlObject = new URL(`${String(base).replace(/\/+$/, '')}/v0/management${apiPath}`); if (query && typeof query === 'object') { for (const [name, value] of Object.entries(query)) { if (value !== undefined && value !== null) { urlObject.searchParams.append(name, String(value)); } } } const headers = { Authorization: `Bearer ${key}`, Accept: 'application/json', 'User-Agent': 'cliproxyapi-cleaner/1.0', ...(extraHeaders || {}), }; let payloadBuffer = null; if (body !== null && body !== undefined) { if (Buffer.isBuffer(body)) { payloadBuffer = body; } else if (typeof body === 'object') { payloadBuffer = Buffer.from(JSON.stringify(body), 'utf8'); if (!headers['Content-Type']) { headers['Content-Type'] = 'application/json'; } } else { payloadBuffer = Buffer.from(String(body), 'utf8'); if (!headers['Content-Type']) { headers['Content-Type'] = 'application/json'; } } headers['Content-Length'] = String(payloadBuffer.length); } let response; try { response = await requestRaw(urlObject, { method: String(method).toUpperCase(), headers, body: payloadBuffer, timeoutSeconds: timeout, }); } catch (error) { throw new Error(`请求管理 API 失败: ${error.message || error}`); } if (expectJson) { const text = response.body.length ? response.body.toString('utf8') : ''; let payload; try { payload = text ? JSON.parse(text) : {}; } catch (_error) { payload = { raw: text }; } return [response.statusCode, payload]; } return [response.statusCode, response.body]; } function extractErrorMessage(message) { try { if (typeof message === 'string' && message.trim().startsWith('{')) { const errorData = JSON.parse(message); if (Object.prototype.hasOwnProperty.call(errorData, 'error')) { const errorObject = errorData.error; if (errorObject && typeof errorObject === 'object' && !Array.isArray(errorObject)) { const errorType = errorObject.type || ''; const errorMessage = errorObject.message || ''; return [errorType, errorMessage]; } if (typeof errorObject === 'string') { return ['error', errorObject]; } } return [null, message]; } } catch (_error) { return [null, message]; } return [null, message]; } function parseCsvSet(value) { if (!value) { return new Set(); } return new Set( String(value) .split(',') .map((item) => item.trim().toLowerCase()) .filter(Boolean) ); } function simplifyReason(reason) { const text = String(reason || '').trim(); if (!text) { return ''; } if (!text.startsWith('{')) { return text.slice(0, 120); } let errorData; try { errorData = JSON.parse(text); } catch (_error) { return text.slice(0, 120); } if (!Object.prototype.hasOwnProperty.call(errorData, 'error')) { return text.slice(0, 120); } const errorObject = errorData.error; if (errorObject && typeof errorObject === 'object' && !Array.isArray(errorObject)) { const errorType = String(errorObject.type || '').trim(); const errorMessage = String(errorObject.message || '').trim(); if (errorType === 'usage_limit_reached') { return `usage_limit_reached: ${errorMessage}`.slice(0, 120); } return (errorType || errorMessage || text).slice(0, 120); } return String(errorObject).slice(0, 120); } function classify(item) { const status = String(item.status || '').trim().toLowerCase(); const message = String(item.status_message || '').trim(); const [errorType] = extractErrorMessage(message); const text = `${status}\n${message}`.toLowerCase(); if (P401.test(text)) { return ['delete_401', message || status || '401/unauthorized']; } if (errorType === 'usage_limit_reached' || text.includes('usage_limit_reached')) { return ['quota_exhausted', message || status || 'usage_limit_reached']; } if (PQUOTA.test(text)) { return ['quota_exhausted', message || status || 'quota']; } if (Boolean(item.disabled) || status === 'disabled') { return ['disabled', message || status || 'disabled']; } if (Boolean(item.unavailable) || status === 'error') { return ['unavailable', message || status || 'error']; } return ['available', message || status || 'active']; } function shouldProbeApiCall(item, args) { if (!args.enableApiCallCheck) { return false; } const authIndex = String(item.auth_index || '').trim(); if (!authIndex) { return false; } const provider = String(item.provider || item.type || '').trim().toLowerCase(); if (args.apiCallProviderSet.size > 0 && !args.apiCallProviderSet.has(provider)) { return false; } const [initialKind] = classify(item); return initialKind === 'available'; } function apiCallItemKey(item) { const authIndex = String(item.auth_index || '').trim(); if (authIndex) { return `auth_index:${authIndex}`; } return `name:${String(item.name || item.id || '').trim()}`; } function buildApiCallPayload(item, args) { const headers = { Authorization: 'Bearer $TOKEN$', 'Content-Type': 'application/json', 'User-Agent': args.apiCallUserAgent, }; if (args.apiCallAccountId.trim()) { headers['Chatgpt-Account-Id'] = args.apiCallAccountId.trim(); } const payload = { authIndex: String(item.auth_index || '').trim(), method: args.apiCallMethod.toUpperCase(), url: args.apiCallUrl.trim(), header: headers, }; if (args.apiCallBody) { payload.data = args.apiCallBody; } return payload; } function normalizeApiCallBody(body) { if (body === null || body === undefined) { return ['', null]; } if (typeof body === 'string') { const text = body; const trimmed = text.trim(); if (!trimmed) { return [text, null]; } try { return [text, JSON.parse(trimmed)]; } catch (_error) { return [text, text]; } } try { return [JSON.stringify(body), body]; } catch (_error) { return [String(body), body]; } } function isLimitReachedWindow(value) { if (!value || typeof value !== 'object' || Array.isArray(value)) { return false; } if (value.allowed === false) { return true; } if (value.limit_reached === true) { return true; } return false; } function classifyApiCallResponse(payload) { let nestedStatus = payload.status_code ?? payload.statusCode ?? 0; nestedStatus = Number.parseInt(nestedStatus, 10); if (Number.isNaN(nestedStatus)) { nestedStatus = 0; } const header = payload.header || payload.headers || {}; const [bodyText, body] = normalizeApiCallBody(payload.body); let headerText; try { headerText = JSON.stringify(header); } catch (_error) { headerText = String(header); } let bodySignal = bodyText; if (body && typeof body === 'object') { try { bodySignal = JSON.stringify(body); } catch (_error) { bodySignal = bodyText; } } if (nestedStatus === 401) { return ['delete_401', bodySignal || `api-call status_code=${nestedStatus}`]; } if (nestedStatus === 402 || nestedStatus === 403 || nestedStatus === 429) { return ['quota_exhausted', bodySignal || `api-call status_code=${nestedStatus}`]; } if (body && typeof body === 'object' && !Array.isArray(body)) { const errorObject = body.error; if (errorObject && typeof errorObject === 'object' && !Array.isArray(errorObject)) { const errorType = String(errorObject.type || '').trim().toLowerCase(); const errorMessage = String(errorObject.message || '').trim(); const errorText = `${errorType}\n${errorMessage}`.toLowerCase(); if (errorType === 'usage_limit_reached' || PQUOTA.test(errorText)) { return ['quota_exhausted', bodySignal || errorMessage || errorType]; } if (P401.test(errorText)) { return ['delete_401', bodySignal || errorMessage || errorType]; } } const rateLimit = body.rate_limit; const codeReviewRateLimit = body.code_review_rate_limit; if (isLimitReachedWindow(rateLimit) || isLimitReachedWindow(codeReviewRateLimit)) { return ['quota_exhausted', bodySignal || 'rate_limit_reached']; } if (nestedStatus === 200) { return [null, bodySignal || 'ok']; } } const fallbackText = `${nestedStatus}\n${headerText}\n${bodySignal}`.toLowerCase(); if (P401.test(fallbackText)) { return ['delete_401', bodySignal || `api-call status_code=${nestedStatus}`]; } if (nestedStatus !== 200 && PQUOTA.test(fallbackText)) { return ['quota_exhausted', bodySignal || `api-call status_code=${nestedStatus}`]; } return [null, bodySignal || (nestedStatus ? `api-call status_code=${nestedStatus}` : 'ok')]; } async function runApiCallProbe(args, item) { const requestPayload = buildApiCallPayload(item, args); const [code, payload] = await api( args.baseUrl, args.managementKey, 'POST', '/api-call', args.timeout, null, true, requestPayload ); if (code !== 200) { throw new Error(`调用 /api-call 失败: HTTP ${code} ${stringifySafe(payload)}`); } const [kind, reason] = classifyApiCallResponse(payload); return { request: requestPayload, response: payload, classification: kind, reason, status_code: payload.status_code ?? payload.statusCode, }; } function pickApiCallSleepSeconds(args) { if (args.apiCallSleep !== null && args.apiCallSleep !== undefined) { return Math.max(0, Number(args.apiCallSleep)); } return args.apiCallSleepMin + Math.random() * (args.apiCallSleepMax - args.apiCallSleepMin); } async function runApiCallFullScan(args, files, counts) { if (!args.enableApiCallCheck) { counts['api-call候选数'] = 0; counts['api-call批次数'] = 0; return {}; } if (args.apiCallScanCompleted) { counts['api-call候选数'] = 0; counts['api-call批次数'] = 0; console.log('[api-call] 当前进程已完成一次全量探测,本轮跳过'); return {}; } const eligible = files.filter((item) => shouldProbeApiCall(item, args)); counts['api-call候选数'] = eligible.length; if (eligible.length === 0) { counts['api-call批次数'] = 0; args.apiCallScanCompleted = true; console.log('[api-call] 没有需要探测的候选账号,本次进程不再执行 api-call'); return {}; } const batchSize = Math.max(1, Math.min(Number.parseInt(args.apiCallMaxPerRun, 10), DEFAULT_API_CALL_MAX_PER_RUN)); const batchCount = Math.ceil(eligible.length / batchSize); counts['api-call批次数'] = batchCount; const sleepDesc = args.apiCallSleep !== null && args.apiCallSleep !== undefined ? `固定 ${Number(args.apiCallSleep).toFixed(1)} 秒` : `随机 ${Number(args.apiCallSleepMin).toFixed(1)}-${Number(args.apiCallSleepMax).toFixed(1)} 秒`; console.log( `[api-call] 已开启全量探测,本次运行将探测 ${eligible.length} 个候选账号,共 ${batchCount} 批,每批最多 ${batchSize} 个,批次间隔 ${sleepDesc}` ); const probeResults = {}; let probedTotal = 0; for (let batchIndex = 0; batchIndex < batchCount; batchIndex += 1) { const start = batchIndex * batchSize; const batch = eligible.slice(start, start + batchSize); console.log(`[api-call批次 ${batchIndex + 1}/${batchCount}] 并发探测 ${batch.length} 个账号`); await Promise.all( batch.map(async (item) => { const name = String(item.name || item.id || '').trim(); const provider = String(item.provider || item.type || '').trim(); const authIndex = item.auth_index; try { const probe = await runApiCallProbe(args, item); counts['api-call已探测'] += 1; probedTotal += 1; const currentIndex = probedTotal; probeResults[apiCallItemKey(item)] = probe; const classification = probe.classification || 'ok'; if (classification === 'delete_401') { counts['api-call发现401'] += 1; } else if (classification === 'quota_exhausted') { counts['api-call发现配额耗尽'] += 1; } console.log( ` [api-call完成 ${currentIndex}/${eligible.length}] ${name} provider=${provider} auth_index=${authIndex} result=${classification}` ); } catch (error) { counts['api-call已探测'] += 1; probedTotal += 1; const currentIndex = probedTotal; counts['api-call探测失败'] += 1; probeResults[apiCallItemKey(item)] = { error: error.message || String(error) }; console.log( ` [api-call完成 ${currentIndex}/${eligible.length}] ${name} provider=${provider} auth_index=${authIndex} result=error error=${error.message || error}` ); } }) ); if (batchIndex + 1 < batchCount) { const sleepSeconds = pickApiCallSleepSeconds(args); if (sleepSeconds > 0) { console.log(`[api-call批次 ${batchIndex + 1}/${batchCount}] 整批完成,等待 ${sleepSeconds.toFixed(1)} 秒后继续下一批`); await sleep(sleepSeconds * 1000); } } } args.apiCallScanCompleted = true; console.log('[api-call] 本次运行已完成全部候选账号探测,后续轮次不再重复探测'); return probeResults; } async function disableAuthFile(args, name) { const payload = { name, disabled: true }; const attempts = []; for (const method of AUTH_FILE_STATUS_METHODS) { const [code, response] = await api( args.baseUrl, args.managementKey, method, '/auth-files/status', args.timeout, null, true, payload ); attempts.push({ method, code, response }); if (code >= 200 && code < 300) { return attempts[attempts.length - 1]; } if (![404, 405, 501].includes(code)) { break; } } throw new Error(`更新 auth-files/status 失败: ${stringifySafe(attempts)}`); } async function runCheck(args) { const [code, payload] = await api(args.baseUrl, args.managementKey, 'GET', '/auth-files', args.timeout); if (code !== 200) { console.error(`[错误] 获取 auth-files 失败: HTTP ${code} ${stringifySafe(payload)}`); return null; } const files = payload.files || []; if (!Array.isArray(files)) { console.error(`[错误] auth-files 返回异常: ${stringifySafe(payload)}`); return null; } const rid = runId(); const backupRoot = path.join('backups', 'cliproxyapi-auth-cleaner', rid); const reportRoot = path.join('reports', 'cliproxyapi-auth-cleaner'); await fs.mkdir(reportRoot, { recursive: true }); const counts = { '检查总数': 0, '可用账号': 0, '配额耗尽': 0, '已禁用': 0, '不可用': 0, 'api-call候选数': 0, 'api-call批次数': 0, 'api-call已探测': 0, 'api-call发现401': 0, 'api-call发现配额耗尽': 0, 'api-call探测失败': 0, '待删除401': 0, '已删除': 0, '备份失败': 0, '删除失败': 0, '额度账号已禁用': 0, '禁用失败': 0, }; const results = []; console.log(`[${getCurrentTime()}] 开始检查 ${files.length} 个账号`); const probeResults = await runApiCallFullScan(args, files, counts); for (const item of files) { counts['检查总数'] += 1; const name = String(item.name || item.id || '').trim(); const provider = String(item.provider || item.type || '').trim(); let [kind, reason] = classify(item); const row = { name, provider, auth_index: item.auth_index, status: item.status, status_message: item.status_message, disabled: item.disabled, unavailable: item.unavailable, runtime_only: item.runtime_only, source: item.source, }; const probe = probeResults[apiCallItemKey(item)]; if (probe !== undefined) { row.api_call_probe = probe; if (probe.classification === 'delete_401') { kind = 'delete_401'; reason = probe.reason || reason; } else if (probe.classification === 'quota_exhausted') { kind = 'quota_exhausted'; reason = probe.reason || reason; } } row.final_classification = kind; row.reason = reason; const displayReason = simplifyReason(reason); if (kind === 'available') { counts['可用账号'] += 1; } else if (kind === 'quota_exhausted') { counts['配额耗尽'] += 1; console.log(`[配额耗尽] ${name} provider=${provider} reason=${displayReason}`); if (args.dryRun) { row.disable_result = 'dry_run_skip'; console.log(' [模拟运行] 将调用 /auth-files/status 设置 disabled=true'); } else if (!name) { counts['禁用失败'] += 1; row.disable_result = 'skip_no_name'; row.disable_error = '缺少 name,无法调用 /auth-files/status'; console.log(' [禁用失败] 缺少 name,无法更新状态'); } else { try { const disableResponse = await disableAuthFile(args, name); counts['额度账号已禁用'] += 1; row.disable_result = 'disabled_true'; row.disable_response = disableResponse; console.log(` [已禁用] method=${disableResponse.method} HTTP ${disableResponse.code}`); } catch (error) { counts['禁用失败'] += 1; row.disable_result = 'disable_failed'; row.disable_error = error.message || String(error); console.log(` [禁用失败] ${error.message || error}`); } } } else if (kind === 'disabled') { counts['已禁用'] += 1; console.log(`[已禁用-不删除] ${name} provider=${provider}`); } else if (kind === 'unavailable') { counts['不可用'] += 1; console.log(`[不可用-不删除] ${name} provider=${provider} reason=${displayReason}`); } else if (kind === 'delete_401') { counts['待删除401'] += 1; console.log(`[待删除-401认证失败] ${name} provider=${provider} reason=${displayReason}`); if (args.dryRun) { row.delete_result = 'dry_run_skip'; console.log(' [模拟运行] 将删除此文件'); } else { const runtimeOnly = Boolean(item.runtime_only); const source = String(item.source || '').trim().toLowerCase(); if (runtimeOnly || (source && source !== 'file')) { counts['备份失败'] += 1; row.delete_result = 'skip_runtime_only'; row.delete_error = 'runtime_only/source!=file,管理 API 无法删除'; console.log(' [跳过] runtime_only 或非磁盘文件,无法通过 /auth-files 删除'); } else if (!name.toLowerCase().endsWith('.json')) { counts['备份失败'] += 1; row.delete_result = 'skip_no_json_name'; row.delete_error = '不是标准 .json 文件名,默认不删'; console.log(' [跳过] 不是 .json 文件'); } else { try { const [downloadCode, raw] = await api( args.baseUrl, args.managementKey, 'GET', '/auth-files/download', args.timeout, { name }, false ); if (downloadCode !== 200) { throw new Error(`下载 auth 文件失败: ${name} HTTP ${downloadCode}`); } await fs.mkdir(backupRoot, { recursive: true }); const backupPath = path.join(backupRoot, path.basename(name)); await fs.writeFile(backupPath, raw); row.backup_path = backupPath; const [deleteCode, deletePayload] = await api( args.baseUrl, args.managementKey, 'DELETE', '/auth-files', args.timeout, { name }, true ); if (deleteCode !== 200) { throw new Error(`删除 auth 文件失败: ${name} HTTP ${deleteCode} ${stringifySafe(deletePayload)}`); } counts['已删除'] += 1; row.delete_result = 'deleted'; row.delete_response = deletePayload; console.log(` [已删除] 备份路径: ${row.backup_path}`); } catch (error) { counts['删除失败'] += 1; row.delete_result = 'delete_failed'; row.delete_error = error.message || String(error); console.log(` [删除失败] ${error.message || error}`); } } } } results.push(row); } const report = { run_id: rid, base_url: args.baseUrl, dry_run: args.dryRun, api_call: { enabled: args.enableApiCallCheck, completed_in_this_process: Boolean(args.apiCallScanCompleted), providers: args.apiCallProviders, url: args.apiCallUrl, batch_size: args.apiCallMaxPerRun, sleep_fixed: args.apiCallSleep, sleep_min: args.apiCallSleepMin, sleep_max: args.apiCallSleepMax, }, results, summary: counts, }; const reportPath = path.join(reportRoot, `report-${rid}.json`); await fs.writeFile(reportPath, `${JSON.stringify(report, null, 2)}\n`, 'utf8'); console.log(`\n${'='.repeat(60)}`); console.log('【统计结果】'); console.log('='.repeat(60)); for (const [key, value] of Object.entries(counts)) { console.log(` ${key}: ${value}`); } console.log('\n【操作说明】'); if (args.dryRun) { console.log(' ✅ 模拟运行模式 - 没有实际删除或禁用任何账号'); if (counts['待删除401'] > 0) { console.log(` 📝 发现 ${counts['待删除401']} 个 401 认证失败账号(去掉 --dry-run 后会备份并删除)`); } if (counts['配额耗尽'] > 0) { console.log(` 📝 发现 ${counts['配额耗尽']} 个额度耗尽账号(去掉 --dry-run 后会调用 /auth-files/status 禁用)`); } } else { console.log(` ✅ 已删除 ${counts['已删除']} 个 401 认证失败账号`); console.log(` ✅ 已禁用 ${counts['额度账号已禁用']} 个额度耗尽账号`); if (counts['删除失败'] > 0) { console.log(` ⚠️ 有 ${counts['删除失败']} 个账号删除失败,请查看报告`); } if (counts['禁用失败'] > 0) { console.log(` ⚠️ 有 ${counts['禁用失败']} 个额度账号禁用失败,请查看报告`); } } console.log('\n【报告文件】'); console.log(` 📄 ${reportPath}`); console.log('='.repeat(60)); return counts; } const OPTION_DEFS = { '--base-url': { key: 'baseUrl', type: 'string', defaultValue: 'http://127.0.0.1:8317', help: '管理 API 的基础地址', }, '--management-key': { key: 'managementKey', type: 'string', defaultValue: '', help: '管理 API 的 Bearer token', }, '--timeout': { key: 'timeout', type: 'int', defaultValue: Number.parseInt(process.env.CLIPROXY_TIMEOUT || '20', 10), help: '管理 API 请求超时秒数', }, '--enable-api-call-check': { key: 'enableApiCallCheck', type: 'boolean', defaultValue: false, help: '开启 /api-call 全量探测;本次脚本运行只会完整探测一遍', }, '--api-call-url': { key: 'apiCallUrl', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_URL || DEFAULT_API_CALL_URL, help: '主动探测时调用的上游 URL', }, '--api-call-method': { key: 'apiCallMethod', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_METHOD || 'GET', help: '主动探测时使用的 HTTP 方法', }, '--api-call-account-id': { key: 'apiCallAccountId', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_ACCOUNT_ID || DEFAULT_API_CALL_ACCOUNT_ID, help: '主动探测时附带的 Chatgpt-Account-Id', }, '--api-call-user-agent': { key: 'apiCallUserAgent', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_USER_AGENT || DEFAULT_API_CALL_USER_AGENT, help: '主动探测时附带的 User-Agent', }, '--api-call-body': { key: 'apiCallBody', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_BODY || '', help: '主动探测时透传到 api-call 的 data 字段', }, '--api-call-providers': { key: 'apiCallProviders', type: 'string', defaultValue: process.env.CLIPROXY_API_CALL_PROVIDERS || DEFAULT_API_CALL_PROVIDERS, help: '哪些 provider 需要做 /api-call 主动探测,逗号分隔;留空表示全部', }, '--api-call-max-per-run': { key: 'apiCallMaxPerRun', type: 'int', defaultValue: Number.parseInt(process.env.CLIPROXY_API_CALL_MAX_PER_RUN || String(DEFAULT_API_CALL_MAX_PER_RUN), 10), help: '每批最多探测多少个账号,最大 9', }, '--api-call-sleep': { key: 'apiCallSleep', type: 'float', defaultValue: null, help: '固定批次等待秒数;如不设置则使用随机等待', }, '--api-call-sleep-min': { key: 'apiCallSleepMin', type: 'float', defaultValue: Number.parseFloat(process.env.CLIPROXY_API_CALL_SLEEP_MIN || String(DEFAULT_API_CALL_SLEEP_MIN)), help: '批次随机等待最小秒数', }, '--api-call-sleep-max': { key: 'apiCallSleepMax', type: 'float', defaultValue: Number.parseFloat(process.env.CLIPROXY_API_CALL_SLEEP_MAX || String(DEFAULT_API_CALL_SLEEP_MAX)), help: '批次随机等待最大秒数', }, '--dry-run': { key: 'dryRun', type: 'boolean', defaultValue: false, help: '模拟运行,不实际删除或禁用', }, '--interval': { key: 'interval', type: 'int', defaultValue: 60, help: '检测间隔时间(秒),默认 60 秒', }, '--once': { key: 'once', type: 'boolean', defaultValue: false, help: '只执行一次,不循环', }, }; function parseValue(rawValue, type, flag) { if (type === 'string') { return rawValue; } if (type === 'int') { const value = Number.parseInt(rawValue, 10); if (Number.isNaN(value)) { throw new Error(`${flag} 需要整数值`); } return value; } if (type === 'float') { const value = Number.parseFloat(rawValue); if (Number.isNaN(value)) { throw new Error(`${flag} 需要数字值`); } return value; } throw new Error(`未知参数类型: ${type}`); } function buildDefaultArgs() { const args = {}; for (const option of Object.values(OPTION_DEFS)) { args[option.key] = option.defaultValue; } return args; } function printUsage() { console.log('用法: node check.js [options]'); console.log(''); console.log('选项:'); console.log(' -h, --help 显示帮助'); for (const [flag, option] of Object.entries(OPTION_DEFS)) { const suffix = option.type === 'boolean' ? '' : ` <${option.type}>`; console.log(` ${flag.padEnd(30)}${suffix.padEnd(10)}${option.help}`); } } function parseCliArgs(argv) { const args = buildDefaultArgs(); for (let index = 0; index < argv.length; index += 1) { const token = argv[index]; if (token === '--help' || token === '-h') { args.help = true; continue; } if (!token.startsWith('--')) { throw new Error(`未知参数: ${token}`); } let flag = token; let inlineValue; const equalIndex = token.indexOf('='); if (equalIndex !== -1) { flag = token.slice(0, equalIndex); inlineValue = token.slice(equalIndex + 1); } const definition = OPTION_DEFS[flag]; if (!definition) { throw new Error(`未知参数: ${flag}`); } if (definition.type === 'boolean') { args[definition.key] = true; continue; } let rawValue = inlineValue; if (rawValue === undefined) { index += 1; if (index >= argv.length) { throw new Error(`${flag} 缺少参数值`); } rawValue = argv[index]; } args[definition.key] = parseValue(rawValue, definition.type, flag); } args.apiCallProviderSet = parseCsvSet(args.apiCallProviders); args.apiCallMaxPerRun = Math.max(0, Math.min(Number.parseInt(args.apiCallMaxPerRun, 10), DEFAULT_API_CALL_MAX_PER_RUN)); if (args.apiCallSleep !== null) { args.apiCallSleep = Math.max(0, Number(args.apiCallSleep)); } args.apiCallSleepMin = Math.max(0, Number(args.apiCallSleepMin)); args.apiCallSleepMax = Math.max(args.apiCallSleepMin, Number(args.apiCallSleepMax)); args.apiCallScanCompleted = false; return args; } async function main() { let args; try { args = parseCliArgs(process.argv.slice(2)); } catch (error) { console.error(`❌ ${error.message || error}`); console.error('使用 --help 查看可用参数'); return 2; } if (args.help) { printUsage(); return 0; } if (!String(args.managementKey || '').trim()) { console.error('❌ 缺少 management key:请先设置 CLIPROXY_MANAGEMENT_KEY'); return 2; } console.log(`\n${'='.repeat(60)}`); console.log('【CLIProxyAPI 清理工具】'); console.log('='.repeat(60)); console.log(' 🎯 清理目标: 删除 401 认证失败账号 + 禁用额度耗尽账号'); if (args.enableApiCallCheck) { const providersDesc = args.apiCallProviders.trim() ? args.apiCallProviders : '全部 provider'; const sleepDesc = args.apiCallSleep !== null ? `固定 ${Number(args.apiCallSleep).toFixed(1)} 秒` : `随机 ${Number(args.apiCallSleepMin).toFixed(1)}-${Number(args.apiCallSleepMax).toFixed(1)} 秒`; console.log(' 🔎 主动探测: 已开启 /v0/management/api-call'); console.log(` - 上游 URL: ${args.apiCallUrl}`); console.log(` - 适用 provider: ${providersDesc}`); console.log(` - 单批最多: ${args.apiCallMaxPerRun} 个`); console.log(` - 批次间隔: ${sleepDesc}`); console.log(' - 探测策略: 本次运行只完整扫描一遍,后续轮次不再重复'); } else { console.log(' 🔎 主动探测: 未开启 /v0/management/api-call'); } console.log(' 🛡️ 保护机制: 不会删除配额耗尽、禁用、不可用账号,只会禁用额度耗尽账号'); if (args.dryRun) { console.log(' 🔍 运行模式: 模拟运行(不会实际删除或禁用)'); } else { console.log(' ⚡ 运行模式: 实际运行(将删除/禁用符合条件的账号)'); } console.log(`${'='.repeat(60)}\n`); if (args.once) { await runCheck(args); return 0; } console.log(`🔄 自动循环检测模式,间隔 ${args.interval} 秒`); console.log('💡 提示: 按 Ctrl+C 停止程序\n'); let loopCount = 0; try { while (true) { loopCount += 1; console.log(`\n${'🔵'.repeat(30)}`); console.log(`【第 ${loopCount} 次检测】${getCurrentTime()}`); console.log('🔵'.repeat(30)); try { await runCheck(args); } catch (error) { console.log(`❌ 检测过程中发生异常: ${error.message || error}`); } console.log(`\n⏰ 等待 ${args.interval} 秒后进行下一次检测...`); await sleep(args.interval * 1000); } } catch (error) { if (error && error.name === 'AbortError') { throw error; } if (error && error.code === 'SIGINT') { console.log(`\n\n🛑 用户中断程序,共执行 ${loopCount} 次检测`); return 0; } throw error; } } process.on('SIGINT', () => { console.log('\n\n🛑 用户中断程序'); process.exit(0); }); main() .then((code) => { process.exitCode = code; }) .catch((error) => { console.error(`❌ ${error.message || error}`); process.exitCode = 1; });

5.4 vibe的产品,一次成型的,不保证准确


--【捌】--:

感谢大佬


--【玖】--:

应该没啥问题


--【拾】--:

加了参数启用,只执行一遍