意见箱
恒创运营部门将仔细参阅您的意见和建议,必要时将通过预留邮箱与您保持联络。感谢您的支持!
意见/建议
提交建议

第七章 交易性能优化技术之事务型消息

来源:恒创科技 编辑:恒创科技编辑部
2024-02-03 19:09:59


第七章 交易性能优化技术之事务型消息事务型消息(上)回顾整个下单流程,我们之前做了下单减缓存库存优化以及回补库存的操作。但是因为整个下单操作是属于同一个 transaction 事务的,如果用户下单成功,但是之后订单入库或返回前端的过程中发生错误了,事务回滚,会导致少卖的现象,有可能造成库存堆积少卖也就是事务 (​​@Transactional​​) 回滚只针对mysql数据库,而对redis不起作用,就会出现redis减库存了,而mysql数据库事务回滚后,库存并没有减少,这种情况叫做少卖。我们的解决方法就是异步消息的发送要在整个事务提交成功后再发送。

具体操作流程:

修改​​OrderServiceImpl.java​
@Override
@Transactional
public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount) throws BusinessException {

//2.落单减库存(用户下单但还没有付款,锁定库存给当前用户)
boolean result = itemService.decreaseStock(itemId, amount);
if (!result) {
throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH);
}

// 该方法在最近一次事务(@Transactional)提交成功之后,才会执行
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 等到事务(@Transactional)内的所有操作都完成并且成功提交之后,在返回前端之前,异步更新mysql库存
boolean mqResult = itemService.asyncDecreaseStock(itemId, amount);
}
});

//4.返回前端
return orderModel;
}
修改​​ItemServiceImpl.java​
// redis库存扣减
@Transactional // 涉及到库存减的操作,所以要保证事务的一致性,加上@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {

//将stock库存表中对应的商品id的对应库存数量减去购买数量
//increment(-3)等于减去3,返回的是计算后的值
long result = redisTemplate.opsForValue().increment("promo_item_stock_" + itemId, amount.intValue() * -1);
if (result >= 0) {
//redis更新库存成功,直接返回true
return true;
} else {
// 更新库存失败,需要回滚redis库存,将库存重新补回去
increaseStock(itemId, amount);
return false;
}
}

// 异步更新库存,使用rocketmq实现
public boolean asyncDecreaseStock(Integer itemId, Integer amount) {
//更新库存成功,发送消息,减数据库的库存
boolean mqResult = mqProducer.asyncReduceStock(itemId, amount);
return mqResult;
}

// redis库存回补
public boolean increaseStock(Integer itemId, Integer amount) throws BusinessException {
redisTemplate.opsForValue().increment("promo_item_stock_" + itemId, amount.intValue());
return true;
}
事务型消息(下)上面用到了 TransactionSynchronizationManager 来保证异步消息在事务提交后再发送。我们同样可以用 rocketmq 自带的​​transactionMQProducer​​ 来发送事务型消息RocketMQ 事务型消息

在分布式系统中,我们常会遇到分布式事务的问题,除了之前用到的方法,我们还可以利用RocketMQ的事务型消息来解决分布式事务问题。


第七章 交易性能优化技术之事务型消息

首先来看 RocketMQ 消息的事务架构设计:

生产者执行本地事务,修改订单支付状态(下单),并且提交事务生产者发送事务消息到 broker 上,消息发送到 broker 上在没有​​确认​​​之前,消息对于consumer 是不可见状态(​​prepare​​状态)生产者确认事务消息,使得发送到 broker 上的事务消息对于消费者可见消费者获取到消息进行消费,消费完之后执行 ack 进行确认

这中间可能会存在一个问题,生产者本地事务成功后,发送事务确认消息到 broker 上失败了怎么办 ?

这个时候意味着消费者无法正常消费到这个消息。所以 RocketMQ 提供了消息回查机制​​LocalTransactionState checkLocalTransaction(MessageExt messageExt)​​ 方法,如果事务消息一直处于中间状态,broker 会发起重试去查询 broker 上这个事务的处理状态。一旦发现事务处理成功,则把当前这条消息设置为可见。

第七章 交易性能优化技术之事务型消息_事务

RocketMQ 事务消息有三种状态:

