Python如何实现异步并发查询MySQL并调用API灌数据,同时确保数据安全?

2026-05-21 20:112阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1143个文字,预计阅读时间需要5分钟。

Python如何实现异步并发查询MySQL并调用API灌数据,同时确保数据安全?

回顾上一篇文章,我们已经编写了异步并发API请求的代码。接下来,我们将继续编写异步并发加锁的代码,以确保数据安全。由于服务架构的重构,本次优化带来了性能提升和重构。带来的任务需求如下:需确保数据安全。


前情回顾

上一篇文章已经编写了异步并发API请求灌数据,那么本章节我们来继续编写异步并发加锁,保证数据安全

实战任务

本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。

执行流程如下



那么根据流程所需要的功能,需要以下的实例进行支撑:
1.并发实例
2.查询数据实例
3.执行post请求实例

目标:循环查询处理并发数据,并且加锁保证数据安全

给查询数据表添加​​is_import​​​字段,在mysql表中添加查询标识,插入成功则为​​1​​​,无插入则为​​0​​



然后初始化 ​​is_import = 0​​ 即可,下面来给我们之前的model方法的查询中添加条件查询。

编写model类中selectTable方法,增加条件查询



# 根据设置的旧表字段,查询旧库的数据库数据
def selectTable(self,DB_NAME,TABLE_NAME,fields,order,cond_dict=''):
# 选择数据库
self.mydb.selectDataBase(DB_NAME)
# 数据查询
result = self.mydb.select(TABLE_NAME, fields=fields,order=order,cond_dict=cond_dict)
# 关闭连接
self.mydb.close()
# 返回查询的数据
return result

增加条件查询cond_dict字典,测试使用。






测试成功之后,就要在model方法中增加一个更新​​is_import​​为1的方法了。

在model类中增加更新​​is_import​​为1的方法

有些时候,因为传入的可能字段名不是​​is_import​​​,可能是​​is_import_xxx​​。那么就要根据传入的字典获取字段名称了。

# 更新is_import字段为1的方法
def updateIsImport(self, TABLE_NAME, attrs_dict, cond_dict):
"""更新数据

args:
tablename :表名字
attrs_dict :更新属性键值对字典
cond_dict :更新条件字典

example:
params = {"name" : "caixinglong", "age" : "38"}
cond_dict = {"name" : "liuqiao", "age" : "18"}
mydb.update(table, params, cond_dict)

"""
# 选择数据库
result = self.mydb.update(TABLE_NAME, attrs_dict=attrs_dict, cond_dict=cond_dict)
return result

写好了,更新字段的方法之后,下面我们在API请求成功之后进行使用。

在消费者方法中引用更新方法

Python如何实现异步并发查询MySQL并调用API灌数据,同时确保数据安全?




此时消费者已经在上一个篇章中写了异步并发的方法,但是这样调用的话,会导致mysql更新的时候报错。
为了保证数据安全,我只能降低效率,增加锁了。

首先先看一个线程加锁的伪代码

#-* coding: utf-8 -*
import threading
import time
import os

def func1(k):
global lock
while True:
lock.acquire() # 开始锁进程
.... 执行任务 ...
lock.release() # 释放进程锁

if __name__=='__main__':

# 初始化进程锁
lock = threading.Lock()

# 使用4个CPU开启进程并发
for k in range(4):
new_thread = threading.Thread(target=func1,args=(k,)) # 开启一个进程调用func1,并且传入参数k
new_thread.start()

从示例代码可以看出,进程锁的基本使用方法。下面我们来使用一下进程锁来保证数据安全。

使用进程锁

result_row = []
lock = threading.Lock() # 初始化进程锁
for row in select_result:
lock.acquire() # 开启进程锁
consume(row, url, model,lock=lock) # 消费请求API



初步代码基本就写到这里了。后面肯定有很多需要优化的地方。
例如:
1、使用查询分页再开启线程并发处理。
2、拆分生产者与消费者,加入rabbitmq等中间件来对付异常处理



关注微信公众号,回复、Python、PHP、JAVA、web,则可获得Python、PHP、JAVA、前端等视频资料。


本文共计1143个文字,预计阅读时间需要5分钟。

Python如何实现异步并发查询MySQL并调用API灌数据,同时确保数据安全?

回顾上一篇文章,我们已经编写了异步并发API请求的代码。接下来,我们将继续编写异步并发加锁的代码,以确保数据安全。由于服务架构的重构,本次优化带来了性能提升和重构。带来的任务需求如下:需确保数据安全。


前情回顾

上一篇文章已经编写了异步并发API请求灌数据,那么本章节我们来继续编写异步并发加锁,保证数据安全

实战任务

本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。

执行流程如下



那么根据流程所需要的功能,需要以下的实例进行支撑:
1.并发实例
2.查询数据实例
3.执行post请求实例

目标:循环查询处理并发数据,并且加锁保证数据安全

给查询数据表添加​​is_import​​​字段,在mysql表中添加查询标识,插入成功则为​​1​​​,无插入则为​​0​​



然后初始化 ​​is_import = 0​​ 即可,下面来给我们之前的model方法的查询中添加条件查询。

编写model类中selectTable方法,增加条件查询



# 根据设置的旧表字段,查询旧库的数据库数据
def selectTable(self,DB_NAME,TABLE_NAME,fields,order,cond_dict=''):
# 选择数据库
self.mydb.selectDataBase(DB_NAME)
# 数据查询
result = self.mydb.select(TABLE_NAME, fields=fields,order=order,cond_dict=cond_dict)
# 关闭连接
self.mydb.close()
# 返回查询的数据
return result

增加条件查询cond_dict字典,测试使用。






测试成功之后,就要在model方法中增加一个更新​​is_import​​为1的方法了。

在model类中增加更新​​is_import​​为1的方法

有些时候,因为传入的可能字段名不是​​is_import​​​,可能是​​is_import_xxx​​。那么就要根据传入的字典获取字段名称了。

# 更新is_import字段为1的方法
def updateIsImport(self, TABLE_NAME, attrs_dict, cond_dict):
"""更新数据

args:
tablename :表名字
attrs_dict :更新属性键值对字典
cond_dict :更新条件字典

example:
params = {"name" : "caixinglong", "age" : "38"}
cond_dict = {"name" : "liuqiao", "age" : "18"}
mydb.update(table, params, cond_dict)

"""
# 选择数据库
result = self.mydb.update(TABLE_NAME, attrs_dict=attrs_dict, cond_dict=cond_dict)
return result

写好了,更新字段的方法之后,下面我们在API请求成功之后进行使用。

在消费者方法中引用更新方法

Python如何实现异步并发查询MySQL并调用API灌数据,同时确保数据安全?




此时消费者已经在上一个篇章中写了异步并发的方法,但是这样调用的话,会导致mysql更新的时候报错。
为了保证数据安全,我只能降低效率,增加锁了。

首先先看一个线程加锁的伪代码

#-* coding: utf-8 -*
import threading
import time
import os

def func1(k):
global lock
while True:
lock.acquire() # 开始锁进程
.... 执行任务 ...
lock.release() # 释放进程锁

if __name__=='__main__':

# 初始化进程锁
lock = threading.Lock()

# 使用4个CPU开启进程并发
for k in range(4):
new_thread = threading.Thread(target=func1,args=(k,)) # 开启一个进程调用func1,并且传入参数k
new_thread.start()

从示例代码可以看出,进程锁的基本使用方法。下面我们来使用一下进程锁来保证数据安全。

使用进程锁

result_row = []
lock = threading.Lock() # 初始化进程锁
for row in select_result:
lock.acquire() # 开启进程锁
consume(row, url, model,lock=lock) # 消费请求API



初步代码基本就写到这里了。后面肯定有很多需要优化的地方。
例如:
1、使用查询分页再开启线程并发处理。
2、拆分生产者与消费者,加入rabbitmq等中间件来对付异常处理



关注微信公众号,回复、Python、PHP、JAVA、web,则可获得Python、PHP、JAVA、前端等视频资料。