1、为何用mq
合作优势
次要就有3个:
应用范畴解耦(削减微办事项目间的干系)、
触发器加速(微办事项目领到mq最新动静后同时组织工做)、
削峰Viluppuram(能最新动静堆积)下风
控造系统易用性削减(MQ一旦无法拜候整个控造系统不成用)
维数进步(必要处理控造系统最新动静持续性、反复消费需求…)
持续性问题(不异控造系管辖到mq中的最新动静后,部门控造系统处置失败怎么办)2、rocketmq软件财产组织工做流程
由上图能看出,rocketMQ软件财产=最新动静伺服器软件财产+从头定名伺服器软件财产,此中最新动静伺服器软件财产=商品和办事项目软件财产+broker软件财产+顾客软件财产。
从头定名伺服器软件财产(nameserver cluster)
● 从头定名伺服器软件财产是办理商品和办事项目、broker、顾客的联合,哪个商品和办事项目/broker/顾客可用都是通过从头定名伺服器得知其信息,因而商品和办事项目/broker/顾客都必要间歇推送心跳给从头定名伺服器
● 从头定名伺服器与商品和办事项目标亲密关系:从头定名伺服器汗青记录有良多broker的ip门商标,每一商品和办事项目推送最新动静到broker前都必要先去从头定名伺服器以获取某一broker的ip,接着再推送最新动静到broker
● 从头定名伺服器和最新动静者的亲密关系:从头定名伺服器汗青记录有良多broker的ip门商标,顾客想窃听broker中的最新动静,必要先去从头定名伺服器以获取某一broker的ip,接着再窃听broker中的最新动静商品和办事项目软件财产(producer cluster)
● 每一商品和办事项目布署在不异的IP上逐渐构成了软件财产
● 商品和办事项目标最新动静=topic+tag,topic用来界定最新动静类别,一种topic类此外最新动静能散布在数个不异的broker中,同类此外最新动静就用tag界定,如他们控造系统里的手续费宝的topic是"topic-yjb",接动手续费宝下面能划分数个tag顾客软件财产(consumer cluster)
● 每一顾客布署在不异的IP上逐渐构成了软件财产
● 顾客以获取某一broker中的最新动静理论上有两种办法:
○ pull拉取贸易形式:顾客开启缓存间歇出访broker,若有最新动静存在则拉取,缺点是太消耗顾客的资本了,不管有没有最新动静单项去出访broker
○ push推送贸易形式:顾客起两个窃听器窃听broker(与broker成立两个长镜像),若broker中有最新动静,则broker会主动推送最新动静给顾客,一般用那种。此中push贸易形式的下层也是通过顾客主动拉取的体例来实现的,只不外它的名字叫push罢了,意思是Broker尽可能实时的推送最新动静给顾客,和pull贸易形式比拟,push贸易形式都帮他们PCB了下层,而pull贸易形式就要本身写标识符去手动拉取最新动静,因而pull贸易形式更像拉取,而PCB好的push更像是推送。3、最新动静类别
并行/触发器/双向最新动静
并行:推送最新动静是顺次序推送
触发器:推送最新动静是触发器的,商品和办事项目推送完最新动静就干其它工作,顾客接著会在商品和办事项目标回调函数中返回消费需求成果【new SendCallback(){} 】,时效性差
大项最新动静:商品和办事项目尽管发进来,其实不转交codice多量量最新动静
特点:
● 统一批最新动静的topic应该不异;
● 最新动静内容大小=(topic+body+其它key/value属性+条记固定20二进造)<4M
● 不是延迟时间最新动静tag过滤器最新动静
顾客指定特定的tag,则只转交该tag的最新动静sql过滤器最新动静
顾客撑持近似于sql查询句法那样的最新动静过滤器 //商品和办事项目 String msg="hello,小华同窗"; Message message=new Message("topic",msg.getBytes("UTF-8")); message.putUserProperty("name","xiaoming"); message.putUserProperty("age","27"); SendResult sendResult1 =defaultMQProducer.send(message); //顾客用sql句法过滤器出age>26岁的最新动静,即只承受age>26岁的最新动静 DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); defaultMQPushConsumer.subscribe("topic", MessageSelector.bySql("age>26"));4、最新动静的特殊处置
次序最新动静
场景:在某些营业控造系统中,一些营业流程处置的次序必需是顺次序的,好比顾客退款:创建订货 -> 退款 -> 推送最新动静 -> 订货完成,在mammalian情况下,不成能只要两个顾客退款,当数个顾客退款时,他们的那四种最新动静有可能是紊乱的。
软件系统:rocketmq预设每一topic在broker中单项有四个仓库放置此类数据,仓库是FIFO性量的,他们能操纵仓库去XC610PA放置那些最新动静以到达XC610PA消费需求的目标。
商品和办事项目次要就标识符:
DefaultMQProducer defaultMQProducer=new DefaultMQProducer("group1"); defaultMQProducer.setNamesrvAddr("127.0.0.1:9876"); try { defaultMQProducer.start(); List<Order> list=new ArrayList<>(); //模仿营业流程乱序提交:单个订货最新动静有序,数个订货间最新动静无序 Order order01=new Order(0,"创建订货"); Order order11=new Order(1,"创建订货"); Order order02=new Order(0,"退款"); Order order03=new Order(0,"推送"); Order order21=new Order(2,"创建订货"); Order order12=new Order(1,"退款"); Order order04=new Order(0,"完成"); Order order13=new Order(1,"推送"); Order order22=new Order(2,"退款"); Order order14=new Order(1,"完成"); Order order23=new Order(2,"推送"); Order order24=new Order(2,"完成"); list.addAll(new ArrayList<Order>(Arrays.asList(order01,order11,order02,order03,order21,order12,order13,order22,order23,order04,order14,order24))); for(Order order:list){ Message message=new Message("topic-order",order.toString().getBytes()); /* *每一topic预设创建4个仓库,defaultMQProducer能通过MessageQueueSelector的select办法设置 *当前Message推送到"topic-order"的哪个仓库:通过订货的独一属性值,如orderId,对topic中的queue仓库数取模, *如许同两个订货的不异最新动静就会被XC610PA放进同两个queue中 */ SendResult sendResult=defaultMQProducer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> queueList, Message msg, Object o) { System.out.println("仓库数:"+queueList.size()); //以获取仓库下标 int size=order.getOrderId()%queueList.size(); //计算该message放在"topic-order"哪个仓库中 MessageQueue mq=queueList.get(size); return mq; } },null); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); }顾客次要就标识符:
//利用MessageListenerConcurrently则数个缓存办事项目两个仓库,而MessageListenerOrderly是两个缓存办事项目两个仓库(topic预设四个仓库就是四个缓存) defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { System.out.println("缓存:"+Thread.currentThread().getName()+",仓库:"+consumeOrderlyContext.getMessageQueue().getQueueId()+",该仓库最新动静数量:"+list.size()); for(MessageExt messageExt:list){ System.out.println(new String(messageExt.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } });施行成果:
能看出两个缓存办事项目两个仓库,将同类营业的最新动静都推送到同两个仓库中,是能实现最新动静的次序推送的.
事务最新动静
1. 为何要用事务最新动静?
仍是以用户退款为例,用户在producer中创建订货(但未提交事务到mysql),接着把退款最新动静推送给broker(即MQ伺服器),MQ伺服器再把该最新动静发给所有订阅了此类topic的顾客,可能呈现如下情况:(1)producer胜利停止了数据库操做(即提交事务到mysql),且MQ伺服器转交最新动静胜利,接着被顾客消费需求 -->大快人心
(2)producer胜利停止了数据库操做(即提交事务到mysql),但发到MQ伺服器失败,进而顾客不克不及消费需求此类最新动静 -->不一般
(3)producer停止数据库操做的时候发作了不测招致数据库操做失败(即提交事务到mysql),但发到MQ伺服器胜利,进而顾客会去消费需求此类最新动静 -->不一般
上面第2、3种情况都是不一般的,处理法子就是引入事务最新动静,事务最新动静的过程如下:
第一阶段producer先推送一条"half"型最新动静到MQ伺服器,MQ伺服器收到后随即返回两个推送胜利标识 ->
producer停止数据库操做施行事务,施行胜利则推送二次确认(Commit或Rollback)最新动静给伺服器 ->
MQ伺服器收到Commit则将第一阶段的"half"型最新动静标识表记标帜为可送达,顾客若订阅了该topic则能收到该最新动静;MQ伺服器收到Rollback则删除第一阶段的最新动静,顾客将转交不到该最新动静,就当什么事也没发作过 ->
MQ伺服器会有两个事务抵偿机造:若伺服器很久都没有收到producer返回的二次确认commit/rollback,则会主动去挪用producer的接口停止回查,接着producer再去数据库中查看事务能否施行胜利 ,如胜利/失败,则推送commit/rollback给MQ伺服器,接着后面的操做同上一步
2. 事务最新动静和一般最新动静的区别
(1)事务最新动静有三种形态:commit形态、回滚形态、中间形态(producer推送了half型最新动静但未推送commit给到伺服器,即未对);commit形态的最新动静等价于一般最新动静(能被顾客感知),但后两种形态的最新动静关于顾客是不成见的
(2)事务最新动静仅与商品和办事项目有关,仅当事务最新动静处于commit形态时与一般最新动静一样能被顾客感知3. 标识符实现事务最新动静
商品和办事项目次要就标识符: //事务最新动静利用的商品和办事项目是TransactionMQProducer TransactionMQProducer transactionMQProducer=new TransactionMQProducer("group1"); transactionMQProducer.setNamesrvAddr("127.0.0.1:9876"); try { //添加事务窃听 transactionMQProducer.setTransactionListener(new TransactionListener(){ //事务最新动静过程中包罗一般事务(数据库操做)、事务抵偿,一般事务在该办法施行 @Override @Transactional public LocalTransactionState executeLocalTransaction(Message message, Object o) { try{ //模仿数据库操做事务一般提交:insert delete... Long orderId= Long.valueOf(new String(message.getBody())); insert(orderId); System.out.println("当地数据库事务提交胜利!"); //当地施行完数据库操做后(一般事务),事务最新动静有三种形态:COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW return LocalTransactionState.COMMIT_MESSAGE; }catch(Exception ex){ //模仿数据库操做事务提交失败:insert delete... System.out.println("当地数据库事务提交失败,事务最新动静rollback:"+ex); //当地施行完数据库操做后(一般事务),事务最新动静有三种形态:COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW return LocalTransactionState.ROLLBACK_MESSAGE; } //营业胜利后,也能间接返回UNKNOW,制止极端的情况下数据库并没有保留胜利 //return LocalTransactionState.UNKNOW; } //事务抵偿在该办法施行:若executeLocalTransaction返回UNKNOW或长时间没有返回最新动静给伺服器 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("施行事务抵偿..."); Long orderId= Long.valueOf(new String(messageExt.getBody())); if(null!=selectOne(orderId)){ return LocalTransactionState.COMMIT_MESSAGE; }else{ //若事务不胜利能返回UNKNOW而不是ROLLBACK_MESSAGE,因为伺服器预设回查15次后都是UNKNOW,则会主动回滚haLf型最新动静 return LocalTransactionState.UNKNOW; } } }); transactionMQProducer.start(); Order order=new Order(0,"创建订货"); Message message=new Message("topic-transaction",String.valueOf(order.getOrderId()).getBytes()); SendResult sendResult=transactionMQProducer.sendMessageInTransaction(message,null); System.out.println("sendResult:"+sendResult);由事务最新动静的整个流程可知,先施行executeLocalTransaction,再打印推送成果:
若是executeLocalTransaction返回UNKNOW,则伺服器不竭测验考试事务抵偿:
顾客次要就标识符:
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876"); defaultMQPushConsumer.subscribe("topic-transaction", "*"); defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(msg->{ System.out.println("收到最新动静:"+new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); defaultMQPushConsumer.start();施行成果如下: