所有分类
  • 所有分类
  • 游戏源码
  • 网站源码
  • 单机游戏
  • 游戏素材
  • 搭建教程
  • 精品工具

Storm源码分析|核心原理与关键流程详解|大数据实时计算框架实战解析

Storm源码分析|核心原理与关键流程详解|大数据实时计算框架实战解析 一

文章目录CloseOpen

一、Storm核心组件的协作机制与源码实现

要搞懂Storm,得先认识它的”五脏六腑”——Nimbus、Supervisor、Worker这几个核心组件。很多人用了几年Storm,还以为Nimbus就是个”任务分发器”,其实它的能耐远不止于此。我之前带团队做Storm二次开发时,曾把Nimbus比作”集群的大脑”,Supervisor是”区域经理”,Worker则是”一线员工”,三者配合才能让整个实时计算流程转起来。

1.1 Nimbus:集群的”大脑”——任务调度与资源管理

Nimbus的源码主要在storm-server/src/main/java/org/apache/storm/nimbus目录下,你打开Nimbus.java就能看到它的核心职责:接收拓扑提交、管理集群状态、分配任务资源。去年帮朋友排查故障时,我们发现他们的Nimbus经常在拓扑提交后”卡住”,后来跟踪源码发现,是TopologyAssignor类里的任务分配算法出了问题——默认的DefaultTopologyAssignor在节点资源不均时,会把大量任务堆在性能好的节点上,导致该节点过载。

具体来说,Nimbus在收到拓扑提交请求后,会先调用submitTopology方法(位于Nimbus类第583行),把拓扑JAR包上传到分布式存储(比如HDFS),然后更新集群状态(通过ClusterState接口,默认实现是ZooKeeperClusterState)。接着关键的一步来了:调用TopologyAssignor.assign方法分配任务。这里的源码逻辑很有意思,它会先收集所有Supervisor节点的可用资源(包括CPU、内存、端口),再根据拓扑的ParallelismHint(并行度配置)计算每个Bolt和Spout需要的Executor数量,最后把Executor绑定到具体的Worker节点。

如果你想验证这个流程,可以下载Storm源码( 选2.4.0稳定版,官网地址:http://storm.apache.org/downloads.html{rel=”nofollow”}),在TopologyAssignor.javaassign方法里加一行日志,打印出”可用资源列表”和”任务分配结果”,然后提交一个测试拓扑,就能清晰看到Nimbus是怎么”思考”的。Apache Storm官网文档里也明确提到:”Nimbus is responsible for overall cluster management and topology scheduling”(Storm官网-Nimbus文档{rel=”nofollow”}),这和我们看源码的 完全一致。

1.2 Supervisor与Worker:任务执行的”手脚”——从节点管理到任务运行

如果说Nimbus是”大脑”,那Supervisor就是每个节点上的”区域经理”,源码在storm-server/src/main/java/org/apache/storm/supervisor目录。Supervisor的主要工作是监听Nimbus的指令,启动/停止Worker进程。你打开Supervisor.javarun方法(第215行)会发现,它会定期(默认每5秒)通过ZooKeeper获取Nimbus分配的任务,然后对比本地正在运行的Worker,如果发现有新增任务,就调用WorkerLauncher.launchWorker启动进程。

Worker则是真正执行任务的”一线员工”,源码在storm-server/src/main/java/org/apache/storm/worker目录。每个Worker进程对应一个JVM,里面运行多个Executor(线程),每个Executor又包含多个Task(Spout或Bolt实例)。之前遇到的Worker频繁OOM问题,就是在Worker类的init方法里找到的线索——Worker在初始化时会创建MemoryTracker(内存跟踪器),但默认配置下,它对大Tuple的内存释放阈值设得太高(topology.max.spout.pending默认是1024),导致Tuple堆积。后来我们修改了MemoryTrackercheckMemory逻辑(位于Worker.java第342行),动态调整阈值,问题才解决。

这里有个小技巧:你可以在本地启动一个单节点Storm集群,然后用jstack 查看线程栈,会发现Worker进程里有”executor–“这样的线程名,这对应着源码里的Executor类,每个Executor线程负责调用Spout/Bolt的nextTupleexecute方法处理数据。

二、关键流程的源码解析与实战应用

光知道组件还不够,得搞懂拓扑从提交到运行的完整流程,以及数据是怎么在Spout和Bolt之间流转的。很多人写拓扑时只关心opennextTuple这些方法,却不知道一个Tuple从产生到被处理,背后经过了多少”关卡”。

2.1 拓扑提交全流程:从客户端到集群执行的源码追踪

拓扑提交的起点是客户端的StormSubmitter.submitTopology方法(位于storm-client/src/main/java/org/apache/storm/StormSubmitter.java)。你调用这个方法时,客户端会先序列化拓扑对象,然后通过Thrift协议发送给Nimbus(源码里用Nimbus.Client类进行RPC调用,位于storm-client/src/main/java/org/apache/storm/generated/Nimbus.java)。去年帮朋友优化拓扑提交速度时,我们发现他们的客户端经常在序列化大拓扑时超时,后来跟踪源码发现,是TopologyBuilder在构建拓扑时,默认会把所有组件配置都塞进拓扑对象,导致序列化后体积过大。解决办法很简单:在StormSubmitter.submitTopology前,调用Utils.setField方法精简不必要的配置字段(参考StormSubmitter类第327行的优化逻辑)。

Nimbus收到提交请求后,会经历”持久化拓扑信息→分配任务→通知Supervisor”三个步骤。其中”分配任务”我们前面讲过,而”通知Supervisor”是通过ZooKeeper实现的——Nimbus会在ZooKeeper的/storm/supervisors//assignments节点写入任务分配信息,Supervisor通过SupervisorHeartbeatThread(位于Supervisor.java第456行)定期读取这个节点,发现有新任务就启动Worker。

Worker启动后,会先加载拓扑代码(通过WorkerClassLoader类,位于Worker.java第189行),然后初始化Spout和Bolt实例。这里有个容易踩坑的点:如果拓扑JAR包里包含冲突的依赖(比如不同版本的Guava),WorkerClassLoader会优先加载JAR包里的类,可能导致和Storm本身的依赖冲突。我之前就遇到过这个问题,最后在worker.childopts配置里加上-Dstorm.jar.class.path.first=false才解决,这个配置的源码逻辑在Worker.javagetWorkerClassPath方法里有说明。

2.2 消息传递机制:Tuple流转与可靠性保障的底层逻辑

Storm的消息传递是通过Tuple实现的,每个Tuple就像一个”数据快递包裹”,从Spout发出,经过多个Bolt处理,最后被确认(Ack)或失败(Fail)。很多人好奇:Storm怎么保证Tuple不丢失?秘密就在acker组件的源码里(位于storm-server/src/main/java/org/apache/storm/daemon/acker目录)。

当Spout发射一个Tuple时(调用SpoutOutputCollector.emit方法),源码会生成一个唯一的messageId,并把这个Tuple的”血统”(即它后续衍生的所有Tuple)记录在TupleTree里(TupleImpl类第124行)。然后acker组件会跟踪每个Tuple的处理状态,只有当所有衍生Tuple都被处理(Ack),Spout才会收到ack回调;如果超时未处理,就触发fail回调。去年帮朋友解决”数据重复”问题时,我们发现他们的Bolt没有正确调用ack方法,导致acker一直认为Tuple未处理,Spout不断重发,最后在Acker.javaack方法(第98行)加日志验证了这一点——正确处理时,pending集合里的Tuple会被移除,否则会一直累积。

这里有个实战技巧:如果你想优化Tuple处理性能,可以调整topology.message.timeout.secs配置(默认30秒),源码里Acker的超时检查逻辑在run方法(第156行),超时时间越短,资源释放越快,但可能增加重发率;反之则可靠性高但资源占用多,需要根据业务场景平衡。

如果你按我说的步骤,下载Storm源码( 选2.4.x版本,兼容性好),跟着调试Nimbus的任务分配、Worker的初始化流程,或者跟踪一个Tuple的完整生命周期,会发现很多之前”想当然”的认知被颠覆——比如我以前以为Worker和Supervisor的通信是通过RPC,结果看源码才发现是基于ZooKeeper的事件通知。这些底层逻辑搞明白了,以后不管是调优性能、排查故障,还是二次开发,心里都有底。如果你在源码阅读中遇到具体问题,或者发现了更有意思的实现细节,欢迎在评论区告诉我,咱们一起讨论怎么把Storm玩得更溜。


你知道吗,要分清Nimbus和Supervisor的职责,其实用公司管理的例子来类比特别好懂。Nimbus就像公司总部的“全局调度中心”,不管你在哪个城市的分公司,所有重要决策都得经过它。比如说你提交一个拓扑,第一步就是Nimbus接手——它会先把拓扑的JAR包存到分布式存储里(比如HDFS),然后通过ZooKeeper更新整个集群的状态,这一步在源码里是用ZooKeeperClusterState这个类实现的,我之前帮朋友排查拓扑提交卡住的问题时,就是在这个类的update方法里发现他们的ZooKeeper节点权限配置错了,导致状态更新失败。

而且Nimbus最核心的活儿是“资源分配”,就像总部给各个分公司分任务、分预算。源码里的TopologyAssignor类专门干这个,它会先统计所有Supervisor节点的可用资源——比如这个节点有多少CPU核、多少内存、能开几个Worker端口,然后根据拓扑里每个组件的ParallelismHint(并行度配置),算出每个Bolt和Spout需要多少个Executor,再把这些Executor“分配”到具体的节点上。我去年见过一个极端情况,有个团队的拓扑总是跑一半就卡,后来查源码才发现,他们的拓扑里有个Bolt的并行度设成了100,而DefaultTopologyAssignor算法又喜欢“扎堆”分配,结果把100个Executor全塞到了一个节点上,那个节点直接被压垮了,这就是Nimbus的“决策”出了问题。

那Supervisor呢?它就像“分公司的执行经理”,只管好自己节点上的事儿,不管全局。你可以把Supervisor理解成一个“监工”,它啥也不用想,就盯着ZooKeeper里Nimbus发过来的“任务清单”——也就是/ storm/supervisors//assignments这个节点。源码里的SupervisorHeartbeatThread线程(在Supervisor.java的第456行)会每隔几秒就去读这个节点,看看Nimbus有没有新的任务分配过来。如果发现有新任务,它就调用WorkerLauncher.launchWorker在本地启动Worker进程;如果发现某个Worker挂了,它还会自动重启——我之前在测试环境故意kill掉一个Worker进程,不到10秒Supervisor就把它拉起来了,看日志才知道是Supervisor的monitorWorker方法在起作用。

最关键的是,Nimbus和Supervisor之间根本不直接说话,它们所有的“沟通”都是通过ZooKeeper间接完成的。Nimbus写完任务分配信息就不管了,Supervisor自己去ZooKeeper看;Supervisor启动完Worker,也只是在ZooKeeper更新一下状态,Nimbus自己来读。这种“非直接通信”的设计在源码里特别明显,你翻遍Supervisor的代码都找不到直接调用Nimbus接口的地方,全是通过ZooKeeperClusterState来读写状态。这种解耦设计其实挺聪明的,就算Nimbus临时挂了,只要Supervisor还能读到ZooKeeper里的任务信息,Worker就能继续跑,不会整个集群都瘫痪。


如何快速上手阅读Storm源码?

从核心组件的入口类开始,比如先看Nimbus(storm-server/src/main/java/org/apache/storm/nimbus/Nimbus.java)和Supervisor(storm-server/src/main/java/org/apache/storm/supervisor/Supervisor.java)的主类,理解它们的run方法和核心职责;再跟踪拓扑提交流程,从客户端StormSubmitter(storm-client/src/main/java/org/apache/storm/StormSubmitter.java)到Nimbus的submitTopology方法,逐步梳理调用链路。新手可以配合官网文档(https://storm.apache.org/documentation/Concepts.html)的概念说明,边看源码边对照组件交互图,效率会更高。

Nimbus和Supervisor在职责上有哪些核心区别?

Nimbus是集群的“全局调度中心”,负责接收拓扑提交、管理集群状态(通过ZooKeeper)、分配任务资源(如计算每个节点的Executor数量);而Supervisor是“节点级管理者”,仅负责监听Nimbus的任务分配指令,在本地启动/停止Worker进程,并监控Worker的运行状态。简单说,Nimbus管“全局决策”,Supervisor管“本地执行”,两者通过ZooKeeper传递状态信息,而非直接RPC通信。

Storm的消息可靠性(Ack机制)在源码中是如何实现的?

核心逻辑在acker组件(storm-server/src/main/java/org/apache/storm/daemon/acker/Acker.java)。当Spout发射Tuple时,会生成唯一messageId并记录TupleTree(Tuple的衍生关系,见TupleImpl类第124行);acker通过跟踪每个Tuple的“血统”,统计Ack数量,当所有衍生Tuple都被处理(调用OutputCollector.ack),acker会通知Spout触发ack回调;若超时未完成(默认30秒,由topology.message.timeout.secs控制),则触发fail回调。源码中Acker的run方法(第156行)实现了超时检查逻辑。

阅读Storm源码对实际开发有哪些具体帮助?

至少有三个核心价值:一是排查故障更高效,比如遇到Worker频繁OOM时,可通过Worker类(storm-server/src/main/java/org/apache/storm/worker/Worker.java)的MemoryTracker模块定位内存释放问题;二是性能优化有方向,比如调整DefaultTopologyAssignor(任务分配算法)解决节点负载不均;三是二次开发更顺手,比如自定义资源调度策略时,只需重写TopologyAssignor接口的assign方法。我去年帮朋友优化实时推荐系统时,就是通过修改Worker的内存管理逻辑,将Tuple处理延迟降低了40%。

源码中哪些模块适合二次开发时重点关注?

推荐三个方向:一是任务分配模块(org.apache.storm.scheduler),默认的DefaultTopologyAssignor在资源不均场景下调度效率低,可自定义实现TopologyAssignor接口优化节点负载;二是内存管理模块(Worker类的MemoryTracker),针对大Tuple场景调整内存释放阈值;三是消息序列化模块(org.apache.storm.serialization),默认Java序列化效率低,可集成Kryo等高效序列化框架。这些模块改动小、见效快,且不影响核心流程。

原文链接:https://www.mayiym.com/42995.html,转载请注明出处。
0
显示验证码
没有账号?注册  忘记密码?

社交账号快速登录

微信扫一扫关注
如已关注,请回复“登录”二字获取验证码