반응형

Spring boot 기반의 웹 어플리케이션에서 Amazon MSK 을 IAM 인증방식으로 연동하려고 함

MSK 클러스터 접속에 사용할 IAM 계정 : test-user ( MSK 관련 권한이 부여되어 있어야 함)

 

사용한 라이브러리

implementation 'org.springframework.kafka:spring-kafka:3.0.12'
implementation 'software.amazon.msk:aws-msk-iam-auth:2.0.3'

 

메세지 전송용 ProducerFactory 생성

KafkaAdmin 클라이언트는 생성하지 않았음
 * ProducerFactory : Kafka Producer 인스턴스를 생성하는 팩토리 빈
* Kafka Producer : Kafaka 브로커에 메시지를 전송하는 역할을 담당

@Bean
public ProducerFactory<String, String> producerFactory() {
     
        Map<String, Object> configProps = new HashMap<>();
        
        //연결할 kafka 브로커설정(MSK 서버)
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        
        //보내는 메시지 타입 설정     
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  

        //IAM인증 설정
        configProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        configProps.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
        //awsProfileName 으로 계정명 명시
        configProps.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"test-user\";");
        configProps.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
                
        return new DefaultKafkaProducerFactory<>(configProps);
}

//위에서 생성한 ProducerFactory로 KafkaTemplate 생성
@Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
}

토픽 생성은 하지 않았음. 특정 토픽으로 메세지를 보낼 때 해당토픽이 존재하지 않으면 자동으로 토픽이 생성됨
auto.create.topics.enable 설정이 디폴트로 true 값을 가짐 

 

메세지 전송용 Service 생성

@Service
public class KafkaProduceService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

	//이 메소드를 호출해서 메세지를 보내면 됨
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }  

}

 

 

 

반응형

 

 

메세지 수신용 ConsumerFactory 생성

 * ConsumerFactory : Kafka Consumer 인스턴스를 생성하기 위한 팩토리 빈

 @Bean
 public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> configProps = new HashMap<>();
        //consumer Group Id 부여해주기
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup1");
        
        //MSK 브로커 설정(MSK서버)
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);

	//IAM 인증 
        configProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        configProps.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
        configProps.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"test-user\";");
        configProps.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");

        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        return new DefaultKafkaConsumerFactory<>(configProps);
 }

   
//kafka 메시지를 수신하는 리스너 컨테이너를 생성하는데 사용되는 인터페이스
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
}

 

 

메세지 수신용 Service 생성

@Service
public class KafkaConsumeService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @KafkaListener(topics = "testTopic", groupId = "testGroup1")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println(record.value());
        System.out.println(record.toString());
    }

}

 


MSK 에 IAM 인증 시도시 오류 정리

1. IAM 계정의 access key와 secret key를 찾지 못하는 경우

com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [software.amazon.msk.auth.iam.internals.EnhancedProfileCredentialsProvider@3bf917a2: Profile file contained no credentials for profile 'testUser': ProfileFile(profilesAndSectionsMap=[]), cohttp://m.amazonaws.auth.AWSCredentialsProviderChain@2dadd688: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, software.amazon.msk.auth.iam.internals.EnhancedProfileCredentialsProvider@68aac71d: Profile file contained no credentials for profile 'default': ProfileFile(profilesAndSectionsMap=[]), cohttp://m.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@69883287: Failed to connect to service endpoint: ]]

시스템 환경변수에 AWS_ACCESS_KEY_ID(혹은 AWS_ACCESS_KEY ) 와 AWS_SECRET_KEY (혹은 AWS_SECRET_ACCESS_KEY) 이름으로 access key와 secret key를 세팅해준다. 

혹은 producerFactory와 consumerFactory 내에 System.setProperty로 키 세팅해주면 됨.

System.setProperty("aws.accessKeyId", awsAccessKey);
System.setProperty("aws.secretKey", awsSecretKey);

 

 

2. IAM인증 설정이 바르지 않을때 난 에러 

Node -1 disconnected.
2024-01-31 10:16:24,725 [INFO  ] NetworkClient.cancelInFlightRequests(NetworkClient.java:344) - [Producer clientId=producer-1] Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected (elapsed time since creation: 98ms, elapsed time since send: 98ms, request timeout: 30000ms)
2024-01-31 10:16:24,726 [WARN  ] NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:1105) - [Producer clientId=producer-1] Bootstrap broker sdfssdfdf.kafka.ap-northeast-2.amazonaws.com:9098 (id: -1 rack: null) disconnected

Factory 생성 시에 sasl_jaas_config 값에 IAM로그인모듈 설정과 IAM계정의 이름을 적어주자.

configs.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"testUser\";");

 

반응형

+ Recent posts