如何通过实例详细解析Python RPC的实现机制与技巧?

2026-05-22 03:161阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计991个文字,预计阅读时间需要4分钟。

如何通过实例详细解析Python RPC的实现机制与技巧?

使用单线程同步,通过socket传输数据,使用JSON序列化消息体,用struct将消息编码为二进制字符串,进行网络传输。采用消息协议:输入{in: ping, params: ireader 0},输出{out: pong, result: ireader 0},客户端。

单线程同步

如何通过实例详细解析Python RPC的实现机制与技巧?

  • 使用socket传输数据
  • 使用json序列化消息体
  • struct将消息编码为二进制字节串,进行网络传输

消息协议

// 输入 { in: "ping", params: "ireader 0" } // 输出 { out: "pong", result: "ireader 0" }

客户端 client.py

# coding: utf-8 # client.py import json import time import struct import socket def rpc(sock, in_, params): response = json.dumps({"in": in_, "params": params}) # 请求消息体 length_prefix = struct.pack("I", len(response)) # 请求长度前缀 sock.sendall(length_prefix) sock.sendall(response) length_prefix = sock.recv(4) # 响应长度前缀 length, = struct.unpack("I", length_prefix) body = sock.recv(length) # 响应消息体 response = json.loads(body) return response["out"], response["result"] # 返回响应类型和结果 if __name__ == '__main__': s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(("localhost", 8080)) for i in range(10): # 连续发送10个rpc请求 out, result = rpc(s, "ping", "ireader %d" % i) print out, result time.sleep(1) # 休眠1s,便于观察 s.close() # 关闭连接

服务端 blocking_single.py

# coding: utf8 # blocking_single.py import json import struct import socket def handle_conn(conn, addr, handlers): print addr, "comes" while True: # 循环读写 length_prefix = conn.recv(4) # 请求长度前缀 if not length_prefix: # 连接关闭了 print addr, "bye" conn.close() break # 退出循环,处理下一个连接 length, = struct.unpack("I", length_prefix) body = conn.recv(length) # 请求消息体 request = json.loads(body) in_ = request['in'] params = request['params'] print in_, params handler = handlers[in_] # 查找请求处理器 handler(conn, params) # 处理请求 def loop(sock, handlers): while True: conn, addr = sock.accept() # 接收连接 handle_conn(conn, addr, handlers) # 处理连接 def ping(conn, params): send_result(conn, "pong", params) def send_result(conn, out, result): response = json.dumps({"out": out, "result": result}) # 响应消息体 length_prefix = struct.pack("I", len(response)) # 响应长度前缀 conn.sendall(length_prefix) conn.sendall(response) if __name__ == '__main__': sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个TCP套接字 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 打开reuse addr选项 sock.bind(("localhost", 8080)) # 绑定端口 sock.listen(1) # 监听客户端连接 handlers = { # 注册请求处理器 "ping": ping } loop(sock, handlers) # 进入服务循环

多线程同步

  • 使用线程库thread创建原生线程
  • 服务器可并行处理多个客户端

服务端 multithread.py

多进程同步

  • Python的GIL导致单个进程只能占满一个CPU核心,多线程无法利用多核优势
  • os.fork()会生成子进程
  • 子进程退出后,父进程需使用waitpid系统调用收割子进程,防止其称为僵尸资源
  • 在子进程中关闭服务器套接字后,在父进程中也要关闭服务器套接字
  • 因为进程fork后,父子进程都有自己的套接字引用指向内核的同一份套接字对象,套接字引用计数为2,对套接字进程close,即将套接字对象的引用计数减1

PreForking同步

  • 进程比线程耗费资源,通过PreForking进程池模型对服务器开辟的进程数量进行限制,避免服务器负载过重
  • 如果并行的连接数量超过了prefork进程数量,后来的客户端请求将会阻塞

单进程异步

  • 通过事件轮询API,查询相关套接字是否有响应的读写事件,有则携带事件列表返回,没有则阻塞
  • 拿到读写事件后,可对事件相关的套接字进行读写操作
  • 设置读写缓冲区
  • Nginx/Nodejs/Redis都是基于异步模型
  • 异步模型编码成本高,易出错,通常在公司业务代码中采用同步模型,仅在讲究高并发高性能的场合才使用异步模型

PreForking异步

Tornado/Nginx采用了多进程PreForking异步模型,具有良好的高并发处理能力

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持易盾网络。

