基于实体+消息的跨系统数据交换

2023-10-22 16:33 杨晓琪 798

1.背景

对于一个生产企业来说,有几个核心的系统必不可少:ERP(企业资源计划)、MES(制造执行系统)、PCS(过程控制系统)。这三大系统构成了生产制造过程中的计划排产、生产制造、过程控制,三者独立存在,但又相互依赖。

它们之间的依赖关系如下:

  • ERP向MES输送生产计划
  • MES向PCS发送生产指令
  • PCS向MES反馈指令执行结果
  • MES向ERP反馈完工状态

假设这是一个最简单的生产制造体系,三个系统,四个调用,由于足够简单,我们完全可以通过API的方式来实现这些系统之间的数据交换。

但实际生产制造所涉及的系统要多不少,比如WMS(仓储管理系统)、PLM(产品生命周期管理)、SCM(供应链管理)、CRM(客户关系管理)、HR(人力资源)等等。这么多的系统之间如果都要互相交换数据,那么有没有一种比较简洁、依赖更小、扩展性更好的方式呢?答案是肯定的,我们先来讨论几种数据交换方式。


2.数据交换方式

2.1基于数据库

数据一般都是存储在数据库中的,那么两个系统如果想要交换数据,那么最直接的方式可能就是直接读取数据库了。比如A系统有产品数据,B系统有设备数据,A系统想要设备数据就直接去B系统的设备表里取,同样的,B系统想要产品数据就直接去A系统的产品表里取。

这样的做法确实很直接,但这种方式的问题也不少:

  • 虽然可以控制表的访问权限,但无法精确到某个字段
  • 相互过度依赖,自己系统调整业务表结果,还需要通知其它系统
  • 无法实时感知数据的变化,需要定期全量更新到自己的数据库
  • 对接的系统多了之后,对原系统的数据库访问会造成很大的压力
  • 对性能的优化有限,无法在应用层做数据缓存
  • 不同系统所用的数据库可能各不相同,需要适配各种类型的数据库

这些问题不仅对于提供数据库的系统不利,还对需要访问数据库的系统不友好,因此我们绝大多数情况下不会选择基于数据库的数据交换方式。

2.2基于API

基于API的数据交换方式要求每个订阅数据的系统,需要按数据提供方要求的方式定义API。比如A系统有产品数据,B系统和C系统需要产品数据,那么它们就需要按A系统的要求,定义产品新增、产品更新、产品删除等API,当A系统有产品新增、更新、删除时,同时调用B系统和C系统事先定义好的API通知对方,这种方式解决了很多基于数据库进行数据交换的问题:

  • 对于不想让其它系统知道的字段可以选择不输出
  • 自己系统的业务表结构调整时,保证API格式不变即可将影响降到最低
  • 数据实时变化时通知其它系统
  • 可以在应用层对数据做缓存,以降低对数据库的压力
  • 不同系统之间不再关心数据库的类型,API的定义决定了系统之间的交互方式

许多问题通过基于API的数据交换方式都得到了很好的解决,但是当系统达到一定规模之后,API的调用就变成了一张错综复杂的网,原来简洁的方式突然变得复杂且不利于维护和扩展了,因此我们需要一种更加解耦的方式来处理各个系统之间的数据交换。

2.3基于实体+消息

ERP有订单数据,MES有物料清单、工艺数据,PCS有设备数据,WMS有仓储数据,PLM有产品数据,SCM有物流数据,CRM有客户数据,HR有员工信息,对于这样的跨系统数据共享的问题,我们首先需要定义主数据,如:

  • 订单主数据
  • 物料主数据
  • 工艺主数据
  • 设备主数据
  • 库存主数据
  • 产品主数据
  • 物流主数据
  • 客户主数据
  • 员工主数据
  • 组织架构主数据

有了主数据之后,我们将这份主数据存储在一个公共区域,由所有系统一同来维护和共享,我们给它一个定义——实体。

当实体发生变更时,由引起该变更的系统进行通知,消息会发送到订阅该主数据的所有系统,收到消息的系统去实体数据库获取数据并更新自己的业务系统。

通过这种实体+消息的方式,使得系统之间的数据交换彻底解耦,引起变更的系统仅维护实体和发送消息,并不直接和订阅该实体的系统进行交互。这种方式就能到达简洁、无依赖、扩展性好的目的。


3.实体

3.1主数据

3.2MongoDB

我们使用高性能、分布式文档数据库来存储实体。


4.消息

4.1消息队列

在基于实体+消息的数据交换方式中,我们实际是通过消息队列来解耦各个系统的。除了降低各个系统之间的耦合,消息队列的引入,使得系统之间的数据交换方式异步化,即发送消息的系统不需要一直等待接收消息的系统接收完消息,而是发送完消息随即就能去处理其它事情了,消息的可靠性由消息队列负责。

在这样一个系统中有两个主要的角色:

  • 生产者:并将数据发送到消息队列
  • 消费者:从自己关心的消息队列中获取数据

