如何用Python的ch-orm模块进行ClickHouse的简易查询与数据写入?

2026-05-05 18:451阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计883个文字,预计阅读时间需要4分钟。

如何用Python的ch-orm模块进行ClickHouse的简易查询与数据写入?

简介:新项目中如需使用ClickHouse,作为一个合适的Python程序员,首先应寻找合适的轮子。浏览一圈后,发现infi.clickhouse_orm在功能和易用性上没有明显的短板,其ORM API对后端来说也较为简洁。

引言

前不久新项目中需要用到ClickHouse,作为一个合格的Python程序员,首先当然是找找有没有合适的轮子。

翻了一圈,infi.clickhouse_orm在功能和易用性上没有明显的短板,其ORM API对后端程序员格外亲切。可惜主分支已经八个月没有更新了,据闻核心开发者已离职,而infi.clickhouse_orm尚不支持一些我需要的新功能如Geo类型和函数,基于这些原因,这篇文章的主角ch-orm也就诞生了。

如何用Python的ch-orm模块进行ClickHouse的简易查询与数据写入?

ch-orm库fork自infi.clickhouse_orm(v2.1.1)。

与infi相比,ch-orm支持同步和异步两种方式与ClickHouse服务器交互,它添加了一些新功能:

  • 异步支持(AioDatabase)

    • 为所有同步API提供async接口
  • 类型注解

    • 大部分对外API实现了类型注解
  • 新的类型支持

    • Tuple
    • Geo类型;Point、Ring等
  • 新的函数支持

    • Geo函数等
  • 支持创建临时表(TemporaryModel)

    • session会话

需要提醒的是,ch-orm仅使用ClickHouse的localhost:8123/') sync_db.create_table(Residence) # 以异步方式创建数据库 async def main(): async_db = AioDatabase('db-test', db_url='localhost:8123/') # 异步模型下需要主动执行init方法初始化 await async_db.init() await async_db.create_table(Residence)

此时,db-test库内应当出现了一个名为residence的表。

插入数据

ClickHouse在数据写入性能表现十分优异,ch-orm能轻易处理写入数据需求

以写入100万条数据为例,使用生成器创建100万个Residence随机实例

import uuid from clickhouse_orm.contrib.geo.fields import Point # 同步写入100万条residence sync_db.insert( (Residence(uuid=str(uuid.uuid4()), geo=Point(120, 20)) for _ in range(1000000)), batch_size=10000 ) # 异步写入100万条residence async def insert(): ... await async_db.insert( (Residence(uuid=str(uuid.uuid4()), geo=Point(120, 20)) for _ in range(1000000)), batch_size=10000 )

示例中我们仅对uuidgeo列进行赋值,其他字段会被设置为默认值(而非None值)

可以看看residence表中有多少条数据

# 同步方式查询Residence行数 Residence.objects_in(sync_db).count() # 异步方式查询Residence行数 async def read_count(): ... await Residence.objects_in(async_db).count() 查询API

ch-orm实现了QuerySet,暴露API基本参照Django设计的,如前述的获取表行数的count()方法就来自QuerySet

与Django不同的是,ch-orm仅将QuerySet作为查询实例,不具备查询结果缓存功能,这代表如果对一个QuerySet对象执行两次迭代,与后端数据库的交互将变成两次而非一次。

可以通过Model的类方法objects_in获得一个QuerySet实例,接着来查询uuid="48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30"的数据

# 同步API queryset = Residence.objects_in(sync_db) queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30") result = list(queryset) # 对于异步API queryset = Residence.objects_in(async_db) queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30") result = [_ async for _ in queryset]

真正的查询请求是在对queryset迭代时处理的,因此下列两行代码不会与数据库后端进行交互

queryset = Residence.objects_in(sync_db) queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30")

最终得到一个由Residence实例的组成的结果列表result。

3. 略微复杂功能

ch-orm具备日常使用的大多数场景功能

  • 执行原生查询并创建动态对象

  • 现有表生成模型类

  • F函数

  • Q查询

  • Aggregation 聚合查询

  • order_by 查询排序

  • distinct 结果去重

  • Pagination 查询分页

  • 表引擎

  • ...

