티스토리 뷰

이번 프로젝트에서 MSK를 사용하게 되었습니다. MSK는 Serverless 로 생성되었는데 이와 SpringBoot 프로젝트를 연동하는 방식에 대해 살펴보고자 합니다.

 

먼저 spring-kafka dependencies를 추가해줍니다.

// build.gradle
dependencies {
    ...
    // kafka
    implementation 'org.springframework.kafka:spring-kafka'
    ...
}

 

여기서 MSK 생성에 대해서는 다루지 않겠습니다. MSK를 Serverless로 생성하고 나서 클라이언트 정보 보기를 하면 아래와 같이 부트스트랩 서버의 엔드포인트를 확인할 수 있습니다. Serverless 의 경우 현재 인증 유형은 IAM 밖에 존재하지 않습니다.

 

 

이를 SpringBoot 프로젝트의 application.yml 파일에 설정을 해줍니다. 그리고 별도로 작성한 Configuration 파일에서 가져다 쓸 수 있게 적어줍니다. 여기서는 Consumer 관련 설정만 해보겠습니다.

// application.yml
spring:
  kafka:
    consumer:
      bootstrap-servers: BOOTSTRAP_SERVER_URL:PORT  // 예) boot-xxxxx.amazonaws.com:9098
      group-id: CONSUMER_GROUP_ID
// KafkaConsumerConfig.class

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String BOOTSTRAP_SERVER;

    @Value("${spring.kafka.consumer.group-id}")
    private String GROUP_ID;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, EventPayload> iropsContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EventPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConfig(), new StringDeserializer(), new JsonDeserializer<>(EventPayload.class)));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

    private Map<String, Object> getConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return props;
    }
}

 

❗️중요

이후 MSK Serverless인 경우 추가 작업이 필요한데요. 먼저 build.gradle 파일에 iam 관련 dependencies를 추가해주어야 합니다.

// build.gradle
dependencies {
    ...
    // kafka
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.7'  // 여기 추가
    ...
}

 

그리고 나서 Configuration 파일에 설정을 추가해줍니다. application.yml 파일에 설정한 뒤, @Value 애노테이션으로 configuration 파일에서 가져다 사용할 수 있도록 처리를 해도 됩니다.

// KafkaConsumerConfig.class

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    private final List<String> LOCAL_PROFILES = List.of("local", "default");

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String BOOTSTRAP_SERVER;

    @Value("${spring.kafka.consumer.group-id}")
    private String GROUP_ID;

    @Value("${spring.config.activate.on-profile}")
    private String profile;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, EventPayload> iropsContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EventPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConfig(), new StringDeserializer(), new JsonDeserializer<>(EventPayload.class)));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

    private Map<String, Object> getConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // 여기 추가 (local, default 프로파일이 아닌 경우에만 설정 추가)
        if (!LOCAL_PROFILES.contains(profile)) {
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
            props.put(SaslConfigs.SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;");
            props.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
        }

        return props;
    }
}

 

이렇게 하고 만들어둔 SpringBoot 프로젝트를 MSK와 같은 vpc 내에 띄우게 되면 연결이 가능합니다.

 

이 설정을 하지 않고 접근을 시도(예. Producer로 send 하는 등) 하는 경우 java.lang.OutOfMemoryError: Java heap space 가 발생하면서 애플리케이션 서버가 다운되는 현상이 발생하기 때문에 꼭 올바른 인증유형을 사용한 접근이 필요합니다 🙏


추가로 MSK에 토픽이 생성되어 있지 않은 경우 토픽을 찾을 수 없다는 DEBUG 레벨의 로그가 계속 남을 수 있습니다. 현재 기준으로 MSK의 토픽을 생성하는 방법은 MSK와 같은 VPC 내에 클라이언트 서버(아무 EC2)를 하나 생성하고 그곳에 접근하여 kafka 설치 후 명령으로 토픽을 생성하는 방법 뿐입니다...🥲

 

서버리스 가이드를 참고하여 클라이언트 서버를 생성한 뒤 다음 명령을 통해 토픽을 생성할 수 있습니다.

<path-to-your-kafka-installation>/bin/kafka-topics.sh --bootstrap-server $BS --command-config client.properties --create --topic <your-topic-name> --partitions 2

 

참고

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함