Spring websocket chatting server(3) – 여러대의 채팅서버간에 메시지 공유하기 by Redis pub/sub

Spring websocket chatting server(3) – 여러대의 채팅서버간에 메시지 공유하기 by Redis pub/sub

이 연재글은 웹소켓(websocket)으로 채팅서버 만들기의 3번째 글입니다.

앞 장에서 실습을 통해 채팅을 구현해 보았습니다. websocket과 Stomp를 이용한 구현만으로도 채팅의 기본 기능은 충분히 구현할 수 있는 것을 확인할 수 있었습니다. 하지만 서비스에 사용하려면 좀 더 쓸만하게 변경이 필요합니다. 앞장에서 만든 채팅 서비스는 몇 가지 문제가 있습니다. 

서버를 재시작 할때마다 채팅방 정보들이 리셋됨

채팅방의 메인 저장소가 없으므로 서버의 메모리에 적재된 채팅방은 서버를 재시작할 때마다 초기화되는 이슈가 있습니다. DB를 이용하거나 다른 저장소를 이용하여 채팅방이 계속 유지되도록 처리가 필요합니다. 여기서는 Redis를 저장소로 이용해 보겠습니다. 

채팅서버가 여러대이면 서버간 채팅방을 공유할수가 없음

현재는 채팅방을 websocket과 Stomp pub/sub를 이용하여 구현하였습니다. 그런데 이러한 구조는 pub/sub가 발생한 서버 내에서만 메시지를 주고받는 것이 가능합니다. 즉 구독 대상인 채팅방(topic)이 생성된 서버 안에서만 유효하므로 다른 서버로 접속한 클라이언트는 해당 채팅방이 보이지도 않고, 채팅방(topic) 구독도 불가능합니다. 즉 구독 대상(채팅방 : topic)이 여러 서버에서 접근할 수 있도록 개선이 필요합니다. 요구조건을 해결하려면 공통으로 사용할 수 있는 pub/sub 시스템을 구축하고 모든 서버들이 해당 시스템을 통하여 pub/sub 메시지를 주고받도록 변경해야 합니다. 

마침 Redis가 pub/sub를 지원하고 있으며 공통 pub/sub 채널로 이용하기 알맞습니다. Redis를 활용하여 서로 다른 서버에 접속해 있는 클라이언트가 채팅방을 통해 다른 서버의 클라이언트와 메시지를 주고받을 수 있도록 구현해보겠습니다.

Redis pub/sub를 이용한 채팅룸 고도화

build.gradle

dependencies에 다음 라이브러리를 추가합니다. local에서는 Redis 설치없이 간단하게 Embedded Redis를 사용하여 환경을 구축해보겠습니다.

    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
    //embedded-redis
    compile group: 'it.ozimov', name: 'embedded-redis', version: '0.7.2'

Embedded Redis 서버 사용을 위한 설정

채팅 서버가 실행될때 Embedded Redis 서버도 동시에 실행 될수 있도록 아래 설정을 추가합니다. local 환경에서만 실행되도록 @Profile(“local”)을 상단에 선언합니다.

package com.websocket.chat.config;
// import 생략...

/**
 * 로컬 환경일경우 내장 레디스가 실행됩니다.
 */
@Profile("local")
@Configuration
public class EmbeddedRedisConfig {

