Citus 分布式 PostgreSQL 集群中如何设置实时仪表盘示例?
- 内容介绍
- 文章标签
- 相关推荐
本文共计2308个文字,预计阅读时间需要10分钟。
Citus 提供对大型数据集的实时查询。在 Citus 中,常见的工作负载包括为事务数据表提供实时副本支持。例如,您可以协助其他企业监控其 HTTP 流量的云服务提供商。每次查询时,您都可能帮助其实现高效的数据处理。
Citus 提供对大型数据集的实时查询。我们在 Citus 常见的一项工作负载涉及为事件数据的实时仪表板提供支持。
例如,您可以是帮助其他企业监控其 HTTP 流量的云服务提供商。每次您的一个客户端收到 HTTP 请求时,您的服务都会收到一条日志记录。您想要摄取所有这些记录并创建一个 HTTP 分析仪表板,为您的客户提供洞察力,例如他们的网站服务的 HTTP 错误数量。 重要的是,这些数据以尽可能少的延迟显示出来,这样您的客户就可以解决他们网站的问题。 仪表板显示历史趋势图也很重要。
或者,也许您正在建立一个广告网络,并希望向客户展示其广告系列的点击率。 在此示例中,延迟也很关键,原始数据量也很高,历史数据和实时数据都很重要。
在本节中,我们将演示如何构建第一个示例的一部分,但该架构同样适用于第二个和许多其他用例。
- real-time-analytics-Hands-On-Lab-Hyperscale-Citus
- github.com/citusdata/real-time-analytics-Hands-On-Lab-Hyperscale-Citus
- Architecting Real-Time Analytics for your Customers
- github.com/citusdata/postgres-analytics-tutorial
我们正在处理的数据是不可变的日志数据流。我们将直接插入 Citus,但这些数据首先通过 Kafka 之类的东西进行路由也很常见。 这样做具有通常的优势,并且一旦数据量变得难以管理,就可以更容易地预先聚合数据。
我们将使用一个简单的 schema 来摄取 HTTP 事件数据。 这个 schema 作为一个例子来展示整体架构;一个真实的系统可能会使用额外的列。
有了这个,系统就可以接受数据并提供查询了! 在继续执行本文中的其他命令时,让以下循环在后台的 摄取数据后,您可以运行仪表板查询,例如: 为了填充 之前的仪表板查询现在好多了: 这些是基础!我们提供了一种架构,可以摄取 接下来的部分将扩展基本架构,并向您展示如何解决经常出现的问题。 一种称为 如果您要运行全局查询,则会出现类似的问题,例如在上个月访问您客户的任何站点的唯一 首先你必须安装 这在 现在我们准备好在 首先,将新列添加到我们的汇总表中: 接下来,通过修改汇总函数将其包含在汇总中: 现在,如果您想在仪表板中获取来自美国的请求数量,您可以将仪表板查询修改为如下所示:-- this is run on the coordinator
CREATE TABLE docs.citusdata.com/en/v10.2/develop/api_udf.html#create-distributed-table
UDF 使用分片计数的默认配置值。我们建议在集群中使用 2-4 倍于 CPU 核的分片。使用这么多分片可以让您在添加新的工作节点后重新平衡集群中的数据。
2-4 倍于 CPU 核的分片
Azure Database for PostgreSQL — 超大规模 (Citus) 使用流式复制来实现高可用性,因此维护分片副本将是多余的。在任何流复制不可用的生产环境中,您应该将 citus.shard_replication_factor 设置为 2 或更高以实现容错。
psql 控制台中运行。 它每隔一两秒就会生成假数据。DO $$
BEGIN LOOP
INSERT INTO example.com/', md5(random()::text)),
('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],
concat(
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2)
)::inet,
('{200,404}'::int[])[ceil(random()*2)],
5+trunc(random()*150)
);
COMMIT;
PERFORM pg_sleep(random() * 0.25);
END LOOP;
END $$;
SELECT
site_id,
date_trunc('minute', ingest_time) as minute,
COUNT(1) AS request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM docs.citusdata.com/en/v10.2/sharding/data_modeling.html#colocation
github.com/citusdata/pg_cron
SELECT site_id, ingest_time as minute, request_count,
success_count, error_count, average_response_time_msec
FROM docs.citusdata.com/en/v10.2/use_cases/timeseries.html#timeseries
HTTP 事件,然后将这些事件汇总到它们的预聚合形式中。 这样,您既可以存储原始事件,也可以通过亚秒级查询为您的分析仪表板提供动力。HTTP 分析中的一个常见问题涉及近似的不同计数:上个月有多少独立访问者访问了您的网站?准确地回答这个问题需要将所有以前见过的访问者的列表存储在汇总表中,这是一个令人望而却步的数据量。然而,一个近似的答案更易于管理。
hyperloglog 或 HLL 的数据类型可以近似地回答查询; 要告诉您一个集合中大约有多少个独特元素,需要的空间非常小。其精度可以调整。 我们将使用仅使用 1280 字节的那些,将能够以最多 2.2% 的错误计算多达数百亿的唯一访问者。IP 地址的数量。在没有 HLL 的情况下,此查询涉及将 IP 地址列表从 worker 传送到 coordinator 以进行重复数据删除。这既是大量的网络流量,也是大量的计算。 通过使用 HLL,您可以大大提高查询速度。HLL 扩展;github repo 有说明。接下来,您必须启用它:
CREATE EXTENSION hll;
Hyperscale 上是不必要的,它已经安装了 HLL 以及其他有用的扩展。HLL 汇总中跟踪 IP 地址。首先向汇总表添加一列。ALTER TABLE github.com/aggregateknowledge/postgresql-hll
使用 JSONB 的非结构化数据
Citus 与 Postgres 对非结构化数据类型的内置支持配合得很好。 为了证明这一点,让我们跟踪来自每个国家/地区的访客数量。 使用半结构数据类型可以让您不必为每个国家添加一列,并最终得到具有数百个稀疏填充列的行。我们有一篇博文解释了半结构化数据使用哪种格式。这篇文章推荐使用 JSONB,在这里我们将演示如何将 JSONB 列合并到您的数据模型中。
ALTER TABLE http_request_1min ADD COLUMN country_counters JSONB;
@@ -1,14 +1,19 @@
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec
+ , country_counters
) SELECT
site_id,
minute,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
- FROM http_request
+ , jsonb_object_agg(request_country, country_count) AS country_counters
+ FROM (
+ SELECT *,
+ count(1) OVER (
+ PARTITION BY site_id, date_trunc('minute', ingest_time), request_country
+ ) AS country_count
+ FROM http_request
+ ) h
SELECT
request_count, success_count, error_count, average_response_time_msec,
COALESCE(country_counters->>'USA', '0')::int AS american_visitors
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;
更多
本文共计2308个文字,预计阅读时间需要10分钟。
Citus 提供对大型数据集的实时查询。在 Citus 中,常见的工作负载包括为事务数据表提供实时副本支持。例如,您可以协助其他企业监控其 HTTP 流量的云服务提供商。每次查询时,您都可能帮助其实现高效的数据处理。
Citus 提供对大型数据集的实时查询。我们在 Citus 常见的一项工作负载涉及为事件数据的实时仪表板提供支持。
例如,您可以是帮助其他企业监控其 HTTP 流量的云服务提供商。每次您的一个客户端收到 HTTP 请求时,您的服务都会收到一条日志记录。您想要摄取所有这些记录并创建一个 HTTP 分析仪表板,为您的客户提供洞察力,例如他们的网站服务的 HTTP 错误数量。 重要的是,这些数据以尽可能少的延迟显示出来,这样您的客户就可以解决他们网站的问题。 仪表板显示历史趋势图也很重要。
或者,也许您正在建立一个广告网络,并希望向客户展示其广告系列的点击率。 在此示例中,延迟也很关键,原始数据量也很高,历史数据和实时数据都很重要。
在本节中,我们将演示如何构建第一个示例的一部分,但该架构同样适用于第二个和许多其他用例。
- real-time-analytics-Hands-On-Lab-Hyperscale-Citus
- github.com/citusdata/real-time-analytics-Hands-On-Lab-Hyperscale-Citus
- Architecting Real-Time Analytics for your Customers
- github.com/citusdata/postgres-analytics-tutorial
我们正在处理的数据是不可变的日志数据流。我们将直接插入 Citus,但这些数据首先通过 Kafka 之类的东西进行路由也很常见。 这样做具有通常的优势,并且一旦数据量变得难以管理,就可以更容易地预先聚合数据。
我们将使用一个简单的 schema 来摄取 HTTP 事件数据。 这个 schema 作为一个例子来展示整体架构;一个真实的系统可能会使用额外的列。
有了这个,系统就可以接受数据并提供查询了! 在继续执行本文中的其他命令时,让以下循环在后台的 摄取数据后,您可以运行仪表板查询,例如: 为了填充 之前的仪表板查询现在好多了: 这些是基础!我们提供了一种架构,可以摄取 接下来的部分将扩展基本架构,并向您展示如何解决经常出现的问题。 一种称为 如果您要运行全局查询,则会出现类似的问题,例如在上个月访问您客户的任何站点的唯一 首先你必须安装 这在 现在我们准备好在 首先,将新列添加到我们的汇总表中: 接下来,通过修改汇总函数将其包含在汇总中: 现在,如果您想在仪表板中获取来自美国的请求数量,您可以将仪表板查询修改为如下所示:-- this is run on the coordinator
CREATE TABLE docs.citusdata.com/en/v10.2/develop/api_udf.html#create-distributed-table
UDF 使用分片计数的默认配置值。我们建议在集群中使用 2-4 倍于 CPU 核的分片。使用这么多分片可以让您在添加新的工作节点后重新平衡集群中的数据。
2-4 倍于 CPU 核的分片
Azure Database for PostgreSQL — 超大规模 (Citus) 使用流式复制来实现高可用性,因此维护分片副本将是多余的。在任何流复制不可用的生产环境中,您应该将 citus.shard_replication_factor 设置为 2 或更高以实现容错。
psql 控制台中运行。 它每隔一两秒就会生成假数据。DO $$
BEGIN LOOP
INSERT INTO example.com/', md5(random()::text)),
('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],
concat(
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2)
)::inet,
('{200,404}'::int[])[ceil(random()*2)],
5+trunc(random()*150)
);
COMMIT;
PERFORM pg_sleep(random() * 0.25);
END LOOP;
END $$;
SELECT
site_id,
date_trunc('minute', ingest_time) as minute,
COUNT(1) AS request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM docs.citusdata.com/en/v10.2/sharding/data_modeling.html#colocation
github.com/citusdata/pg_cron
SELECT site_id, ingest_time as minute, request_count,
success_count, error_count, average_response_time_msec
FROM docs.citusdata.com/en/v10.2/use_cases/timeseries.html#timeseries
HTTP 事件,然后将这些事件汇总到它们的预聚合形式中。 这样,您既可以存储原始事件,也可以通过亚秒级查询为您的分析仪表板提供动力。HTTP 分析中的一个常见问题涉及近似的不同计数:上个月有多少独立访问者访问了您的网站?准确地回答这个问题需要将所有以前见过的访问者的列表存储在汇总表中,这是一个令人望而却步的数据量。然而,一个近似的答案更易于管理。
hyperloglog 或 HLL 的数据类型可以近似地回答查询; 要告诉您一个集合中大约有多少个独特元素,需要的空间非常小。其精度可以调整。 我们将使用仅使用 1280 字节的那些,将能够以最多 2.2% 的错误计算多达数百亿的唯一访问者。IP 地址的数量。在没有 HLL 的情况下,此查询涉及将 IP 地址列表从 worker 传送到 coordinator 以进行重复数据删除。这既是大量的网络流量,也是大量的计算。 通过使用 HLL,您可以大大提高查询速度。HLL 扩展;github repo 有说明。接下来,您必须启用它:
CREATE EXTENSION hll;
Hyperscale 上是不必要的,它已经安装了 HLL 以及其他有用的扩展。HLL 汇总中跟踪 IP 地址。首先向汇总表添加一列。ALTER TABLE github.com/aggregateknowledge/postgresql-hll
使用 JSONB 的非结构化数据
Citus 与 Postgres 对非结构化数据类型的内置支持配合得很好。 为了证明这一点,让我们跟踪来自每个国家/地区的访客数量。 使用半结构数据类型可以让您不必为每个国家添加一列,并最终得到具有数百个稀疏填充列的行。我们有一篇博文解释了半结构化数据使用哪种格式。这篇文章推荐使用 JSONB,在这里我们将演示如何将 JSONB 列合并到您的数据模型中。
ALTER TABLE http_request_1min ADD COLUMN country_counters JSONB;
@@ -1,14 +1,19 @@
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec
+ , country_counters
) SELECT
site_id,
minute,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
- FROM http_request
+ , jsonb_object_agg(request_country, country_count) AS country_counters
+ FROM (
+ SELECT *,
+ count(1) OVER (
+ PARTITION BY site_id, date_trunc('minute', ingest_time), request_country
+ ) AS country_count
+ FROM http_request
+ ) h
SELECT
request_count, success_count, error_count, average_response_time_msec,
COALESCE(country_counters->>'USA', '0')::int AS american_visitors
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;
更多

