1.메시지 브로커(Message Broker)란?
시스템 간에 메시지를 전송하고 중개하는 역할을 하는 소프트웨어 시스템
생산자와 소비자 간의 의존성을 줄이고 시스템 간의 통신을 원활하게 처리할 수 있게한다.
2. 왜 필요할까?
2-1.메시지 브로커가 없을 때
메시지 브로커가 없다면 시스템 간에 직접적인 연결을 하게되고 시스템 구조가 간단하다.
이경우 동기식 통신을 사용하므로, 데이터가 직접적으로 전달되어 즉시 처리된다.
하지만 시스템 간에 직접적인 연결이 필요하고, 각 시스템은 서로에 대한 의존성이 높아지기 때문에 결합도가 증가한다.
그렇기 때문에 하나의 시스템이 장애를 일으킬 경우, 다른 시스템에도 연쇄적인 장애가 일어난다.
또한 직접 통신하기에 모니터링이나 트래픽 처리가 상대적으로 어려울수있다.
[제공자] -> [소비자]
2-2.메시지 브로커가 있을 때
여러 제공자가 서로 독립적으로 작동하며, 각각 메시지를 브로커에 보낸다.
메시지 브로커는 생산자와 소비자가 독립적으로 동작하게 해주므로, 비동기 처리가 가능하고, 시스템의 응답성이 향상된다.
메시지 브로커는 메시지를 큐에 저장하거나 영구적으로 보관하여 시스템 장애 시에도 메시지를 잃지 않고 다시 처리할수도있다.
제공자와 소비자가 독립적이므로 확장성에서도 용이하다.
[제공자1] -> -> [소비자1]
[제공자2] -> [메시지 브로커] -> [소비자2]
[제공자3] -> -> [소비자3]
3.메시지 브로커 대표적인 종류
3-1.RabbitMQ
AMQP(Advanced Message Queuing Protocol)를 기반으로 하는 메시지 브로커
큐 기반 메시징을 제공하고, 다양한 클러스터링 및 부하 분산 방식이 지원한다.
3-2.Apache Kafka
분산 스트리밍 플랫폼으로, 대규모 데이터 스트리밍을 처리하는 데 최적화된 메시지 브로커
데이터를 토픽 단위로 관리하고, 장기 저장이 가능하다.
4.RabbitMQ 사용해보기
4-1.RabbitMQ Docker로 실행하기
RabbitMQ와 Management Plugin을 Docker로 활성화해준다.
Management Plugin은 RabbitMQ을 웹 기반 UI로 관리할수있게 해준다.
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
4-2.RabbitMQ 관리 UI
위에 docker로 활성화해줬다면 다음 링크에 들어간다.
http://localhost:15672/
아이디 비밀번호는 기본적으로 guest이다.
4-3. 벡엔드 서버 만들기 (FASTAPI)
간단하게 만들기 위해 backend 프레임워크는 FastAPI를 사용하였다.
pika는 RabbitMQ 클라이언트 라이브러리다.
pip install fastapi pika uvicorn
from fastapi import FastAPI
import pika
import json
app = FastAPI()
# RabbitMQ 연결 설정
RABBITMQ_HOST = 'localhost' # Docker에서 실행 중이므로 localhost로 설정
QUEUE_NAME = 'task_queue'
# RabbitMQ 연결 및 채널 생성
def get_rabbitmq_channel():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
# 큐가 없으면 생성 (내구성 설정)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
return channel
# 메시지 전송 엔드포인트
@app.post("/send_message/")
async def send_message(message: str):
# RabbitMQ 채널 가져오기
channel = get_rabbitmq_channel()
# 메시지를 큐에 발행
channel.basic_publish(
exchange='',
routing_key=QUEUE_NAME,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 메시지 내구성 설정 (큐에 남아 있음)
)
)
return {"message": "Message sent to RabbitMQ", "data": message}
# 메시지 수신 엔드포인트
@app.get("/receive_message/")
async def receive_message():
# RabbitMQ 채널 가져오기
channel = get_rabbitmq_channel()
# 메시지를 큐에서 소비하는 콜백 함수 정의
def callback(ch, method, properties, body):
print(f"Received: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
# 메시지 수신 대기
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
# 메시지를 계속 받기 위한 무한 대기
print("Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
4-3-1.RabbitMQ 연결 설정
RabbitMQ 호스트와 메시지를 저장할 큐의 이름을 설정 해준다.
이때 나는 Docker로 실행했기때문에 RabbitMQ 호스트이름은 local이다.
RABBITMQ_HOST = 'localhost'
QUEUE_NAME = 'task_queue'
4-3-2.RabbitMQ 채널 생성 함수
RabbitMQ 서버와 연결을 설정하고, 채널을 생성하는 함수를 만들었다.
def get_rabbitmq_channel():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
# 큐가 없으면 생성 (내구성 설정)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
return channel
4-3-3.메시지 전송 엔드포인트
POST 요청으로 요청 본문에 포함된 message를 RabbitMQ 큐에 전송하는 엔드포인트다.
async def send_message(message: str):
# RabbitMQ 채널 가져오기
channel = get_rabbitmq_channel()
# 메시지를 큐에 발행
channel.basic_publish(
exchange='',
routing_key=QUEUE_NAME,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 메시지 내구성 설정 (큐에 남아 있음)
)
)
return {"message": "Message sent to RabbitMQ", "data": message}
4-3-4.메시지 수신 엔드포인트
GET 요청을 처리하며, 큐에서 메시지를 소비하는 기능을 수행한다.
# 메시지 수신 엔드포인트
@app.get("/receive_message/")
async def receive_message():
# RabbitMQ 채널 가져오기
channel = get_rabbitmq_channel()
# 메시지를 큐에서 소비하는 콜백 함수 정의
def callback(ch, method, properties, body):
print(f"Received: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
# 메시지 수신 대기
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
# 메시지를 계속 받기 위한 무한 대기
print("Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
4-4.메시지 전송
Postman 을 사용해서 메시지 전송 엔드포인트로 메시지를 전송하였다.
RabbitMq UI에서 2개의 준비된 메시지를 확인할수있다.
4-5. 메시지 받기
이제 receive_message 엔드 포인트에 접속해보자
들어가자마자 큐에 쌓여있던 모든 메시지들을 받은 것을 확인할수있다.
반응형