关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
智能制造云办公系统 [SpringBoot2 – 快速开发平台],适用于制造业、建筑业、汽车行业、互联网、教育、政府机关等机构的管理。包含文件在线操作、工作日志、多班次考勤、CRM、ERP 进销存、项目管理、EHR、拖拽式生成问卷、日程、笔记、工作计划、行政办公、薪资模块、动态表单、知识库、公告模块、企业论坛、云售后模块、生产模块、系统模块化同步模块等多种复杂业务功能。
智能制造云办公系统 skyeye v3.7.23 发布 ,更新内容如下:
- 菜单权限管理新增数据权限控制,支持多个分组设定
- 角色权限分配新增数据权限分配,支持不同的人同一个页面查看到不同的数据
- 跟单信息删除 create_name 字段
- CRM列表查询整改,删除我创建的客户信息,删除我负责的客户信息
- 表格插件升级
- PC端管理APP菜单功能整改,按照模块新增所属模块属性
erp: https://gitee.com/doc_wei01/erp-pro
OA: https://gitee.com/doc_wei01/skyeye
报表:https://gitee.com/doc_wei01/skyeye-report
企业版信息: https://sigusoft.com/doc/DQlRxcVRMWWVjbU1i?_from=1&disableReturnList=1 ,有问题可以联系作者,详情请看开发计划。
效果图
Eurynome Cloud 是一款企业级微服务架构和服务能力开发平台。首个全面拥抱 Spring Authorization Server 的版本,基于Spring Boot 2.7.2、Spring Cloud 2021.0.3、Spring Cloud Alibaba 2021.0.1.0、 Spring Authorization Server 0.3.1、Nacos 2.1.0 等最新版本开发,遵循SpringBoot编程思想,高度模块化和可配置化。具备服务发现、配置、熔断、限流、降级、监控、多级缓存、分布式事务、工作流等功能
平台定位
- 构建成熟的、完善的、全面的,基于 OAuth2 的、前后端分离的微服务架构解决方案。
- 面向企业级应用和互联网应用设计开发,既兼顾传统项目的微服务化,又满足互联网应用开发建设、快速迭代的使用需求。
- 平台架构使用微服务领域及周边相关的各类新兴技术或主流技术进行建设,是帮助快速跨越架构技术选型、研究探索阶段的利器。
- 代码简洁规范、结构合理清晰,是新技术开发应用的典型的、综合性案例,助力开发人员对新兴技术的学习和掌握。
[1]、更新内容
- 主要更新
- Spring Boot 版本升级至 2.7.2
- Spring Boot Admin 版本升级至 2.7.3
- Vite 版本升级至 3.0.2
- 使用 JetCache 重构已有自定义 JPA 多级二级缓存代码,将系统中所有多级缓存统一为使用 JetCache。也为后续升级 JetCache 2.7.X,实现服务本地缓存自动同步更新功能做铺垫。
- 新增极简 CMDB 配置管理服务及前端管理功能,用于管理应用系统、服务器、数据库等基础配置信息。
- 其它更新
- [修复] 修复用户密码过期状态判断条件使用错误,fix #I5I7ZZ
- [修复] 修复后端配置前端菜单角色信息,前端工程读取错误,导致菜单无法过滤问题。
- [新增] 优化数据库初始化脚本,增加菜单默认初始化数据以及测试用切换账号数据。
- [新增] 增加后端菜单数据节点排序功能,可通过 Ranking 字段进行配置。
- [优化] 优化服务间权限数据同步代码逻辑,删除不合理代码、简化同步实现逻辑。
- [删除] 删除 cache-sdk-layer 和 cache-layer-spring-boot-starter 两个模块
- 依赖更新
- redisson 版本升级至 3.17.5
- tencentcloud-sdk-java-sms 版本升级至 3.1.557
- alipay-sdk-java 版本升级至 4.31.72.ALL
- com.baidu.aip 版本升级至 4.16.10
[2]、界面预览
[3]、重要说明
由于 Spring Authorization Server 0.3.0 版本,使用 Java 11 进行代码编译。导致使用该版本在 Java 8 下代码已无法编译成功,所以必须要升级 JDK 版本。同时,考虑到 2022 年 11 月,Spring Boot 3 将会发布,最低版本要求 Java 17。因此,直接将 Java 版本升级至 17。Eurynome Cloud 2.7.0.20 ~ 2.7.0.50 均是采用 Java 17 编译运行,同时不兼容 Java 8。
不管是 Spring Authorization Server 还是本项目 Eurynome Cloud,各路网友均不主张在现阶段直接将 Java 升级 17,而是希望继续兼容 Java 8,在 Spring Boot 3 发布以后再统一升级为默认使用 Java 17
因此 Spring Authorization Server 0.3.1 版本,代码降级兼容了 Java 8。Eurynome Cloud 也同步进行了代码的降级兼容处理,以兼容 Java 8。经过验证,目前 Erurynom Cloud 在 Java 8、11、17 环境下均可以正常稳定运行
Spring Authorization Server 发布两个版本,Eurynome Cloud 使用的 Java 版本就跟着变,升到 Java 17 又跟着降回 Java 8,折腾一圈浪费功夫。看似折腾实则不然,经此一役,Eurynome Cloud 已经完全支持 Java 8 Java 11 和 Java 17,未来升级使用 Spring Boot 3 也不是问题。验证了那句话“用心认真走过的每条路都不会白走”
[4]、Eurynome Cloud 2.7.X 主要变化
-
基于 深度定制:
- 基于 ,重新构建 基础数据存储代码,替代原有 JDBC 数据访问方式,破除 原有数据存储局限,扩展为更符合实际应用的方式和设计。
- 基于 ,在 OAuth 2.1 规范基础之上,增加自定义“密码”认证模式,以兼容现有基于 OAuth 2 规范的、前后端分离的应用。
- 基于 ,在 OAuth 2.1 规范基础之上,增加自定义Social Credentials 认证模式,支持手机短信验证码、微信小程序、第三方应用登录。
- 遵照 以及 的代码规范,进行 OAuth2 认证服务器核心代码的开发,遵照其使用 Jackson 反序列化的方式, 增加大量自定义 Jackson Module。
- 支持 的标准的Token加密校验方式外,还了增加支持自定义证书的 Token 加密方式,可通过配置动态修改
- 支持 OAuth2 OIDC 认证模式,补充前端 OIDC 认证相关配置操作,以及对应的 /userinfo 接口调用支持 和 客户端注册支持
- 支持 OAuth2 Authorization Code PKCE 认证模式
- 扩展 默认的 模式,实现 Refresh Token 的创建。
- 扩展 默认的 模式,实现真正的使用 Scope 权限对接口进行验证。 增加客户端 Scope 的权限配置功能,并与已有的用户权限体系解耦
- 自定义 授权码模式登录认证页面和授权确认页面,授权码模式登录采用数据加密传输。支持多种验证码类型,暂不支持行为验证码。
-
代码结构的大规模调整和优化:
- 对原有代码进行了深度的“庖丁解牛”,严格遵照“单一职责”原则,根据各个组件的职责以及用途,将整个工程拆解细化为多个各自独立组件模块,在最大程度上降低代码间的耦合,也更容易聚焦和定位问题。
- 将通用化组件提取为独立工程,独立编译、按需选用,极大的降低系统主工程代码量。相关组件也已上传至 Maven 中央仓库,降低系统主工程工程代码编译耗时,改进和提升 CICD 效率,
- 原有主工程代码结构也进行了深化调整,代码分包更加合理,代码逻辑也更加清晰。
[5]、额外说明
- 本项目以后将主要维护 `Spring Authorization Server` 版本,原有基于 `Spring Security OAuth2` 的版本已经移至 spring-security-oauth2 分支,可以从该分支或发行版页面获取历史版本继续使用。后期会根据 ISSUE 以及使用用户反馈情况,再行决定是否继续维护 `Spring Security OAuth2` 版本。
- 基于 Vue3、Vite2、Vuetify3、Pinia 等新版前端已发布,原有基于 Vue2、Vuetify2、Typescript 开发的前端代码已移至 vue2+vuetify2+typescript 分支
前言
三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。
【重要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-消息存储(视频版) 查看【bilibili】
本文是“一张图进阶 RocketMQ”第 5 篇,对 RocketMQ 不了解的同学可以先看看前面 4 期:
- 一张图进阶 RocketMQ-整体架构
- 一张图进阶 RocketMQ – NameServer
- 一张图进阶 RocketMQ – 消息发送
- 一张图进阶 RocketMQ – 通信机制
前面两期我们主要分享了 RocketMQ 是如何将消息发送出去的,现在消息已经被 Netty 送上路了,接力棒已经交给了 Broker。如果我们自己来实现 Broker 会怎么实现呢?首先肯定得把消息存起来吧,不然宕机了,消息丢失了,那就离大谱了。
可是消息要以什么结构存储呢?二进制、JSON、PB?从功能上来看肯定都是可以的,那 RocketMQ 到底是怎么搞的?
解决了存储结构问题,那消息存到哪里呢?数据库,本地文件,还是对象存储服务器?从功能的角度肯定也都是可以的。可是,哪家数据库可以支持单机十万级吞吐量?那我直接统统存到数据库得了,瞎折腾些啥。难道存在本地文件就可以了?我们自己实现不可以,但是 RocketMQ 可以,那 RocketMQ 有什么黑科技呢?
所以我们今天就来聊一聊 Broker 如何存储消息,【首先明确我们的目标】我们需要先了解 RocketMQ 的存储结构,也就是消息是如何组织的。了解了存储结构,我们才能更好的理解存储流程,不然我们不知道为什么流程是这样的。最后我们需要了解有哪些机制支撑 RocketMQ 单机十万级吞吐量。
存储架构
消息在 Broker 上的存储结构如上图,所有相关文件放在 ROCKETMQ_HOME 下,有哪些文件呢?存放消息本身的 CommitLog,以及消息的索引文件 ConsumeQueue 和 IndexFile:
- CommitLog
从物理结构上来看,所有的消息都存储在 CommitLog 里面,其实就是所有的消息按照“消息在 CommitLog 各字段示意图”所示,挨个按顺序存储到文件中。
单个 CommitLog 文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量。比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=;当第一个文件写满了,第二个文件为 0000000000,起始偏移量为 ,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。CommitLog 顺序写,可以大大提高写入效率。
但是问题来了,消息发送的时候我们指定了 Topic,现在所有 Topic 都顺序个写入到 CommitLog,存入的时候是安逸了(顺序写),但是获取消息可就麻烦了。如果我要获取某个 Topic 的消息,需要遍历 commitlog 文件,根据 topic 过滤消息。CommitLog 这个渣男,只管自己爽。有什么办法可以提高消息查询效率呢?
- ConsumeQueue
我们再回忆一下,消息存入的时候是指定了 Topic,同时我们也说了每个 Topic 会对应多个 ConsumeQueue( queueId 标识)。关键就在 ConsumeQueue 上,ConsumeQueue 是指定 Topic 消息的索引文件,怎么理解呢?从“消息在 ConsumeQueue 各字段示意图”可知,每个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M。ConsumeQueue 文件可以看成是基于 topic 的 commitlog 索引文件。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。
因为 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog 和 ConsumeQueue 的一致性,CommitLog 里存储了 ConsumeQueues、Message Key、Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 CommitLog 完全恢复出来。 ConsumeQueue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
- IndexFile
IndexFile 是另一种可选索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法。 IndexFile 索引文件其底层实现为 hash 索引,类似于 Java 1.7 HashMap,计算 Key 的 hashcode,hashcode 取余得到 hash 槽,拉链法解决哈希冲突。Index 文件的存储位置是:$HOME storeindex${fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引。
所以,RocketMQ 消息存储架构主要有 CommitLog,ConsumeQueue,IndexFile 构成。我们发送一条消息,会先格式化成“消息在 CommitLog 各字段示意图”中的样子,顺序写入 CommitLog 中,然后 Broker 会按照 ”消息在 ConsumeQueue 各字段示意图“所示构建一条索引记录,存入该消息所属 Topic 的 ConsumeQueue 索引文件中。如果有 IndexFile,还会构建 IndexFile。
现在我们已经知道了 RocketMQ 消息的存储结构,接下来我们的就要了解 RocketMQ 是如何构建 CommitLog、ConsumeQueue 和 IndexFile,以及 RocketMQ 如何保证性能,支撑单机十万级吞吐量的?这是本文的主要目标,一定要抓住主要目标,不要走丢咯。
启动流程
了解了 RocketMQ 消息在磁盘中是怎么存储的,我们就可以来看看具体的存储流程了。首先,还是先来看看 Broker 的启动流程。初始化过程都是这个鸟样,只看初始化过程完全不知所云,但是不看初始化过程,直接看具体执行流程也是摸不着头脑,一堆组件不知道从哪里来的,所以我们还是先耐着性子大致看看。但这并不是我们关注的重点,注意几个关键点即可。
- 初始化启动环境。部署好 RocketMQ 后,执行/bin/mqbroker 脚本,主要用于设置 RocketMQ 目录环境变量,例如 ROCKETMQ_HOME 。然后调用 https://www.xujun.org/bin/runbroker.sh 进入 RocketMQ 的启动入口,主要设置了 JVM 启动参数,比如 JAVA_HOME、Xms、Xmx。执行 main 函数。
- 初始化 BrokerController。该初始化主要包含 RocketMQ 启动命令行参数解析、NettyRemotingServer 初始化、Broker 各个模块配置参数解析、Broker 各个模块初始化、进程关机 Hook 初始化等过程。
- 启动 RocketMQ 的各个组件。但是这些组件并不是每一个都是核心组件,部分组件会在后面的流程中使用,这里混个眼熟,如果后面流程没有提及的大家可以暂且跳过,我们的目标是把握 RocketMQ 的核心内容,而不是每个细节。
- MessageStore:存储层服务,比如 CommitLog、ConsumeQueue 存储管理,消息刷盘,索引构建等。
- RemotingServer:普通通道请求处理服务。一般的请求都是在这里被处理的。
- FastRemotingServer:VIP 通道请求处理服务。如果普通通道比较忙,那么可以使用 VIP 通道,一般作为客户端降级使用。
- BrokerOuterAPI:Broker 访问对外接口的封装对象。
- PullRequestHoldService:Pull 长轮询服务。
- ClientHousekeepingService:清理心跳超时的生产者、消费者、过滤服务器。
- FilterServerManager:过滤服务器管理。
存储流程
在前面 RocketMQ 存储结构中我们了解了 RocketMQ 将所有消息顺序写入 CommitLog,然后构建 ConsumeQueue/IndexFile 索引文件,所以这个小结我们主要的目标就是看看这些文件是如何构建的。
-
Broker 启动流程中很关键的一点是启动了 NettyRemotingServer,在 RocketMQ 通信机制(视频) 中我们介绍过 Broker(NettyRemotingServer) 初始化会监听端口等待客户端连接,当客户端发送请求的时,NettyRemotingServer WorkerGroup 处理可读事件,调用 NettyServerHandler.channelRead0() 处理数据。 接着调用链到 processRequestCommand 方法,这个方法主要是根据请求中的 RequestCode,从本地缓存 processorTable 中获取相应的 Processor 来执行后续逻辑。处理器是什么?处理器的缓存从哪里来?
Processor 就是用来处理特定请求的执行者,例如,生产者存入消息使用 SendMessageProcessor,查询消息使用 QueryMessageProcessor,拉取消息使用 PullMessageProcessor。在 Broker 启动流程中有一步是注册 Processor,以 RequestCode 为 Key ,Processor 为值,添加到 processorTable 缓存中。接着 RocketMQ 消息发送(视频) 流程来看,当生产者的请求达到 Broker,Broker 获取的 Processor 应为 SendMessageProcessor。封装一个 Runable 对象,run 方法内调用 SendMessageProcessor.processRequest ,提交到线程池,继续后面的处理。
-
SendMessageProcessor.processRequest 调用 sendMessage 方法,主要包含消息的校验及重试逻辑处理,然后调用存储模块 DefaultMessageStore 存储消息。 消息校验:校验 Broker 是否配置可写,校验 Topic 名字是否为默认值,获取或创建 topicConfig,判断 queueId 是否超过限制。 重试消息处理:消费者消费失败后会将消息发回给 Broker,这里我们暂且认为就是生产者发送的请求,先看下面的流程。
-
DefaultMessageStore.putMessage 只是做了很多的校验,简单看看即可。包括:如果当前 Broker 停止工作则拒绝消息写入、Broker 为 SLAVE 角色则拒绝消息写入、当前 RocketMQ 不支持写入则拒绝消息写入、主题长度超过 256 个字符则拒绝消息写入、消息属性长度超过 65536 个字符则拒绝消息写入、PageCache 忙则报错。然后调用 CommitLog.putMessage 存入消息。
-
看到这里应该稍微熟悉一些了,终于到我们期待已久的 CommitLog 出场了。主要是延迟消息处理,然后获取可以写入的 CommitLog 进行写入。 延迟消息处理:如果消息的延迟级别大于 0,将消息的原主题名称与原消息队列 ID 存入消息属性中,用延迟消息主题 SCHEDULE_TOPIC、消息队列 ID 更新原先消息的主题与队列,这是并发消息消费重试关键的一步。但不是这个本节的主要目标,后文会进一步分析。 关键点在如何获取可以写入的 CommitLog。存储结构小节里面有提到每个 CommitLog 默认大小 1G,写完一个文件,以偏移量命名创建下一个文件。每个 1G 大小 CommitLog 的在代码层面对应的是 MappedFile,而多个 MappedFiled 组成 MappedFileQueue。逻辑上的 CommitLog 通过持有 MappedFileQueue 管理多个 MappedFile。所以,获取可以写入的 CommitLog 也就是获取 MappedFileQueue 最后一个 MappedFile,为什么是最后一个,因为前面的已经写完了呀。来看看 RocketMQ 逻辑与物理存储的对应关系应该能够更直观的理解。
-
获取到最后一个 MappedFile 后,调用 MappedFile.appendMessage 将消息追加到该文件中。可是尽管是顺序写入,但是连小学生都知道写磁盘还是很慢,难道想这样支撑 RocketMQ 单机十万吞吐量?too young too simple!从逻辑存储结构和物理存储结构的映射关系来看,MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 文件读写的通道),通过 fileChannel 可以访问物理 CommitLog 文件,但是 RocketMQ 并没有直接使用 fileChannel,而是映射到一个 MappedByteBuffer,我们的目的就是把消息写入这个 ByteBuffer 中,进而写入 MappedFile 对应的 CommitLog 文件。为什么需要这样做,还有哪些细节,会在”文件内存映射“小结中为大家解答。
-
继续看流程,得到 MappedFile 对应的 ByteBuffer,我们需要将消息序列化,写入 ByteBuffer 中。
- 构建消息 id, createMessageId
- 获取该消息在消息队列的偏移量,CommitLog 中保存了当前所有消息队列的当前待写入偏移量。
- 判断是否是事务消息:这里主要处理 Prepared 类型和 Rollback 类型的消息,设置消息 queueOffset 为 0
- 计算消息总大小,calMsgLength。
- 判断文件的剩余空间,是否足够写入当条消息,如果不可以,则将文件末尾写入剩余空间大小+固定魔数;然后返回一个 END_OF_FILE 的结果
- 如果空间足够,这将这条消息写入之前得到的 MappedFile 的 ByteBuffer 中。
- 将各字段按照”消息在 CommitLog 各字段示意图“存入 Bytebuffer,然后返回 PUT_OK 结果
总结
以上就是今天 RocketMQ 消息存储的主要内容,消息只是写入到 CommitLog 对应的 ByteBuffer中,下一期就是我们重要的零拷贝即将登场。我们简单总结一下今天的内容:
- 要理解消息的存储流程需要先知道消息的存储结构:在物理上消息挨个顺序写入 CommitLog,为了提升消息查询效率需要构建消息的索引文件 ConsumeQueue/IndexFile;
- Broker 启动时进行参数解析,并初始化了 NettyRemotingServer,启动存储服务用于消息存储及索引构建等;
- Broker 收到消息存储请求,经过层层校验,获取 CommitLog 对应的 MappedFile,将消息写入MappedFile 对应的内存映射ByteBuffer;
以上就是今天全部的内容,如果觉得本期的内容对你有用的话记得点赞、关注、转发、收藏,这将是对我最大的支持。
如果你需要 RocketMQ 相关的所有资料,可以评论区留言,或者关注公众号:三此君。回复:mq,即可。
消息已经写入 ByteBuffer,写入 ByteBuffer 就可以了吗?那收到消息直接丢弃岂不是更好。消息要落在磁盘上才不会丢失,所以下一期我们要分享的就是消息的刷盘及索引构建,PageCache 及零拷贝也将闪亮登场。感谢观看,下期不见不散。 PhpStorm激活2022.1.4
参考文献
- RocketMQ 官方文档
- RocketMQ 源码
- 丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.
- 李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
- 杨开PhpStorm激活2022.1.4. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.
2024最新激活全家桶教程,稳定运行到2099年,请移步至置顶文章:https://sigusoft.com/99576.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。 文章由激活谷谷主-小谷整理,转载请注明出处:https://sigusoft.com/178413.html