Pulsar特性在AI场景中的使用
创始人
2025-12-22 19:17:51

作者 | 谙流科技魏祥臣

没有意外,随着模型规模的持续增长和应用场景的日益复杂,AI Infra 也自然的从"单体架构" -> "分布式架构"进行演进,例如:

  • 在大模型训练和推理阶段,随着模型规模的增长,需要通过 多维度并行技术(数据并行、张量并行、流水线并行等)并发使用数百甚至数千个 GPU 才能满足训练需求;

  • 在智能体应用阶段,从能对话、写文案的 Chatbot 到如今能自主规划、工具调用、多 Agent 协作,工具越来越智能, 调用链也越来越复杂

  • 再到各行业落地时,应用的业务主路径开始集成 AI 能力,也对部署架构本身的 高可靠、高可用及高性能提出了更多的要求

然而这个从单体架构到分布式架构的升级,最核心的变化就是通过消息中间件让 数据、模型、服务之间能够 异步、可靠、松耦合地协同工作,从而构建可扩展、可维护、可演进的 AI 平台的基础设施。

Pulsar 作为消息中间件的中流砥柱,以其更鲜明的存算分离、云原生特性,可发挥着更大的价值。

1多模态:让 Pulsar 直接吞进 超大消息,多模态训练“零”切片

多模态 AI 系统处理的数据类型远超传统文本,包含了图像、视频、音频、3D 点云等大体积的非结构化数据。这些数据单个文件的大小就可能从几 MB 到几 GB 不等。其他的消息队列系统往往对单条消息的大小有严格限制(例如,Kafka 默认单条消息上限约 1 MB,调参后虽可放大,但需权衡副本同步压力。),这迫使开发者在传输大文件时采用复杂的变通方案,如将文件存储在对象存储中,然后在消息中只传递文件的路径或 URL。

这种方式虽然可行,但增加了系统的复杂性和处理延迟,并且无法充分利用消息队列在数据流管理和处理方面的优势 。

然而 Pulsar 原生支持超大消息体,即 pulsar 的 chunk message,Pulsar 的 Chunk Message 是多模态训练的 数据管道利器,它解决了大消息传输的完整性、顺序性、简化性三大问题,可显著降低多模态数据管道的工程负担,使开发者聚焦模型逻辑而非传输细节。

Pulsar Chunk Message(分块消息)是 Apache Pulsar 提供的一种用于透明处理超大消息(>5MB)的机制。它允许生产者端自动将大消息拆分为多个小块传输,并在消费者端自动重组,业务层无需感知分块细节。

2多模态:用 Pulsar 把文本、图像、音频流绑定到一起

多模态 AI 需要处理和融合的数据类型极其多样化。系统需要同时处理文本(自然语言描述)、图像(像素矩阵)、音频(波形信号)、视频(图像序列和音频流的组合)等多种异构数据。

在许多场景中,不同模态的数据在时间上存在紧密的依赖关系。例如,在视频理解任务中,音频中的对话内容需要与视频中人物的口型、动作在时间上精确对齐;在自动驾驶场景中,激光雷达的点云数据、摄像头的图像数据和 GPS 的定位数据必须在同一时间点或时间窗口内进行融合,才能构建出对周围环境准确的感知。因此,消息中间件不仅要能传输数据,还需要提供机制来保证跨模态数据的时间同步和顺序性 。

利用 Pulsar 的 keyshare 消费模型,可以将同一 key 的数据总是被路由分配到同一实例完成聚合,方案如下:

  • 时间同步:选定一个物理时钟源(PTP/NTP/ 帧时钟),所有模态 Producer 在本地打时间桶 ID(t-bucket),粒度 = 1 ms 或 1 帧间隔。

  • Produce 发送:每条消息把 t-bucket 放在 Pulsar 的事件时间( eventTime ,SDK 原生字段)里,同时作为路由 Key。

  • 收到模态 A、B、C 的同一桶消息后,再打包成一条 MultiModalFrame 喂给模型;

3模型训练:用好 Pulsar 压缩 Topic,实现 Checkpoint 秒级断点续训

模型训练周期长、数据量大、集群规模大,出现中断的概率显著提高,且重启代价高昂;

所以通常会使用 Checkpoint 机制来加速恢复的过程,但保存 Checkpoint 耗时较高,若存储服务瞬时故障,写入请求直接丢失,导致训练状态丢失。

引入 pulsar 作为中间层后,可以将异常数据跳过、Checkpoint 异步缓存、任务级重试等操作都交给 pulsar 的特性来解决,方案如下:

Checkpoint 数据具有明显的历史消息无效的特性,如果发生积压时,只有最新的一条 checkpoint 才有价值,这时可以使用 Pulsar 的压缩 Topic (Compaction Topic)压缩 topic 将 Checkpoint Topic 从日志流变为 KV 存储,仅保留每个 Key 的最新消息,自动清理历史版本,这样对比传统方案(扫描 S3 文件列表 → 排序 → 下载)需要耗时 3-5 分钟到直接接收最新 Key 的方案,耗时<1S;

