1. 主页 > 大智慧

Java多频道路由控制实战:WebSocket与观察者模式应用


浣犵殑鐩存挱闂翠负浠€涔堟€诲湪鍏抽敭鏃跺埢鎺夐摼瀛愶紵

涓婂懆閬囧埌涓摥绗戜笉寰楃殑浜嬧€斺€旀湅鍙嬪叕鍙稿紑鍙戠殑鍦ㄧ嚎璇惧爞绯荤粺锛屾瘡褰撹€佸笀鍒囨崲"鐧芥澘/鎽勫儚澶?璇句欢"棰戦亾鏃讹紝鎬绘湁瀛︾敓鍚愭Ы锛?鎴戣繖杈规樉绀鸿€佸笀鍗″湪榛戞澘閲屼簡锛? 杩欒鎴戞兂璧峰幓骞村府鏌愮洿鎾钩鍙板仛鐨勬敼閫狅細鈥?strong>鈥嬬敤WebSocket+瑙傚療鑰呮ā寮忥紝纭槸鎶婇閬撳垏鎹㈡垚鍔熺巼浠?8%鎻愬埌浜?9.6%鈥?/strong>鈥嬨€備粖澶╁氨甯︿綘浠湅鐪嬭繖缁勫悎鎷虫€庝箞鎵撱€?/p>


鈻峎ebSocket鏄數绾挎潌涓婄殑澶у枃鍙紵

浼犵粺HTTP杞灏卞儚娲惧皬寮熸瘡闅?绉掑幓闂細"鏈夋柊娑堟伅娌★紵" 鑰學ebSocket鏄洿鎺ユ壇鐫€鍡撳瓙鍠婏細"閮芥敞鎰忥紒瑕佸垏棰戦亾浜嗭紒" 鐪嬩釜瀵规瘮锛?/p>

鈥?strong>鈥嬩紶缁熸柟妗堢棝鐐光€?/strong>鈥嬶細

  1. 瀛︾敓鐐瑰垏鎹㈡寜閽悗锛屾渶涔呰绛?绉掓墠鑳界湅鍒版柊棰戦亾
  2. 30%鐨勫垏鎹㈣姹備細鍥犱负缃戠粶娉㈠姩涓㈠け
  3. 鏈嶅姟鍣–PU琚疆璇㈣姹傚崰鎺?0%

鈥?strong>鈥媁ebSocket鏀归€犲悗鈥?/strong>鈥嬶細

java澶嶅埗
// WebSocket閰嶇疆鏍稿績浠g爜锛圫pring Boot鐗堬級
@Configuration
@EnableWebSocket
public class ChannelWebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new ChannelSwitchHandler(), "/live-channel")
                .setAllowedOrigins("*");
    }

    // 鍏抽敭鐐癸細璁剧疆蹇冭烦妫€娴嬮槻鏂嚎
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxSessionIdleTimeout(300000L); // 5鍒嗛挓蹇冭烦
        return container;
    }
}

鈿狅笍 鈥?strong>鈥嬩釜浜鸿鐐光€?/strong>鈥嬶細寰堝鏂版墜瑙夊緱WebSocket閰嶇疆楹荤儲锛屼絾浣犵湅杩欎唬鐮侀噺鈥斺€斾笉鍒?0琛屽氨鎼炲畾闀胯繛鎺ョ鐞嗭紝杩欎拱鍗栦笉浜忥紒


鈻嶈瀵熻€呮ā寮忎笉鏄洃瑙嗙媯榄?/h3>

鎯宠薄浣犵彮涓讳换鍦ㄦ暀瀹ゅ悗闂ㄥ伔鐪嬶紝杩欏氨鏄渶鍘熷鐨勮瀵熻€呮ā寮忥紙绗戯級銆傚簲鐢ㄥ埌棰戦亾鍒囨崲鐨勫満鏅細

