관리 메뉴

nalaolla

Spring Boot - RabbitMQ 본문

RabbitMQ

Spring Boot - RabbitMQ

날아올라↗↗ 2020. 2. 20. 17:01
728x90

1. 어떻게 RabbitMQ를 접하게 되었는가..?

이번에 사내 프로젝트를 진행하면서, 많은 데이터 처리가 웹서버만으로 힘들기 때문에 여러 방법을 구상 중 이였습니다. 

 

앞단에 로드밸런싱을 해볼까?

서버사이드에 분산은 되겠지만, DB가 버텨줄지 의문이였습니다. 기존 시스템 또한 DB가 말썽이였기 때문입니다. 또한 DB 사이드에 session을 무한정 늘릴 수도 없습니다.

 

그래서 선택하게 된 것이 메시지큐입니다. 이미 몇 년전부터 많은 기업에서 사용 중 이였고, DB 과부하는 충분히 커버되리라 생각되어 검토 후 사용해보기로 했습니다.

 

 

2. 왜 RabbitMQ인가?

* RabbitMQ에서 플러그인 추가만으로 모니터링이 가능하다.

* 빠른 편에 속한다.

* 다양한 언어 지원하다.

 

JAVA만이 아닌 다른 언어로도 시스템이 돌고 있는 회사 시스템에 적용하기 위해서 RabbitMQ를 선택하게 되었습니다.

 

 

3. 설치 과정

① 다운로드 URL

https://www.rabbitmq.com/download.html

 

Default User : guest/guest

 

② 모니터링 플러그인 추가(rabbitmq cmd )

rabbitmq-plugins enable rabbitmq-management

 

③ rabbitmq 서비스 재시작

이로써 http://localhost:15672으로 모니터링이 가능합니다.

 

 

 

4. 개념 정리

 

* RabbitMQ 기본개념

 

1) Message

: 일정 구조를 지닙니다.

- header

 ① message id

 ② timestamp

 ③ contenttype

- body

 ① data

 

2) Exchange

: Producer가 메시지를 보낼 때, AMQP로 교환하는 과정을 나타냅니다.

 

종류

① Direct(default)

- Routing Key와 Exchange이 같을 때 입니다.

 

② Topic

- Routing Key와 Exchange이 일정 패턴이 동일할 때 입니다.

 

③ Fanout

- 무관하게 넣습니다.

 

3) Queue

: Message가 쌓이는 구성요소. 즉, Consumer가 Message를 받는 구성요소를 나타냅니다.

 

- Name : Queue 이름

- Durable : 대기열을 유지할지를 정할 플래그

- Exclusive : 선언된 연결에 의해서만 사용할지 정할 플래그

- Auto-Delete : 더 이상 사용되지 않는 큐를 삭제할지 정할 플래그

 

4) Binding

: 메시지를 주고 받을 수 있게 도와주는 교환기

 

5) Connection

: 실질적인 TCP 연결

 

6) Channel

: 가상연결, 메시지에 대한 작업 단위

 

 

* Spring RabbitMQ

 

1) ConnectionFactory : RabbitMQ를 연결하고 관리하는 인터페이스

- CachingConnectionFactory : Spring에서 제공하는 ConnectionFactory 객체

- SingleConnectionFactory : Spring에서 제공하는 Test ConnectionFactory 객체

 

2) SSL 적용

- RabbitConnectionFactoryBean 이용하여 가능합니다.

 

3) Send Message

- ...Send() : 메소드로 보낼 수 있습니다.

- v1.4.+ : MessagingMessageConverter 로 Coverter 추가가 가능합니다.( ex) Jackson2MessageConverter )

- v.1.4.2 : batchRabbitTemplate로 배치형태의 메시지 Send 가능합니다.

- v1.6.+ : Validate User Id

 

4) Receive Message

1) Polling Consumer

: 한번에 하나의 Message를 가져옵니다.

- receive...()

 

2) Asynchronous Consumer

: 비동기적으로 요청시 메시지를 수신하는 수신기를 등록합니다.

 

- SimpleRabbitListenerContainerFactory로 커스텀 수신이 가능합니다.

(Default Consumer = min : 3 / max : 10)

 

① MesageListener 를 상속받는 객체 생성 및 등록합니다.

② 실행될 메소드에 @RabbitListener 를 추가합니다

 

※ @Configuration에 @EableRabbit 추가해줘야 합니다.

 

 

5. 예졔

 

① Gradle 설정

② Config 설정

③ Producer 설정

④ Consumer 설정

 

① Gradle 설정

 

dependencies {

    compile('org.springframework.boot:spring-boot-starter-amqp')

    testCompile('org.springframework.boot:spring-boot-starter-test')

}

Colored by Color Scripter

 

② Config 설정

@Configuration

@EnableRabbit

public class RabbitMQConfig {

    

    public static final String QUEUE_NAME = "queue";

    

    private static final String EXCHANGE = QUEUE_NAME + "-exchange";

    

    @Bean

    public RabbitTemplate rabbitTemplate() {

        RabbitTemplate template = new RabbitTemplate(connectionFactory());

        template.setRoutingKey(QUEUE_NAME);

        template.setMessageConverter(jsonMessageConverter());

        return template;

    }

    

    @Bean

    public SimpleMessageListenerContainer container() {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

        container.setConnectionFactory(connectionFactory());

        container.setQueueNames(QUEUE_NAME);

//        container.setMessageListener(baseMesage());

        container.setMessageConverter(jsonMessageConverter());

        return container;

    }

    

    @Bean

    public Queue queue() {

        return new Queue(QUEUE_NAME, false);

    }

    

    @Bean

    public TopicExchange exchange() {

        return new TopicExchange(EXCHANGE);

    }

    

    @Bean

    public Binding binding(Queue queue, TopicExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);

    }

    

    @Bean

    public Jackson2JsonMessageConverter jsonMessageConverter() {

        return new Jackson2JsonMessageConverter();

    }

    

    @Bean

    public BaseMesage baseMesage() {

        return new BaseMesage();

    }

    

    @Bean

    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory factory = new CachingConnectionFactory();

        factory.setHost("localhost");

        factory.setUsername("rabbitmq");

        factory.setPassword("rabbitmq");

        return factory;

    }

}

Colored by Color Scripter

 

③ Producer 설정

@Component

public class SendSchedler {

 

    @Autowired

    private RabbitTemplate rabbitTemplate;

    

    private static final Logger logger = Logger.getLogger(SendSchedler.class);

    

    @Scheduled(cron = "0/3 * * * * *")

    public void onSend() {

        logger.info("Sending message... Start");

        

        StopWatch stopWatch = new StopWatch();

        stopWatch.start();

        IntStream.range(115000)

                    .parallel()

                    .forEach(val -> {

                        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, new Base(val, "Hello, RabbitMQ! 1"));

                    });

        stopWatch.stop();

        logger.info(stopWatch.toString());

        logger.info("Sending message... End");

    }

}

Colored by Color Scripter

 

④ Consumer 설정

@Component

public class BaseJsonMessage {

 

    private static final Logger logger = Logger.getLogger(BaseJsonMessage.class);

    

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)

    public void onMessage(Base base) {

        logger.info("Received < " + base.toString() + " >");

    }

}

Colored by Color Scripter

 

참고

깃헙소스

https://spring.io/guides/gs/messaging-rabbitmq/



출처: https://heowc.tistory.com/36 [허원철의 개발 블로그]

728x90

'RabbitMQ' 카테고리의 다른 글

windows에서 rabbitMQ 설치하기  (0) 2020.02.20