ROLLBACK_MESSAGE:回滚事务COMMIT_MESSAGE:提交事务UNKNOW:broker 会定时回查 Producer 消息状态,直到彻底成功或失败代码实现首先,在 OrderController 中先开启异步发送事务型消息的操作
// 创建订单,开启异步发送事务型消息的操作
if (!mqProducer.transactionAsyncReduceStock(userModel.getId(), itemId, promoId, amount)) {
throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
}
然后在​​MqProducer​​​ 类中实现​​transactionAsyncReduceStock​​ 方法,投递prepare消息
// 事务型同步库存扣减消息
public boolean transactionAsyncReduceStock(Integer userId, Integer itemId, Integer promoId, Integer amount) {
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("itemId", itemId);
bodyMap.put("amount", amount);

Map<String, Object> argsMap = new HashMap<>();
argsMap.put("itemId", itemId);
argsMap.put("amount", amount);
argsMap.put("userId", userId);
argsMap.put("promoId", promoId);

// 投放消息
Message message = new Message(topicName, "increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
TransactionSendResult sendResult = null;
try {
// 投递prepare消息,sendMessageInTransaction方法内部,就会往消息中间件投递一个对应的prepare消息,同时回调一个executeLocalTransaction方法
// 注意这个argsMap传进去后,就会被上面的executeLocalTransaction方法的arg所接收
sendResult = transactionMQProducer.sendMessageInTransaction(message, argsMap);
} catch (MQClientException e) {
e.printStackTrace();
return false;
}
if (sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
return false;
} else if (sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
return true;
} else {
return false;
}
}
在​​MqProducer​​​ 类的​​init​​​ 初始化方法中实现​​transactionMQProducer​
@PostConstruct
public void init() throws MQClientException {
// 做mq producer的初始化
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameAddr);
producer.start();

transactionMQProducer = new TransactionMQProducer("transaction_producer_group");
transactionMQProducer.setNamesrvAddr(nameAddr);
transactionMQProducer.start();
transactionMQProducer.setTransactionListener(new TransactionListener() {

// 发送事务型消息,消息的类型是prepare,不会被consumer立即执行
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
// 真正要做的事,创建订单
Integer userId = (Integer) ((Map) arg).get("userId");
Integer itemId = (Integer) ((Map) arg).get("itemId");
Integer promoId = (Integer) ((Map) arg).get("promoId");
Integer amount = (Integer) ((Map) arg).get("amount");
try {
// 这里进行订单的创建
orderService.createOrder(userId, itemId, promoId, amount);
} catch (BusinessException e) {
e.printStackTrace();
// 如果订单创建失败则事务回滚
// ROLLBACK_MESSAGE表示将之前的prepare消息撤回,相当于没发送消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 订单创建成功,可以让消息被consumer消费
// COMMIT_MESSAGE表示将之前的prepare设置为commit,给对应的consumer去消费
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是继续UNKNOW
String jsonString = new String(msg.getBody());
Map<String, Object> map = JSON.parseObject(jsonString);
Integer itemId = (Integer) map.get("itemId");
Integer amount = (Integer) map.get("amount");
return null;
}
});
}
库存流水状态

上面有个问题就是回调 ​​checkLocalTransaction​​​ 方法时,无法仅仅通过 ​​itemId​​​ 和 ​​amount​​​ 来确定库存是否扣减成功,所有要引入​​库存流水​​的概念

操作流水的数据类型:

主业务数据:master data ,比如商品模型​​itemModel​​操作型数据:log data

新建表 stock_log,生成表结构,ItemService 接口中实现初始化库存流水