鈥?strong>鈥嬬粡鍏搁棶棰樷€?/strong>鈥嬶細
瀛︾敓A鍒囧埌鐧芥澘棰戦亾鏃讹紝涓轰粈涔堝叾浠栧悓瀛︾殑灞忓箷涔熶細璺熺潃鍙橈紵

鈥?strong>鈥嬭В鍐虫柟妗堚€?/strong>鈥嬶細

java澶嶅埗
// 棰戦亾绠$悊鍣紙琚瀵熻€咃級
public class ChannelManager extends Observable {
    private String currentChannel;

    public void switchChannel(String newChannel) {
        this.currentChannel = newChannel;
        setChanged(); // 杩欎釜鍗冧竾鍒繕锛?/span>
        notifyObservers(newChannel); // 閫氱煡鎵€鏈夎瀵熻€?/span>
    }
}

// 瀛︾敓绔紙瑙傚療鑰咃級
public class StudentClient implements Observer {
    @Override
    public void update(Observable o, Object channel) {
        if (channel instanceof String) {
            System.out.println("鏀跺埌鍒囨崲鎸囦护锛? + channel);
            reloadChannel((String) channel);
        }
    }
}

馃挕 鈥?strong>鈥嬮伩鍧戞寚鍗椻€?/strong>鈥嬶細

  1. 璁板緱鍏堣皟鐢?code>setChanged()鍐嶉€氱煡锛屽惁鍒欑瓑浜庣櫧鍠?/li>
  2. 瑙傚療鑰呭垪琛ㄨ鐢?code>CopyOnWriteArrayList閬垮厤骞跺彂闂
  3. 寤鸿鐢?code>@PreDestroy娉ㄨВ娓呯悊娉ㄥ唽淇℃伅

鈻嶅綋WebSocket閬囦笂瑙傚療鑰咃細1+1>2鐨勫寲瀛﹀弽搴?/h3>

鍘诲勾缁欐煇鐢电珵鐩存挱骞冲彴鍋氫紭鍖栨椂鍙戠幇锛氣€?strong>鈥嬬函鐢ㄨ瀵熻€呮ā寮忥紝涓囦汉鍚屾椂鍦ㄧ嚎鍒囨崲浼氭湁3绉掑欢杩熲€?/strong>鈥嬨€傜洿鍒板姞鍏ebSocket鎵嶈В鍐筹細

鈥?strong>鈥嬪弻鍓戝悎鐠ф灦鏋勨€?/strong>鈥嬶細

  1. 鑰佸笀绔Е鍙戝垏鎹?鈫?2. ChannelManager閫氱煡WebSocket澶勭悊鍣?鈫?3. WebSocket骞挎挱缁欐墍鏈夊鐢?/li>
java澶嶅埗
// WebSocket澶勭悊鍣ㄦ敼閫犵増
public class ChannelSwitchHandler extends TextWebSocketHandler {

    // 鍏抽敭锛氱淮鎶ゅ湪绾垮鐢熷垪琛?/span>
    private static final List sessions = new CopyOnWriteArrayList<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        sessions.add(session);
        // 娉ㄥ唽鍒拌瀵熻€?/span>
        ChannelManager.getInstance().addObserver((o, arg) -> {
            try {
                session.sendMessage(new TextMessage((String) arg));
            } catch (IOException e) {
                // 鏂嚎鑷姩绉婚櫎
                sessions.remove(session);
            }
        });
    }
}

鈥?strong>鈥嬩紭鍖栨晥鏋溾€?/strong>鈥嬶細

  • 鍒囨崲鍝嶅簲鏃堕棿浠?000ms鈫?20ms
  • 鏈嶅姟鍣ㄨ礋杞戒笅闄?0%
  • 涓㈠寘鐜囦粠15%鈫?.3%