实际在我们的系统中,一个系统可能即是生产者,又是消费者,只是生产和消费的数据不同,但两者都不与其它生产者或消费者直接相连,因此即使系统多了之后也不会相互产生耦合,系统的上线、下线都不会影响到其它系统,我们只要保证消息队列是可靠的就行了。

对于消息队列,我们有许多选择,如ActiveMQ、RabbitMQ、ZeroMQ、MetaMQ、RocketMQ、Kafka等,从时效性、可用性、功能性,我们选择RabbitMQ作为消息队列组件。

4.2RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。

RabbitMQ的优势

  • 可靠性:使用持久化、传输确认、发布确认等机制来保证可靠性
  • 多种消息模式:支持多种消息模式,包括简单模式、工作模式、发布/订阅模式和主题模式,以满足不同的业务需求
  • 高并发:能够处理大量的并发消息,适用于高并发场景和大规模的消息处理
  • 水平扩展:将多个节点组成集群,可以实现水平扩展和负载均衡,以提高消息处理的吞吐量和可靠性
  • 多种协议支持:AMQP、MQTT、STOMP等
  • 丰富的客户端:Java、Python、Go等,几乎支持所有常用语言
  • 管理界面:简单易用的用户界面,用于监控和管理消息

RabbitMQ的消息模式

简单模式:一个生产者一个消费者绑定一个队列

工作模式:一个生产者多个消费者消费同一个队列

点对点模式:一个生产者多个消费者,通过exchange及routingkey绑定特定的queue

发布/订阅模式:一个生产者多个消费者,通过exchange绑定多个queue

主题模式:一个生产者多个消费者,通过exchange绑定多个queue,同时增加topic通配符*和#

4.3消息格式

由于我们多个系统之间需要交换数据,一份数据可能会被多个系统订阅,为了增加系统的扩展性,我们选择主题模式。

订单消息

routing key:{系统ID}.orders

示例:erp.orders

说明:表示ERP发出的订单消息

字段 说明
order_no 订单编号
operation 操作(create:创建、update:更新、delete:删除)
msg_id 消息id(UUID)
msg_time 消息时间

示例

{
  "order_no": "D0020230101230851",
  "operation": "create",
  "msg_id": "7eab737960b14130a6856158133a0789",
  "msg_time": "2023-01-01 09:00:00"
}

物料消息

routing key:{系统ID}.materials

示例:mes.materials

说明:表示MES发出的物料消息

字段 说明
material_no 物料编号
operation 操作(create:创建、update:更新、delete:删除)
msg_id 消息id(UUID)
msg_time 消息时间
示例

示例

{
  "order_no": "M00237730",
  "operation": "update",
  "msg_id": "a4a0d700565b465eafe40247279106a9",
  "msg_time": "2023-01-10 20:00:00"
}

其它消息

格式类似,其中系统ID,会有统一的定义,并且全局唯一;routing key也有一定的规范。

5.规范

5.1系统

统ID 系统名称
erp 企业资源计划
mes 制造执行系统
wms 仓储管理系统
plm 产品生命周期管理
scm 供应链管理
crm 客户关系管理
hr 人力资源

5.2消息队列

交换机

名称 说明
topic_orders_exchange Topic订单交换机
topic_materials_exchange Topic物料交换机
topic_device_exchange Topic设备交换机
topic_products_exchange Topic产品交换机
topic_employees_exchange Topic员工交换机

队列

每个系统都可以定义自己的队列

名称 说明
{系统ID}_orders_queue 订单队列
{系统ID}_materials_queue 物料队列
{系统ID}_device_queue 设备队列
{系统ID}_products_queue 产品队列
{系统ID}_employees_queue 员工队列

绑定

每个系统都可以订阅自己关心的key

交换机 队列 binding key
topic_orders_exchange {系统ID}_orders_queue *.orders
topic_materials_exchange {系统ID}_materials_queue *.materials
topic_device_exchange {系统ID}_device_queue *.devices
topic_products_exchange {系统ID}_products_queue *.products
topic_employees_exchange {系统ID}_employees_queue *.employees

5.3流程

当源系统有数据创建、更新、删除时,会发送消息,订阅了该消息的目标系统会收到消息并从实体中获取数据,流程如下:

  1. 【源系统】在实体库中创建/更新/删除实体信息
  2. 【源系统】推送创建/更新/删除消息
  3. 【目标系统】接收消息
  4. 【目标系统】从实体库中获取实体信息

6.扩展

6.1新增订阅

当有新系统想要订阅实体时,流程如下:

  1. 定义系统ID
  2. 创建队列
  3. 绑定交换机

示例:

# 新系统:数据分析系统
# 订阅实体:订单实体、产品实体
1.定义系统ID:bi
2.创建队列:
  bi_orders_queue
  bi_products_queue
3.绑定交换机:
  bi_orders_queue 绑定 topic_orders_exchange
  bi_products_queue 绑定 topic_products_exchange

# 完成后,之后其它系统发送的订单消息和产品消息都会推送到数据分析系统

6.2取消订阅

如果想取消订阅,只需要将队列解绑对应的交换机即可。