지난번에는 rabbitMQ를 docker에 설치하고 rabbitMQ 관리 콘솔로 실행되는 것 까지 해보았다.
오늘은 rabbitMQ를 위한 config파일을 만들고 코드를 이용해 메시지를 발행해보고자 한다.
RabbitMqConfig
/**
* 실행하면 RabbitMQ에 메시지 큐를 생성하고 큐를 통해 메시지 송신,
* <p>
* 큐에 쌓인 메시지를 받아 출력하는 로그를 확인할 수 있다.
*/
@Configuration
public class RabbitMqConfig {
@Value("${rabbitmq.topic-exchange.name}")
private String topicExchangeName;
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.routing-key.name}")
private String routingKey;
/**
* 지정된 큐 이름으로 Queue 빈을 생성
*
* @return 메시지를 저장하는 버퍼
*/
@Bean
Queue queue() {
return new Queue(queueName, false); // Queue이름 지정
}
/**
* 발생된 메시지를 Queue에 저장하는 모듈
*
* @return 메시지를 여러 큐로 전송하고 싶을 때 또는 메시지를 특정 패턴에 따라 라우팅하고 싶을 때 사용
*/
@Bean
TopicExchange exchange() { //
return new TopicExchange(topicExchangeName); // Topic을 지정
}
/**
* queue와 topic 그리고 이를 연결할 Route Key인 "fitmate.rabbit.#"으로 연결
*
* @param queue 메시지를 저장하는 버퍼
* @param exchange 발생된 메시지를 Queue에 저장하는 모듈
* @return
*/
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange)
.with(routingKey); // '#': 여러 단어를 대체할 수 있는 와일드 카드
}
/**
* RabbitTemplate을 생성하여 반환
*
* @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
* @return RabbitTemplate 객체
*/
@Bean
public RabbitTemplate rabbitTemplate(
ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
/**
* Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
*
* @return MessageConverter 객체
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule()); // JavaTimeModule 추가
return new Jackson2JsonMessageConverter(objectMapper);
}
}
Queue, Exchange, Binding, RabbitTemplate, MessageConverter를 등록해주었다.
Queue와 Exchange의 이름을 지정해 주었고, 등록한 Queue와 Exchange를 연결해주며 routingKey를 지정해주었다.
여기서 Exchange는 TopicExchange를 사용했는데 TopicExchange 말고도 DirectExchange 도 있었다.
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
둘의 주요 차이점은 메시지의 라우팅 방식에 있다.
- DirectExchange:
- DirectExchange는 라우팅 키와 메시지의 라우팅 키가 정확히 일치할 때만 메시지를 큐로 전달한다.
- 즉, 메시지의 라우팅 키와 큐를 바인딩한 Exchange의 라우팅 키가 정확히 일치해야만 해당 큐로 메시지가 전송된다.
- 이 Exchange는 일반적으로 한 대의 큐에 메시지를 직접 전달할 때 사용된다.
- TopicExchange:
- TopicExchange는 라우팅 키에 패턴을 사용하여 메시지를 라우팅한다.
- 라우팅 키는 여러 단어로 구성될 수 있으며, 라우팅 키의 각 단어는 '.'로 구분된다.
- '*'는 하나의 단어를 대체할 수 있는 와일드 카드이며, '#'은 여러 단어를 대체할 수 있는 와일드 카드다.
- TopicExchange는 라우팅 키와 큐를 바인딩할 때 패턴을 사용하여 여러 큐에 메시지를 라우팅할 수 있다.
- 이 Exchange는 메시지를 여러 큐로 전송하고 싶을 때 또는 메시지를 특정 패턴에 따라 라우팅하고 싶을 때 사용된다.
요약하면, DirectExchange는 라우팅 키의 일치를 기반으로 메시지를 전달하고 TopicExchange는 패턴을 사용하여 메시지를 전달한다.
마지막으로 RabbitTemplate에 json형식으로 변환하는 MessageConverter를 설정해주었다.
Consumer
@Slf4j
@Service
public class Receiver { // 메시지 구독
private CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "${rabbitmq.queue.name}")
public void receiveMessage(LocalTime hi) {
log.info("[RabbitMQ][Receiver]Received <{}>", hi);
latch.countDown(); // 메시지 처리가 완료될 때까지 대기
}
public CountDownLatch getLatch() {
return latch;
}
}
@RabbitListener 어노테이션를 이용해 큐이름을 지정하하고 메시지를 받고 처리할 메서드를 만들어줬다.
(LocalTime 파라미터는 그냥 의미없이 생각하는 Object를 사용했다.)
간단한 TEST
CommandLineRunner을 구현하면 쉽게 테스트가 가능하다.
@Slf4j
@Service
@RequiredArgsConstructor
public class Runner implements CommandLineRunner { // 발행
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
private static final int TIME_OUT = 10_000;
@Value("${rabbitmq.topic-exchange.name}")
private String topicExchangeName;
@Override
public void run(String... args) throws Exception {
log.info("[RabbitMQ][Runner]Sending message...");
rabbitTemplate.convertAndSend(topicExchangeName, "fitmate.rabbit.baz",
LocalTime.of(1, 1));
receiver.getLatch().await(TIME_OUT, TimeUnit.MILLISECONDS);
}
}
CommandLineRunner 인터페이스는 Spring Boot 애플리케이션이 시작될 때 특정한 작업을 수행할 수 있도록 하는 스프링 프레임워크의 콜백 인터페이스다. 이 인터페이스를 구현한 클래스는 run 메서드를 오버라이드하여 시작 시에 실행할 코드를 정의할 수 있다.
에플리케이션을 실행하면 콘솔창에서 로그가 찍히는 것을 확인할 수 있었다.
참고
위의 설정말고도 아래와 같은 설정을 하는 곳도 있었다.
1. MessageListenerAdapter 설정해주기
/**
* container() 메서드가 호출되어, RabbitMQ서버와 연결
*
* @param connectionFactory RabbitMQ 연결을 위한 ConnectionFactory
* @param listenerAdapter
* @return RabbitMQ에서 메시지를 수신하고 처리하는 데 사용,
* <p>
* (컨슈머(메시지를 받는 쪽) 측에서 사용), 큐를 구독하여, 메시지가 도착하면 자동으로 메시지를 가져와서 처리
*/
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName); // 메세시 수신할 큐
container.setMessageListener(listenerAdapter); // 메시지 수신 시 실행할 리스너 메서드
return container;
}
/**
* 메세시 수신 시, 응답할 리스너 메서드를 등록(메시지를 처리하는 방식을 정의)
*
* @param receiver 메시지 구독
* @return Spring AMQP에서 메시지를 수신하는 데 사용되는 어댑터
*/
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
// RabbitMQ에서 메시지가 수신될 때마다 Receiver 클래스의 receiveMessage 메서드가 호출되어 메시지를 처리
return new MessageListenerAdapter(receiver, "receiveMessage");
}
해당 코드는 MessageListenerAdapter에 consumer로 사용될 클래스(Receiver)를 지정하고
MessageListenerContainer에 해당 MessageListenerAdapter를 설정시켜주는 내용이다.
이는 메시지를 수신하면 처리할 메서드를 등록해주는 내용으로 이해했다.
필자는 이렇게 설정하지 않고 Consumer 클래스(Reciver)에서 @RabbitListener(queues = "${rabbitmq.queue.name}")를 사용해주었다.
2. ConnectionFactory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(rabbitmqPort);
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
return connectionFactory;
}
연결을 설정한다. 이는 application.yaml 파일에 설정해준대로 자동 되는 부분인듯해서 Config파일에서는 직접 설정하진 않았다.
참고 블로그
'spring > spring' 카테고리의 다른 글
[Spring][MSA] Spring에서 MSA (0) | 2024.08.30 |
---|---|
[Spring][RabbitMQ][Exception] Message로 인터페이스 사용하기 (0) | 2024.04.29 |
[Spring][Exception] Controller Ambiguous mapping (0) | 2024.04.15 |
[Spring][스프링 부트 핵심 가이드] 스프링 시큐리티: 서비스의 인증과 권한 부여 (0) | 2024.03.13 |
[Spring][스프링 부트 핵심 가이드] 액추에이터 활용하기 (0) | 2024.03.06 |