티스토리 뷰
이번 프로젝트에서 MSK를 사용하게 되었습니다. MSK는 Serverless 로 생성되었는데 이와 SpringBoot 프로젝트를 연동하는 방식에 대해 살펴보고자 합니다.
먼저 spring-kafka dependencies를 추가해줍니다.
// build.gradle
dependencies {
...
// kafka
implementation 'org.springframework.kafka:spring-kafka'
...
}
여기서 MSK 생성에 대해서는 다루지 않겠습니다. MSK를 Serverless로 생성하고 나서 클라이언트 정보 보기를 하면 아래와 같이 부트스트랩 서버의 엔드포인트를 확인할 수 있습니다. Serverless 의 경우 현재 인증 유형은 IAM 밖에 존재하지 않습니다.
이를 SpringBoot 프로젝트의 application.yml 파일에 설정을 해줍니다. 그리고 별도로 작성한 Configuration 파일에서 가져다 쓸 수 있게 적어줍니다. 여기서는 Consumer 관련 설정만 해보겠습니다.
// application.yml
spring:
kafka:
consumer:
bootstrap-servers: BOOTSTRAP_SERVER_URL:PORT // 예) boot-xxxxx.amazonaws.com:9098
group-id: CONSUMER_GROUP_ID
// KafkaConsumerConfig.class
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String BOOTSTRAP_SERVER;
@Value("${spring.kafka.consumer.group-id}")
private String GROUP_ID;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EventPayload> iropsContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EventPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConfig(), new StringDeserializer(), new JsonDeserializer<>(EventPayload.class)));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
private Map<String, Object> getConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
}
❗️중요
이후 MSK Serverless인 경우 추가 작업이 필요한데요. 먼저 build.gradle 파일에 iam 관련 dependencies를 추가해주어야 합니다.
// build.gradle
dependencies {
...
// kafka
implementation 'org.springframework.kafka:spring-kafka'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.7' // 여기 추가
...
}
그리고 나서 Configuration 파일에 설정을 추가해줍니다. application.yml 파일에 설정한 뒤, @Value 애노테이션으로 configuration 파일에서 가져다 사용할 수 있도록 처리를 해도 됩니다.
// KafkaConsumerConfig.class
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private final List<String> LOCAL_PROFILES = List.of("local", "default");
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String BOOTSTRAP_SERVER;
@Value("${spring.kafka.consumer.group-id}")
private String GROUP_ID;
@Value("${spring.config.activate.on-profile}")
private String profile;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EventPayload> iropsContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EventPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConfig(), new StringDeserializer(), new JsonDeserializer<>(EventPayload.class)));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
private Map<String, Object> getConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 여기 추가 (local, default 프로파일이 아닌 경우에만 설정 추가)
if (!LOCAL_PROFILES.contains(profile)) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;");
props.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
}
return props;
}
}
이렇게 하고 만들어둔 SpringBoot 프로젝트를 MSK와 같은 vpc 내에 띄우게 되면 연결이 가능합니다.
이 설정을 하지 않고 접근을 시도(예. Producer로 send 하는 등) 하는 경우 java.lang.OutOfMemoryError: Java heap space 가 발생하면서 애플리케이션 서버가 다운되는 현상이 발생하기 때문에 꼭 올바른 인증유형을 사용한 접근이 필요합니다 🙏
추가로 MSK에 토픽이 생성되어 있지 않은 경우 토픽을 찾을 수 없다는 DEBUG 레벨의 로그가 계속 남을 수 있습니다. 현재 기준으로 MSK의 토픽을 생성하는 방법은 MSK와 같은 VPC 내에 클라이언트 서버(아무 EC2)를 하나 생성하고 그곳에 접근하여 kafka 설치 후 명령으로 토픽을 생성하는 방법 뿐입니다...🥲
서버리스 가이드를 참고하여 클라이언트 서버를 생성한 뒤 다음 명령을 통해 토픽을 생성할 수 있습니다.
<path-to-your-kafka-installation>/bin/kafka-topics.sh --bootstrap-server $BS --command-config client.properties --create --topic <your-topic-name> --partitions 2
참고
'AWS' 카테고리의 다른 글
[AWS APIGateway] APIGateway WebSocket API 사용하기 2 (0) | 2024.03.05 |
---|---|
[AWS APIGateway] APIGateway WebSocket API 사용하기 1 (1) | 2023.12.06 |
[AWS PreSignedUrl] PreSignedUrl로 S3 Bucket에 파일 업로드 (0) | 2023.11.07 |
[AWS KMS] KMS를 통한 암복호화 (2) | 2022.04.26 |
[Terraform] Terraform 사용하기 (사전 준비) (0) | 2022.01.04 |
- Total
- Today
- Yesterday
- search
- Dynamic Programming
- CodeDeploy
- 프로그래머스
- CodeCommit
- 에라토스테네스의 체
- BFS
- cloudfront
- sort
- string
- map
- Baekjoon
- Combination
- 소수
- 수학
- java
- ECR
- permutation
- 조합
- SWIFT
- programmers
- Algorithm
- ionic
- DFS
- 순열
- EC2
- spring
- array
- CodePipeline
- AWS
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |