지난번에는 rabbitMQ를 docker에 설치하고 rabbitMQ 관리 콘솔로 실행되는 것 까지 해보았다.
[RabbitMQ] docker에 설치하고 springboot에서 사용하기
내가 RabbitMQ를 사용하고자 하는 이유 프로젝트에서 알림의 기능을 구현하고자 했다. 특정 시간마다 스케줄링으로 여러명에게 알림을 보낼 생각이기 때문에 여러명에서 순차적으로 알림을 보내
jepa.tistory.com
오늘은 rabbitMQ를 위한 config파일을 만들고 코드를 이용해 메시지를 발행해보고자 한다.
RabbitMqConfig
/**
* 실행하면 RabbitMQ에 메시지 큐를 생성하고 큐를 통해 메시지 송신,
* <p>
* 큐에 쌓인 메시지를 받아 출력하는 로그를 확인할 수 있다.
*/
@Configuration
public class RabbitMqConfig {
@Value("${rabbitmq.topic-exchange.name}")
private String topicExchangeName;
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.routing-key.name}")
private String routingKey;
/**
* 지정된 큐 이름으로 Queue 빈을 생성
*
* @return 메시지를 저장하는 버퍼
*/
@Bean
Queue queue() {
return new Queue(queueName, false); // Queue이름 지정
}
/**
* 발생된 메시지를 Queue에 저장하는 모듈
*
* @return 메시지를 여러 큐로 전송하고 싶을 때 또는 메시지를 특정 패턴에 따라 라우팅하고 싶을 때 사용
*/
@Bean
TopicExchange exchange() { //
return new TopicExchange(topicExchangeName); // Topic을 지정
}
/**
* queue와 topic 그리고 이를 연결할 Route Key인 "fitmate.rabbit.#"으로 연결
*
* @param queue 메시지를 저장하는 버퍼
* @param exchange 발생된 메시지를 Queue에 저장하는 모듈
* @return
*/
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange)
.with(routingKey); // '#': 여러 단어를 대체할 수 있는 와일드 카드
}
/**
* RabbitTemplate을 생성하여 반환
*
* @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
* @return RabbitTemplate 객체
*/
@Bean
public RabbitTemplate rabbitTemplate(
ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
/**
* Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
*
* @return MessageConverter 객체
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule()); // JavaTimeModule 추가
return new Jackson2JsonMessageConverter(objectMapper);
}
}
Queue, Exchange, Binding, RabbitTemplate, MessageConverter를 등록해주었다.
Queue와 Exchange의 이름을 지정해 주었고, 등록한 Queue와 Exchange를 연결해주며 routingKey를 지정해주었다.
여기서 Exchange는 TopicExchange를 사용했는데 TopicExchange 말고도 DirectExchange 도 있었다.
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
둘의 주요 차이점은 메시지의 라우팅 방식에 있다.
- DirectExchange:
- DirectExchange는 라우팅 키와 메시지의 라우팅 키가 정확히 일치할 때만 메시지를 큐로 전달한다.
- 즉, 메시지의 라우팅 키와 큐를 바인딩한 Exchange의 라우팅 키가 정확히 일치해야만 해당 큐로 메시지가 전송된다.
- 이 Exchange는 일반적으로 한 대의 큐에 메시지를 직접 전달할 때 사용된다.
- TopicExchange:
- TopicExchange는 라우팅 키에 패턴을 사용하여 메시지를 라우팅한다.
- 라우팅 키는 여러 단어로 구성될 수 있으며, 라우팅 키의 각 단어는 '.'로 구분된다.
- '*'는 하나의 단어를 대체할 수 있는 와일드 카드이며, '#'은 여러 단어를 대체할 수 있는 와일드 카드다.
- TopicExchange는 라우팅 키와 큐를 바인딩할 때 패턴을 사용하여 여러 큐에 메시지를 라우팅할 수 있다.
- 이 Exchange는 메시지를 여러 큐로 전송하고 싶을 때 또는 메시지를 특정 패턴에 따라 라우팅하고 싶을 때 사용된다.
요약하면, DirectExchange는 라우팅 키의 일치를 기반으로 메시지를 전달하고 TopicExchange는 패턴을 사용하여 메시지를 전달한다.
마지막으로 RabbitTemplate에 json형식으로 변환하는 MessageConverter를 설정해주었다.
Consumer
@Slf4j
@Service
public class Receiver { // 메시지 구독
private CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "${rabbitmq.queue.name}")
public void receiveMessage(LocalTime hi) {
log.info("[RabbitMQ][Receiver]Received <{}>", hi);
latch.countDown(); // 메시지 처리가 완료될 때까지 대기
}
public CountDownLatch getLatch() {
return latch;
}
}
@RabbitListener 어노테이션를 이용해 큐이름을 지정하하고 메시지를 받고 처리할 메서드를 만들어줬다.
(LocalTime 파라미터는 그냥 의미없이 생각하는 Object를 사용했다.)
간단한 TEST
CommandLineRunner을 구현하면 쉽게 테스트가 가능하다.
@Slf4j
@Service
@RequiredArgsConstructor
public class Runner implements CommandLineRunner { // 발행
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
private static final int TIME_OUT = 10_000;
@Value("${rabbitmq.topic-exchange.name}")
private String topicExchangeName;
@Override
public void run(String... args) throws Exception {
log.info("[RabbitMQ][Runner]Sending message...");
rabbitTemplate.convertAndSend(topicExchangeName, "fitmate.rabbit.baz",
LocalTime.of(1, 1));
receiver.getLatch().await(TIME_OUT, TimeUnit.MILLISECONDS);
}
}
CommandLineRunner 인터페이스는 Spring Boot 애플리케이션이 시작될 때 특정한 작업을 수행할 수 있도록 하는 스프링 프레임워크의 콜백 인터페이스다. 이 인터페이스를 구현한 클래스는 run 메서드를 오버라이드하여 시작 시에 실행할 코드를 정의할 수 있다.
에플리케이션을 실행하면 콘솔창에서 로그가 찍히는 것을 확인할 수 있었다.

참고
위의 설정말고도 아래와 같은 설정을 하는 곳도 있었다.
1. MessageListenerAdapter 설정해주기
/**
* container() 메서드가 호출되어, RabbitMQ서버와 연결
*
* @param connectionFactory RabbitMQ 연결을 위한 ConnectionFactory
* @param listenerAdapter
* @return RabbitMQ에서 메시지를 수신하고 처리하는 데 사용,
* <p>
* (컨슈머(메시지를 받는 쪽) 측에서 사용), 큐를 구독하여, 메시지가 도착하면 자동으로 메시지를 가져와서 처리
*/
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName); // 메세시 수신할 큐
container.setMessageListener(listenerAdapter); // 메시지 수신 시 실행할 리스너 메서드
return container;
}
/**
* 메세시 수신 시, 응답할 리스너 메서드를 등록(메시지를 처리하는 방식을 정의)
*
* @param receiver 메시지 구독
* @return Spring AMQP에서 메시지를 수신하는 데 사용되는 어댑터
*/
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
// RabbitMQ에서 메시지가 수신될 때마다 Receiver 클래스의 receiveMessage 메서드가 호출되어 메시지를 처리
return new MessageListenerAdapter(receiver, "receiveMessage");
}
해당 코드는 MessageListenerAdapter에 consumer로 사용될 클래스(Receiver)를 지정하고
MessageListenerContainer에 해당 MessageListenerAdapter를 설정시켜주는 내용이다.
이는 메시지를 수신하면 처리할 메서드를 등록해주는 내용으로 이해했다.
필자는 이렇게 설정하지 않고 Consumer 클래스(Reciver)에서 @RabbitListener(queues = "${rabbitmq.queue.name}")를 사용해주었다.
2. ConnectionFactory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(rabbitmqPort);
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
return connectionFactory;
}
연결을 설정한다. 이는 application.yaml 파일에 설정해준대로 자동 되는 부분인듯해서 Config파일에서는 직접 설정하진 않았다.
참고 블로그
'spring > spring' 카테고리의 다른 글
| [Spring][MSA] Spring에서 MSA (0) | 2024.08.30 |
|---|---|
| [Spring][RabbitMQ][Exception] Message로 인터페이스 사용하기 (0) | 2024.04.29 |
| [Spring][Exception] Controller Ambiguous mapping (0) | 2024.04.15 |
| [Spring][스프링 부트 핵심 가이드] 스프링 시큐리티: 서비스의 인증과 권한 부여 (0) | 2024.03.13 |
| [Spring][스프링 부트 핵심 가이드] 액추에이터 활용하기 (0) | 2024.03.06 |