1.背景
对于一个生产企业来说,有几个核心的系统必不可少:ERP(企业资源计划)、MES(制造执行系统)、PCS(过程控制系统)。这三大系统构成了生产制造过程中的计划排产、生产制造、过程控制,三者独立存在,但又相互依赖。
它们之间的依赖关系如下:
- ERP向MES输送生产计划
- MES向PCS发送生产指令
- PCS向MES反馈指令执行结果
- MES向ERP反馈完工状态
假设这是一个最简单的生产制造体系,三个系统,四个调用,由于足够简单,我们完全可以通过API的方式来实现这些系统之间的数据交换。
但实际生产制造所涉及的系统要多不少,比如WMS(仓储管理系统)、PLM(产品生命周期管理)、SCM(供应链管理)、CRM(客户关系管理)、HR(人力资源)等等。这么多的系统之间如果都要互相交换数据,那么有没有一种比较简洁、依赖更小、扩展性更好的方式呢?答案是肯定的,我们先来讨论几种数据交换方式。
2.数据交换方式
2.1基于数据库
数据一般都是存储在数据库中的,那么两个系统如果想要交换数据,那么最直接的方式可能就是直接读取数据库了。比如A系统有产品数据,B系统有设备数据,A系统想要设备数据就直接去B系统的设备表里取,同样的,B系统想要产品数据就直接去A系统的产品表里取。
这样的做法确实很直接,但这种方式的问题也不少:
- 虽然可以控制表的访问权限,但无法精确到某个字段
- 相互过度依赖,自己系统调整业务表结果,还需要通知其它系统
- 无法实时感知数据的变化,需要定期全量更新到自己的数据库
- 对接的系统多了之后,对原系统的数据库访问会造成很大的压力
- 对性能的优化有限,无法在应用层做数据缓存
- 不同系统所用的数据库可能各不相同,需要适配各种类型的数据库
这些问题不仅对于提供数据库的系统不利,还对需要访问数据库的系统不友好,因此我们绝大多数情况下不会选择基于数据库的数据交换方式。
2.2基于API
基于API的数据交换方式要求每个订阅数据的系统,需要按数据提供方要求的方式定义API。比如A系统有产品数据,B系统和C系统需要产品数据,那么它们就需要按A系统的要求,定义产品新增、产品更新、产品删除等API,当A系统有产品新增、更新、删除时,同时调用B系统和C系统事先定义好的API通知对方,这种方式解决了很多基于数据库进行数据交换的问题:
- 对于不想让其它系统知道的字段可以选择不输出
- 自己系统的业务表结构调整时,保证API格式不变即可将影响降到最低
- 数据实时变化时通知其它系统
- 可以在应用层对数据做缓存,以降低对数据库的压力
- 不同系统之间不再关心数据库的类型,API的定义决定了系统之间的交互方式
许多问题通过基于API的数据交换方式都得到了很好的解决,但是当系统达到一定规模之后,API的调用就变成了一张错综复杂的网,原来简洁的方式突然变得复杂且不利于维护和扩展了,因此我们需要一种更加解耦的方式来处理各个系统之间的数据交换。
2.3基于实体+消息
ERP有订单数据,MES有物料清单、工艺数据,PCS有设备数据,WMS有仓储数据,PLM有产品数据,SCM有物流数据,CRM有客户数据,HR有员工信息,对于这样的跨系统数据共享的问题,我们首先需要定义主数据,如:
- 订单主数据
- 物料主数据
- 工艺主数据
- 设备主数据
- 库存主数据
- 产品主数据
- 物流主数据
- 客户主数据
- 员工主数据
- 组织架构主数据
有了主数据之后,我们将这份主数据存储在一个公共区域,由所有系统一同来维护和共享,我们给它一个定义——实体。
当实体发生变更时,由引起该变更的系统进行通知,消息会发送到订阅该主数据的所有系统,收到消息的系统去实体数据库获取数据并更新自己的业务系统。
通过这种实体+消息的方式,使得系统之间的数据交换彻底解耦,引起变更的系统仅维护实体和发送消息,并不直接和订阅该实体的系统进行交互。这种方式就能到达简洁、无依赖、扩展性好的目的。
3.实体
3.1主数据
3.2MongoDB
我们使用高性能、分布式文档数据库来存储实体。
4.消息
4.1消息队列
在基于实体+消息的数据交换方式中,我们实际是通过消息队列来解耦各个系统的。除了降低各个系统之间的耦合,消息队列的引入,使得系统之间的数据交换方式异步化,即发送消息的系统不需要一直等待接收消息的系统接收完消息,而是发送完消息随即就能去处理其它事情了,消息的可靠性由消息队列负责。
在这样一个系统中有两个主要的角色:
- 生产者:并将数据发送到消息队列
- 消费者:从自己关心的消息队列中获取数据
实际在我们的系统中,一个系统可能即是生产者,又是消费者,只是生产和消费的数据不同,但两者都不与其它生产者或消费者直接相连,因此即使系统多了之后也不会相互产生耦合,系统的上线、下线都不会影响到其它系统,我们只要保证消息队列是可靠的就行了。
对于消息队列,我们有许多选择,如ActiveMQ、RabbitMQ、ZeroMQ、MetaMQ、RocketMQ、Kafka等,从时效性、可用性、功能性,我们选择RabbitMQ作为消息队列组件。
4.2RabbitMQ
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。
RabbitMQ的优势
- 可靠性:使用持久化、传输确认、发布确认等机制来保证可靠性
- 多种消息模式:支持多种消息模式,包括简单模式、工作模式、发布/订阅模式和主题模式,以满足不同的业务需求
- 高并发:能够处理大量的并发消息,适用于高并发场景和大规模的消息处理
- 水平扩展:将多个节点组成集群,可以实现水平扩展和负载均衡,以提高消息处理的吞吐量和可靠性
- 多种协议支持:AMQP、MQTT、STOMP等
- 丰富的客户端:Java、Python、Go等,几乎支持所有常用语言
- 管理界面:简单易用的用户界面,用于监控和管理消息
RabbitMQ的消息模式
简单模式:一个生产者一个消费者绑定一个队列
工作模式:一个生产者多个消费者消费同一个队列
点对点模式:一个生产者多个消费者,通过exchange及routingkey绑定特定的queue
发布/订阅模式:一个生产者多个消费者,通过exchange绑定多个queue
主题模式:一个生产者多个消费者,通过exchange绑定多个queue,同时增加topic通配符*和#
4.3消息格式
由于我们多个系统之间需要交换数据,一份数据可能会被多个系统订阅,为了增加系统的扩展性,我们选择主题模式。
订单消息
routing key:{系统ID}.orders
示例:erp.orders
说明:表示ERP发出的订单消息
字段 | 说明 |
---|---|
order_no | 订单编号 |
operation | 操作(create:创建、update:更新、delete:删除) |
msg_id | 消息id(UUID) |
msg_time | 消息时间 |
示例
{
"order_no": "D0020230101230851",
"operation": "create",
"msg_id": "7eab737960b14130a6856158133a0789",
"msg_time": "2023-01-01 09:00:00"
}
物料消息
routing key:{系统ID}.materials
示例:mes.materials
说明:表示MES发出的物料消息
字段 | 说明 |
---|---|
material_no | 物料编号 |
operation | 操作(create:创建、update:更新、delete:删除) |
msg_id | 消息id(UUID) |
msg_time | 消息时间 示例 |
示例
{
"order_no": "M00237730",
"operation": "update",
"msg_id": "a4a0d700565b465eafe40247279106a9",
"msg_time": "2023-01-10 20:00:00"
}
其它消息
格式类似,其中系统ID,会有统一的定义,并且全局唯一;routing key也有一定的规范。
5.规范
5.1系统
系统ID | 系统名称 |
---|---|
erp | 企业资源计划 |
mes | 制造执行系统 |
wms | 仓储管理系统 |
plm | 产品生命周期管理 |
scm | 供应链管理 |
crm | 客户关系管理 |
hr | 人力资源 |
5.2消息队列
交换机
名称 | 说明 |
---|---|
topic_orders_exchange | Topic订单交换机 |
topic_materials_exchange | Topic物料交换机 |
topic_device_exchange | Topic设备交换机 |
topic_products_exchange | Topic产品交换机 |
topic_employees_exchange | Topic员工交换机 |
队列
每个系统都可以定义自己的队列
名称 | 说明 |
---|---|
{系统ID}_orders_queue | 订单队列 |
{系统ID}_materials_queue | 物料队列 |
{系统ID}_device_queue | 设备队列 |
{系统ID}_products_queue | 产品队列 |
{系统ID}_employees_queue | 员工队列 |
绑定
每个系统都可以订阅自己关心的key
交换机 | 队列 | binding key |
---|---|---|
topic_orders_exchange | {系统ID}_orders_queue | *.orders |
topic_materials_exchange | {系统ID}_materials_queue | *.materials |
topic_device_exchange | {系统ID}_device_queue | *.devices |
topic_products_exchange | {系统ID}_products_queue | *.products |
topic_employees_exchange | {系统ID}_employees_queue | *.employees |
5.3流程
当源系统有数据创建、更新、删除时,会发送消息,订阅了该消息的目标系统会收到消息并从实体中获取数据,流程如下:
- 【源系统】在实体库中创建/更新/删除实体信息
- 【源系统】推送创建/更新/删除消息
- 【目标系统】接收消息
- 【目标系统】从实体库中获取实体信息
6.扩展
6.1新增订阅
当有新系统想要订阅实体时,流程如下:
- 定义系统ID
- 创建队列
- 绑定交换机
示例:
# 新系统:数据分析系统
# 订阅实体:订单实体、产品实体
1.定义系统ID:bi
2.创建队列:
bi_orders_queue
bi_products_queue
3.绑定交换机:
bi_orders_queue 绑定 topic_orders_exchange
bi_products_queue 绑定 topic_products_exchange
# 完成后,之后其它系统发送的订单消息和产品消息都会推送到数据分析系统
6.2取消订阅
如果想取消订阅,只需要将队列解绑对应的交换机即可。