Redis – Reactive redis

Redis – Reactive redis

이 연재글은 Redis 알아보기의 6번째 글입니다.

이번 장에서는 Spring2.0의 주요 feature중 하나인 reactor(reactive)가 spring-data-redis에 어떻게 적용되어 있는지 실습을 통해 확인해 보겠습니다.

redis에서 reactive를 사용하면 대용량의 요청을 처리할때 non block으로 처리하므로 blocking 메서드로 처리할때보다 성능을 높일수 있습니다. 하지만 비동기 처리로 인하여 데이터 저장시 순서가 변경될수도 있으므로, 순서가 중요한 프로세스에 적용하려면 reactive redis를 신중히 사용해야 합니다.

build.gradle

reactive redis는 현재 spring-data-redis에 통합되어 있지 않고 지원하는 라이브러리가 별도로 존재합니다. 아래와 같이 spring-boot-starter-data-redis-reactive를 추가하면 사용할수 있습니다. 그리고 reactive는 비동기 처리를 하기 때문에 기존의 junit 테스트로는 처리할수가 없어 별도의 테스트 라이브러리인 io.projectreactor:reactor-test:3.1.0.RELEASE를 추가해야 합니다.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-freemarker'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
    implementation 'org.springframework.boot:spring-boot-starter-cache'
    implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
    implementation 'com.google.code.gson:gson'
    implementation 'com.h2database:h2'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'mysql:mysql-connector-java'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test:3.1.0.RELEASE'
    testCompileOnly 'org.projectlombok:lombok'
}

Reactive RedisTemplate  추가

reactive방식으로 redis command를 사용하기 위해 아래와 같이 설정을 추가합니다.

@Configuration
@EnableCaching
public class RedisCacheConfig {

//.... 설정 생략

    @Bean
    public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
        RedisSerializer<String> serializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(String.class);
        RedisSerializationContext serializationContext = RedisSerializationContext
                .<String, String>newSerializationContext()
                .key(serializer)
                .value(jackson2JsonRedisSerializer)
                .hashKey(serializer)
                .hashValue(jackson2JsonRedisSerializer)
                .build();

        return new ReactiveRedisTemplate<>(connectionFactory, serializationContext);
    }
}

테스트 코드 작성

위에서 설정한 reactiveRedisTemplate을 사용하는 테스트 코드를 작성합니다. reactive로 작성할경우 기존의 테스트 코드로는 비동기를 처리할수가 없으므로 StepVerifier를 통해 결과를 검증합니다.

String structure command

redis string command set, get에 대한 내용입니다. 동기방식의 redis를 사용할때와 달리 reactive를 사용하면 blocking 되지 않고 결과를 리턴합니다. 아래 소스에서는 Step-1 ~ Step-3의 수행시간을 보면 blocking없이 수행되는것을 확인 할 수 있습니다.(reactive를 사용하지 않으면 처리 완료될때까지 block 됩니다.) StepVerifier에 결과를 주입하면 비동기 처리 완료 후 결과를 확인할 수 있습니다. StepVerifier는 쓰레드 개발시 Future와 비슷한 기능을 하는것으로 보입니다.

@Test
public void opsValue() {
        ReactiveValueOperations<String, String> valueOps = reactiveRedisTemplate.opsForValue();
        Set<String> cacheKeys = new HashSet<>();
        // async process
        log.info("Step-1");
        for (int i = 0; i < 5000; i++) {
            String key = "value_" + i;
            cacheKeys.add(key);
            valueOps.set(key, String.valueOf(i));
        }
        log.info("Step-2");
        Mono<List<String>> values = valueOps.multiGet(cacheKeys);
        log.info("Step-3");
        StepVerifier.create(values)
                .expectNextMatches(x -> x.size() == 5000).verifyComplete();
        log.info("Step-4");
    }

console output

INFO  15:04:48 c.r.c.ReactiveRedisClusterTest.opsValue:36 - Step-1
INFO  15:04:48 c.r.c.ReactiveRedisClusterTest.opsValue:42 - Step-2
INFO  15:04:48 c.r.c.ReactiveRedisClusterTest.opsValue:44 - Step-3
INFO  15:05:13 c.r.c.ReactiveRedisClusterTest.opsValue:47 - Step-4

Lists structure command – 순서 있음. value 중복 허용

redis list command lpush, llen, lrange, lpop, rpop에 대한 내용입니다. reactive로 redis 명령어 처리시 non blocking으로 처리됩니다.

주의할점은 아래 예제에서는 leftPushAll을 써서 한번에 데이터를 저장해서 문제가 없지만 leftPush 명령을 이용하여 단건으로 여러번 입력할경우엔 저장되는 순서가 뒤바뀔 수 있습니다. 순서가 중요하다면 List에 데이터를 저장할때 reactive의 사용은 주의가 필요합니다.

    @Test
    public void opsList() {
        ReactiveListOperations<String, String> listOps = reactiveRedisTemplate.opsForList();
        String cacheKey = "valueList";

        // previous key delete - sync process
        redisTemplate.delete(cacheKey);

        // async process
        Mono<Long> results = listOps.leftPushAll(cacheKey, "0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
        StepVerifier.create(results).expectNext(10L).verifyComplete();
        StepVerifier.create(reactiveRedisTemplate.type(cacheKey)).expectNext(DataType.LIST).verifyComplete();
        StepVerifier.create(listOps.size(cacheKey)).expectNext(10L).verifyComplete();
        StepVerifier.create(listOps.rightPop(cacheKey)).expectNext("0").verifyComplete();
        StepVerifier.create(listOps.leftPop(cacheKey)).expectNext("9").verifyComplete();
    }

Hashes structure command – 순서 없음. key 중복 허용안함, value 중복 허용

redis hash command hset, hget, hlen, hdel에 대한 내용입니다. reactive로 redis 명령어 처리시 non blocking으로 처리됩니다. 순서가 보장되지 않는 타입이므로 reactive를 통해 순서가 바뀌어 저장되도 별다른 문제는 없습니다.

    @Test
    public void opsHash() {
        ReactiveHashOperations<String, String, String> hashOps = reactiveRedisTemplate.opsForHash();
        String cacheKey = "valueHash";
        Map<String, String> setDatas = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            setDatas.put("key_" + i, "value_" + i);
        }

        // previous key delete - sync
        redisTemplate.delete(cacheKey);

        // async process
        StepVerifier.create(hashOps.putAll(cacheKey, setDatas)).expectNext(true).verifyComplete();
        StepVerifier.create(reactiveRedisTemplate.type(cacheKey)).expectNext(DataType.HASH).verifyComplete();
        StepVerifier.create(hashOps.size(cacheKey)).expectNext(10L).verifyComplete();
        StepVerifier.create(hashOps.get(cacheKey, "key_5")).expectNext("value_5").verifyComplete();
        StepVerifier.create(hashOps.remove(cacheKey, "key_5")).expectNext(1L).verifyComplete();
    }

Set structure command – 순서 없음, value 중복 허용안함

redis set command sadd, scard, smembers, sismember에 대한 내용입니다. reactive로 redis 명령어 처리시 논 블로킹으로 처리됩니다. 순서가 보장되지 않는 타입이므로 reactive를 통해 순서가 바뀌어서 저장되도 별다른 문제는 없을거 같습니다.

    @Test
    public void opsSet() {
        ReactiveSetOperations<String, String> setOps = reactiveRedisTemplate.opsForSet();
        String cacheKey = "valueSet";

        // previous key delete - sync process
        redisTemplate.delete(cacheKey);

        // async process
        StepVerifier.create(setOps.add(cacheKey, "0", "1", "2", "3", "4", "5", "6", "7", "8", "9")).expectNext(10L).verifyComplete();
        StepVerifier.create(reactiveRedisTemplate.type(cacheKey)).expectNext(DataType.SET).verifyComplete();
        StepVerifier.create(setOps.size(cacheKey)).expectNext(10L).verifyComplete();
        StepVerifier.create(setOps.isMember(cacheKey, "5")).expectNext(true).verifyComplete();
    }

SortedSet structure command – 순서 있음, value 중복 허용안함

redis sortedset command zadd, zcard, zrange, zrank에 대한 내용입니다. reactive로 redis 명령어 처리시 non blocking으로 처리됩니다.

순서가 보장되는 타입이므로 reactive를 통해 저장시 순서가 바뀌면 문제가 될거 같지만 저장시 입력받는 score값으로 정렬할 경우엔 입력순서는 문제는 없습니다. 그러나 index로 정렬할 경우는 순서가 문제가 될 수 있으므로 상황에 따라 reactive 사용을 고려해봐야 합니다.

    @Test
    public void opsSortedSet() {
        ReactiveZSetOperations<String, String> zsetOps = reactiveRedisTemplate.opsForZSet();
        String cacheKey = "valueZSet";

        // previous key delete - sync process
        redisTemplate.delete(cacheKey);

        // async process
        List<ZSetOperations.TypedTuple<String>> tuples = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            tuples.add(new DefaultTypedTuple<>(String.valueOf(i), (double) i));
        }
        StepVerifier.create(zsetOps.addAll(cacheKey, tuples)).expectNext(10L).verifyComplete();
        StepVerifier.create(reactiveRedisTemplate.type(cacheKey)).expectNext(DataType.ZSET).verifyComplete();
        StepVerifier.create(zsetOps.size(cacheKey)).expectNext(10L).verifyComplete();
        StepVerifier.create(zsetOps.reverseRank(cacheKey, "9")).expectNext(0L).verifyComplete();
    }

Geo structure command – 좌표 정보 처리, 타입은 zset으로 저장.

redis geo command geoadd, geodist, geopos에 대한 내용입니다. reactive로 redis 명령어 처리시 non blocking으로 처리됩니다. 좌표 정보는 순서와 상관없고 고정된 데이터일 확률이 높으므로 reactive redis를 사용하여 대량의 좌표 데이터를 저장하는데 효과적일것 같습니다.

    @Test
    public void opsGeo() {
        ReactiveGeoOperations<String, String> geoOps = reactiveRedisTemplate.opsForGeo();
        String[] cities = {"서울", "부산"};
        String[][] gu = {{"강남구", "서초구", "관악구", "동작구", "마포구"}, {"사하구", "해운대구", "영도구", "동래구", "수영구"}};
        Point[][] pointGu = {{new Point(10, -10), new Point(11, -20), new Point(13, 10), new Point(14, 30), new Point(15, 40)}, {new Point(-100, 10), new Point(-110, 20), new Point(-130, 80), new Point(-140, 60), new Point(-150, 30)}};
        String cacheKey = "valueGeo";

        // previous key delete - sync process
        redisTemplate.delete(cacheKey);

        // async process
        Map<String, Point> memberCoordiateMap = new HashMap<>();
        for (int x = 0; x < cities.length; x++) {
            for (int y = 0; y < 5; y++) {
                memberCoordiateMap.put(gu[x][y], pointGu[x][y]);
            }
        }
        StepVerifier.create(geoOps.add(cacheKey, memberCoordiateMap)).expectNext(10L).verifyComplete();
        StepVerifier.create(geoOps.distance(cacheKey, "강남구", "동작구")).expectNextMatches(x -> x.getValue() == 4469610.0767).verifyComplete();
        StepVerifier.create(geoOps.position(cacheKey, "동작구")).expectNextMatches(x -> x.getX() == 14.000001847743988 &amp;&amp; x.getY() == 30.000000249977013).verifyComplete();
    }

HyperLogLog structure command – 집합의 원소의 개수 추정, 타입은 string으로 저장.

redis hyperloglog command pfadd, pfcount에 대한 내용입니다. reactive로 redis 명령어 처리시 non blocking으로 처리됩니다. 메서드의 목적이 집합의 원소 개수 추정이므로, 데이터가 모두 저장되기만 하면 되므로 대량건 처리시 reactive를 활용하여 성능을 높일수 있습니다.

    @Test
    public void opsHyperLogLog() {
        ReactiveHyperLogLogOperations<String, String> hyperLogLogOps = reactiveRedisTemplate.opsForHyperLogLog();
        String cacheKey = "valueHyperLogLog";

        // previous key delete - sync process
        redisTemplate.delete(cacheKey);

        // async process
        String[] arr = {"1", "2", "2", "3", "4", "5", "5", "5", "5", "6", "7", "7", "7"};
        StepVerifier.create(hyperLogLogOps.add(cacheKey, arr)).expectNext(1L).verifyComplete();
        StepVerifier.create(hyperLogLogOps.size(cacheKey)).expectNext(7L).verifyComplete();
    }

테스트 소스는 아래 github에서 확인하실 수 있습니다.
https://github.com/codej99/SpringRedisCluster/tree/feature/reactive

연재글 이동<< Redis – spring-data-redis : 발행/구독(pub/sub) 모델의 구현
공유
Close Menu