반응형

Spring boot 기반의 웹 어플리케이션에서 Amazon MSK 을 IAM 인증방식으로 연동하려고 함

MSK 클러스터 접속에 사용할 IAM 계정 : test-user ( MSK 관련 권한이 부여되어 있어야 함)

 

사용한 라이브러리

implementation 'org.springframework.kafka:spring-kafka:3.0.12'
implementation 'software.amazon.msk:aws-msk-iam-auth:2.0.3'

 

메세지 전송용 ProducerFactory 생성

KafkaAdmin 클라이언트는 생성하지 않았음
 * ProducerFactory : Kafka Producer 인스턴스를 생성하는 팩토리 빈
* Kafka Producer : Kafaka 브로커에 메시지를 전송하는 역할을 담당

@Bean
public ProducerFactory<String, String> producerFactory() {
     
        Map<String, Object> configProps = new HashMap<>();
        
        //연결할 kafka 브로커설정(MSK 서버)
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        
        //보내는 메시지 타입 설정     
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  

        //IAM인증 설정
        configProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        configProps.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
        //awsProfileName 으로 계정명 명시
        configProps.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"test-user\";");
        configProps.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
                
        return new DefaultKafkaProducerFactory<>(configProps);
}

//위에서 생성한 ProducerFactory로 KafkaTemplate 생성
@Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
}

토픽 생성은 하지 않았음. 특정 토픽으로 메세지를 보낼 때 해당토픽이 존재하지 않으면 자동으로 토픽이 생성됨
auto.create.topics.enable 설정이 디폴트로 true 값을 가짐 

 

메세지 전송용 Service 생성

@Service
public class KafkaProduceService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

	//이 메소드를 호출해서 메세지를 보내면 됨
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }  

}

 

 

 

반응형

 

 

메세지 수신용 ConsumerFactory 생성

 * ConsumerFactory : Kafka Consumer 인스턴스를 생성하기 위한 팩토리 빈

 @Bean
 public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> configProps = new HashMap<>();
        //consumer Group Id 부여해주기
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup1");
        
        //MSK 브로커 설정(MSK서버)
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);

	//IAM 인증 
        configProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        configProps.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
        configProps.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"test-user\";");
        configProps.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");

        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        return new DefaultKafkaConsumerFactory<>(configProps);
 }

   
//kafka 메시지를 수신하는 리스너 컨테이너를 생성하는데 사용되는 인터페이스
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
}

 

 

메세지 수신용 Service 생성

@Service
public class KafkaConsumeService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @KafkaListener(topics = "testTopic", groupId = "testGroup1")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println(record.value());
        System.out.println(record.toString());
    }

}

 


MSK 에 IAM 인증 시도시 오류 정리

1. IAM 계정의 access key와 secret key를 찾지 못하는 경우

com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [software.amazon.msk.auth.iam.internals.EnhancedProfileCredentialsProvider@3bf917a2: Profile file contained no credentials for profile 'testUser': ProfileFile(profilesAndSectionsMap=[]), cohttp://m.amazonaws.auth.AWSCredentialsProviderChain@2dadd688: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, software.amazon.msk.auth.iam.internals.EnhancedProfileCredentialsProvider@68aac71d: Profile file contained no credentials for profile 'default': ProfileFile(profilesAndSectionsMap=[]), cohttp://m.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@69883287: Failed to connect to service endpoint: ]]

시스템 환경변수에 AWS_ACCESS_KEY_ID(혹은 AWS_ACCESS_KEY ) 와 AWS_SECRET_KEY (혹은 AWS_SECRET_ACCESS_KEY) 이름으로 access key와 secret key를 세팅해준다. 

혹은 producerFactory와 consumerFactory 내에 System.setProperty로 키 세팅해주면 됨.

System.setProperty("aws.accessKeyId", awsAccessKey);
System.setProperty("aws.secretKey", awsSecretKey);

 

 

2. IAM인증 설정이 바르지 않을때 난 에러 

Node -1 disconnected.
2024-01-31 10:16:24,725 [INFO  ] NetworkClient.cancelInFlightRequests(NetworkClient.java:344) - [Producer clientId=producer-1] Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected (elapsed time since creation: 98ms, elapsed time since send: 98ms, request timeout: 30000ms)
2024-01-31 10:16:24,726 [WARN  ] NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:1105) - [Producer clientId=producer-1] Bootstrap broker sdfssdfdf.kafka.ap-northeast-2.amazonaws.com:9098 (id: -1 rack: null) disconnected

Factory 생성 시에 sasl_jaas_config 값에 IAM로그인모듈 설정과 IAM계정의 이름을 적어주자.

configs.put(SaslConfigs.SASL_JAAS_CONFIG,"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"testUser\";");

 

반응형
반응형

SpringBoot에서 SNS로 메세지 발행하기

SQS로 받은 메시지 Springboot에서 받아보기

개발환경 : Intellij, java17, gradle

 

1. 의존성 설정

    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'


    implementation 'com.google.code.gson:gson:2.9.0'
    implementation platform('software.amazon.awssdk:bom:2.15.0')
    implementation group: 'org.springframework.cloud', name: 'spring-cloud-aws-messaging', version: '2.2.1.RELEASE'
    implementation 'software.amazon.awssdk:sns'
    implementation 'software.amazon.awssdk:sqs'
    implementation 'org.springframework.cloud:spring-cloud-starter-aws:2.2.6.RELEASE'

 

 

2. application.yml

${AWS_ACCESSKEY} 는 intellij 환경변수에 등록해서 사용함

intellij 환경변수  추가는 https://mchch.tistory.com/282 참고

cloud:
  aws:
    credentials:
      access-key: ${AWS_ACCESSKEY}
      secret-key: ${AWS_SECRETKEY}
    region:
      static: ap-northeast-1
      auto: false
    stack:
      auto: false

    sns:
      topic:
        arn: arn:aws:sns:ap-northeast-1:34243243247:MyTopic2

    sqs:
      queue:
        name: MyQueue2
        url: https://sqs.ap-northeast-1.amazonaws.com/34243243247/MyQueue2

 

 

3. AWSConfig.java

@Getter
@Configuration
public class AWSConfig {
    @Value("${cloud.aws.credentials.access-key}")
    private String awsAccessKey;

    @Value("${cloud.aws.credentials.secret-key}")
    private String awsSecretKey;

    @Value("${cloud.aws.region.static}")
    private String awsRegion;

    @Value("${cloud.aws.sns.topic.arn}")
    private String snsTopicARN;

    @Bean // SNS Client 세팅
    public SnsClient getSnsClient() {
        return SnsClient.builder()
                .credentialsProvider(
                        getAwsCredentials(this.awsAccessKey, this.awsSecretKey)
                ).region(Region.of(this.awsRegion))
                .build();
    }

	//aws credential 세팅
    public AwsCredentialsProvider getAwsCredentials(String accessKeyID, String secretAccessKey) {
        AwsBasicCredentials awsBasicCredentials = AwsBasicCredentials.create(accessKeyID, secretAccessKey);
        return () -> awsBasicCredentials;
    }

    @Bean // SQS Client 세팅
    public AmazonSQS amazonSQS() {
        AWSCredentials credentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
        return AmazonSQSAsyncClientBuilder
                .standard()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withRegion(awsRegion)
                .build();
    }
}

 

 

4. SnsService.java

지정한 topic으로 메세지 발행하는 서비스

@Service
@RequiredArgsConstructor
public class SnsService {
    private final AWSSnsConfig awsConfig;

    public PublishResponse awsSnsPublishTest(Map<String,Object> scriptData) {
        PublishRequest publishRequest = PublishRequest.builder()
                .topicArn(awsConfig.getSnsTopicARN())
                .subject("TEST 제목")
                .message(scriptData.toString())
                .build();

        SnsClient snsClient = awsConfig.getSnsClient();
        PublishResponse publishResponse = snsClient.publish(publishRequest);

        snsClient.close();
        return publishResponse;
    }
}

 

 

5. SqsService.java

sns 토픽에서 메세지 가져오는 sqs서비스

@Service
public class SqsService {
    private final QueueMessagingTemplate queueMessagingTemplate;

    @Autowired
    public SqsService(AmazonSQS amazonSqs) {
        this.queueMessagingTemplate = new QueueMessagingTemplate((AmazonSQSAsync) amazonSqs);
    }

    public void getMessage() {
        String rr = queueMessagingTemplate.receiveAndConvert("MyQueue2", String.class);
        System.out.println("queue message :: " + rr );

    }
}

 

 

6. SqsListener.java

@Component
public class SqlListener {
    @SqsListener(value = "MyQueue2", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void listen(@Payload String sst, Acknowledgment ack) {
        log.info("{}", sst);
        ack.acknowledge();
    }
}

 

 

7. Controller.java 

서비스 호출 controller.

@RestController
@RequiredArgsConstructor
@RequestMapping("api")
public class SnsController {
    private final SnsService snsService;
    private final SqsService sqsService;

