分享一个 CPA 的 Codex 转 sub2api 的 py 脚本
- 内容介绍
- 文章标签
- 相关推荐
如图所示:
image402×290 13.9 KB
image986×268 11.4 KB
在 L 站 和 Github 上找了一圈都没找到想要的,干脆直接让 AI 做一个了。分享出来吧
from __future__ import annotations
import base64
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib import error, request
AUTHS_DIR = Path(__file__).resolve().parent / "auths"
FILE_PATTERN = "codex-*.json" # 匹配 codex- 开头的 JSON 文件
API_URL = "http://sub2api的域名/api/v1/admin/accounts/data"
SHANGHAI_TZ = timezone(timedelta(hours=8))
HEADERS = {
"Authorization": "Bearer " , # 在这里填入 sub2api 的 JWT Token
"Content-Type": "application/json",
}
def decode_jwt_payload(token: str) -> dict[str, Any]:
parts = token.split(".")
if len(parts) != 3:
raise ValueError("JWT 格式不正确")
payload = parts[1]
padded = payload + "=" * (-len(payload) % 4)
decoded = base64.urlsafe_b64decode(padded.encode("utf-8"))
data = json.loads(decoded.decode("utf-8"))
if not isinstance(data, dict):
raise ValueError("JWT payload 不是对象")
return data
def parse_datetime(value: str) -> datetime:
normalized = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed
def compact_dict(value: dict[str, Any]) -> dict[str, Any]:
return {key: item for key, item in value.items() if item is not None}
def build_account_payload(source: dict[str, Any]) -> dict[str, Any]:
access_token = source["access_token"]
id_token = source["id_token"]
access_claims = decode_jwt_payload(access_token)
id_claims = decode_jwt_payload(id_token)
auth_claims = access_claims.get("https://api.openai.com/auth", {})
if not isinstance(auth_claims, dict):
auth_claims = {}
id_auth_claims = id_claims.get("https://api.openai.com/auth", {})
if not isinstance(id_auth_claims, dict):
id_auth_claims = {}
organizations = id_auth_claims.get("organizations", [])
organization_id = None
if isinstance(organizations, list) and organizations:
first_organization = organizations[0]
if isinstance(first_organization, dict):
organization_id = first_organization.get("id")
expired_at = parse_datetime(source["expired"])
token_issued_at = access_claims.get("iat")
token_expires_at = access_claims.get("exp")
expires_in = None
if isinstance(token_issued_at, int) and isinstance(token_expires_at, int):
expires_in = token_expires_at - token_issued_at
email = source.get("email") or auth_claims.get("email")
credentials = compact_dict(
{
"_token_version": token_issued_at * 1000 if isinstance(token_issued_at, int) else None,
"access_token": access_token,
"chatgpt_account_id": source.get("account_id") or auth_claims.get("chatgpt_account_id"),
"chatgpt_user_id": auth_claims.get("chatgpt_user_id") or auth_claims.get("user_id"),
"email": email,
"expires_at": expired_at.astimezone(SHANGHAI_TZ).replace(microsecond=0).isoformat(),
"expires_in": expires_in,
"id_token": id_token,
"organization_id": organization_id,
"refresh_token": source.get("refresh_token"),
}
)
account = {
"name": email,
"platform": "openai",
"type": "oauth",
"credentials": credentials,
"extra": compact_dict({"email": email}),
"concurrency": 10,
"priority": 1,
"rate_multiplier": 1,
"auto_pause_on_expired": True,
}
return {
"exported_at": datetime.now(timezone.utc).replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ"),
"proxies": [],
"accounts": [account],
}
def upload_payload(payload: dict[str, Any]) -> tuple[int, str]:
body = json.dumps(
{"data": payload, "skip_default_group_bind": True},
ensure_ascii=False,
).encode("utf-8")
req = request.Request(API_URL, data=body, headers=HEADERS, method="POST")
with request.urlopen(req, timeout=30) as response:
raw = response.read().decode("utf-8", errors="replace")
return response.status, raw
def process_file(file_path: Path) -> bool:
try:
source = json.loads(file_path.read_text(encoding="utf-8"))
if not isinstance(source, dict):
raise ValueError("JSON 根节点不是对象")
payload = build_account_payload(source)
status_code, response_text = upload_payload(payload)
print(f"[OK] {file_path.name} -> HTTP {status_code}")
print(response_text)
return True
except error.HTTPError as exc:
response_text = exc.read().decode("utf-8", errors="replace")
print(f"[HTTP ERROR] {file_path.name} -> HTTP {exc.code}")
print(response_text)
except error.URLError as exc:
print(f"[URL ERROR] {file_path.name} -> {exc.reason}")
except Exception as exc:
print(f"[ERROR] {file_path.name} -> {exc}")
return False
def main() -> int:
if not AUTHS_DIR.exists():
print(f"未找到目录: {AUTHS_DIR}")
return 1
files = sorted(path for path in AUTHS_DIR.glob(FILE_PATTERN) if path.is_file())
if not files:
print(f"未找到文件: {AUTHS_DIR / FILE_PATTERN}")
return 1
success_count = 0
for file_path in files:
if process_file(file_path):
success_count += 1
print(f"完成: 成功 {success_count} / 总计 {len(files)}")
return 0 if success_count == len(files) else 2
if __name__ == "__main__":
raise SystemExit(main())
网友解答:
--【壹】--:
感分享谢
--【贰】--: 還記得你說家是唯一的城堡 隨著稻香河流繼續奔跑 微微笑 小時候的夢我知道:
感谢分享(*ゝω・)
感谢分享(*ゝω・) 刚好用上了,维护两个网站有点累
--【叁】--:
是啊,具体有什么用?sub2api干嘛 的
--【肆】--:
cpa和sub2api有什么区别吗?
--【伍】--:
感谢分享(*ゝω・)
--【陆】--:
哇,感谢大佬
--【柒】--:
感谢分享
--【捌】--:
同问,我也想知道,我目前感觉好像都是可以反代中转,但是sub2api好像可以开多账户,分组这样用,适合多人企业用,CPA好像更适合个人,我是这样理解的
--【玖】--:
cpa怎么转sub2api,我没用过sub2api,佬能指点一下吗?
如图所示:
image402×290 13.9 KB
image986×268 11.4 KB
在 L 站 和 Github 上找了一圈都没找到想要的,干脆直接让 AI 做一个了。分享出来吧
from __future__ import annotations
import base64
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib import error, request
AUTHS_DIR = Path(__file__).resolve().parent / "auths"
FILE_PATTERN = "codex-*.json" # 匹配 codex- 开头的 JSON 文件
API_URL = "http://sub2api的域名/api/v1/admin/accounts/data"
SHANGHAI_TZ = timezone(timedelta(hours=8))
HEADERS = {
"Authorization": "Bearer " , # 在这里填入 sub2api 的 JWT Token
"Content-Type": "application/json",
}
def decode_jwt_payload(token: str) -> dict[str, Any]:
parts = token.split(".")
if len(parts) != 3:
raise ValueError("JWT 格式不正确")
payload = parts[1]
padded = payload + "=" * (-len(payload) % 4)
decoded = base64.urlsafe_b64decode(padded.encode("utf-8"))
data = json.loads(decoded.decode("utf-8"))
if not isinstance(data, dict):
raise ValueError("JWT payload 不是对象")
return data
def parse_datetime(value: str) -> datetime:
normalized = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed
def compact_dict(value: dict[str, Any]) -> dict[str, Any]:
return {key: item for key, item in value.items() if item is not None}
def build_account_payload(source: dict[str, Any]) -> dict[str, Any]:
access_token = source["access_token"]
id_token = source["id_token"]
access_claims = decode_jwt_payload(access_token)
id_claims = decode_jwt_payload(id_token)
auth_claims = access_claims.get("https://api.openai.com/auth", {})
if not isinstance(auth_claims, dict):
auth_claims = {}
id_auth_claims = id_claims.get("https://api.openai.com/auth", {})
if not isinstance(id_auth_claims, dict):
id_auth_claims = {}
organizations = id_auth_claims.get("organizations", [])
organization_id = None
if isinstance(organizations, list) and organizations:
first_organization = organizations[0]
if isinstance(first_organization, dict):
organization_id = first_organization.get("id")
expired_at = parse_datetime(source["expired"])
token_issued_at = access_claims.get("iat")
token_expires_at = access_claims.get("exp")
expires_in = None
if isinstance(token_issued_at, int) and isinstance(token_expires_at, int):
expires_in = token_expires_at - token_issued_at
email = source.get("email") or auth_claims.get("email")
credentials = compact_dict(
{
"_token_version": token_issued_at * 1000 if isinstance(token_issued_at, int) else None,
"access_token": access_token,
"chatgpt_account_id": source.get("account_id") or auth_claims.get("chatgpt_account_id"),
"chatgpt_user_id": auth_claims.get("chatgpt_user_id") or auth_claims.get("user_id"),
"email": email,
"expires_at": expired_at.astimezone(SHANGHAI_TZ).replace(microsecond=0).isoformat(),
"expires_in": expires_in,
"id_token": id_token,
"organization_id": organization_id,
"refresh_token": source.get("refresh_token"),
}
)
account = {
"name": email,
"platform": "openai",
"type": "oauth",
"credentials": credentials,
"extra": compact_dict({"email": email}),
"concurrency": 10,
"priority": 1,
"rate_multiplier": 1,
"auto_pause_on_expired": True,
}
return {
"exported_at": datetime.now(timezone.utc).replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ"),
"proxies": [],
"accounts": [account],
}
def upload_payload(payload: dict[str, Any]) -> tuple[int, str]:
body = json.dumps(
{"data": payload, "skip_default_group_bind": True},
ensure_ascii=False,
).encode("utf-8")
req = request.Request(API_URL, data=body, headers=HEADERS, method="POST")
with request.urlopen(req, timeout=30) as response:
raw = response.read().decode("utf-8", errors="replace")
return response.status, raw
def process_file(file_path: Path) -> bool:
try:
source = json.loads(file_path.read_text(encoding="utf-8"))
if not isinstance(source, dict):
raise ValueError("JSON 根节点不是对象")
payload = build_account_payload(source)
status_code, response_text = upload_payload(payload)
print(f"[OK] {file_path.name} -> HTTP {status_code}")
print(response_text)
return True
except error.HTTPError as exc:
response_text = exc.read().decode("utf-8", errors="replace")
print(f"[HTTP ERROR] {file_path.name} -> HTTP {exc.code}")
print(response_text)
except error.URLError as exc:
print(f"[URL ERROR] {file_path.name} -> {exc.reason}")
except Exception as exc:
print(f"[ERROR] {file_path.name} -> {exc}")
return False
def main() -> int:
if not AUTHS_DIR.exists():
print(f"未找到目录: {AUTHS_DIR}")
return 1
files = sorted(path for path in AUTHS_DIR.glob(FILE_PATTERN) if path.is_file())
if not files:
print(f"未找到文件: {AUTHS_DIR / FILE_PATTERN}")
return 1
success_count = 0
for file_path in files:
if process_file(file_path):
success_count += 1
print(f"完成: 成功 {success_count} / 总计 {len(files)}")
return 0 if success_count == len(files) else 2
if __name__ == "__main__":
raise SystemExit(main())
网友解答:
--【壹】--:
感分享谢
--【贰】--: 還記得你說家是唯一的城堡 隨著稻香河流繼續奔跑 微微笑 小時候的夢我知道:
感谢分享(*ゝω・)
感谢分享(*ゝω・) 刚好用上了,维护两个网站有点累
--【叁】--:
是啊,具体有什么用?sub2api干嘛 的
--【肆】--:
cpa和sub2api有什么区别吗?
--【伍】--:
感谢分享(*ゝω・)
--【陆】--:
哇,感谢大佬
--【柒】--:
感谢分享
--【捌】--:
同问,我也想知道,我目前感觉好像都是可以反代中转,但是sub2api好像可以开多账户,分组这样用,适合多人企业用,CPA好像更适合个人,我是这样理解的
--【玖】--:
cpa怎么转sub2api,我没用过sub2api,佬能指点一下吗?

