<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • 基于long pull實現簡易的消息中心MQ參考

      我們都用過消息中間件,它的作用自不必多說。但對于消費者卻一直有一些權衡,就是使用push,還是pull模式的問題,這當然是各有優劣。當然,這并不是本文想討論的問題。我們想在不使用長連接的情意下,如何實現實時的消息消費,而不至于讓server端壓力過大。大體上來說,這是一種主動拉取pull的方式。具體情況如何,且看且聽。

     

    1. 架構示意圖

      既然是一個消息中間的作用,我們必須得模擬一個生產消費者模型,如下:

     

     

       生產者集群->消息中心集群->消費者集群

      只是這里的生產和消息中心也許我們可以合二為一,為簡單起見,可能我們消費者只是想知道數據發生了變化。

      以上是一個通用模型,接下來再說說我如何以long pull消息消費,其流程圖如下:

     

     

       消費者一直請求連接->消息中心->有數據到來或者超時->消費者處理數據->發送ack確認->繼續請求連接

      如此一來,我們基本上就實現了一個消費模型了。但是有個問題,我們一直在不停地請求server,這會不會讓server疲于奔命?是的,如果按照正常的http請求,就是不停地建立連接,處理數據,關閉連接等等。在沒有消息到來之前,可以說,server會一直被這無用功跑死,它的qps越高,壓力也越大。所以,我們使用了一種long pull的方式,讓server端不要那么快返回沒有意義的數據。但,這可能不是一件容易的事。

     

    2. long pull的實現方式

      long pull從原理上來說就是,必要的時候hold住連接,直到某個時機才返回。這和長鏈接有點類似。

      至于為什么不用長連接實現,我想至少有兩個原因:一是long pull一般基于http協議,實現簡單且通用,而如果要基于長鏈接則需要了解太多的通信細節太復雜;二是端口復用問題,long pull可以直接基于業務端口實現,而長連接則必須要另外開一個通信端口,這在實際運維過程中也許不那么好操作,主要原因可能是我們往往不是真正的中間件,還達不到與架構或運維pk端口標準的資本。

      說回正題,如何實現long pull?這其實和你使用的框架有關。但簡單來說都可以這樣干,請求進來后,我只要一直不返回即可。而且這也許是許多框架或語言的唯一選擇。

      如果咱們是java語言且基于spring系列框架,則可以用另外一種異步的方式。用上一種通用的實現方式的缺點是:當一個請求一直不返回后,必然占用主連接池,從而影響其他業務接口的請求處理,就是說只要你多接入幾個這種請求,業務就別想有好日子過了。所以,我們選擇異步的方式。異步,聽起來是個好名詞,但又該如何實現呢?我們普通異步,可能是直接丟到一個隊列去,然后由后臺線程一直處理即可,聽起來不錯。但這種請求至少兩個問題:一是當我們提交到任務隊列之后,連接還存在嗎?二是我們敢讓請求排隊嗎?因為如果排隊有新數據進來,可就不面對實時的承諾了。

      所以,針對上面的問題,spring系列有了解決方案。使用異步 servlet(async servlet),其操作步驟如下:

    1 controller中返回異步實例callable;
    2 在servlet中配置異步支持標識(統一配置);

     

      比如下面的demo:

    // controller
        @GetMapping(value = "/consumeData")
        public Object consumeData(@RequestParam String topicName,
                                  @RequestParam Long offset,
                                  @RequestParam Long maxWait) {
            // 必要的時候需要在 web.xml中配置 <async-supported>true</async-supported>
            Callable<String> callable = () -> {
                SleepUtil.sleepMillis(10_000L);
                System.out.println("data come in, got out.");
                return "ok";
            };
    
            return callable;
        }
    // web.xml
        // 所有需要的filter和servlet中,添加
        <async-supported>true</async-supported>

      具體的框架版本各自具體配置可能不一樣,自行查找資料即可。

      以上,就解決了long pull的問題了。

     

    3. 主鍵id的實現

      主鍵id至少有兩個作用:一是可用于唯一定位一條消息;二是可以用于去重做冪等;其實一般還有一個目的就是用于確認消息的先后順序;

      所以主鍵id很重要,往往需要經過精心的設計。但,我們這里可以簡單的基于redis的自增key來處理即可。既保證了性能,又保證了唯一性,還保證了先后順序問題。這就為后續消息的存儲帶來了方便。比如可以用zset存儲這個消息id。

     

    4. 數據到來的檢測實現

      在server端hold連接的同時,它又是如何發現數據已經到來了呢?

      最簡單的,可以讓每個請求每隔一定時間,去查詢一次數據,如果有則返回。但這個實現既不優雅也不經濟也不實時,但是簡單,可以適當考慮。

      好點的方式,使用wait/notify機制,簡單來說比如使用一個CountDownLatch,沒有數據時則進行wait,數據到來時進行notify。這樣下不來,不用每個請求反復查詢數據,導致server壓力變大,同時也讓系統調度壓力減小了,而且能夠做到實時感知數據,可以說是很棒的選擇。只是,這必然有很多的細節問題需要處理,稍有不慎,可能就是一個坑。比如:死鎖問題,多節點問題,網絡問題。。。 隨便來一個,也許就jj了。

      好好處理這個問題,總是好的。

     

    5. 消息中心實現demo

     

    5.1. 消費者生產者controller

      兩個簡單方法入口,生產+消費 。

    @RestController
    @RequestMapping("/simpleMessageCenter")
    public class SimpleMessageCenterController {
    
        @Resource
        private MessageService messageService;
    
        // 消費消息
        @GetMapping(value = "/consumeData")
        public Object consumeData(@RequestParam String topicName,
                                  @RequestParam Long offset,
                                  @RequestParam Long maxWait) {
            // 必要的時候需要在 web.xml中配置 <async-supported>true</async-supported>
            Callable<String> callable = () -> {
                try {
                    Object data = messageService.consumeData(topicName, offset, maxWait);
                    return JSONObject.toJSONString(data);
                }
                catch (Exception e){
                    e.printStackTrace();
                    return "error";
                }
            };
    
            return callable;
        }
    
        // 發送消息
        @GetMapping(value = "/sendMsg")
        public Object sendMsg(@RequestParam String topicName,
                              @RequestParam String extraId,
                              @RequestParam String data) {
            messageService.sendMsg(topicName, extraId, data);
            return "ok";
        }
    }

     

    5.2. 核心service簡化版

      由redis作為存儲,展示各模塊間的協作。

    @Service
    public class MessageService {
    
        @Resource
        private RedisTemplate<String, String> redisTemplate;
    
        // 消費閉鎖
        private volatile ConcurrentHashMap<String, CountDownLatch>
                consumeLatchContainer = new ConcurrentHashMap<>();
    
        // 消費數據接口
        public List<Map<String, Object>> consumeData(String topic,
                                                     Long offset,
                                                     Long maxWait) throws InterruptedException {
            long startTime = System.currentTimeMillis();
            final CountDownLatch myLatch = getOrCreateConsumeLatch(topic);
            List<Map<String, Object>> result = new ArrayList<>();
            do {
                ZSetOperations<String, String> queueHolder
                        = redisTemplate.opsForZSet();
                Set<ZSetOperations.TypedTuple<String>> nextData
                        = queueHolder.rangeByScoreWithScores(topic, offset, offset + 100);
                if(nextData == null || nextData.isEmpty()) {
                    long timeRemain = maxWait - (System.currentTimeMillis() - startTime);
                    myLatch.await(timeRemain, TimeUnit.MILLISECONDS);
                    continue;
                }
                for (ZSetOperations.TypedTuple<String> queue1 : nextData) {
                    Map<String, Object> queueWrapped = new HashMap<>();
                    queueWrapped.put(queue1.getValue(), queue1.getScore());
                    result.add(queueWrapped);
                }
                break;
            } while (System.currentTimeMillis() - startTime <= maxWait);
            return result;
        }
    
        // 獲取topic級別的鎖
        private CountDownLatch getOrCreateConsumeLatch(String topicName) {
            return consumeLatchContainer.computeIfAbsent(
                        topicName, k -> new CountDownLatch(1));
        }
    
        // 接收到消息存儲請求
        public void sendMsg(String topic, String extraIdSign, String data) {
            ValueOperations<String, String> strOp = redisTemplate.opsForValue();
            Long msgId = strOp.increment(topic + ".counter");
            // todo: 1. save real data
            // 2. 加入通知隊列
            ZSetOperations<String, String> zsetOp = redisTemplate.opsForZSet();
            zsetOp.add(topic, extraIdSign, msgId);
            wakeupConsumers(topic, extraIdSign);
        }
    
        // 喚醒消費者,一般是有新數據到來
        private void wakeupConsumers(String topic, String extraIdSign) {
            CountDownLatch consumeLatch = getOrCreateConsumeLatch(topic);
            consumeLatch.countDown();
            rolloverConsumeLatch(topic, extraIdSign);
        }
    
        // 產生新一輪的鎖
        private void rolloverConsumeLatch(String topic, String extraIdSign) {
            consumeLatchContainer.put(topic, new CountDownLatch(1));
        }
    }

      

    5.3. 功能測試

      因為是使用http接口實現,所以,可以直接通過瀏覽器實現功能測試。一個地址打開生產者鏈接,一個打開消費者鏈接。

    // 1. 先訪問消費者
    http://localhost:8081/simpleMessageCenter/consumeData?topicName=q&offset=19&maxWait=50000
    // 2. 再訪問生產者
    http://localhost:8081/simpleMessageCenter/sendMsg?topicName=q&extraId=d3&data=aaaaaaaaaaa

      在生產者沒有數據進來前,消費者會一直在等待,而生產者產生數據后,消費者就立即展示結果了。我們要實現的,不就是這個效果嗎?

     

    5.4. 消費者一直請求樣例

      在瀏覽器上我們看到的只是一次請求,但如果真正想實現,一直消費數據,則必須有一種訂閱的感覺。其實就是不停的請求,處理,再請求的過程。

    public class SimpleMessageCenterTest {
    
        @Test
        public void testConsumerSubscribe() {
            long offset = 0;
            String urlPrefix = "http://localhost:8081/simpleMessageCenter/consumeData?topicName=q&maxWait=50000&offset=";
            while (!Thread.interrupted()) {
                String dataListStr = HttpUtils.doGet(urlPrefix + offset);
                System.out.println("offsetStart: " + offset + ", got data:" + dataListStr);
                List<Object> dataListParsed = JSONObject.parseArray(dataListStr);
                // 不解析最終的offset了,大概就是根據最后一次offset再發起請求即可
                offset += dataListParsed.size();
            }
        }
    }

      以上,就是本次分享的小輪子了。我們拋卻了消息系統中的一個重要且復雜的環節:存儲。供參考。

    posted @ 2022-03-24 16:51  等你歸去來  閱讀(194)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看