这些内容Github仓库有相应的文档,限于本文篇幅这里就不再过多介绍。

本文共计883个文字,预计阅读时间需要4分钟。

如何用Python的ch-orm模块进行ClickHouse的简易查询与数据写入?

简介:新项目中如需使用ClickHouse,作为一个合适的Python程序员,首先应寻找合适的轮子。浏览一圈后,发现infi.clickhouse_orm在功能和易用性上没有明显的短板,其ORM API对后端来说也较为简洁。

引言

前不久新项目中需要用到ClickHouse,作为一个合格的Python程序员,首先当然是找找有没有合适的轮子。

翻了一圈,infi.clickhouse_orm在功能和易用性上没有明显的短板,其ORM API对后端程序员格外亲切。可惜主分支已经八个月没有更新了,据闻核心开发者已离职,而infi.clickhouse_orm尚不支持一些我需要的新功能如Geo类型和函数,基于这些原因,这篇文章的主角ch-orm也就诞生了。

如何用Python的ch-orm模块进行ClickHouse的简易查询与数据写入?

ch-orm库fork自infi.clickhouse_orm(v2.1.1)。

与infi相比,ch-orm支持同步和异步两种方式与ClickHouse服务器交互,它添加了一些新功能:

  • 异步支持(AioDatabase)

    • 为所有同步API提供async接口
  • 类型注解

    • 大部分对外API实现了类型注解
  • 新的类型支持

    • Tuple
    • Geo类型;Point、Ring等
  • 新的函数支持

    • Geo函数等
  • 支持创建临时表(TemporaryModel)

    • session会话

需要提醒的是,ch-orm仅使用ClickHouse的localhost:8123/') sync_db.create_table(Residence) # 以异步方式创建数据库 async def main(): async_db = AioDatabase('db-test', db_url='localhost:8123/') # 异步模型下需要主动执行init方法初始化 await async_db.init() await async_db.create_table(Residence)

此时,db-test库内应当出现了一个名为residence的表。

插入数据

ClickHouse在数据写入性能表现十分优异,ch-orm能轻易处理写入数据需求

以写入100万条数据为例,使用生成器创建100万个Residence随机实例

import uuid from clickhouse_orm.contrib.geo.fields import Point # 同步写入100万条residence sync_db.insert( (Residence(uuid=str(uuid.uuid4()), geo=Point(120, 20)) for _ in range(1000000)), batch_size=10000 ) # 异步写入100万条residence async def insert(): ... await async_db.insert( (Residence(uuid=str(uuid.uuid4()), geo=Point(120, 20)) for _ in range(1000000)), batch_size=10000 )

示例中我们仅对uuidgeo列进行赋值,其他字段会被设置为默认值(而非None值)

可以看看residence表中有多少条数据

# 同步方式查询Residence行数 Residence.objects_in(sync_db).count() # 异步方式查询Residence行数 async def read_count(): ... await Residence.objects_in(async_db).count() 查询API

ch-orm实现了QuerySet,暴露API基本参照Django设计的,如前述的获取表行数的count()方法就来自QuerySet

与Django不同的是,ch-orm仅将QuerySet作为查询实例,不具备查询结果缓存功能,这代表如果对一个QuerySet对象执行两次迭代,与后端数据库的交互将变成两次而非一次。

可以通过Model的类方法objects_in获得一个QuerySet实例,接着来查询uuid="48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30"的数据

# 同步API queryset = Residence.objects_in(sync_db) queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30") result = list(queryset) # 对于异步API queryset = Residence.objects_in(async_db) queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30") result = [_ async for _ in queryset]

真正的查询请求是在对queryset迭代时处理的,因此下列两行代码不会与数据库后端进行交互

queryset = Residence.objects_in(sync_db) queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30")

最终得到一个由Residence实例的组成的结果列表result。

3. 略微复杂功能

ch-orm具备日常使用的大多数场景功能

  • 执行原生查询并创建动态对象

  • 现有表生成模型类

  • F函数

  • Q查询

  • Aggregation 聚合查询

  • order_by 查询排序

  • distinct 结果去重

  • Pagination 查询分页

  • 表引擎

  • ...

这些内容Github仓库有相应的文档,限于本文篇幅这里就不再过多介绍。