分享一个 CPA 的 Codex 转 sub2api 的 py 脚本

2026-04-13 12:031阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐
问题描述:

如图所示:


image402×290 13.9 KB
image986×268 11.4 KB


在 L 站 和 Github 上找了一圈都没找到想要的,干脆直接让 AI 做一个了。分享出来吧

from __future__ import annotations import base64 import json from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any from urllib import error, request AUTHS_DIR = Path(__file__).resolve().parent / "auths" FILE_PATTERN = "codex-*.json" # 匹配 codex- 开头的 JSON 文件 API_URL = "http://sub2api的域名/api/v1/admin/accounts/data" SHANGHAI_TZ = timezone(timedelta(hours=8)) HEADERS = { "Authorization": "Bearer " , # 在这里填入 sub2api 的 JWT Token "Content-Type": "application/json", } def decode_jwt_payload(token: str) -> dict[str, Any]: parts = token.split(".") if len(parts) != 3: raise ValueError("JWT 格式不正确") payload = parts[1] padded = payload + "=" * (-len(payload) % 4) decoded = base64.urlsafe_b64decode(padded.encode("utf-8")) data = json.loads(decoded.decode("utf-8")) if not isinstance(data, dict): raise ValueError("JWT payload 不是对象") return data def parse_datetime(value: str) -> datetime: normalized = value.replace("Z", "+00:00") parsed = datetime.fromisoformat(normalized) if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed def compact_dict(value: dict[str, Any]) -> dict[str, Any]: return {key: item for key, item in value.items() if item is not None} def build_account_payload(source: dict[str, Any]) -> dict[str, Any]: access_token = source["access_token"] id_token = source["id_token"] access_claims = decode_jwt_payload(access_token) id_claims = decode_jwt_payload(id_token) auth_claims = access_claims.get("https://api.openai.com/auth", {}) if not isinstance(auth_claims, dict): auth_claims = {} id_auth_claims = id_claims.get("https://api.openai.com/auth", {}) if not isinstance(id_auth_claims, dict): id_auth_claims = {} organizations = id_auth_claims.get("organizations", []) organization_id = None if isinstance(organizations, list) and organizations: first_organization = organizations[0] if isinstance(first_organization, dict): organization_id = first_organization.get("id") expired_at = parse_datetime(source["expired"]) token_issued_at = access_claims.get("iat") token_expires_at = access_claims.get("exp") expires_in = None if isinstance(token_issued_at, int) and isinstance(token_expires_at, int): expires_in = token_expires_at - token_issued_at email = source.get("email") or auth_claims.get("email") credentials = compact_dict( { "_token_version": token_issued_at * 1000 if isinstance(token_issued_at, int) else None, "access_token": access_token, "chatgpt_account_id": source.get("account_id") or auth_claims.get("chatgpt_account_id"), "chatgpt_user_id": auth_claims.get("chatgpt_user_id") or auth_claims.get("user_id"), "email": email, "expires_at": expired_at.astimezone(SHANGHAI_TZ).replace(microsecond=0).isoformat(), "expires_in": expires_in, "id_token": id_token, "organization_id": organization_id, "refresh_token": source.get("refresh_token"), } ) account = { "name": email, "platform": "openai", "type": "oauth", "credentials": credentials, "extra": compact_dict({"email": email}), "concurrency": 10, "priority": 1, "rate_multiplier": 1, "auto_pause_on_expired": True, } return { "exported_at": datetime.now(timezone.utc).replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ"), "proxies": [], "accounts": [account], } def upload_payload(payload: dict[str, Any]) -> tuple[int, str]: body = json.dumps( {"data": payload, "skip_default_group_bind": True}, ensure_ascii=False, ).encode("utf-8") req = request.Request(API_URL, data=body, headers=HEADERS, method="POST") with request.urlopen(req, timeout=30) as response: raw = response.read().decode("utf-8", errors="replace") return response.status, raw def process_file(file_path: Path) -> bool: try: source = json.loads(file_path.read_text(encoding="utf-8")) if not isinstance(source, dict): raise ValueError("JSON 根节点不是对象") payload = build_account_payload(source) status_code, response_text = upload_payload(payload) print(f"[OK] {file_path.name} -> HTTP {status_code}") print(response_text) return True except error.HTTPError as exc: response_text = exc.read().decode("utf-8", errors="replace") print(f"[HTTP ERROR] {file_path.name} -> HTTP {exc.code}") print(response_text) except error.URLError as exc: print(f"[URL ERROR] {file_path.name} -> {exc.reason}") except Exception as exc: print(f"[ERROR] {file_path.name} -> {exc}") return False def main() -> int: if not AUTHS_DIR.exists(): print(f"未找到目录: {AUTHS_DIR}") return 1 files = sorted(path for path in AUTHS_DIR.glob(FILE_PATTERN) if path.is_file()) if not files: print(f"未找到文件: {AUTHS_DIR / FILE_PATTERN}") return 1 success_count = 0 for file_path in files: if process_file(file_path): success_count += 1 print(f"完成: 成功 {success_count} / 总计 {len(files)}") return 0 if success_count == len(files) else 2 if __name__ == "__main__": raise SystemExit(main()) 网友解答:


--【壹】--:

感分享谢


--【贰】--: 還記得你說家是唯一的城堡 隨著稻香河流繼續奔跑 微微笑 小時候的夢我知道:

感谢分享(*ゝω・)

感谢分享(*ゝω・) 刚好用上了,维护两个网站有点累


--【叁】--:

是啊,具体有什么用?sub2api干嘛 的


--【肆】--:

cpa和sub2api有什么区别吗?


--【伍】--:

感谢分享(*ゝω・)


--【陆】--:

哇,感谢大佬


--【柒】--:

感谢分享


--【捌】--:

同问,我也想知道,我目前感觉好像都是可以反代中转,但是sub2api好像可以开多账户,分组这样用,适合多人企业用,CPA好像更适合个人,我是这样理解的


--【玖】--:

cpa怎么转sub2api,我没用过sub2api,佬能指点一下吗?

标签:人工智能
问题描述:

如图所示:


image402×290 13.9 KB
image986×268 11.4 KB


在 L 站 和 Github 上找了一圈都没找到想要的,干脆直接让 AI 做一个了。分享出来吧

from __future__ import annotations import base64 import json from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any from urllib import error, request AUTHS_DIR = Path(__file__).resolve().parent / "auths" FILE_PATTERN = "codex-*.json" # 匹配 codex- 开头的 JSON 文件 API_URL = "http://sub2api的域名/api/v1/admin/accounts/data" SHANGHAI_TZ = timezone(timedelta(hours=8)) HEADERS = { "Authorization": "Bearer " , # 在这里填入 sub2api 的 JWT Token "Content-Type": "application/json", } def decode_jwt_payload(token: str) -> dict[str, Any]: parts = token.split(".") if len(parts) != 3: raise ValueError("JWT 格式不正确") payload = parts[1] padded = payload + "=" * (-len(payload) % 4) decoded = base64.urlsafe_b64decode(padded.encode("utf-8")) data = json.loads(decoded.decode("utf-8")) if not isinstance(data, dict): raise ValueError("JWT payload 不是对象") return data def parse_datetime(value: str) -> datetime: normalized = value.replace("Z", "+00:00") parsed = datetime.fromisoformat(normalized) if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed def compact_dict(value: dict[str, Any]) -> dict[str, Any]: return {key: item for key, item in value.items() if item is not None} def build_account_payload(source: dict[str, Any]) -> dict[str, Any]: access_token = source["access_token"] id_token = source["id_token"] access_claims = decode_jwt_payload(access_token) id_claims = decode_jwt_payload(id_token) auth_claims = access_claims.get("https://api.openai.com/auth", {}) if not isinstance(auth_claims, dict): auth_claims = {} id_auth_claims = id_claims.get("https://api.openai.com/auth", {}) if not isinstance(id_auth_claims, dict): id_auth_claims = {} organizations = id_auth_claims.get("organizations", []) organization_id = None if isinstance(organizations, list) and organizations: first_organization = organizations[0] if isinstance(first_organization, dict): organization_id = first_organization.get("id") expired_at = parse_datetime(source["expired"]) token_issued_at = access_claims.get("iat") token_expires_at = access_claims.get("exp") expires_in = None if isinstance(token_issued_at, int) and isinstance(token_expires_at, int): expires_in = token_expires_at - token_issued_at email = source.get("email") or auth_claims.get("email") credentials = compact_dict( { "_token_version": token_issued_at * 1000 if isinstance(token_issued_at, int) else None, "access_token": access_token, "chatgpt_account_id": source.get("account_id") or auth_claims.get("chatgpt_account_id"), "chatgpt_user_id": auth_claims.get("chatgpt_user_id") or auth_claims.get("user_id"), "email": email, "expires_at": expired_at.astimezone(SHANGHAI_TZ).replace(microsecond=0).isoformat(), "expires_in": expires_in, "id_token": id_token, "organization_id": organization_id, "refresh_token": source.get("refresh_token"), } ) account = { "name": email, "platform": "openai", "type": "oauth", "credentials": credentials, "extra": compact_dict({"email": email}), "concurrency": 10, "priority": 1, "rate_multiplier": 1, "auto_pause_on_expired": True, } return { "exported_at": datetime.now(timezone.utc).replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ"), "proxies": [], "accounts": [account], } def upload_payload(payload: dict[str, Any]) -> tuple[int, str]: body = json.dumps( {"data": payload, "skip_default_group_bind": True}, ensure_ascii=False, ).encode("utf-8") req = request.Request(API_URL, data=body, headers=HEADERS, method="POST") with request.urlopen(req, timeout=30) as response: raw = response.read().decode("utf-8", errors="replace") return response.status, raw def process_file(file_path: Path) -> bool: try: source = json.loads(file_path.read_text(encoding="utf-8")) if not isinstance(source, dict): raise ValueError("JSON 根节点不是对象") payload = build_account_payload(source) status_code, response_text = upload_payload(payload) print(f"[OK] {file_path.name} -> HTTP {status_code}") print(response_text) return True except error.HTTPError as exc: response_text = exc.read().decode("utf-8", errors="replace") print(f"[HTTP ERROR] {file_path.name} -> HTTP {exc.code}") print(response_text) except error.URLError as exc: print(f"[URL ERROR] {file_path.name} -> {exc.reason}") except Exception as exc: print(f"[ERROR] {file_path.name} -> {exc}") return False def main() -> int: if not AUTHS_DIR.exists(): print(f"未找到目录: {AUTHS_DIR}") return 1 files = sorted(path for path in AUTHS_DIR.glob(FILE_PATTERN) if path.is_file()) if not files: print(f"未找到文件: {AUTHS_DIR / FILE_PATTERN}") return 1 success_count = 0 for file_path in files: if process_file(file_path): success_count += 1 print(f"完成: 成功 {success_count} / 总计 {len(files)}") return 0 if success_count == len(files) else 2 if __name__ == "__main__": raise SystemExit(main()) 网友解答:


--【壹】--:

感分享谢


--【贰】--: 還記得你說家是唯一的城堡 隨著稻香河流繼續奔跑 微微笑 小時候的夢我知道:

感谢分享(*ゝω・)

感谢分享(*ゝω・) 刚好用上了,维护两个网站有点累


--【叁】--:

是啊,具体有什么用?sub2api干嘛 的


--【肆】--:

cpa和sub2api有什么区别吗?


--【伍】--:

感谢分享(*ゝω・)


--【陆】--:

哇,感谢大佬


--【柒】--:

感谢分享


--【捌】--:

同问,我也想知道,我目前感觉好像都是可以反代中转,但是sub2api好像可以开多账户,分组这样用,适合多人企业用,CPA好像更适合个人,我是这样理解的


--【玖】--:

cpa怎么转sub2api,我没用过sub2api,佬能指点一下吗?

标签:人工智能