​MqProducer.java​
@Component
public class MqProducer {

private DefaultMQProducer producer;

private TransactionMQProducer transactionMQProducer;

//声明value注解,引入配置变量
@Value("${mq.nameserver.addr}")
private String nameAddr;

@Value("${mq.topicname}")
private String topicName;

@Autowired
private OrderService orderService;

@Autowired
private StockLogDOMapper stockLogDOMapper;

@PostConstruct
public void init() throws MQClientException {
// 做mq producer的初始化
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameAddr);
producer.start();

transactionMQProducer = new TransactionMQProducer("transaction_producer_group");
transactionMQProducer.setNamesrvAddr(nameAddr);
transactionMQProducer.start();
transactionMQProducer.setTransactionListener(new TransactionListener() {

// 发送事务型消息,消息的类型是prepare,不会被consumer立即执行
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
// 真正要做的事,创建订单
Integer userId = (Integer) ((Map) arg).get("userId");
Integer itemId = (Integer) ((Map) arg).get("itemId");
Integer promoId = (Integer) ((Map) arg).get("promoId");
Integer amount = (Integer) ((Map) arg).get("amount");
String stockLogId = (String) ((Map) arg).get("stockLogId");
try {
// 这里进行订单的创建
orderService.createOrder(userId, itemId, promoId, amount, stockLogId);
} catch (BusinessException e) {
e.printStackTrace();
// 设置对应的stockLog为回滚状态
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
stockLogDO.setStatus(3);
stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);

// 如果订单创建失败则事务回滚
// ROLLBACK_MESSAGE表示将之前的prepare消息撤回,相当于没发送消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 订单创建成功,可以让消息被consumer消费
// COMMIT_MESSAGE表示将之前的prepare设置为commit,给对应的consumer去消费
return LocalTransactionState.COMMIT_MESSAGE;
}

// 如果上面订单的创建一直没有返回结果,那么该方法就会被调用
// 如果prepare状态一直没有更新,broker 会定时回查 Producer 消息状态
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是继续UNKNOW
String jsonString = new String(msg.getBody());
Map<String, Object> map = JSON.parseObject(jsonString);
Integer itemId = (Integer) map.get("itemId");
Integer amount = (Integer) map.get("amount");
String stockLogId = (String) map.get("stockLogId");
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
if (stockLogDO == null) {
return LocalTransactionState.UNKNOW;
}
// 2表示下单扣减库存成功
if (stockLogDO.getStatus().intValue() == 2) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (stockLogDO.getStatus().intValue() == 1) { //1表示初始状态
return LocalTransactionState.UNKNOW;
}
// 其他情况表示下单扣减库存失败
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
}

// 事务型同步库存扣减消息
public boolean transactionAsyncReduceStock(Integer userId, Integer itemId, Integer promoId, Integer amount, String stockLogId) {
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("itemId", itemId);
bodyMap.put("amount", amount);
bodyMap.put("stockLogId", stockLogId);

Map<String, Object> argsMap = new HashMap<>();
argsMap.put("itemId", itemId);
argsMap.put("amount", amount);
argsMap.put("userId", userId);
argsMap.put("promoId", promoId);
argsMap.put("stockLogId", stockLogId);

// 投放消息
Message message = new Message(topicName, "increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
TransactionSendResult sendResult = null;
try {
// 投递prepare消息,sendMessageInTransaction方法内部,就会往消息中间件投递一个对应的prepare消息,同时回调一个executeLocalTransaction方法
// 注意这个argsMap传进去后,就会被上面的executeLocalTransaction方法的arg所接收
sendResult = transactionMQProducer.sendMessageInTransaction(message, argsMap);
} catch (MQClientException e) {
e.printStackTrace();
return false;
}
if (sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
return false;
} else if (sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
return true;
} else {
return false;
}
}

}
业务场景决定高可用技术实现

设计原则:

宁可少卖,不能超卖

实现方案:

redis 可以比实际mysql数据库少超时释放(针对消息一直卡死在初始状态,会造成订单大量废弃,设置超时时间)库存售罄处理方案

之前的设计还存在一个问题:当库存售罄时,还会初始化库存流水这个操作,导致之后下单失败,所以对库存售罄的情况做一个处理:

库存售罄标识售罄后不去操作后续流程售罄后通知各系统售罄回补上新

在 ​​ItemServiceImpl​​​ 的减缓存库存中,若 ​​result == 0​​ ,redis 内打上已售罄标识。在之后初始化库存流水之前,判断 redis 内是否有此key,如果有,直接返回库存不足。

具体操作流程

​ItemServiceImpl.java​
// redis库存扣减
@Transactional // 涉及到库存减的操作,所以要保证事务的一致性,加上@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
// 将stock库存表中对应的商品id的对应库存数量减去购买数量
// increment(-3)等于减去3,返回的是计算后的值
long result = redisTemplate.opsForValue().increment("promo_item_stock_" + itemId, amount.intValue() * -1);
if (result > 0) {
// 若redis更新库存成功,直接返回true
return true;
} else if (result == 0) { // 商品已售完
// 打上库存已售罄的标识
redisTemplate.opsForValue().set("promo_item_stock_invalid_" + itemId, "true");
// 返回更新库存成功
return true;
} else {
// 若redis更新库存失败,需要回滚redis库存,将库存重新补回去
increaseStock(itemId, amount);
return false;
}
}
在​​OrderController​​ 加入库存流水 init 状态前判断是否已售罄
// 判断是否库存已售罄,若对应的售罄key存在,则直接返回下单失败
if (redisTemplate.hasKey("promo_item_stock_invalid_" + itemId)) {
throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH);
}

// 先加入库存流水init状态
String stockLogId = itemService.initStockLog(itemId, amount);

// 再去完成对应的下单事务型消息机制
if (!mqProducer.transactionAsyncReduceStock(userModel.getId(), itemId, promoId, amount, stockLogId)) {
throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
}
后置流程总结

销售逻辑异步化

销量与库存模型一样,存在数据库加行锁并加1的操作,所以也可以用类似方法优化

交易单逻辑异步化

生成交易单 sequence 后直接异步返回前端轮询异步单状态


上一篇: 第十九章 为什么我只查一行的语句,也执行这么慢? 下一篇: 手机怎么远程登录云服务器?