이번 실습에서는 SpringBoot 프로젝트에서 SQS에 메시지를 전달하고 받는 실습을 진행해 보겠습니다. SQS는 AWS에서 제공하는 메시지 대기열 서비스로서 대기열(Queue)을 이용하면 메시지를 순차적으로 저장할 수 있으며 대기열에 저장된 메시지는 별도의 프로세스에서 작업을 진행할 수 있습니다.
주로 1:1의 관계로 맵핑되는 작업에서 많이 사용됩니다. 예를 들면 회원 가입 후 가입완료 메일 발송, 고객 주문 완료 후 배송 처리와 같이 주 프로세스가 완료된 이후 추가적으로 발생하는 작업 처리를 위해 많이 사용됩니다. 또는 서비스에 대량의 이벤트가 발생할 때 Queue를 이용하면 작업이 예약되어 처리되기 때문에 대량의 요청에서도 안정적으로 시스템을 운용할 수 있게 됩니다.
참고로 동일 메시지를 다수가 받아야 하는(1:N) 작업일 경우엔 AWS에서 제공하는 아래 서비스 중 하나를 사용하면 됩니다.
- SNS(Simple Notification Service)
- MSK(Amazon Managed Streaming for Apache Kafka)
- Kinesis(Amazon Kinesis Data Streams)
SQS와 관련된 좀더 자세한 내용은 아래 포스팅을 참고하시면 됩니다.
SpringBoot에서 SQS 사용하기
Spring 프로젝트에서 SQS를 사용하기 위해서는 AWS에서 제공하는 Java 라이브러리를 사용해도 되고 본 글에서 사용할 Spring Cloud Messaging 라이브러리를 사용해도 됩니다.
build.gradle
프로젝트에 org.springframework.cloud:spring-cloud-starter-aws-messaging 라이브러리를 추가합니다.
plugins { id 'org.springframework.boot' version '2.3.9.RELEASE' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } group = 'com.daddyprogrammer' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } ext { set('springCloudVersion', "Hoxton.SR10") } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.cloud:spring-cloud-starter-aws-messaging' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}" } } test { useJUnitPlatform() }
application.yml 설정 추가
cloud: aws: region: static: 리전 정보(ex:ap-northeast-2) stack: auto: false credentials: access-key: IAM에서 발급받은 엑세스키 secret-key: IAM에서 발급받은 시크릿키
Message Sender/Listener 구현
Spring Cloud 라이브러리를 이용하면 예제의 코드처럼 간단하게 Sender, Listener 구현이 가능합니다.
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSAsync; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Slf4j @Component public class MessageProcessor { private final String queueName = "event-collect"; private QueueMessagingTemplate queueMessagingTemplate; @Autowired public MessageProcessor(AmazonSQS amazonSqs) { this.queueMessagingTemplate = new QueueMessagingTemplate((AmazonSQSAsync) amazonSqs); } public void send(String data) { Message<String> message = MessageBuilder.withPayload(data).build(); queueMessagingTemplate.send(queueName, message); } @SqsListener(value = queueName) public void receive(String message) { log.info("Event : {}", message); } }
@SqsListener 메시지 삭제 정책
Listener에는 4가지 메시지 삭제 정책(SqsMessageDeletionPolicy)이 있으며 내용은 다음과 같습니다.
- ALWAYS – 리스너 메서드에 의한 메시지 처리 중 성공 (예외 발생 없음) 또는 실패 (예외 발생)시 항상 메시지를 삭제합니다.
- NEVER – 메시지를 자동으로 삭제하지 않습니다. 수신 확인(Acknowledgment)로 명시적으로 삭제가 가능합니다.
- NO_REDRIVE – Redrive policy(DeadLetterQueue)가 정의되지 않은 경우 메시지를 삭제합니다.
- ON_SUCCESS – 리스너 메서드에 의해 성공적으로 실행되면 메시지를 삭제합니다.
정책을 적용하려면 다음과 같이 @SqsListener에 설정을 추가하면 됩니다. 명시적으로 선언하지 않으면 NO_REDRIVE가 기본 정책으로 사용됩니다.
@SqsListener(value = queueName, deletionPolicy = SqsMessageDeletionPolicy.NEVER) public void receive(String message) { log.info("Event : {}", message); }
테스트를 위한 Controller 작성
간단히 Queue에 메시지를 넣기 위해 GetMapping으로 endpoint를 만듭니다. 서버를 실행하고 메시지를 발송하면 Listener가 메시지를 받아 콘솔에 로그를 출력하는 것을 확인할 수 있습니다.
@RequiredArgsConstructor @RestController public class MessagingController { private final MessageProcessor messageProcessor; @GetMapping("/send") public void sendMessage(@RequestParam String message) { messageProcessor.send(message); } }
localhost:8080/send?message=helloSQS
2021-03-13 00:16:36.608 INFO 83435 --- [enerContainer-2] c.d.messaging.MessageProcessor : Event : helloSQS
성능 개선
대기열의 메시지 처리중 시간이 오래 걸리는 작업이 발생하면 이후로 대기중인 메시지들은 처리가 지연될 수 있습니다. 병목 현상을 최대한 줄이기 위해서 다음과 같이 동시에 처리할 수 있는 쓰레드 풀의 개수를 설정할 수 있습니다. 해당 쓰레드가 어떤것인지 판별하기 위해 ThreadNamePrefix를 지정하여 로그상으로 확인이 가능합니다.
@EnableSqs @Configuration @EnableAutoConfiguration(exclude = {ContextInstanceDataAutoConfiguration.class}) public class SQSConfiguration { @Bean public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor asyncTaskExecutor) { SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory(); factory.setAmazonSqs(amazonSqs); factory.setTaskExecutor(asyncTaskExecutor); return factory; } @Bean public AsyncTaskExecutor asyncTaskExecutor() { ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor(); asyncTaskExecutor.setCorePoolSize(20); asyncTaskExecutor.setMaxPoolSize(50); asyncTaskExecutor.setQueueCapacity(5); asyncTaskExecutor.setThreadNamePrefix("jobThread-"); asyncTaskExecutor.initialize(); return asyncTaskExecutor; } }
corePoolSize : 최초 생성되는 스레드 사이즈이며 해당 사이즈로 최소 스레드가 유지.
maximumPoolSize : 해당 풀에 최대로 유지할 수 있는 스레드 개수.
queueCapacity : corePoolSize보다 스레드가 많아졌을 경우, 남는 스레드가 없을때 대기하는 큐. 해당 큐에 담을공간이 꽉차면 maximumPoolSize까지 점차 스레드 개수가 늘어난다.
설정 적용 전
쓰레드(enerContainer-번호)를 보면 10개 정도의 쓰레드가 생성되어 작업을 처리하는 것을 확인할 수 있습니다.
2021-03-13 01:55:03.824 INFO 97718 --- [enerContainer-9] c.d.messaging.MessageProcessor : Event : helloSQS-18 2021-03-13 01:55:03.824 INFO 97718 --- [enerContainer-5] c.d.messaging.MessageProcessor : Event : helloSQS-3 2021-03-13 01:55:08.877 INFO 97718 --- [enerContainer-5] c.d.messaging.MessageProcessor : Event : helloSQS-14 2021-03-13 01:55:08.877 INFO 97718 --- [enerContainer-3] c.d.messaging.MessageProcessor : Event : helloSQS-30 2021-03-13 01:55:08.877 INFO 97718 --- [enerContainer-7] c.d.messaging.MessageProcessor : Event : helloSQS-18 2021-03-13 01:55:08.877 INFO 97718 --- [enerContainer-9] c.d.messaging.MessageProcessor : Event : helloSQS-3 2021-03-13 01:55:13.923 INFO 97718 --- [enerContainer-4] c.d.messaging.MessageProcessor : Event : helloSQS-7 2021-03-13 01:55:13.923 INFO 97718 --- [enerContainer-9] c.d.messaging.MessageProcessor : Event : helloSQS-4 2021-03-13 01:55:13.923 INFO 97718 --- [enerContainer-7] c.d.messaging.MessageProcessor : Event : helloSQS-27 2021-03-13 01:55:13.923 INFO 97718 --- [enerContainer-3] c.d.messaging.MessageProcessor : Event : helloSQS-29 2021-03-13 01:55:13.923 INFO 97718 --- [enerContainer-5] c.d.messaging.MessageProcessor : Event : helloSQS-22
설정 적용 후
기본 20개의 쓰레드로 시작하며 maximumPoolSize까지 쓰레드 개수가 점차 증가되되면서 메시지를 처리하는 것을 확인할 수 있습니다.
2021-03-13 01:58:57.143 INFO 98274 --- [ jobThread-10] c.d.messaging.MessageProcessor : Event : helloSQS-11 2021-03-13 01:58:57.143 INFO 98274 --- [ jobThread-16] c.d.messaging.MessageProcessor : Event : helloSQS-10 2021-03-13 01:58:57.143 INFO 98274 --- [ jobThread-22] c.d.messaging.MessageProcessor : Event : helloSQS-2 2021-03-13 01:58:57.143 INFO 98274 --- [ jobThread-18] c.d.messaging.MessageProcessor : Event : helloSQS-25 2021-03-13 01:58:57.143 INFO 98274 --- [ jobThread-23] c.d.messaging.MessageProcessor : Event : helloSQS-16 2021-03-13 01:58:57.143 INFO 98274 --- [ jobThread-19] c.d.messaging.MessageProcessor : Event : helloSQS-20 2021-03-13 01:58:57.143 INFO 98274 --- [ jobThread-15] c.d.messaging.MessageProcessor : Event : helloSQS-9 2021-03-13 01:58:57.143 INFO 98274 --- [ jobThread-6] c.d.messaging.MessageProcessor : Event : helloSQS-15
실습에서 사용한 소스는 아래 Github에서 자세하게 확인할 수 있습니다.
https://github.com/codej99/spring-sqs-messaging.git