Redis – spring-data-redis : 발행/구독(pub/sub) 모델의 구현

Redis – spring-data-redis : 발행/구독(pub/sub) 모델의 구현

이 연재글은 Redis 알아보기의 5번째 글입니다.

이번장에서는 Redis를 이용한 발행(publish)및 구독(subscribe)모델의 구현 방법에 대해 설명하겠습니다. (이하 pub/sub) pub/sub는 특정한 주제(topic)에 대하여 해당 topic을 구독한 모두에게 메시지를 발행하는 통신 방법입니다.
이를테면 날씨정보를 구독한 사람에게 주기적으로 날씨정보를 보내거나. 특정한 작업을 반복 수행하는 작업자에게 비동기적으로 작업을 보내 처리하도록 하거나, 또는 현재 앱에 로그인한 유저에게 푸시를 발송하는 활동들이 모두 pub/sub의 원리로 만들어 진다고 보면 됩니다.

Redis는 RabbitMQ나 Kafka같이 전문적인 메시징시스템의 pub/sub처럼 고도화된 기능을 제공하지는 않지만 MemoryDB의 특성을 살려 단순하지만 가볍고 빠른 pub/sub기능을 제공하고 있습니다. 스프링에서는 Spring-Data-Redis 모듈을 로드함으로써 쉽게 pub/sub를 구현 할 수 있습니다.

build.gradle

이전 장에서 spring-boot-starter-data-redis를 추가했다면 spring-data-redis 모듈도 같이 로드됩니다.

implementation 'org.springframework.boot:spring-boot-starter-data-redis'

Redis Config 내용 추가

Redis Config에 다음과 같이 두개의 Bean을 등록합니다. pub/sub는 기존의 http통신처럼 통신을 맺고 – 작업을 수행하고 – 통신을 끝는것이 아니라 항상 레디스에 발행된 데이터가 있는지 확인하고 있어야 하므로 Listener를 등록해줘야 합니다.
그리고 pub/sub통신시 redis 서버와 메시지 교환시 사용할 Serializer를 등록해줍니다. Json이 사용하기 편리하므로 ValueSerializer를 Jackson2JsonRedisSerializer로 등록합니다.

@Configuration
@EnableCaching
public class RedisCacheConfig {

// 다른 설정은 생략

// redis를 경청하고 있다가 메시지 발행(publish)이 오면 Listener가 처리합니다.    
@Bean
    public RedisMessageListenerContainer RedisMessageListener(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }

// pub/sub 통신에 사용할 RedisTemplate 설정 
    @Bean
    public RedisTemplate<String, Object> redisTemplateForObject(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
        return redisTemplate;
    }

메시지 교환 모델 추가

메시지 교환을 위한 객체를 추가합니다. topic에 메시지 발행시 해당 객체에 데이터를 담아 발송합니다.

@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class RoomMessage implements Serializable {
    private static final long serialVersionUID = 2082503192322391880L;
    private String roomId;
    private String name;
    private String message;
}

발행자(Publisher) 추가

메시지를 Redis 서버로 발행하는 Publisher를 추가합니다. 위에서 설정한 redisTemplate을 주입하고 발행을 위한 메서드를 구현합니다. topic에 구독자(subscriber)가 있는경우 발행자(publisher)가 메시지를 발행하면 구독자에게 메시지가 전달됩니다.

@Service
public class RedisPublisher {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic topic, RoomMessage message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

구독자(Subscriber) 추가

MessageListener를 상속받아 onMessage를 구현합니다. 여기서 발행된 메시지를 읽어 추가 작업을 하면 됩니다. 예제에서는 전달된 객체를 Deserialize하여 메시지를 로그로 출력합니다.

@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String body = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
            RoomMessage roomMessage = objectMapper.readValue(body, RoomMessage.class);
            log.info("Room - Message : {}", roomMessage.toString());
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}

Topic 생성 및 발행을 위한 Controller생성

여러개의 Topic이 생성될 수 있고 해당 Topic을 구독한 유저에게 메시지가 발송될 수 있도록 아래와 같이 구현합니다.

@RequiredArgsConstructor
@RequestMapping("/pubsub")
@RestController
public class PubSubController {
    // topic에 메시지 발행을 기다리는 Listner
    private final RedisMessageListenerContainer redisMessageListener;
    // 발행자
    private final RedisPublisher redisPublisher;
    // 구독자
    private final RedisSubscriber redisSubscriber;
    // topic 이름으로 topic정보를 가져와 메시지를 발송할 수 있도록 Map에 저장
    private Map<String, ChannelTopic> channels;

    @PostConstruct
    public void init() {
        // topic 정보를 담을 Map을 초기화
        channels = new HashMap<>();
    }

    // 유효한 Topic 리스트 반환
    @GetMapping("/room")
    public Set<String> findAllRoom() {
        return channels.keySet();
    }

    // 신규 Topic을 생성하고 Listener등록 및 Topic Map에 저장
    @PutMapping("/room/{roomId}")
    public void createRoom(@PathVariable String roomId) {
        ChannelTopic channel = new ChannelTopic(roomId);
        redisMessageListener.addMessageListener(redisSubscriber, channel);
        channels.put(roomId, channel);
    }

    // 특정 Topic에 메시지 발행
    @PostMapping("/room/{roomId}")
    public void pushMessage(@PathVariable String roomId, @RequestParam String name, @RequestParam String message) {
        ChannelTopic channel = channels.get(roomId);
        redisPublisher.publish(channel, RoomMessage.builder().name(name).roomId(roomId).message(message).build());
    }

    // Topic 삭제 후 Listener 해제, Topic Map에서 삭제
    @DeleteMapping("/room/{roomId}")
    public void deleteRoom(@PathVariable String roomId) {
        ChannelTopic channel = channels.get(roomId);
        redisMessageListener.removeMessageListener(redisSubscriber, channel);
        channels.remove(roomId);
    }
}

단일 Topic에 메시지 발행

Boot를 실행하고 콘솔에서 Controller를 호출하여 topic 생성 및 메시지 발송을 테스트 합니다.

# free_1, free_2 Topic 생성
$ curl -XPUT localhost:8081/pubsub/room/free_1
$ curl -XPUT localhost:8081/pubsub/room/free_2
# 등록된 Topic 리스트 확인
$ curl localhost:8081/pubsub/room
["free_1","free_2"]
# Topic에 메시지 발송
$ curl -XPOST localhost:8081/pubsub/room/free_1 -d 'name=daddy&amp;message=helloworld!!'

위에서 POST로 메시지를 보낼경우 Boot log에는 다음과 같이 출력됩니다.

Room - Message : RoomMessage(roomId=free_1, name=daddy, message=helloworld!!)

여러개의 Topic에 메시지 발행

콘솔 2개를 열고 redis-cli로 redis에 접속하여 하나는 free_1, 하나는 free_2로 topic을 구독합니다.
그리고 boot 서버로 free_1, free_2로 메시지를 발행합니다. 결과를 보면 해당 topic을 구독한 subscriber에게 각각 메시지가 전달되는것을 확인 할 수 있습니다.

free_1 토픽 구독 및 결과

$ redis-cli -c -p 6300
127.0.0.1:6300> subscribe "free_1"
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "free_1"
3) (integer) 1


1) "message"
2) "free_1"
3) "{\"@class\":\"com.redis.cluster.pubsub.RoomMessage\",\"roomId\":\"free_1\",\"name\":\"daddy\",\"message\":\"helloworld!!\"}"

free_2 토픽 구독 및 결과

$ redis-cli -c -p 6300
127.0.0.1:6300> subscribe "free_2"
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "free_2"
3) (integer) 1


1) "message"
2) "free_2"
3) "{\"@class\":\"com.redis.cluster.pubsub.RoomMessage\",\"roomId\":\"free_2\",\"name\":\"mommy\",\"message\":\"Welcome!!\"}"

메시지 발행

$ curl -XPOST localhost:8081/pubsub/room/free_1 -d 'name=daddy&amp;message=helloworld!!'
$ curl -XPOST localhost:8081/pubsub/room/free_2 -d 'name=mommy&amp;message=Welcome!!'

소스는 아래 깃허브에서 확인하시면 됩니다.
https://github.com/codej99/SpringRedisCluster/tree/feature/pubsub

연재글 이동<< Redis – Spring-data-redis : @Cacheable, @CachePut, @CacheEvict, @RedisHash
Redis – Reactive redis >>
공유
Close Menu