	//메세지 발행 메소드
    @PostMapping("/publish")
    public void publish(@RequestBody Map<String, Object> scriptData) {
        PublishResponse pr = snsService.awsSnsPublishTest(scriptData);
        System.out.println(pr);
        //PublishResponse(MessageId=80da361c-4ec9-55f5-9aa8-6a49037995af)

    }

	//메세지 가져오는 메소드
    @GetMapping("/subscribe")
    public void getMessage() {
        sqsService.getMessage();
    }
}

 

가져온 메세지 콘솔출력

반응형
반응형

software.amazon.awssdk.services.sns.model.InvalidParameterException: Invalid parameter: TopicArn

 

원인 : 프로젝트에 topic region설정을 잘못함

내 토픽 region은 ap-northeast-1인데

프로젝트 설정은 2로 해놓음 ㅎㅎ;;

 

반응형
반응형

SNS와 SQS 정리
SNS : Publisher가 Subscriber에게 메시지를 전송하는 서비스
SQS : 메세지 대기열 서비스

SNS 역할
특정 서비스에서 SNS로 지정한 주제(topic)에 대한 메시지를 발행한다. 예)회원가입정보
그럼 이 SNS는 이 topic에 대한 메시지를 받아보기로 한 (subscribe한) 서비스들에게 메시지를 전달해준다.
subscribe를 하는 주체는 SQS, Lambda, HTTP, SMS, 이메일, 모바일 애플리케이션 엔드포인트가 될 수 있다.

SQS 역할
SQS는 Simple Queue Service로, 메세지 대기열 서비스라고 함.
SNS로부터 특정 주제의 메세지를 구독하여 받아볼 수 있다.
그럼 특정 서비스나 시스템에서 SQS로 받은 메시지를 사용하면 되는거임

흐름은 이렇게 됨
서비스 -> SNS -> SQS -> 다른 서비스

즉 SNS와 SQS를 활용하면 엔드포인트에서 엔드포인트로 데이터를 직접 때려박지 않아도 되므로 데이터 안전성과 관리 오버헤드 제거에 있어서 용이하겠다. . . 

1. SNS 주제 생성하기


https://ap-northeast-1.console.aws.amazon.com/sns/v3/

주제 이름 적고 다음단계로 ㄱㄱ

표준으로 ㄱㄱ 하고 다른 옵션들은 모두 주어진대로 ㄱㄱ

생성되었다.

 


2. SQS 대기열 생성하기


https://ap-northeast-1.console.aws.amazon.com/sqs/v2/

 

대기열 생성 ㄱㄱ

표준으로 하고 다른 옵션들은 기본으로 ㄱㄱ

생성되었다.

 

 


3. SNS 구독하기

생성한 대기열(MyQueue)이 만들었던 SNS의 주제를 바라보도록 설정해야한다. (publish)

'Amazon SNS 주체 구독' 선택

내가 만들었던 Topic을 선택해줌.

SQS에서 설정된것 확인

SNS에서도 설정된것 확인

 


4. SNS 메시지발행

구독 설정이 잘 되었나 확인해볼것.

SNS에서 메시지 게시

제목과 메시지 본문을 적고 기본 옵션으로 전송

 

5. 메시지 수신

MyQueue에서 메시지를 받아보자

메시지 폴링을 누릅니다.

 

SNS 에서 발행한 메시지가 보입니다.

메시지 아이디를 눌러보면 아래와 같은 메시지가 표시됨.  성공~ 

{
  "Type" : "Notification",
  "MessageId" : "d118fffff-dddd-cccc-bbbbb-aaaaaaaa",
  "TopicArn" : ":MyTopic",
  "Subject" : "TEST~~TITLE~~",
  "Message" : "TEST~~PAYLOAD~~~",
  "Timestamp" : "2023-02-02T00:11:52.300Z",
  "SignatureVersion" : "1",
  "Signature" : "==",
  "SigningCertURL" : "",
  "UnsubscribeURL" : ""
}

 

다음은 SNS 와 SQS 를 Spring Boot에 연동해서 메세지 보내고 받아보기~

https://mchch.tistory.com/281

 

SpringBoot에서 SNS SQS 연동하기

SpringBoot에서 SNS로 메세지 발행하기 SQS로 받은 메시지 Springboot에서 받아보기 개발환경 : Intellij, java17, gradle 1. 의존성 설정 implementation 'org.springframework.boot:spring-boot-starter-data-redis' implementation 'org.sp

mchch.tistory.com

 

반응형

+ Recent posts