grok oss批量测活py
- 内容介绍
- 文章标签
- 相关推荐
用的grok2api那个项目改的,scripts/check_alive.py路径
python3 check_alive.py
import asyncio
import sys
import time
import uuid
from pathlib import Path
import orjson
from curl_cffi.requests import AsyncSession
INPUT_FILE = "tokens.txt"
OUTPUT_OK = "result_ok.txt"
OUTPUT_4XX = "result_4xx.txt"
OUTPUT_5XX = "result_5xx.txt"
CONCURRENCY = 50
PROXY_URL = ""
BROWSER = "chrome136"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.0.0 Safari/537.36"
)
TIMEOUT = 30
RATE_LIMITS_API = "https://grok.com/rest/rate-limits"
MODEL_NAME = "grok-4-1-thinking-1129"
def _build_headers(token: str) -> dict:
raw = token[4:] if token.startswith("sso=") else token
cookie = f"sso={raw}; sso-rw={raw}"
return {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Content-Type": "application/json",
"Cookie": cookie,
"Origin": "https://grok.com",
"Referer": "https://grok.com/",
"Sec-Ch-Ua": '"Google Chrome";v="136", "Chromium";v="136", "Not(A:Brand";v="24"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"macOS"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": USER_AGENT,
"x-statsig-id": f"v3_{uuid.uuid4().hex}",
"x-xai-request-id": str(uuid.uuid4()),
}
def _classify(code: int) -> str:
if code == 200:
return "ok"
if 400 <= code < 500:
return "4xx"
return "5xx"
async def check_one(token: str, session: AsyncSession, semaphore: asyncio.Semaphore):
raw = token[4:] if token.startswith("sso=") else token
async with semaphore:
try:
proxies = {"http": PROXY_URL, "https": PROXY_URL} if PROXY_URL else None
resp = await session.post(
RATE_LIMITS_API,
headers=_build_headers(raw),
data=orjson.dumps({"requestKind": "DEFAULT", "modelName": MODEL_NAME}),
timeout=TIMEOUT,
proxies=proxies,
impersonate=BROWSER,
)
code = resp.status_code
try:
body = resp.text[:300]
except Exception:
body = "N/A"
return raw, code, _classify(code), body
except asyncio.TimeoutError:
return raw, 0, "5xx", "timeout"
except Exception as e:
return raw, 0, "5xx", str(e)[:300]
def _load_tokens(path: str) -> list[str]:
p = Path(path)
if not p.exists():
print(f"[错误] 输入文件不存在: {path}")
sys.exit(1)
lines = p.read_text(encoding="utf-8").splitlines()
tokens, seen = [], set()
for line in lines:
t = line.strip()
if not t:
continue
raw = t[4:] if t.startswith("sso=") else t
if raw in seen:
continue
seen.add(raw)
tokens.append(raw)
return tokens
def _write_results(path: str, entries: list):
with open(path, "w", encoding="utf-8") as f:
for raw, code, body in entries:
if body and code != 200:
f.write(f"{raw}\t{code}\t{body}\n")
else:
f.write(f"{raw}\n")
async def main():
tokens = _load_tokens(INPUT_FILE)
total = len(tokens)
print(f"[信息] 读取 token: {total} 条(已去重)")
print(f"[信息] 并发数: {CONCURRENCY} 代理: {PROXY_URL or '无'}")
print(f"[信息] 模型: {MODEL_NAME}")
print(f"[信息] 输出: {OUTPUT_OK} / {OUTPUT_4XX} / {OUTPUT_5XX}")
print("-" * 70)
semaphore = asyncio.Semaphore(CONCURRENCY)
ok_list, xx4_list, xx5_list = [], [], []
done = 0
start_time = time.monotonic()
lock = asyncio.Lock()
async def _task(token: str, session: AsyncSession):
nonlocal done
raw, code, category, body = await check_one(token, session, semaphore)
entry = (raw, code, body)
async with lock:
if category == "ok":
ok_list.append(entry)
tag = "[正常]"
elif category == "4xx":
xx4_list.append(entry)
tag = "[4xx] "
else:
xx5_list.append(entry)
tag = "[5xx] "
done += 1
elapsed = time.monotonic() - start_time
speed = done / elapsed if elapsed > 0 else 0
eta = (total - done) / speed if speed > 0 else 0
print(
f"{tag} {done:>5}/{total} code={code} "
f"ok={len(ok_list)} 4xx={len(xx4_list)} 5xx={len(xx5_list)} "
f"{speed:.1f}条/s 剩余≈{eta:.0f}s"
)
print(f" token={raw[:20]}...")
print(f" 响应 : {body[:200]}")
async with AsyncSession(impersonate=BROWSER) as session:
tasks = [_task(t, session) for t in tokens]
await asyncio.gather(*tasks)
_write_results(OUTPUT_OK, ok_list)
_write_results(OUTPUT_4XX, xx4_list)
_write_results(OUTPUT_5XX, xx5_list)
elapsed = time.monotonic() - start_time
print("\n" + "=" * 70)
print(f" 总计: {total} 耗时: {elapsed:.1f}s 平均: {total/elapsed:.1f}条/s")
print(f" [正常] {len(ok_list)} 条 → {OUTPUT_OK}")
print(f" [4xx] {len(xx4_list)} 条 → {OUTPUT_4XX}")
print(f" [5xx] {len(xx5_list)} 条 → {OUTPUT_5XX}")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())
简单测活和额度,可能有误测的情况可以换ip重复测一下,我自己几十万的无所谓哈哈哈
image1264×991 68.9 KB
--【壹】--:
真的是节点的问题啊,真不错啊,而且这个号也不容易被封啊,3400多号检出来300个左右!
--【贰】--:
谢谢佬友分享,待会试一下
--【叁】--:
换个好IP cf的盾不一直都有的么?
--【肆】--:
======================================================================
总计: 5000 耗时: 233.4s 平均: 21.4条/s
[正常] 1543 条 → result_ok.txt
[4xx] 3456 条 → result_4xx.txt
[5xx] 1 条 → result_5xx.txt
======================================================================
惨啊
--【伍】--:
image1117×585 23.8 KB
是不是已经不行了呀 现在加了盾了
--【陆】--:
4w存活3000
--【柒】--:
我的是干公益站,几B的token使用量没办法
用的grok2api那个项目改的,scripts/check_alive.py路径
python3 check_alive.py
import asyncio
import sys
import time
import uuid
from pathlib import Path
import orjson
from curl_cffi.requests import AsyncSession
INPUT_FILE = "tokens.txt"
OUTPUT_OK = "result_ok.txt"
OUTPUT_4XX = "result_4xx.txt"
OUTPUT_5XX = "result_5xx.txt"
CONCURRENCY = 50
PROXY_URL = ""
BROWSER = "chrome136"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.0.0 Safari/537.36"
)
TIMEOUT = 30
RATE_LIMITS_API = "https://grok.com/rest/rate-limits"
MODEL_NAME = "grok-4-1-thinking-1129"
def _build_headers(token: str) -> dict:
raw = token[4:] if token.startswith("sso=") else token
cookie = f"sso={raw}; sso-rw={raw}"
return {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Content-Type": "application/json",
"Cookie": cookie,
"Origin": "https://grok.com",
"Referer": "https://grok.com/",
"Sec-Ch-Ua": '"Google Chrome";v="136", "Chromium";v="136", "Not(A:Brand";v="24"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"macOS"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": USER_AGENT,
"x-statsig-id": f"v3_{uuid.uuid4().hex}",
"x-xai-request-id": str(uuid.uuid4()),
}
def _classify(code: int) -> str:
if code == 200:
return "ok"
if 400 <= code < 500:
return "4xx"
return "5xx"
async def check_one(token: str, session: AsyncSession, semaphore: asyncio.Semaphore):
raw = token[4:] if token.startswith("sso=") else token
async with semaphore:
try:
proxies = {"http": PROXY_URL, "https": PROXY_URL} if PROXY_URL else None
resp = await session.post(
RATE_LIMITS_API,
headers=_build_headers(raw),
data=orjson.dumps({"requestKind": "DEFAULT", "modelName": MODEL_NAME}),
timeout=TIMEOUT,
proxies=proxies,
impersonate=BROWSER,
)
code = resp.status_code
try:
body = resp.text[:300]
except Exception:
body = "N/A"
return raw, code, _classify(code), body
except asyncio.TimeoutError:
return raw, 0, "5xx", "timeout"
except Exception as e:
return raw, 0, "5xx", str(e)[:300]
def _load_tokens(path: str) -> list[str]:
p = Path(path)
if not p.exists():
print(f"[错误] 输入文件不存在: {path}")
sys.exit(1)
lines = p.read_text(encoding="utf-8").splitlines()
tokens, seen = [], set()
for line in lines:
t = line.strip()
if not t:
continue
raw = t[4:] if t.startswith("sso=") else t
if raw in seen:
continue
seen.add(raw)
tokens.append(raw)
return tokens
def _write_results(path: str, entries: list):
with open(path, "w", encoding="utf-8") as f:
for raw, code, body in entries:
if body and code != 200:
f.write(f"{raw}\t{code}\t{body}\n")
else:
f.write(f"{raw}\n")
async def main():
tokens = _load_tokens(INPUT_FILE)
total = len(tokens)
print(f"[信息] 读取 token: {total} 条(已去重)")
print(f"[信息] 并发数: {CONCURRENCY} 代理: {PROXY_URL or '无'}")
print(f"[信息] 模型: {MODEL_NAME}")
print(f"[信息] 输出: {OUTPUT_OK} / {OUTPUT_4XX} / {OUTPUT_5XX}")
print("-" * 70)
semaphore = asyncio.Semaphore(CONCURRENCY)
ok_list, xx4_list, xx5_list = [], [], []
done = 0
start_time = time.monotonic()
lock = asyncio.Lock()
async def _task(token: str, session: AsyncSession):
nonlocal done
raw, code, category, body = await check_one(token, session, semaphore)
entry = (raw, code, body)
async with lock:
if category == "ok":
ok_list.append(entry)
tag = "[正常]"
elif category == "4xx":
xx4_list.append(entry)
tag = "[4xx] "
else:
xx5_list.append(entry)
tag = "[5xx] "
done += 1
elapsed = time.monotonic() - start_time
speed = done / elapsed if elapsed > 0 else 0
eta = (total - done) / speed if speed > 0 else 0
print(
f"{tag} {done:>5}/{total} code={code} "
f"ok={len(ok_list)} 4xx={len(xx4_list)} 5xx={len(xx5_list)} "
f"{speed:.1f}条/s 剩余≈{eta:.0f}s"
)
print(f" token={raw[:20]}...")
print(f" 响应 : {body[:200]}")
async with AsyncSession(impersonate=BROWSER) as session:
tasks = [_task(t, session) for t in tokens]
await asyncio.gather(*tasks)
_write_results(OUTPUT_OK, ok_list)
_write_results(OUTPUT_4XX, xx4_list)
_write_results(OUTPUT_5XX, xx5_list)
elapsed = time.monotonic() - start_time
print("\n" + "=" * 70)
print(f" 总计: {total} 耗时: {elapsed:.1f}s 平均: {total/elapsed:.1f}条/s")
print(f" [正常] {len(ok_list)} 条 → {OUTPUT_OK}")
print(f" [4xx] {len(xx4_list)} 条 → {OUTPUT_4XX}")
print(f" [5xx] {len(xx5_list)} 条 → {OUTPUT_5XX}")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())
简单测活和额度,可能有误测的情况可以换ip重复测一下,我自己几十万的无所谓哈哈哈
image1264×991 68.9 KB
--【壹】--:
真的是节点的问题啊,真不错啊,而且这个号也不容易被封啊,3400多号检出来300个左右!
--【贰】--:
谢谢佬友分享,待会试一下
--【叁】--:
换个好IP cf的盾不一直都有的么?
--【肆】--:
======================================================================
总计: 5000 耗时: 233.4s 平均: 21.4条/s
[正常] 1543 条 → result_ok.txt
[4xx] 3456 条 → result_4xx.txt
[5xx] 1 条 → result_5xx.txt
======================================================================
惨啊
--【伍】--:
image1117×585 23.8 KB
是不是已经不行了呀 现在加了盾了
--【陆】--:
4w存活3000
--【柒】--:
我的是干公益站,几B的token使用量没办法

