ApiApplication
package com.bx.api;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class ApiApplication {
public static void main(String[] args) {
SpringApplication.run(ApiApplication.class, args);
}
}
application.properties
# RabbitMQ (RK3588 IP)
spring.rabbitmq.host=192.168.0.24
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=password
Topic의 핵심: 와일드카드(Wildcards)
Topic 방식의 가장 큰 특징은 **점(.)**으로 구분된 라우팅 키와 두 가지 특수 기호를 사용할 수 있다는 점입니다.
- * (별표): 정확히 단어 하나를 대체합니다. (예: *.orange.*)
- # (우물 정): 0개 이상의 단어를 대체합니다. (예: lazy.#)
RabbitTopicConfig
Spring Boot에서 Topic 방식을 쓰려면 TopicExchange와 Binding을 설정
package com.bx.api.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTopicConfig {
// 1. Topic 타입의 Exchange 생성
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("log.exchange");
}
// 2. 큐 생성
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
// 3. 바인딩 (에러 로그만 이 큐로 오게 설정)
@Bean
public Binding bindingError(Queue errorQueue, TopicExchange topicExchange) {
// "#.error" 패턴을 가진 메시지만 errorQueue로 연결
return BindingBuilder.bind(errorQueue).to(topicExchange).with("#.error");
}
@Bean
public MessageConverter jsonMessageConverter() {
// 이 설정이 있어야 객체 <-> JSON 변환이 가능합니다.
return new Jackson2JsonMessageConverter();
}
}
TopicConfig
Topic 방식은 Exchange, Queue, Binding 정의
package com.bx.api.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
public static final String EXCHANGE_NAME = "log.topic.exchange";
public static final String ALL_LOG_QUEUE = "all.log.queue";
public static final String ERROR_LOG_QUEUE = "error.log.queue";
// 1. Topic Exchange 선언
@Bean
public TopicExchange logExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
// 2. 큐 선언 (모든 로그용 / 에러 전용)
@Bean
public Queue allLogQueue() {
return new Queue(ALL_LOG_QUEUE);
}
@Bean
public Queue errorLogQueue() {
return new Queue(ERROR_LOG_QUEUE);
}
// 3. 바인딩 (와일드카드 사용)
@Bean
public Binding bindAll(Queue allLogQueue, TopicExchange logExchange) {
// "seoul.#" -> 서울에서 발생하는 모든 로그(info, warn, error 등) 수집
return BindingBuilder.bind(allLogQueue).to(logExchange).with("seoul.#");
}
@Bean
public Binding bindError(Queue errorLogQueue, TopicExchange logExchange) {
// "#.error" -> 지역 상관없이 모든 에러 로그만 수집
return BindingBuilder.bind(errorLogQueue).to(logExchange).with("#.error");
}
}
메시지 보내기 (Producer) : LogProducer
package com.bx.api.service;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import com.bx.api.config.TopicConfig;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class LogProducer {
private final RabbitTemplate rabbitTemplate;
public void sendLog(String location, String level, String message) {
// 라우팅 키 생성 예: "seoul.info" 또는 "busan.error"
String routingKey = location + "." + level;
Map<String, String> logData = new HashMap<>();
logData.put("location", location);
logData.put("level", level);
logData.put("message", message);
rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE_NAME, routingKey, logData);
System.out.println("Sent Log with Key [" + routingKey + "]: " + message);
}
}
메시지 받기 (Consumer) : LogConsumer
package com.bx.api.service;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.bx.api.config.TopicConfig;
@Service
public class LogConsumer {
// 서울 지역의 모든 로그 처리
@RabbitListener(queues = TopicConfig.ALL_LOG_QUEUE)
public void consumeAllSeoulLog(Map<String, String> message) {
System.out.println("[서울 통합 관제센터] 수신: " + message);
}
// 전 지역의 에러 로그만 처리
@RabbitListener(queues = TopicConfig.ERROR_LOG_QUEUE)
public void consumeErrorLog(Map<String, String> message) {
System.out.println("[긴급 에러 알람] 수신: " + message);
}
}
로그 메시지 전송용 Controller 구현
package com.bx.api.controller;
import java.util.HashMap;
import java.util.Map;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.bx.api.service.LogProducer;
import lombok.RequiredArgsConstructor;
@RestController
@RequiredArgsConstructor
public class RabbitController {
private final LogProducer logProducer;
/**
* HTTP GET 호출을 받아 RabbitMQ로 메시지 전송 실행 주소:
* http://localhost:8080/send?loc=seoul&lvl=info&msg=test_message
*/
@GetMapping("/send")
public Map<String, Object> sendMessage(@RequestParam(value = "loc") String location,
@RequestParam(value = "lvl") String level, @RequestParam(value = "msg") String message) {
// 이전에 만든 Producer의 메서드 호출
logProducer.sendLog(location, level, message);
Map<String, Object> result = new HashMap<>();
result.put("status", "success");
result.put("routingKey", location + "." + level);
result.put("payload", message);
return result;
}
}
실행





메시지 전송
호출 주소 예제
http://192.168.0.4:8080/send?loc=seoul&lvl=info&msg=test_message
서버 로그
Sent Log with Key [seoul.info]: test_message
[서울 통합 관제센터] 수신: {level=info, location=seoul, message=test_message}


