本文共计991个文字,预计阅读时间需要4分钟。

如何通过实例详细解析Python RPC的实现机制与技巧?

使用单线程同步,通过socket传输数据,使用JSON序列化消息体,用struct将消息编码为二进制字符串,进行网络传输。采用消息协议:输入{in: ping, params: ireader 0},输出{out: pong, result: ireader 0},客户端。

单线程同步

如何通过实例详细解析Python RPC的实现机制与技巧?

  • 使用socket传输数据
  • 使用json序列化消息体
  • struct将消息编码为二进制字节串,进行网络传输

消息协议

// 输入 { in: "ping", params: "ireader 0" } // 输出 { out: "pong", result: "ireader 0" }

客户端 client.py

# coding: utf-8 # client.py import json import time import struct import socket def rpc(sock, in_, params): response = json.dumps({"in": in_, "params": params}) # 请求消息体 length_prefix = struct.pack("I", len(response)) # 请求长度前缀 sock.sendall(length_prefix) sock.sendall(response) length_prefix = sock.recv(4) # 响应长度前缀 length, = struct.unpack("I", length_prefix) body = sock.recv(length) # 响应消息体 response = json.loads(body) return response["out"], response["result"] # 返回响应类型和结果 if __name__ == '__main__': s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(("localhost", 8080)) for i in range(10): # 连续发送10个rpc请求 out, result = rpc(s, "ping", "ireader %d" % i) print out, result time.sleep(1) # 休眠1s,便于观察 s.close() # 关闭连接

服务端 blocking_single.py

# coding: utf8 # blocking_single.py import json import struct import socket def handle_conn(conn, addr, handlers): print addr, "comes" while True: # 循环读写 length_prefix = conn.recv(4) # 请求长度前缀 if not length_prefix: # 连接关闭了 print addr, "bye" conn.close() break # 退出循环,处理下一个连接 length, = struct.unpack("I", length_prefix) body = conn.recv(length) # 请求消息体 request = json.loads(body) in_ = request['in'] params = request['params'] print in_, params handler = handlers[in_] # 查找请求处理器 handler(conn, params) # 处理请求 def loop(sock, handlers): while True: conn, addr = sock.accept() # 接收连接 handle_conn(conn, addr, handlers) # 处理连接 def ping(conn, params): send_result(conn, "pong", params) def send_result(conn, out, result): response = json.dumps({"out": out, "result": result}) # 响应消息体 length_prefix = struct.pack("I", len(response)) # 响应长度前缀 conn.sendall(length_prefix) conn.sendall(response) if __name__ == '__main__': sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个TCP套接字 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 打开reuse addr选项 sock.bind(("localhost", 8080)) # 绑定端口 sock.listen(1) # 监听客户端连接 handlers = { # 注册请求处理器 "ping": ping } loop(sock, handlers) # 进入服务循环

多线程同步

  • 使用线程库thread创建原生线程
  • 服务器可并行处理多个客户端

服务端 multithread.py

多进程同步

  • Python的GIL导致单个进程只能占满一个CPU核心,多线程无法利用多核优势
  • os.fork()会生成子进程
  • 子进程退出后,父进程需使用waitpid系统调用收割子进程,防止其称为僵尸资源
  • 在子进程中关闭服务器套接字后,在父进程中也要关闭服务器套接字
  • 因为进程fork后,父子进程都有自己的套接字引用指向内核的同一份套接字对象,套接字引用计数为2,对套接字进程close,即将套接字对象的引用计数减1

PreForking同步

  • 进程比线程耗费资源,通过PreForking进程池模型对服务器开辟的进程数量进行限制,避免服务器负载过重
  • 如果并行的连接数量超过了prefork进程数量,后来的客户端请求将会阻塞

单进程异步

  • 通过事件轮询API,查询相关套接字是否有响应的读写事件,有则携带事件列表返回,没有则阻塞
  • 拿到读写事件后,可对事件相关的套接字进行读写操作
  • 设置读写缓冲区
  • Nginx/Nodejs/Redis都是基于异步模型
  • 异步模型编码成本高,易出错,通常在公司业务代码中采用同步模型,仅在讲究高并发高性能的场合才使用异步模型

PreForking异步

Tornado/Nginx采用了多进程PreForking异步模型,具有良好的高并发处理能力

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持易盾网络。