반응형
Spring boot 기반의 웹 어플리케이션에서 Amazon MSK 을 IAM 인증방식으로 연동하려고 함
MSK 클러스터 접속에 사용할 IAM 계정 : test-user ( MSK 관련 권한이 부여되어 있어야 함)
사용한 라이브러리
implementation 'org.springframework.kafka:spring-kafka:3.0.12'
implementation 'software.amazon.msk:aws-msk-iam-auth:2.0.3'
메세지 전송용 ProducerFactory 생성
KafkaAdmin 클라이언트는 생성하지 않았음
* ProducerFactory : Kafka Producer 인스턴스를 생성하는 팩토리 빈
* Kafka Producer : Kafaka 브로커에 메시지를 전송하는 역할을 담당
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
//연결할 kafka 브로커설정(MSK 서버)
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
//보내는 메시지 타입 설정
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//IAM인증 설정
configProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
configProps.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
//awsProfileName 으로 계정명 명시
configProps.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"test-user\";");
configProps.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
return new DefaultKafkaProducerFactory<>(configProps);
}
//위에서 생성한 ProducerFactory로 KafkaTemplate 생성
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
토픽 생성은 하지 않았음. 특정 토픽으로 메세지를 보낼 때 해당토픽이 존재하지 않으면 자동으로 토픽이 생성됨
auto.create.topics.enable 설정이 디폴트로 true 값을 가짐
메세지 전송용 Service 생성
@Service
public class KafkaProduceService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//이 메소드를 호출해서 메세지를 보내면 됨
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
반응형
메세지 수신용 ConsumerFactory 생성
* ConsumerFactory : Kafka Consumer 인스턴스를 생성하기 위한 팩토리 빈
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
//consumer Group Id 부여해주기
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup1");
//MSK 브로커 설정(MSK서버)
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
//IAM 인증
configProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
configProps.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
configProps.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"test-user\";");
configProps.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(configProps);
}
//kafka 메시지를 수신하는 리스너 컨테이너를 생성하는데 사용되는 인터페이스
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
메세지 수신용 Service 생성
@Service
public class KafkaConsumeService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "testTopic", groupId = "testGroup1")
public void listen(ConsumerRecord<String, String> record) {
System.out.println(record.value());
System.out.println(record.toString());
}
}
MSK 에 IAM 인증 시도시 오류 정리
1. IAM 계정의 access key와 secret key를 찾지 못하는 경우
com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [software.amazon.msk.auth.iam.internals.EnhancedProfileCredentialsProvider@3bf917a2: Profile file contained no credentials for profile 'testUser': ProfileFile(profilesAndSectionsMap=[]), cohttp://m.amazonaws.auth.AWSCredentialsProviderChain@2dadd688: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, software.amazon.msk.auth.iam.internals.EnhancedProfileCredentialsProvider@68aac71d: Profile file contained no credentials for profile 'default': ProfileFile(profilesAndSectionsMap=[]), cohttp://m.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@69883287: Failed to connect to service endpoint: ]] |
시스템 환경변수에 AWS_ACCESS_KEY_ID(혹은 AWS_ACCESS_KEY ) 와 AWS_SECRET_KEY (혹은 AWS_SECRET_ACCESS_KEY) 이름으로 access key와 secret key를 세팅해준다.
혹은 producerFactory와 consumerFactory 내에 System.setProperty로 키 세팅해주면 됨.
System.setProperty("aws.accessKeyId", awsAccessKey);
System.setProperty("aws.secretKey", awsSecretKey);
2. IAM인증 설정이 바르지 않을때 난 에러
Node -1 disconnected. 2024-01-31 10:16:24,725 [INFO ] NetworkClient.cancelInFlightRequests(NetworkClient.java:344) - [Producer clientId=producer-1] Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected (elapsed time since creation: 98ms, elapsed time since send: 98ms, request timeout: 30000ms) 2024-01-31 10:16:24,726 [WARN ] NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:1105) - [Producer clientId=producer-1] Bootstrap broker sdfssdfdf.kafka.ap-northeast-2.amazonaws.com:9098 (id: -1 rack: null) disconnected |
Factory 생성 시에 sasl_jaas_config 값에 IAM로그인모듈 설정과 IAM계정의 이름을 적어주자.
configs.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"testUser\";");
반응형
'코딩 관련 > AWS' 카테고리의 다른 글
SpringBoot에서 SNS SQS 연동하기 (0) | 2023.02.02 |
---|---|
software.amazon.awssdk.services.sns.model.InvalidParameterException: Invalid parameter: TopicArn (0) | 2023.02.02 |
AWS SNS SQS 사용법 / SNS SQS 연동 (0) | 2023.02.01 |