你好,我是yes。
这次事故是有关消息的场景,实际也涉及到分布式事务的内容,我们先简单来回顾下分布式事务。
之前我写过分布式事务相关的文章,基本上把常见的几个分布式事务都捋了捋。
其中我分析了挺多,例如 2pc、TCC、本地消息表、事务消息等等,更多的可以看之前的那篇文章。
这次事故主要是有关本地消息表的实现,借着这篇我也给个实战代码,拿来即用,这里我先再简单把原理介绍一遍。

本地消息表原理

本地消息就是利用了本地事务,会在数据库中存放一张本地事务消息表,在进行本地事务操作中加入了本地消息的插入,即将业务的执行和将消息放入消息表中的操作放在同一个事务中提交,
这样本地事务执行成功的话,消息肯定也插入成功,然后再调用其他服务,如果调用成功就修改这条本地消息的状态。
如果失败也不要紧,会有一个后台线程扫描,发现这些状态的消息,会一直调用相应的服务,一般会设置重试的次数,如果一直不行则特殊记录,待人工介入处理。
可以看到还是很简单的,也是一种最大努力通知思想。

实战

上面的原理大家应该都清晰了,这篇我就基于本地消息表补偿消息来实现事务和消息发送的事务一致性
在我们日常业务中,MQ的应用必不可少,相信你肯定会遇到这个场景:一个 service 方法里执行一些业务,修改了一些数据落库,然后再发送一条MQ消息,触发下一个流程。
那么问题来了,如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?
这个简单,将同步发送消息的逻辑写在事务内部,就能保证发送失败,事务不会提交。
那么问题又来了,如果消息发送成功了,最后事务提交失败了呢?那发出去的消息还能追回吗?
因此我们要解决的第一个问题其实是:当前 service 事务提交后,才能发送消息,不然就可能导致消息发出去了,实际事务是没执行成功的。
而上述的操作使得我们兜兜转转又回到第一个问题:如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?,万一事务提交了应用就挂了呢?消息不就没了,后续的流程也就中断了。
这归根结底是分布式事务问题,是数据库操作跟MQ消息的爱恨情仇,关于这个 RocketMQ 提供了解决方案即事务消息,但是它的侵入性比较大,需要修改接口适配事务消息的实现。
而本地消息表则非常简单,接下来我们开始操作!
首先我们需要建立一张本地消息表(当前这个设计主要是为了MQ消息的事务场景):
CREATETABLE`message`
 (

`id`bigintNOTNULL
 AUTO_INCREMENT 
COMMENT'id'
,

`create_time`
 datetime 
NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间'
,

`update_time`
 datetime 
NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'修改时间'
,

`status_delete`tinyintNOTNULLDEFAULT'0'COMMENT'删除标记 0正常 1删除'
,

`topic`varchar
(
64
CHARACTERSET
 utf8mb4 
COLLATE
 utf8mb4_general_ci 
NOTNULLCOMMENT'topic'
,

`tag`varchar
(
128
CHARACTERSET
 utf8mb4 
COLLATE
 utf8mb4_general_ci 
NOTNULLCOMMENT'tag'
,

`msg_id`varchar
(
64
CHARACTERSET
 utf8mb4 
COLLATE
 utf8mb4_general_ci 
NOTNULLDEFAULT''COMMENT'消息id'
,

`msg_key`varchar
(
64
COLLATE
 utf8mb4_general_ci 
DEFAULTNULLCOMMENT'消息key'
,

`data`textCHARACTERSET
 utf8mb4 
COLLATE
 utf8mb4_general_ci 
NOTNULLCOMMENT'消息json串'
,

`try_num`intNOTNULLDEFAULT'0'COMMENT'重试次数'
,

`status`tinyintNOTNULLDEFAULT'0'COMMENT'发送状态 0-未发送 1-已发送'
,

`next_time`
 datetime 
NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'下次驱动开始时间'
  PRIMARY 
KEY
 (
`id`
),

KEY`idx_key`
 (
`msg_key`
),

KEY`idx_nexttime_status`
 (
`next_time`
,
`status`
),

KEY`idx_msgid`
 (
`msg_id`
)

ENGINE
=
InnoDBDEFAULTCHARSET
=utf8mb4 
COLLATE
=utf8mb4_general_ci ROW_FORMAT=DYNAMIC 
COMMENT
=
'本地消息记录表'
;

然后再写个 MessageService 来包装下消息的发送流程,把本地消息记录保存封装在里面。
@Service
publicclassMessageServiceimplementsIMessageService
{

@Resource
private
 Producer producer;

@Resource
private
 MessageMapper messageMapper;


@Override
@Transactional
(rollbackFor = Exception
.
class
)

publicvoidsend
(
Stringtopic
Stringtag
Stringkey
Objectobj
{

        sendDelay(topic, tag, key, obj, 
0L
);

    }


@Override
@Transactional
(rollbackFor = Exception
.
class
)

publicvoidsendDelay
(
Stringtopic
Stringtag
Stringkey
Objectobj
Longperiod
{

//计算时间,防止定时任务扫描将还在正常流程中的消息进行重试
int
 time = (period == 
0L
 ? 
10
 : period.intValue() / 
1000
);

        Date nextTime = DateUtil.getAfterNewDateSecond(
new
 Date(), time);

        String data = JSON.toJSONString(obj);

        Message message = 
new
 Message()

                .setStatusDelete(
0
)

                .setTopic(topic)

                .setTag(tag)

                .setMsgId(
""
)

                .setMsgKey(key)

                .setData(data)

                .setTryNum(
0
)

                .setStatus(
0
)

                .setNextTime(nextTime);

// 保存本地消息记录
        messageMapper.save(message);


// 当前事务提交后,再执行发送消息和更改本地消息记录状态
        TransactionSynchronizationManager.registerSynchronization(

new
 TransactionSynchronizationAdapter() {

@Override
publicvoidafterCommit()
{

                        String messageId;

try
 {

if
 (period == 
0L
) {

                                messageId = producer.send(topic, tag, key, data);

                            } 
else
 {

                                messageId = producer.sendDelay(topic, tag, key, data, period);

                            }

                            Message update = 
new
 Message()

                                    .setId(message.getId())

                                    .setMsgId(messageId)

                                    .setStatus(
1
);

                            messageMapper.updateById(update);

                        } 
catch
 (Exception e) {

                            log.error(
".."
);

                        }

                    }

                }

        );

    }

}


定时任务的逻辑就很简单了,就是扫描 nextTime 到期且未发送的消息,重新发送即可,这里不多赘述。
最终的使用就非常简单了:
@Transactional
(rollbackFor = Exception
.
class
)

publicvoiddoSth
(
xx
{

    saveA();

    saveB();

    messageService.send(xxx);

}

我们来分析一下:
  1. 假设数据库事务提交失败,那么无事发生,消息也没发出去,此时业务正常。
  2. 假设数据库操作成功,但是数据库事务提交后,服务宕机了,那么消息没发出去,此时 saveA 和 saveB 都保存成功,那么 message 肯定也插入了(它们在同一个事务中),message 的 status 是 0 ,那么我们有个定时任务,根据 nextTime 和 status 来扫描得到未成功发送的消息,进行重试即可,后续消息可正常发送
  3. 假设数据库操作成功,但是数据库事务提交了,MQ有问题,使得消息发不出去,同理第二条,后续定时任务扫描重试即可。
就在两个月前,公司用的阿里云的MQ故障,导致消息发送频繁超时,就是因为我们的消息发送都做了以上的改造,因此没有影响业务(数据都正常落库,事务正常提交),部分消息发送超时,由后续补偿任务自动补偿重试。
可以想象,如果没有这个机制可能会发送两种情况:
  1. 如果消息在事务内发送,由于消息发送出错,那么事务提交失败,业务会直接受到影响,线上频繁报错(还解决不了,因为这是阿里云MQ底层升级导致的问题),妥妥P0故障。
  2. 如果消息在事务提交后发送,又没落库记录,那么消息发送超时,后续流程中断,后续需要手动补数据,能累死个人。

简要分析

一般 service 事务相关方法都用 @Transactional 修饰, messageService.send 也被 @Transactional,默认事务传播级别是 PROPAGATION_REQUIRED,继承外部事务,因此它们处于同一个事务。
然后 TransactionSynchronizationManager 可以管理当前线程的事务,内部的 TransactionSynchronizationAdapter 是一个抽象类
可以看到,它能让我们在事务提交前、后、暂停等各阶段实现一些自己的逻辑。

最后

具体操作还是很简单的,仅需一张表,一个服务的所有消息发送都能复用。
回头看看现在的业务代码,看看是不是有业务执行了但是消息没发送成功的风险?小心遇到我之前的问题,有的话赶紧改造吧!
其实这篇也呼应了我之前的这篇文章,RocketMQ事务消息和QMQ(去哪儿)事务消息实现的对比,有兴趣的朋友可以看看。
我是yes,从一点点我亿点点我们下篇见~
继续阅读
阅读原文