使用 RabbitMq 异步通知的方式,解决数据同步问题。

(图片来源网络,侵删)
- 消息的提供者和消费者导入相关的依赖和配置文件
org.springframework.boot spring-boot-starter-amqp
spring: rabbitmq: virtual-host: / host: 192.168.72.101 port: 5672 username: root password: 123
public class MqConstants { /** * 交换机 */ public static final String HOTEL_Exchange = "hotel.topic"; /** * 监听新增和修改的队列 */ public static final String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; /** * 监听删除的队列 */ public static final String HOTEL_DELETE_QUEUE = "hotel.delete.queue"; /** * 新增或修改的 RoutingKey */ public static final String HOTEL_INSERT_KEY = "hotel.insert"; /** * 删除的RoutingKey */ public static final String HOTEL_DELETE_KEY = "hotel.delete"; }
对 insert, update, delete 请求发送消息
@Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PostMapping public void saveHotel(@RequestBody Hotel hotel){ hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId()); } @PutMapping public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id); }
- elasticsearch 模块,通过注解的方式接收发送方的消息
// 编写监听器 @Component public class HotelListener { @Autowired private IHotelService hotelService; @RabbitListener(bindings = @QueueBinding( value = @Queue(MqConstants.HOTEL_INSERT_QUEUE), exchange = @Exchange(value = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.TOPIC), key = MqConstants.HOTEL_INSERT_KEY )) public void listenHotelInsertOrUpdate(Long id){ hotelService.insertById(id); } @RabbitListener(bindings = @QueueBinding( value = @Queue(MqConstants.HOTEL_DELETE_QUEUE), exchange = @Exchange(value = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.TOPIC), key = MqConstants.HOTEL_DELETE_KEY )) public void listenHotelDelete(Long id){ hotelService.deleteById(id); } }
在 com.mysite.hotel.service.impl.HotelService 中实现业务

(图片来源网络,侵删)
@Autowired private RestHighLevelClient client; @Override public void deleteById(Long id) { try { DeleteRequest request = new DeleteRequest("hotel", id.toString()); client.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void insertById(Long id) { try { Hotel hotel = getById(id); // 转换为文档类型 HotelDoc hotelDoc = new HotelDoc(hotel); // 准备 request 对象 IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); // 准备 JSON 文档 request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); // 发送请求 client.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性