Compaction Topic是 Apache Pulsar 提供的一种 基于消息 Key 的日志压缩机制,它会自动清理主题中每个 Key 的旧版本消息,仅保留 最新版本,从而显著减少主题体积、加速消费回溯,适用于"只关心最终状态"的场景。

4模型训练:以 Pulsar 为“输油管”:优化模型训练中的 GPU 饥饿

在大规模模型训练中,数据是驱动整个训练过程的“燃料”,特别是针对拥有数十亿甚至万亿级参数的深度学习模型,能高效且稳定的确保“燃料”能够持续、稳定地供应给计算引擎(如 GPU 集群)是关键所在。

训练这些庞然大物需要海量的训练数据,这些数据通常以 TB 甚至 PB 计。数据加载和预处理的速度直接决定了 GPU 这一昂贵计算资源的利用率。有数据表明 I/O 延迟使 GPU 每步等待数百毫秒,空闲率可达 30-50%。为了充分利用昂贵的计算资源,必须确保数据能够以足够快的速度被加载到每个计算节点的内存中,如果数据供给速度跟不上模型消耗数据的速度,就会有大量时间浪费在等待数据上,即所谓的“数据饥饿”问题。

历史的架构中,数据预处理模块与训练模块存在耦合的情况,然而耦合的模块可能相互影响从而降低了 GPU 的读取效率;

这种架构中,非常适合引入 Pulsar 在其中作为缓冲层,在数据平面预处理服务独立扩展,训练节点只专注消费,利用 Pulsar 的 高吞吐特性,“喂养”GPU 的数据高速且稳定;

并且当 GPU 消费慢时,还可以利用 Pulsar 的 背压机制,预处理消费时自动降低预取速率,避免 OOM,从而让整个链路更加健壮;

不止如此,还可以继续针对 topic 的消费进行积压监控,如果出现积压,辅以 K8S 的 KEDA 机制+ Pulsar 的 S hare 消费类型可以整个扩缩容过程更加平滑和稳定;

  • 背压(Backpressure) 是 Pulsar 中用于 防止生产者过载消费者的流量控制机制。当消费者处理速度跟不上生产者发送速度时,系统通过 多级反馈控制主动减缓上游生产速率,避免内存溢出、数据丢失和系统崩溃。

  • KEDA(Kubernetes Event-driven Autoscaling),是一种基于事件驱动的自动扩容解决方案,支持通过外部事件源动态调整 Pod 副本数;

5智能体:利用 Pulsar 轻量化主题(non-persistent)解决 AI 应用的异步通信难题

模型迭代日新月异,企业正在积极把 AI 能力嵌入业务流程。然而,企业应用从调用传统微服务应用 API 接口 到 调用大模型“生成式”的 API 接口过程中,一个显著的特征是任务处理时耗变的很长,传统微服务应用通常能实现毫秒级响应,而 AI 应用的处理周期跨度极大——从几分钟到数小时不等;

这就意味着原本微服务间的同步调用就不再适用,可将同步调用改为 异步通知来解决长耗时的阻塞;改为异步通知后,那又如何能实现同步调用的即时通信呐,可以采取以下模型:

  • Agent1 在启动时注册一个专属于自己的用于接收回包的非持久化 Topic(non-persistent Topic),非持久化 topic 非常轻量化,数据不落盘存储,生命周期可由 TTL 自动 或人工回收,Agent1 可使用独占消费模型进行消费该 topic

  • 当 Agent1 有长耗时的调用模型请求时,向正常 topic 发送请求,并由模型处理模块处理;该 Topic 为常规 topic,具备消息持久化、消息回放、海量积压等队列特性

  • 当 LLM 处理模块完成后,根据请求包中的回包地址进行回包投递

基于此模型,可以利用 Pulsar 的 Persistent-topic,将长时耗任务进行异步化处理,利用 pulsar 的高可用、低延时的特性来保障请求任务的可靠、解耦和削峰填谷;又可以利用 pulsar 的 Non-Persistent-topic 的轻量化,实现百万级创建,快速回收等能力

Non-persistent Topic:是 pulsar 的一种 topic 类型,是“不落盘、纯内存” 的消息通道——数据不会写入磁盘、不会做副本复制,Broker 宕机或进程重启即丢失,因此极致轻量、低延迟,适合“可丢、可重试、要快、要大量”的短时消息场景。

6智能体:Pulsar 可为事件驱动的智能体提供“新基建”

AI Agent 的概念正在经历一场深刻的变革,从简单的对话式 AI(Chatbot)向复杂的独立实体转变。AI Agent 就是将一个大模型(大脑)和一系列工具(感官与四肢)组装起来,形成的一个能够感知和改变外部环境的智能程序。

以创建一个营销 Agent 为例,采用 ReAct 的模型,Agent 可能首先从 CRM 中提取客户数据,使用 API 收集市场趋势,并在新信息出现时不断调整策略。通过通过记忆保留上下文并迭代查询,Agent 能够生成更准确、更相关的输出。

