이 연재글은 Google Cloud Platform의 4번째 글입니다.

이번 실습에서는 Spring Cloud GCP Pub/Sub Starter 모듈을 이용하여 Spring application에서 PubSub를 연동하는 방법에 대해 실습하겠습니다.

시작하기 전에 GCP PubSub를 로컬에서 접근해 사용하려면 사전 작업이 필요합니다.

사전 작업

실습을 위해 프로젝트에 PubSub Topic을 생성합니다. 이때 프로젝트에 Pub/Sub API가 비활성화 되어있으면 활성화 합니다. 아래와 같은 순서로 서비스 계정을 만들고 비공개 키(JSON)를 로컬에 다운로드 받습니다.

서비스 계정 생성

  • IAM – Service Accounts – + CREATE SERVICE ACCOUNT
    • 생성시 Role은 Pub/Sub Publisher, Pub/Sub Subscriber를 부여합니다.

비공개 Key 다운로드

  • IAM – Service Accounts – 생성한 어카운트 클릭 – KEYS Tab 선택
    • ADD KEY – Create new key – JSON 선택 후 CREATE하면 새로 생성된 Service Account Key가 파일로 다운로드 됩니다.

Spring Cloud GCP Pub/Sub Starter 사용

build.gradle에 dependency를 추가

  • com.google.cloud:spring-cloud-gcp-starter-pubsub
  • org.springframework.integration:spring-integration-core
// .. 생략
ext {
    set('springCloudGcpVersion', "3.3.0")
    set('springCloudVersion', "2021.0.3")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.google.cloud:spring-cloud-gcp-starter-pubsub'
    implementation 'org.springframework.integration:spring-integration-core'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
}

dependencyManagement {
    imports {
        mavenBom "com.google.cloud:spring-cloud-gcp-dependencies:${springCloudGcpVersion}"
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

// .. 생략

application.yaml에 gcp credential 정보 세팅

스프링 애플리케이션에서 GCP PubSub를 사용할 수 있도록 프로젝트ID 및 사전작업에서 생성한 Service Account Key 파일의 내용을 base64 인코딩하여 application.yaml에 세팅합니다.

spring:
  cloud:
    gcp:
      project-id: [프로젝트명]
      credentials:
        encoded-key: [Service Account Key Base64 인코딩한 값]

메시지를 전달할 객체 생성

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@ToString
public class Messaging {
    private int id;
    private String name;
}

Message Publish/Subscribe 컨버터 설정

메시지 객체가 PubSub를 통해 전달될때 사용될 Converter를 설정해 줍니다. 예제에서는 default 설정을 사용합니다. 서비스에서 필요한 설정이 있다면 추가합니다.

@Configuration
public class PubSubConfiguration {
    @Bean
    public ObjectMapper objectMapper() {
        return new ObjectMapper();
    }

    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }
}

Pub/Sub에서 메시지 수신

인바운드 채널 어댑터는 수신되는 Pub/Sub 메시지를 POJO로 변환하고 POJO를 메시지 채널로 전달합니다. Topic에 등록된 구독ID(SubScriptionID)를 설정하면 MessageReceiver를 통해 메시지를 전달 받을수 있습니다. AckMode가 Manual일 경우 작업 처리 후엔 명시적으로 Acknowledgement 응답(message.ack())을 해야 메시지가 큐에서 사라집니다.

@Slf4j
@RequiredArgsConstructor
@Component
public class EventMessageHandlerAdapter {

    private final String SUBSCRIPTION_ID = "messaging-queue-sub";

    @Bean
    public MessageChannel inboundMessageChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public PubSubInboundChannelAdapter inboundMessageChannelAdapter(
            @Qualifier("inboundMessageChannel") MessageChannel messageChannel,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
                new PubSubInboundChannelAdapter(pubSubTemplate, SUBSCRIPTION_ID);
        adapter.setOutputChannel(messageChannel);
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(Messaging.class);
        return adapter;
    }

    @ServiceActivator(inputChannel = "inboundMessageChannel")
    public void messageReceiver(
            Messaging messaging,
            @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
        log.info("Message: {}", messaging);
        // TODO: implement your code here
        // 메시지 처리 완료 응답
        message.ack();
    }
}

Pub/Sub에 메시지 게시

pubSubTemplate.publish 메서드를 이용하여 topic에 메시지를 게시합니다. 해당 메서드는 ListenableFuture객체를 반환하기 때문에 비동기 처리를 할 수 있습니다. 예제에서는 get() 메서드를 호출하여 발송이 완료될때까지 기다리므로 동기식으로 처리됩니다. 이 부분을 동기/비동기로 처리할지는 제공하는 서비스 환경에 따라 선택하여 구현하도록 합니다.

@Component
@RequiredArgsConstructor
public class PubSubMessagePublisher {

    private final PubSubTemplate pubSubTemplate;

    public void sendMessage(String destination, Messaging message) throws ExecutionException, InterruptedException {
        pubSubTemplate.publish(destination, message).get();
    }
}

Test

메시지를 발송할 수 있게 간단한 Controller를 생성합니다.

@RestController
@RequiredArgsConstructor
public class MessagingController {

    private final PubSubMessagePublisher pubSubMessagePublisher;

    private static final String MESSAGE_QUEUE = "messaging-queue";

    @PostMapping("/publishMessage")
    public void getContentNoticeList(@RequestBody Messaging messaging) throws Exception {
        pubSubMessagePublisher.sendMessage(MESSAGE_QUEUE, messaging);
    }
}

스프링 서버를 실행한 후 아래와 같이 터미널에서 curl로 메시지를 발송합니다.

$ curl --location --request POST 'http://localhost:8080/publishMessage' \
--header 'Content-Type: application/json' \
--data-raw '{
    "id": 777,
    "name": "leesunsin"
}'

터미널에 출력되는 콘솔 로그를 통해 구독한 메시지가 출력되는것을 확인할 수 있습니다.

2022-07-15 18:13:28.872  INFO 45512 --- [sub-subscriber1] c.a.s.EventMessageHandlerAdapter         : Message: Messaging(id=777, name=leesunsin)
연재글 이동[이전글] Google Cloud Platform – GCP PubSub