    @Value("${spring.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    @PostConstruct
    public void redisServer() {
        redisServer = new RedisServer(redisPort);
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis() {
        if (redisServer != null) {
            redisServer.stop();
        }
    }
}

Redis 설정

redis의 pub/sub 기능을 이용할 것이므로 MessageListener 설정을 추가합니다. 그리고 어플리케이션에서 redis 사용을 위해 redisTemplate 설정도 추가합니다.

package com.websocket.chat.config;

// import 생략...

@Configuration
public class RedisConfig {

    /**
     * redis pub/sub 메시지를 처리하는 listener 설정
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }

    /**
     * 어플리케이션에서 사용할 redisTemplate 설정
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
        return redisTemplate;
    }
}

환경 설정 수정 및 추가

application.yml

기본 프로파일을 local로 설정합니다.

spring:
  profiles:
    active: local

application-local.yml

local 환경 설정 파일을 추가합니다. Embedded Redis는 local에 실행되므로 아래와 같이 추가합니다.

spring:
  profiles:
    active: local
  redis:
    host: localhost
    port: 6379

application-alpha.yml

alpha 서버용 환경 설정 파일입니다. 로컬에서만 테스트할 것이면 굳이 추가할 필요가 없지만. 다른 환경의 테스트가 필요할 경우 세팅합니다. Redis 정보는 서버 환경에 설치된 Redis 정보로 대체합니다.

spring:
  profiles:
    active: alpha
  redis:
    host: redis가 설치된 서버 호스트
    port: redis가 설치된 서버 포트

Redis 발행/구독 모델 구현을 위한 서비스 생성

Redis에서는 공통 주제(Topic)에 대하여 구독자(subscriber)에게 메시지를 발행(publish)할 수 있는 기능이 있습니다. 통칭하여 pub/sub라고 하며 채팅방에서는 redis의 topic을 채팅방이라고 가정하고, pub/sub는 대화를 하거나/대화를 보는 행위라고 생각하면 됩니다. Spring에서는 redis topic에 대하여 구독 및 발행을 처리할 수 있도록 다음과 같이 방법을 제공하고 있습니다.

Redis 발행 서비스 구현

채팅방에 입장하여 메시지를 작성하면 해당 메시지를 Redis Topic에 발행하는 기능의 서비스입니다. 이 서비스를 통해 메시지를 발행하면 대기하고 있던 redis 구독 서비스가 메시지를 처리합니다.

package com.websocket.chat.pubsub;
// import 생략...
@RequiredArgsConstructor
@Service
public class RedisPublisher {
    private final RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic topic, ChatMessage message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

Redis 구독 서비스 구현

Redis에 메시지 발행이 될 때까지 대기하였다가 메시지가 발행되면 해당 메시지를 읽어 처리하는 리스너입니다. MessageListener를 상속받아 onMessage 메서드를 재작성합니다. 아래에서는 Redis에 메시지가 발행되면 해당 메시지를 ChatMessage로 변환하고 messaging Template을 이용하여 채팅방의 모든 websocket 클라이언트들에게 메시지를 전달하도록 구현되어 있습니다. 

@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final RedisTemplate redisTemplate;
    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * Redis에서 메시지가 발행(publish)되면 대기하고 있던 onMessage가 해당 메시지를 받아 처리한다.
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // redis에서 발행된 데이터를 받아 deserialize
            String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
            // ChatMessage 객채로 맵핑
            ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
            // Websocket 구독자에게 채팅 메시지 Send
            messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}

ChatController 수정

클라이언트가 채팅방 입장시 채팅방(topic)에서 대화가 가능하도록 리스너를 연동하는 enterChatRoom 메서드를 세팅합니다. 채팅방에 발행된 메시지는 서로 다른 서버에 공유하기 위해 redis의 Topic으로 발행합니다. 

package com.websocket.chat.controller;

// import 생략...

@RequiredArgsConstructor
@Controller
public class ChatController {

    private final RedisPublisher redisPublisher;
    private final ChatRoomRepository chatRoomRepository;

    /**
     * websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
     */
    @MessageMapping("/chat/message")
    public void message(ChatMessage message) {
        if (ChatMessage.MessageType.ENTER.equals(message.getType())) {
            chatRoomRepository.enterChatRoom(message.getRoomId());
            message.setMessage(message.getSender() + "님이 입장하셨습니다.");
        }
        // Websocket에 발행된 메시지를 redis로 발행한다(publish)
        redisPublisher.publish(chatRoomRepository.getTopic(message.getRoomId()), message);
    }
}

ChatRoomRepository 수정

채팅방 정보는 초기화 되지 않도록 생성 시 Redis Hash에 저장하도록 처리합니다. 채팅방 정보를 조회할 때는 Redis Hash에 저장된 데이터를 불러오도록 메서드 내용을 수정합니다. 채팅방 입장 시에는 채팅방 id로 Redis topic을 조회하여 pub/sub 메시지 리스너와 연동 합니다. 

@RequiredArgsConstructor
@Repository
public class ChatRoomRepository {
    // 채팅방(topic)에 발행되는 메시지를 처리할 Listner
    private final RedisMessageListenerContainer redisMessageListener;
    // 구독 처리 서비스
    private final RedisSubscriber redisSubscriber;
    // Redis
    private static final String CHAT_ROOMS = "CHAT_ROOM";
    private final RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, ChatRoom> opsHashChatRoom;
    // 채팅방의 대화 메시지를 발행하기 위한 redis topic 정보. 서버별로 채팅방에 매치되는 topic정보를 Map에 넣어 roomId로 찾을수 있도록 한다.
    private Map<String, ChannelTopic> topics;

    @PostConstruct
    private void init() {
        opsHashChatRoom = redisTemplate.opsForHash();
        topics = new HashMap<>();
    }

    public List<ChatRoom> findAllRoom() {
        return opsHashChatRoom.values(CHAT_ROOMS);
    }

    public ChatRoom findRoomById(String id) {
        return opsHashChatRoom.get(CHAT_ROOMS, id);
    }

    /**
     * 채팅방 생성 : 서버간 채팅방 공유를 위해 redis hash에 저장한다.
     */
    public ChatRoom createChatRoom(String name) {
        ChatRoom chatRoom = ChatRoom.create(name);
        opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
        return chatRoom;
    }

    /**
     * 채팅방 입장 : redis에 topic을 만들고 pub/sub 통신을 하기 위해 리스너를 설정한다.
     */
    public void enterChatRoom(String roomId) {
        ChannelTopic topic = topics.get(roomId);
        if (topic == null) {
            topic = new ChannelTopic(roomId);
            redisMessageListener.addMessageListener(redisSubscriber, topic);
            topics.put(roomId, topic);
        }
    }

    public ChannelTopic getTopic(String roomId) {
        return topics.get(roomId);
    }
}

ChatRoom Serialize

Redis에 저장되는 객체들은 Serialize가능해야 하므로 Serializable을 참조하도록 선언하고 serialVersionUID를 세팅해 줍니다.

@Getter
@Setter
public class ChatRoom implements Serializable {

    private static final long serialVersionUID = 6494678977089006639L;

    private String roomId;
    private String name;

    public static ChatRoom create(String name) {
        ChatRoom chatRoom = new ChatRoom();
        chatRoom.roomId = UUID.randomUUID().toString();
        chatRoom.name = name;
        return chatRoom;
    }
}

단일 서버 테스트

채팅 서버를 실행하고 브라우저 창 2개를 띄워서 서로 채팅을 주고받아 봅니다. 여기까지는 Stomp를 통해 구현한 채팅 서버와 별다른 차이가 없습니다. 

채팅방 개설
채팅방 진입 – 행인1
채팅방 진입 – 학생 1
채팅 리스트
대화 진행

다중 서버 채팅 테스트

local 환경에서 서버를 2대 띄우면 같은 port로 redis가 두 개가 실행되므로 테스트가 불가능합니다. 실습에서는 외부 Redis를 설치하고 profile을 alpha로 지정한 후 테스트합니다. 로컬 PC나 특정 서버에 Redis가 설치돼있다는 가정하에 아래 테스트를 진행합니다. 

application-alpha.yml

Redis서버 내용을 설정합니다.

spring:
  profiles:
    active: alpha
  redis:
    host: Standalone Redis 호스트
    port: Standalone Redis 포트

다른 포트로 서버를 2대 띄울 것이므로 터미널을 띄우고 프로젝트 root에서 다음 명령어로 executable jar를 생성합니다. 

$ ./gradlew bootJar 
// 윈도우의 경우는 ./gradlew.bat bootJar

빌드가 완료되면 /build/libs에 jar가 생성됩니다. 8080, 8090 두 개 포트로 서버를 2개 띄웁니다. jar실행 시에는 각각 다른 터미널에서 띄우던지 nohup을 통해 데몬으로 띄우면 됩니다. 

$ cd build/libs
$ ls
chat-0.0.1-SNAPSHOT.jar
$ java -jar -Dserver.port=8080 -Dspring.profiles.active=alpha chat-0.0.1-SNAPSHOT.jar
// 다른 터미널에서 실행
$ java -jar -Dserver.port=8090 -Dspring.profiles.active=alpha chat-0.0.1-SNAPSHOT.jar 

서버 2대가 실행되면 브라우저에서 각각 http://localhost:8080/chat/room, http://localhost:8090/chat/room으로 접속하여 서로 채팅이 되는지 테스트 합니다.

서로 다른 서버에 접속한 클라이언트들 간에 채팅 메시지를 공유하고 메시지를 주고받을 수 있게 되었습니다. 

채팅방 개설
대화명 입력 및 채팅방 입장
대화명 입력 및 채팅방입장
대화진행
대화진행

서버를 8070 port로 1대 더 띄워보겠습니다. 이제 총 3대의 채팅 서버가 존재하게 됩니다. 

$ java -jar -Dserver.port=8070 -Dspring.profiles.active=alpha chat-0.0.1-SNAPSHOT.jar

localhost:8070/chat/room으로 접속하여 채팅을 진행해 봅니다.

다른서버에서 생성된 채팅방도 화면에 표시되며 채팅방에 들어가서 대화를 하면 서로 다른 서버에서 접속한 클라이언트라도 채팅방의 대화내용이 모두 표시됩니다. 이제 채팅서버가 아무리 많아도 동일한 채팅방에만 있으면 메시지를 교환할 수 있게 되었습니다.

Redis에서 직접 메시지를 보내고 받기

redis의 pub/sub를 이용한 채팅이므로 redis client를 통해서도 메시지를 보내거나 받을수 있습니다.

redis가 6379 port로 떠있다고 가정하고 다음과 같이 redis-cli로 접속하여 pub/sub를 테스트해봅니다. 
CHAT_ROOM안에 채팅룸 id가 있으므로 검색해 봅니다. 검색되는 채팅룸 id로 채팅룸을 구독합니다. 
$ subscribe 3f0f893a-5849-4028-9755-8c6c8 ab1846b 
이 상황에서 브라우저의 채팅룸에서 메시지를 입력하면 터미널에 메시지가 출력됩니다. 객체는 Serialize 되어있어 알아보기 힘든 특수문자로 표시됩니다. 

$ redis-cli -p 6379
$ exists CHAT_ROOM
(integer) 1
$ hkeys CHAT_ROOM
1) "\xac\xed\x00\x05t\x00$3f0f893a-5849-4028-9755-8c6c8ab1846b"
$ subscribe 3f0f893a-5849-4028-9755-8c6c8ab1846b
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "3f0f893a-5849-4028-9755-8c6c8ab1846b"
3) (integer) 1
1) "message"
2) "3f0f893a-5849-4028-9755-8c6c8ab1846b"
3) "{\"type\":\"TALK\",\"roomId\":\"3f0f893a-5849-4028-9755-8c6c8ab1846b\",\"sender\":\"\xeb\xb6\x81\xed\x95\x9c\xec\xa3\xbc\xeb\xaf\xbc\",\"message\":\"\xea\xb2\xbd\xea\xb8\xb0\xea\xb0\x80 \xec\x96\xb4\xeb\x96\xbb\xec\x8a\xb5\xeb\x8b\x88\xea\xb9\x8c?\"}"