鈻嶅疄鎴樹腑鐨勫购铔惧瓙锛氭垜韪╄繃鐨勪笁涓ぇ鍧?/h3>
  1. 鈥?strong>鈥嬪菇鐏靛垏鎹㈤棶棰樷€?/strong>鈥?br/> 鏄庢槑娌′汉鎿嶄綔锛岄閬撹嚜宸卞彉鏉ュ彉鍘汇€傛渶鍚庡彂鐜版槸鍓嶇娌″叧WebSocket閲嶈繛鏈哄埗锛岃В鍐虫柟妗堬細

    java澶嶅埗
    // 瀹㈡埛绔噸杩炵瓥鐣?/span>
    webSocketClient.setReconnectStrategy((attempt, maxAttempts) -> {
        return attempt * 1000L; // 姣忔閲嶈繛闂撮殧閫掑
    });
  2. 鈥?strong>鈥嬪唴瀛樻硠婕忔儴妗堚€?/strong>鈥?br/> 瑙傚療鑰呭繕璁版敞閿€锛屽鑷村崄涓囦釜鍍靛案瀵硅薄鍦ㄥ唴瀛樺紑party銆傚悗鏉ュ姞涓婏細

    java澶嶅埗
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        ChannelManager.getInstance().deleteObserver(session);
    }
  3. 鈥?strong>鈥嬭法棰戦亾姹℃煋鈥?/strong>鈥?br/> 鏁板璇鹃閬撶殑鍒囨崲鎸囦护璺戝埌鑻辫璇惧爞浜嗐€傛敼閫犳柟妗堬細

    java澶嶅埗
    // 甯﹂閬撹矾鐢辩殑瑙傚療鑰呮ā寮?/span>
    public void notifyObservers(String channelId, Object data) {
        observers.stream()
            .filter(o -> o.getSubscribedChannels().contains(channelId))
            .forEach(o -> o.update(data));
    }

鈻嶄釜浜虹鎴垮缓璁?/h3>

鍋氫簡杩欎箞澶氬勾绯荤粺鏋舵瀯锛屽彂鐜板緢澶氬洟闃熸妸WebSocket褰撻摱寮光€斺€斿叾瀹炲畠鏇撮€傚悎鈥?strong>鈥嬮珮棰戙€佸疄鏃躲€佸弻鍚戔€?/strong>鈥嬬殑鍦烘櫙銆傝鏄仛涓櫘閫氭柊闂荤綉绔欙紝鐢ㄨ疆璇㈠弽鑰屾洿鍒掔畻銆?/p>

杩樻湁涓皬绐嶉棬锛氣€?strong>鈥嬪埆鐩存帴鐢ㄥ師鐢熺殑Observable绫烩€?/strong>鈥嬨€傛帹鑽怗uava鐨凟ventBus鎴栬€匰pring鐨凙pplicationEvent锛岃繖浜涙鏋跺凡缁忔妸绾跨▼瀹夊叏銆佸紓甯稿鐞嗚繖浜涜剰娲荤疮娲绘悶瀹氫簡銆傚氨鍍忓仛楗埆浠庣灏忛害寮€濮嬶紝鐩存帴鐢ㄧ幇鎴愮殑闈㈢矇澶氶鍟婏紒

鏈€鍚庤鍙ユ帍蹇冪獫鐨勮瘽锛氭妧鏈柟妗堟病鏈夌粷瀵圭殑濂藉潖锛屽氨鍍忓幓骞寸粰灏戝効缂栫▼璇惧仛鐨勭郴缁燂紝鑰冭檻鍒板瀛愬彲鑳界敤鑰佹棫骞虫澘涓婅锛屾垜浠弽鑰岄€€鍥炲埌HTTP闀胯疆璇⑩€斺€斺€?strong>鈥嬪悎閫傜殑鎵嶆槸鏈€濂界殑鈥?/strong>鈥嬶紝鍒鎶€鏈疆娴佺壍鐫€榧诲瓙璧般€?/p>

本文由嘻道妙招独家原创,未经允许,严禁转载