grok oss批量测活py

2026-04-11 08:221阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐
问题描述:

用的grok2api那个项目改的,scripts/check_alive.py路径

python3 check_alive.py

import asyncio import sys import time import uuid from pathlib import Path import orjson from curl_cffi.requests import AsyncSession INPUT_FILE = "tokens.txt" OUTPUT_OK = "result_ok.txt" OUTPUT_4XX = "result_4xx.txt" OUTPUT_5XX = "result_5xx.txt" CONCURRENCY = 50 PROXY_URL = "" BROWSER = "chrome136" USER_AGENT = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/136.0.0.0 Safari/537.36" ) TIMEOUT = 30 RATE_LIMITS_API = "https://grok.com/rest/rate-limits" MODEL_NAME = "grok-4-1-thinking-1129" def _build_headers(token: str) -> dict: raw = token[4:] if token.startswith("sso=") else token cookie = f"sso={raw}; sso-rw={raw}" return { "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Content-Type": "application/json", "Cookie": cookie, "Origin": "https://grok.com", "Referer": "https://grok.com/", "Sec-Ch-Ua": '"Google Chrome";v="136", "Chromium";v="136", "Not(A:Brand";v="24"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"macOS"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": USER_AGENT, "x-statsig-id": f"v3_{uuid.uuid4().hex}", "x-xai-request-id": str(uuid.uuid4()), } def _classify(code: int) -> str: if code == 200: return "ok" if 400 <= code < 500: return "4xx" return "5xx" async def check_one(token: str, session: AsyncSession, semaphore: asyncio.Semaphore): raw = token[4:] if token.startswith("sso=") else token async with semaphore: try: proxies = {"http": PROXY_URL, "https": PROXY_URL} if PROXY_URL else None resp = await session.post( RATE_LIMITS_API, headers=_build_headers(raw), data=orjson.dumps({"requestKind": "DEFAULT", "modelName": MODEL_NAME}), timeout=TIMEOUT, proxies=proxies, impersonate=BROWSER, ) code = resp.status_code try: body = resp.text[:300] except Exception: body = "N/A" return raw, code, _classify(code), body except asyncio.TimeoutError: return raw, 0, "5xx", "timeout" except Exception as e: return raw, 0, "5xx", str(e)[:300] def _load_tokens(path: str) -> list[str]: p = Path(path) if not p.exists(): print(f"[错误] 输入文件不存在: {path}") sys.exit(1) lines = p.read_text(encoding="utf-8").splitlines() tokens, seen = [], set() for line in lines: t = line.strip() if not t: continue raw = t[4:] if t.startswith("sso=") else t if raw in seen: continue seen.add(raw) tokens.append(raw) return tokens def _write_results(path: str, entries: list): with open(path, "w", encoding="utf-8") as f: for raw, code, body in entries: if body and code != 200: f.write(f"{raw}\t{code}\t{body}\n") else: f.write(f"{raw}\n") async def main(): tokens = _load_tokens(INPUT_FILE) total = len(tokens) print(f"[信息] 读取 token: {total} 条(已去重)") print(f"[信息] 并发数: {CONCURRENCY} 代理: {PROXY_URL or '无'}") print(f"[信息] 模型: {MODEL_NAME}") print(f"[信息] 输出: {OUTPUT_OK} / {OUTPUT_4XX} / {OUTPUT_5XX}") print("-" * 70) semaphore = asyncio.Semaphore(CONCURRENCY) ok_list, xx4_list, xx5_list = [], [], [] done = 0 start_time = time.monotonic() lock = asyncio.Lock() async def _task(token: str, session: AsyncSession): nonlocal done raw, code, category, body = await check_one(token, session, semaphore) entry = (raw, code, body) async with lock: if category == "ok": ok_list.append(entry) tag = "[正常]" elif category == "4xx": xx4_list.append(entry) tag = "[4xx] " else: xx5_list.append(entry) tag = "[5xx] " done += 1 elapsed = time.monotonic() - start_time speed = done / elapsed if elapsed > 0 else 0 eta = (total - done) / speed if speed > 0 else 0 print( f"{tag} {done:>5}/{total} code={code} " f"ok={len(ok_list)} 4xx={len(xx4_list)} 5xx={len(xx5_list)} " f"{speed:.1f}条/s 剩余≈{eta:.0f}s" ) print(f" token={raw[:20]}...") print(f" 响应 : {body[:200]}") async with AsyncSession(impersonate=BROWSER) as session: tasks = [_task(t, session) for t in tokens] await asyncio.gather(*tasks) _write_results(OUTPUT_OK, ok_list) _write_results(OUTPUT_4XX, xx4_list) _write_results(OUTPUT_5XX, xx5_list) elapsed = time.monotonic() - start_time print("\n" + "=" * 70) print(f" 总计: {total} 耗时: {elapsed:.1f}s 平均: {total/elapsed:.1f}条/s") print(f" [正常] {len(ok_list)} 条 → {OUTPUT_OK}") print(f" [4xx] {len(xx4_list)} 条 → {OUTPUT_4XX}") print(f" [5xx] {len(xx5_list)} 条 → {OUTPUT_5XX}") print("=" * 70) if __name__ == "__main__": asyncio.run(main())

简单测活和额度,可能有误测的情况可以换ip重复测一下,我自己几十万的无所谓哈哈哈
image1264×991 68.9 KB

网友解答:
--【壹】--:

真的是节点的问题啊,真不错啊,而且这个号也不容易被封啊,3400多号检出来300个左右!


--【贰】--:

谢谢佬友分享,待会试一下


--【叁】--:

换个好IP cf的盾不一直都有的么?


--【肆】--:

====================================================================== 总计: 5000 耗时: 233.4s 平均: 21.4条/s [正常] 1543 条 → result_ok.txt [4xx] 3456 条 → result_4xx.txt [5xx] 1 条 → result_5xx.txt ======================================================================

惨啊


--【伍】--:

image1117×585 23.8 KB
是不是已经不行了呀 现在加了盾了


--【陆】--:


4w存活3000


--【柒】--:

我的是干公益站,几B的token使用量没办法

问题描述:

用的grok2api那个项目改的,scripts/check_alive.py路径

python3 check_alive.py

import asyncio import sys import time import uuid from pathlib import Path import orjson from curl_cffi.requests import AsyncSession INPUT_FILE = "tokens.txt" OUTPUT_OK = "result_ok.txt" OUTPUT_4XX = "result_4xx.txt" OUTPUT_5XX = "result_5xx.txt" CONCURRENCY = 50 PROXY_URL = "" BROWSER = "chrome136" USER_AGENT = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/136.0.0.0 Safari/537.36" ) TIMEOUT = 30 RATE_LIMITS_API = "https://grok.com/rest/rate-limits" MODEL_NAME = "grok-4-1-thinking-1129" def _build_headers(token: str) -> dict: raw = token[4:] if token.startswith("sso=") else token cookie = f"sso={raw}; sso-rw={raw}" return { "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Content-Type": "application/json", "Cookie": cookie, "Origin": "https://grok.com", "Referer": "https://grok.com/", "Sec-Ch-Ua": '"Google Chrome";v="136", "Chromium";v="136", "Not(A:Brand";v="24"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"macOS"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": USER_AGENT, "x-statsig-id": f"v3_{uuid.uuid4().hex}", "x-xai-request-id": str(uuid.uuid4()), } def _classify(code: int) -> str: if code == 200: return "ok" if 400 <= code < 500: return "4xx" return "5xx" async def check_one(token: str, session: AsyncSession, semaphore: asyncio.Semaphore): raw = token[4:] if token.startswith("sso=") else token async with semaphore: try: proxies = {"http": PROXY_URL, "https": PROXY_URL} if PROXY_URL else None resp = await session.post( RATE_LIMITS_API, headers=_build_headers(raw), data=orjson.dumps({"requestKind": "DEFAULT", "modelName": MODEL_NAME}), timeout=TIMEOUT, proxies=proxies, impersonate=BROWSER, ) code = resp.status_code try: body = resp.text[:300] except Exception: body = "N/A" return raw, code, _classify(code), body except asyncio.TimeoutError: return raw, 0, "5xx", "timeout" except Exception as e: return raw, 0, "5xx", str(e)[:300] def _load_tokens(path: str) -> list[str]: p = Path(path) if not p.exists(): print(f"[错误] 输入文件不存在: {path}") sys.exit(1) lines = p.read_text(encoding="utf-8").splitlines() tokens, seen = [], set() for line in lines: t = line.strip() if not t: continue raw = t[4:] if t.startswith("sso=") else t if raw in seen: continue seen.add(raw) tokens.append(raw) return tokens def _write_results(path: str, entries: list): with open(path, "w", encoding="utf-8") as f: for raw, code, body in entries: if body and code != 200: f.write(f"{raw}\t{code}\t{body}\n") else: f.write(f"{raw}\n") async def main(): tokens = _load_tokens(INPUT_FILE) total = len(tokens) print(f"[信息] 读取 token: {total} 条(已去重)") print(f"[信息] 并发数: {CONCURRENCY} 代理: {PROXY_URL or '无'}") print(f"[信息] 模型: {MODEL_NAME}") print(f"[信息] 输出: {OUTPUT_OK} / {OUTPUT_4XX} / {OUTPUT_5XX}") print("-" * 70) semaphore = asyncio.Semaphore(CONCURRENCY) ok_list, xx4_list, xx5_list = [], [], [] done = 0 start_time = time.monotonic() lock = asyncio.Lock() async def _task(token: str, session: AsyncSession): nonlocal done raw, code, category, body = await check_one(token, session, semaphore) entry = (raw, code, body) async with lock: if category == "ok": ok_list.append(entry) tag = "[正常]" elif category == "4xx": xx4_list.append(entry) tag = "[4xx] " else: xx5_list.append(entry) tag = "[5xx] " done += 1 elapsed = time.monotonic() - start_time speed = done / elapsed if elapsed > 0 else 0 eta = (total - done) / speed if speed > 0 else 0 print( f"{tag} {done:>5}/{total} code={code} " f"ok={len(ok_list)} 4xx={len(xx4_list)} 5xx={len(xx5_list)} " f"{speed:.1f}条/s 剩余≈{eta:.0f}s" ) print(f" token={raw[:20]}...") print(f" 响应 : {body[:200]}") async with AsyncSession(impersonate=BROWSER) as session: tasks = [_task(t, session) for t in tokens] await asyncio.gather(*tasks) _write_results(OUTPUT_OK, ok_list) _write_results(OUTPUT_4XX, xx4_list) _write_results(OUTPUT_5XX, xx5_list) elapsed = time.monotonic() - start_time print("\n" + "=" * 70) print(f" 总计: {total} 耗时: {elapsed:.1f}s 平均: {total/elapsed:.1f}条/s") print(f" [正常] {len(ok_list)} 条 → {OUTPUT_OK}") print(f" [4xx] {len(xx4_list)} 条 → {OUTPUT_4XX}") print(f" [5xx] {len(xx5_list)} 条 → {OUTPUT_5XX}") print("=" * 70) if __name__ == "__main__": asyncio.run(main())

简单测活和额度,可能有误测的情况可以换ip重复测一下,我自己几十万的无所谓哈哈哈
image1264×991 68.9 KB

网友解答:
--【壹】--:

真的是节点的问题啊,真不错啊,而且这个号也不容易被封啊,3400多号检出来300个左右!


--【贰】--:

谢谢佬友分享,待会试一下


--【叁】--:

换个好IP cf的盾不一直都有的么?


--【肆】--:

====================================================================== 总计: 5000 耗时: 233.4s 平均: 21.4条/s [正常] 1543 条 → result_ok.txt [4xx] 3456 条 → result_4xx.txt [5xx] 1 条 → result_5xx.txt ======================================================================

惨啊


--【伍】--:

image1117×585 23.8 KB
是不是已经不行了呀 现在加了盾了


--【陆】--:


4w存活3000


--【柒】--:

我的是干公益站,几B的token使用量没办法