事情是这样的。
我正躺在床上看《说唱听我的2》,突然微信发来一个消息,我一看是艿艿。
艿艿突然发来这一段,没有上下文,我懵了一下,发生什么事了?
但是,我没有惊慌,以我沉着稳重的人设,淡定地回了个:啊?去哪儿的那个?
但是,艿艿没有理我,紧接着来了一拨输出:
当时,我的脑子里就只有rap:“不是吧不是吧,难道单押也算押,无视他无视他,蓝调专家也算家。”
现在来看这个问题,肯定是 mysql 重要。
因为艿艿强调的是成功,我当时没注意到成功二字,认为事务如果涉及 mysql 和 mq 这两者,那肯定是要重点关注 mq ,因为 mysql 对事务是有保障的,对于已经稳了的东西,不需要花费太多心思在上面。
所以,在这个错误的认知前提下,下面的讨论就没啥意义,所以就不贴了。
到后面,我也搞懂了艿艿发问的起因,因为艿艿最近做了次 mq 的分享,然后回想起我之前写的那篇文章,他比较推荐 qmq 的事务消息方案,而不是 RocketMQ 的。
因为他之前用过类似的方案,然后当时线上 mq 集群网络故障,导致发消息失败,当时 mysql 虽然还是活着的,但是无法进行事务。
这其实就回到前面抛出的问题了:一个事务涉及 mysql 和 mq,到底哪个写入成功重要?
我翻译一下,其实这个问题讲的是:mysql 和 mq 之间的写入顺序。
在 RocketMQ 中,事务消息的实现方案是先发半消息(半消息对消费者不可见),待半消息发送成功之后,才能执行本地事务,等本地事务执行成功之后,再向 Broker 发送请求将半消息转成正常消息,这样消费者就可以消费此消息。
这种顺序等于先得成功写入 mq,然后再写入数据库,这样的模式会出现一个问题:即 mq 集群挂了,事务就无法继续进行了,等于整个应用无法正常执行了
看一下我之前画的 RocketMQ 事务消息流程图:
第一步是需要等待半消息的响应,如果响应失败就无法执行本地事务。
看下伪代码,可能更清晰:
result = sendHalfMsg();
//发送半消息
if
(result.success) {

  执行本地事务

else
 {

  回滚此次事务

}

具体 RocketMQ 事务细节看我这篇:RocketMQ与Kafka之事务消息
所以,先写 mq 后写 mysql 就会发生 mysql 还好好的,但是 mq 挂了事务就无法正常执行的情况。

那 qmq 怎么做的呢?

PS: QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景;也包括报价搜索等高吞吐量场景。目前在公司内部日常消息qps在60W左右,生产上承载将近4W+消息topic,消息的端到端延迟可以控制在10ms以内。
在说 qmq 的事务消息之前,先来说下本地消息表这个分布式事务实现方式。
本地消息就是利用了关系型数据库的事务能力,会在数据库中存放一张本地事务消息表,在进行本地事务操作中加入了本地消息表的插入,即将业务的执行和将消息放入到消息表中的操作放在同一个事务中提交。
这样本地事务执行成功的话,消息肯定也插入成功,然后再调用其他服务,如果其他服务调用成功就修改这条本地消息的状态。
如果失败也不要紧,会有一个后台线程扫描,发现这些状态的消息,会一直调用相应的服务,一般会设置重试的次数,如果一直不行则特殊记录,待人工介入处理。
可以看到,本地事务消息表还是很简单的,也是一种最大努力通知的思想。
在理解本地消息表之后,我们再来看一下 qmq 的事务消息是如何设计的。
首先,想要用 qmq 的事务消息,需要在数据库中建一张表,就是如下这样的表:
是不是有本地消息表那味儿了?
没错核心思想就是本地消息表!利用关系型数据库的事务能力,将业务的写入和消息表的写入融在一个事务中,这样业务成功则消息表肯定写入成功。
然后在事务提交之后,立刻发送事务消息,如果发送成功,则删除本地消息表中的记录,来看一下伪代码的实现,应该就很清晰了:
@Transactional// 在一个事务中
publicvoidyes()
{

    Order order = buildOrder();

    orderDao.insert(order);

    Message message = buildMessage(order);

    messageDao.insert(message);

//异步,在事务提交后执行
    triggerAfterTransactionCommit(()->{

        messageClient.send(message);

        messageDao.delete(message);

    });

}

当然,这是我剖开来写的实现思路,qmq的使用没这么麻烦,直接在 sendMessage 里把上面的逻辑都包装好了,所以使用起来直接就是一个发送消息:
@Transactional// 在一个事务中
publicvoidyes()
{

    Order order = buildOrder();

    orderDao.insert(order);

//封装插入消息、发送消息、删除消息的逻辑
    producer.sendMessage(buildMessage(order)); 

}

如果消息发送失败,也就是比如 mq 集群挂了,并不会影响事务的执行,业务的执行和事务消息的插入都已经成功了,那此时待消息已经安安静静的在消息库里等着,后台能会有一个补偿任务,会将这些消息捞出来重新发送,直到发送成功。
想必,现在你应该对 qmq 的事务消息流程应该很清晰了,它的顺序就属于先写数据库,再发mq,即使 mq 集群挂了,也不会影响事务的进行,不会导致应用无法正常执行了。
这也是艿艿说的,qmq 更适合业务平台的原因。
这里可能有人会问,那如果 mysql 挂了呢?
我只能说数据库都挂了,那就都没了,别想啥别的了。

再来看 RocketMQ 和 QMQ

至此,想必你已经清楚 RocketMQ 和 QMQ 事务消息的区别,我们再来盘下 QMQ 事务消息更优的原因。
RocketMQ 只支持单事务消息,也就是无法在一个事务内发送多种事务消息
而 QMQ 可以在一次事务中发多个消息,来看下伪代码:
@Transactional// 在一个事务中
publicvoidyes()
{

    Order order = buildOrder();

    orderDao.insert(order);

    producer.sendMessage(buildMessageA(order));

    producer.sendMessage(buildMessageB(order));

    producer.sendMessage(buildMessageC(order));

}

这样的实现就比 RocketMQ 灵活多了。
然后 RocketMQ 事务消息的实现还需要提供一个反查机制,因为RocketMQ 事务消息的提交是 oneway 的发送方式,有可能 Broker 没有接收到事务提交的消息。
所以 Broker 会定时去生产者那边查看事务是否已经执行完成,因此生产者需要保存本地事务执行结果,简单的就是用一个 map 保存,让 Broker 可以通过消息的事务 id 查找到事务执行的结果。
如果还要考虑发送事务消息的生产者挂了,那么 Broker 会找同个生产组的其他生产者来查询事务结果,所以这个存储还得提出来放到第三方,而不是本地内存保存。
因此,RocketMQ 得多维护一个本地事务执行结果,是稍微有点麻烦的。
当然,QMQ 还得建表呢,不过按照 QMQ 说的:如果公司方便的话,可以直接合并进DBA的初始化数据库的自动化流程中,这样就透明了。
还有一点 RocketMQ 的 api 不太友好,改造有点大,之后的迁移不太方便。
贴一下完整的使用 RocketMQ 事务消息的代码:
可以看到,如果想要搞事务消息,首先新建 transationMQproducer,然后再新建一个 transcationListenerImpl,再覆盖 listener 执行事务的方法和回查事务的方法,等于你得把业务逻辑实现在 transcationListenerImpl 内部,这和我们平日里在 service 里面实现事务的差距就有点大了。
而 QMQ 提供了内置 Spring 事务的方式,所以就直接在 service 实现就行了。
@Transactional// 在一个事务中
publicvoidyes()
{

    Order order = buildOrder();

    orderDao.insert(order);

    producer.sendMessage(buildMessageA(order));

}

这就很贴合我们平日的使用方式了,这样对业务的改造很小,并且迁移也很方便。

最后

暂时就分析这么多了,对 QMQ 有兴趣的同学可以再自己研究一下,之后有机会之后我再写一篇盘一盘。
再贴一下我之前写过的事务消息文章:RocketMQ与Kafka之事务消息,结合者这两篇看,应该会这事务消息的实现有更深入的理解。
参考:https://github.com/qunarcorp

我是yes,从一点点到亿点点,我们下篇见。
继续阅读
阅读原文