当外部接口越来越丰富,Agent 需要不断的扩展收集信息来源,包括其他 Agent、工具和外部系统等等,以便做出更精准的决策。

而这,从系统架构设计的角度上讲,就是一个 分布式系统问题。这和微服务时代面临的挑战相似,因为在微服务中,各个组件必须高效地进行通信,而不产生瓶颈或僵化的依赖关系。也和微服务架构系统一样,它们的输出不仅仅应该回流到 AI 应用程序中,它们还应该流入其他关键系统,如数据仓库、CRM、CDP 和客户成功平台。所以完全可以将 Agent 理解为: 有“大脑”的微服务

从微服务的架构演进来看, Agent 的未来是事件驱动的,事件驱动的架构需要一个高效的消息中间件作为“基建”,因为消息中间的特性可以很好的匹配事件驱动需要的横向扩展性、低延迟、松耦合、事件持久化等诉求

Pulsar 除了以上在消息中间件的优势外,还提供了 Function Mesh 的能力,利用 Function 的能力可以更近一步简化 AI Agent 的架构:

  • ReAct 模式:ReAct(Reasoning and Action)是目前应用最广泛、最经典的 AI Agent 运行模式之一 。其核心思想是模拟人类解决复杂问题的过程,通过一个 “思考(Thought)→ 行动(Action)→ 观察(Observation)”的循环来逐步推进任务 。

  • Pulsar Function :Pulsar 提供的轻量级、Serverless 流处理框架,定位是“用写普通函数的代码量,完成 ETL、过滤、聚合、打标签等实时计算”。它把“消息 → 计算 → 消息”的闭环直接跑在 Pulsar 集群内部,简单场景不需要额外部署 Flink、Storm 等重型流处理引擎。

7智能体:具身智能需要“传感器流 + 任务队列”

在具身智能的场景中,既需要处理传感器读数流(连续、有序的数据),也需要处理独立的指令或任务(这些任务需要独立处理);

例如:一个机器人 agent 在处理任务时,首先机器人的视觉或遥测传感器持续发布事件流,这些事件需要按顺序来处理或者来理解当前所处环境的变化;然后当机器人的 AI 决定采取行动,例如“拾取物体”或“导航到位置”时,这些任务会被添加到工作队列中。这些行动消息可能需要多个执行器模块(消费者)会分担这些任务。每个任务消息会被分配给一个执行器,执行器在完成任务后会进行确认。如果任务失败,执行器可以发送负向确认(表示失败),然后另一个实例可以重试。

我们回顾上述的过程,虽然都是利用消息管道进行消息传递,但是这是两种不同的数据类型:

  • 类似传感器流,生产者将数据追加到一个无界、有序的日志(即流)中。消费者随后按顺序从这个日志中读取数据,并维护流中的偏移量(offset)。每个分区内的顺序是有保障的,消息在消费时不会被移除,这就是 kafka 专注的 stream(流)场景,它提供了高吞吐量和分区的严格排序,这使得它非常适合处理有序的事件流。

  • 类似任务消息,生产者将消息发送到队列,每条消息只由一个消费者处理(即使有多个消费者在监听)。消费者从队列中拉取消息,并在处理完成后确认每条消息,消息随后会从队列中移除。队列擅长分发可以并行处理且无需全局排序要求的任务或工作。这就是 rabbitmq、rocketmq 专注的 queues(消息)场景,专注于每个消费者只处理一条消息,并具备消息重试和死信队列等能力。

参考

https://huggingface.co/spaces/nanotron/ultrascale-playbook

https://www.linkedin.com/pulse/kafkas-role-powering-next-wave-event-driven-agentic-ai-jeyaraman-xq0kc

https://streamnative.io/blog/streams-vs-queues-why-your-agents-need-both--and-why-pulsar-protocol-delivers

https://dzone.com/articles/agentic-ai-using-apache-kafka-as-event-broker-with-agent2agent-protocol

相关内容

热门资讯

百千万·山海间|五口人,50年 全国每5个柚子,就有1个来自广东梅州。梅州梅县区雁洋镇的南福村是一个种柚大村。最近收获季刚过,柚农黄...
受益于“AI+黄金”,中国矿业...   炒股就看金麒麟分析师研报,权威,专业,及时,全面,助您挖掘潜力主题机会! (来源:泡财经)12...
中电电机(603988.SH)... 中电电机(603988.SH)发布公告,根据公司的业务特点和股东情况,结合国家和相关地方对“十五五”...
七年来首次!三星、海力士毛利率... 《科创板日报》12月22日讯 美光之后,存储另外两家龙头三星电子和SK海力士将交出财报成绩单。在存储...
我们的“十四五”丨贵州:残疾人...   12月22日,记者从省政府新闻办召开的新闻发布会上获悉,贵州立足实际,推动残健共融,倡导平等、融...