메시지 발송

발송할 message를 담는 json을 만드는 것이 좀 힘들지만 다음과 같이 영문을 이용하여 json message를 구성하고 메시지를 발행할 수 있습니다. 

$ publish "3f0f893a-5849-4028-9755-8c6c8ab1846b" "{\"type\":\"TALK\",
\"roomId\":\"3f0f893a-5849-4028-9755-8c6c8ab1846b\",
\"sender\":\"God!\",
\"message\":\"Hi EveryOne!!!\"}"

브라우저의 채팅창에서 다음과 같이 메시지를 수신받을 수 있습니다. redis console에서 메시지를 발행했지만 roomId로 메시지를 발행하면 서로 다른 서버의 채팅방에도 메시지를 발송할 수 있음을 확인할 수 있습니다. 

이러한 pub/sub의 특징을 활용하면 채팅방에 들어가지 않더라도 전체 메시지를 보낼수 있습니다.

여기까지 Redis pub/sub를 이용하여 채팅 서버를 고도화해보았습니다. 눈치 채신분들도 있겠지만 pub/sub를 제공하는 다른 메시징 플랫폼(RabbitMQ, Kafka)을 사용하여 Redis의 pub/sub를 대체하는 것도 가능합니다. 

이번장에서 사용한 소스는 다음 Github에서 확인 가능합니다.

https://github.com/codej99/websocket-chat-server/tree/feature/redis-pub-sub

연재글 이동<< Spring websocket chatting server(2) – Stomp로 채팅서버 고도화하기
Spring websocket chatting server(4) – SpringSecurity + Jwt를 적용하여 보안강화하기 >>
공유

댓글 남기기

Close Menu