V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
hezijiangjiang
V2EX  ›  推广

用 RisingWave 和 ClickHouse 实现实时数据扩充与分析

  •  
  •   hezijiangjiang · 2023-12-01 19:18:14 +08:00 · 1123 次点击
    这是一个创建于 386 天前的主题,其中的信息可能已经有所发展或是发生改变。

    本文将介绍一个可以无缝进行实时数据导入、转换、分析的强大组合:RisingWave 和 ClickHouse 。ClickHouse 是一款高性能列式数据库管理系统 (DBMS),用于联机分析 (OLAP),可以处理大量数据和复杂分析查询。

    随着各种事件型数据事件生成的数据呈现出指数级增长,实时分析越来越重要。然而,实时分析究竟是什么?它仅仅是更快的数据分析吗?那快速的历史数据分析是否可以视为实时分析?

    实时分析的关键点并不在于分析数据的速度,而在于快速分析的是新鲜且格式良好的数据。也就是说,在进行分析之前,数据就需要实时导入和转换。

    对于实时数据导入、转换与分析,有一个值得一试的强大组合: RisingWave 和 ClickHouse。RisingWave 是专为流处理设计的数据库。它与 PostgreSQL 兼容,支持导入实时数据流,可以执行多样化转换,并能够立即查询结果。ClickHouse® 是一款高性能列式数据库管理系统 (DBMS),用于联机分析 (OLAP),可以处理大量数据和复杂分析查询。

    在将数据导入 ClickHouse 之前,您可以通过 RisingWave 的实时数据转换功能,对数据进行预处理与扩充,从而确保导入的数据能满足精确分析的需求。

    用例展示:为购物车事件扩充产品信息

    在本文中,我们将以在线零售为例,展示如何构建一个实时导入、转换和分析数据的系统。

    我们有一条数据流,用于记录客户将商品添加到购物车时触发的事件 (Event)。

    以下是一个典型的购物车事件:

    (customerId, eventTime, itemId)
    --------------------------------
    ("1234","2023-02-01 10:01:00","P001")
    ("1234","2023-02-01 10:05:00","P002")
    

    由于这条数据流中没有具体产品信息,仅根据它进行分析是很困难的。要使其有用,有多种办法。我们可以将其与订单流 join ,用于分析已添加到购物车但尚未付款的商品,或者将其与产品目录表 join ,从而形成一个扩充的数据流。

    在接下来的演示中,我们就把它与产品目录表进行 join ,并将扩充后的数据流输出到 ClickHouse 做进一步分析。

    我们先假设产品目录表如下:

    itemId, name, price, category
    -------------------------------------
     ("P001","Red T-Shirt",9.99,"Apparel"),
      ("P002","Blue Jeans",39.95,"Apparel")
    

    架构概览

    我们先使用 RisingWave 进行实时数据导入和扩充,然后将扩充后的数据输出到 ClickHouse ,后者将进一步分析数据。下图展示了该用例的架构: Untitled

    准备工作

    • 安装 psql。如需了解如何只安装 psql,不安装其他 PostgreSQL 组件,请参阅 《在不安装 PostgreSQL 的情况下安装 psql 》

    • 启动并运行 Kafka Producer 。我们使用 KRaft 来启动 Kafka ,具体方式请参阅《APACHE KAFKA 快速入门》。

    • 安装并连接 RisingWave 。

      # 安装 RisingWave
      brew tap risingwavelabs/risingwave
      brew install risingwave
      # 启动 RisingWave
      risingwave playground
      
      # 在一个新的终端窗口连接 RisingWave
      psql -h localhost -p 4566 -d dev -U root
      
    • 安装并连接 ClickHouse

      # 下载 ClickHouse 的二进制文件
      curl <https://clickhouse.com/> | sh
      # 启动服务
      ./clickhouse server
      # 在一个新的终端窗口启动客户端
      ./clickhouse client
      

    向 Kafka 写入事件

    现在,让我们创建一个 topic ,并插入一些事件:

    # 创建一个 topic
    bin/kafka-topics.sh --create --topic cart-events --bootstrap-server localhost:9092
    # 写入三条事件
    bin/kafka-console-producer.sh --topic cart-events --bootstrap-server localhost:9092
    
    {"cust_id": "1234", "event_time": "2023-02-01 10:01:00", "item_id": "P001"}
    {"cust_id": "1232","event_time": "2023-02-01 10:05:00", "item_id": "P002"}
    {"cust_id": "1235","event_time": "2023-02-01 10:10:00","item_id": "P003"}
    
    # 请不要关闭 producer ,稍后我们还要写入更多事件
    

    RisingWave 导入数据

    现在,我们在 RisingWave 中创建一个表来导入这些事件。在 RisingWave 中,您可以创建源 (Source) 或表 (Table) 来导入事件。这两者的不同之处在于,如果使用表,导入的事件将储存在 RisingWave 。

    CREATE TABLE IF NOT EXISTS cart_event (
    cust_id VARCHAR,
    event_time TIMESTAMP,
    item_id VARCHAR
    )
    WITH (
       connector='kafka',
       topic='cart-events',
       properties.bootstrap.server='localhost:9092',
       scan.startup.mode='earliest',
    ) FORMAT PLAIN ENCODE JSON;
    

    然后在 RisingWave 中创建一个本地表,用来存储产品目录,并往表里插入一些数据,这样,我们就可以用产品目录信息来扩充购物车事件。

    CREATE TABLE product_catalog (
    item_id varchar,
    name varchar,
    price double precision,
    category varchar
    );
    
    INSERT INTO product_catalog (item_id, name, price, category)
    VALUES 
      ('P001','Red T-Shirt',9.99,'Apparel'),
      ('P002','Blue Jeans',39.95,'Apparel'),
      ('P003','Smart Watch',199.99,'Electronics'),
      ('P004','Yoga Mat',29.95,'Fitness'), 
      ('P005','Wireless Headphones',99.99,'Electronics'),
      ('P006','Coffee Mug',5.99,'Kitchen');
    

    将数据流与表 join

    现在,我们将 cart_event 流与 product_catalog 表 join 起来,形成一条扩充后的数据流。接下来,如果我们还想进行其他一些转换操作,可以使用物化视图 (Materialized View) 来执行这个“流-表”join 。如果我们只想使用产品目录来扩充这个数据流,那可以只简单地创建一个 sink 来执行连接。在这个用例中,我们将使用物化视图。

    RisingWave 的流式物化视图创新性地通过增量更新实时反映结果。

    CREATE MATERIALIZED VIEW data_enrichment AS SELECT 
      c.cust_id,
      c.event_time,
      p.name,
      p.price,
      p.category
    FROM
      cart_event c
    JOIN
      product_catalog p 
    ON 
      c.item_id = p.item_id;
    

    通过这个“流-表”join ,每当原始事件进入 RisingWave ,就会产生一个扩充事件。

    将扩充后的数据流传输到 ClickHouse

    接下来,我们可以将扩充后的数据流传输到 ClickHouse ,以做进一步分析。为了实现这一点,我们需要在 ClickHouse 中创建一个表,这个表要与 RisingWave 中的表具有相同的 schema 。我们希望将数据从物化视图 data_enrichment 中传输出来,所以要创建一个与 data_enrichment 具有相同 schema 的表。

    ---在 ClickHouse 中运行以下代码
    CREATE TABLE enriched_cart_events
    (
    cust_id String,
    event_time DateTime64,
    name String,
    price Float64,
    category String
    )
    ENGINE = ReplacingMergeTree()
    ORDER BY (cust_id, event_time);
    

    当传输目标就位后,我们可以创建一个 sink ,并开始将数据从 RisingWave 传输到 ClickHouse 。

    ---在 RisingWave 中运行以下代码
    CREATE SINK sink_to_clickhouse
    FROM
        data_enrichment WITH (
        connector = 'clickhouse',
    		type='append-only',
    		force_append_only='true',
        clickhouse.url = '<http://0.0.0.0:8123>',
        clickhouse.user = 'default',
        clickhouse.password = '',
        clickhouse.database = 'default',
        clickhouse.table='enriched_cart_events',
    );
    

    现在,查询 ClickHouse 的表,看看数据是否已传输。

    SELECT * from enriched_cart_events;
    
    ------ 结果
    ┌─cust_id┬──────event_time───┬─name────┬─price─┬category─┐
    │ 1234    │ 2023-02-01 18:01:00.000 │ Red T-Shirt │  9.99 │ Apparel     │
    │ 1232    │ 2023-02-01 18:05:00.000 │ Blue Jeans  │ 39.95 │ Apparel     │
    │ 1235    │ 2023-02-01 18:10:00.000 │ Smart Watch │ 199.99│ Electronics 
    └─────────┴─────────────────────────┴───────┘
    

    可以看到,三条事件都已经被扩充,可以用于 ClickHouse 的分析。

    接下来,我们来模拟一条数据流,一次写入一个事件,并立即在 ClickHouse 中查询表。

    --- 每次写入一个事件
    {"cust_id": "1236","event_time": "2023-02-01 10:15:00","item_id": "P001"}
    {"cust_id": "1237","event_time": "2023-02-01 10:20:00","item_id": "P004"}
    {"cust_id": "1238", "event_time": "2023-02-01 10:25:00", "item_id": "P002"}
    {"cust_id": "1239", "event_time": "2023-02-01 10:30:00", "item_id": "P005"}
    {"cust_id": "1240", "event_time": "2023-02-01 10:35:00", "item_id": "P003"}
    {"cust_id": "1241", "event_time": "2023-02-01 10:40:00", "item_id": "P006"}
    {"cust_id": "1242", "event_time": "2023-02-01 10:45:00", "item_id": "P007"}
    

    每当我们将一条事件写入 Kafka ,扩充后的数据就会立即出现在 ClickHouse 的表中,每次查询表时,您都将看到新增了一行数据。

    当将所有上述信息 (Message) 写入 Kafka 后,您应该会看到类似如下内容。

    SELECT * FROM enriched_cart_events;
    
    ------ RESULTS
    
    ┌custid┬─────eventtime──┬─name───────┬─price─┬─category─┐
    │ 1232 │ 23-02-01 18:05:00.00│ Blue Jeans      │  39.95 │ Apparel     │
    │ 1234 │ 23-02-01 18:01:00.00│ Red T-Shirt     │   9.99 │ Apparel     │
    │ 1235 │ 23-02-01 18:10:00.00│ Smart Watch     │ 199.99 │ Electronics │
    │ 1236 │ 23-02-01 18:15:00.00│ Red T-Shirt     │   9.99 │ Apparel     │
    │ 1237 │ 23-02-01 18:20:00.00│ Yoga Mat        │  29.95 │ Fitness     │
    │ 1238 │ 23-02-01 18:25:00.00│ Blue Jeans      │  39.95 │ Apparel     │
    │ 1239 │ 23-02-01 18:30:00.00│ Wireless Phones │  99.99 │ Electronics │
    │ 1240 │ 23-02-01 18:35:00.00│ Smart Watch     │ 199.99 │ Electronics │
    │ 1241 │ 23-02-01 18:40:00.00│ Coffee Mug      │   5.99 │ Kitchen     │
    └───┴─────────────┴──────────┴─────┴───────┘
    

    总结

    在本文中,为简单起见,我们以一个流-表 join 为例,演示如何扩充数据。而在实际场景中,在数据传输到 ClickHouse 前,我们可以进行多流 join ,并对流中的数据进行过滤和聚合。通过 RisingWave 的实时数据导入和转换能力,您可以确保 ClickHouse 接收到的是可立即用于深入分析的高质量数据。这两者是一个完美组合。

    关于 RisingWave

    RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于降低流计算使用门槛。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,帮助用户轻松快速搭建稳定高效的流计算系统。使用 RisingWave 处理流数据的方式类似使用 PostgreSQL ,通过创建实时物化视图,让用户能够轻松编写流计算逻辑,并通过访问物化视图来进行即时、一致的查询流计算结果。了解更多:

    官网:risingwave.com

    文档:risingwave.dev

    GitHub:risingwave.com/github

    Slack:risingwave.com/slack

    使用教程: https://www.risingwavetutorial.com/

    微信公众号:RisingWave 中文开源社区

    社区用户交流群:risingwave_assistant

    3 条回复    2023-12-02 10:32:06 +08:00
    ijk0
        1
    ijk0  
       2023-12-01 22:02:58 +08:00 via iPhone
    学习学习,前几天才知道这个项目
    liprais
        2
    liprais  
       2023-12-01 22:29:22 +08:00   ❤️ 1
    @livid 恶意推广
    Livid
        3
    Livid  
    MOD
       2023-12-02 10:32:06 +08:00
    @hezijiangjiang 这个主题已经被移动到 /go/promotions

    请阅读 V2EX 的节点使用指南 https://www.v2ex.com/help/node
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1691 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 16:29 · PVG 00:29 · LAX 08:29 · JFK 11:29
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.