WebSocketClient 메시지 누락을 막기 위한 ExecutorService, BlockingQueue 도입
해당 포스팅은 최근 주식 관련 프로그램을 개발하는 과정에서 실시간 주가 정보를 받아오기 위해 'WebSocketClient'를 사용하며 발생했던 메시지 누락과 메시지 누락 문제를 해결하기 위해 도입한 'ExecutorService', 'BlockingQueue'에 대해 정리한 내용입니다.
1. WebSocketClient 데이터 누락 발생
증권사 API 구조 상 하나의 소켓에서 여러 종목의 실시간 주가 정보 데이터 및 거래 데이터를 모두 처리해야 했는데, 순간적으로 소켓 서버로부터 들어오는 메시지가 많아지는 경우 'onMessage()' 메서드 내에서의 처리 지연으로 인해 메시지가 누락되는 상황이 발생했는데요.
시스템 로직상 누락되는 메시지가 거래 요청 및 체결과 관련된 경우 이후 로직이 문제가 되기 때문에 메시지 누락 방지가 필수적이었습니다.
2. BlockingQueue 역할
public class TradingSystemSocketClient extends WebSocketClient {
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(5000);
...
@Override
public void onMessage(String message) {
if (!messageQueue.offer(message)) {
log.info("블로킹 큐가 가득 차 수신된 데이터 손실");
}
}
...
}
(LinkedBlockingQueue 코드 예시)
BlockingQueue는 소켓 서버로부터 들어오는 메시지를 임시로 저장하는 버퍼 역할을 하게 됩니다.
onMessage()에서의 처리 지연으로 인한 메시지 누락을 막기 위해 일단은 메시지를 큐에 저장해 두고 나중에 처리하는 방식을 적용하였습니다.
LinkedBlockingQueue의 경우 스레드 간 안전(Thread-safe)하며, FIFO 구조로 되어 있는데요.
offer() 메서드의 경우, 큐에 메시지를 추가하는 역할을 하며 추가에 성공하면 true, 실패하면 false를 반환하는데 논블로킹 방식으로 동작한다는 특징이 있습니다.
예시에서는 5000으로 설정했지만, 큐의 크기의 경우 메시지 수신 속도, 메시지 처리 속도, 데이터 유실 허용 여부 등이 고려되어야 하며, 큐의 크기가 너무 크면 메모리의 사용량이 증가되기 때문에 리소스적인 문제가 발생될 수 있습니다.
3. ExecutorService 역할
public class TradingSystemSocketClient extends WebSocketClient {
private final ExecutorService executor = Executors.newFixedThreadPool(2);
...
private void startMessageConsumers() {
for (int i = 0; i < 2; i++) {
executor.submit(() -> {
while (true) {
try {
String message = messageQueue.take();
Map<String, Object> resultMap = objectMapper.readValue(message, Map.class);
// 메시지 처리
} catch (Exception e) {
log.error("Exception: " + e);
}
}
});
}
}
...
}
(ExecutorService 코드 예시)
ExecutorService는 BlockingQueue에 쌓인 메시지를 병렬로 처리하는 작업자 스레드 풀입니다.
'newFixedThreadPool(2)'를 사용하여 고정된 2개의 스레드를 생성하며, 각 스레드는 큐에서 메시지를 take() 하여 처리합니다.
고정된 스레드 개수로 인해 메시지 처리를 위한 과도한 스레드 생성을 방지할 수 있다는 장점이 있습니다.
onMessage()에서 소켓 서버의 메시지를 바로 처리하지 않고 ExecutorService를 통한 별도의 스레드를 통해 메시지를 처리함으로써 메시지 수신 작업은 빠르게 처리하고 세부적인 로직은 뒤에서 수행할 수 있게 됩니다.
4. 최종 구조 및 흐름 정리
public class TradingSystemSocketClient extends WebSocketClient {
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(5000);
private final ExecutorService executor = Executors.newFixedThreadPool(2);
public TradingSystemSocketClient() {
...
// 생성자를 통해 메시지 큐의 메시지 읽기 작업 시작
startMessageConsumers();
}
@Override
public void onMessage(String message) {
if (!messageQueue.offer(message)) {
log.info("블로킹 큐가 가득 차 수신된 데이터 손실");
}
}
private void startMessageConsumers() {
for (int i = 0; i < 2; i++) {
executor.submit(() -> {
while (true) {
try {
String message = messageQueue.take();
Map<String, Object> resultMap = objectMapper.readValue(message, Map.class);
// 메시지 처리 로직
} catch (Exception e) {
log.error("Exception: " + e);
}
}
});
}
}
}
(최종 구조 소스코드 예시)
소켓 서버로부터의 메시지 누락을 방지하기 위해 ExecutorService, BlockingQueue를 도입한 최종 구조 및 흐름은 다음과 같습니다.
1. WebSocket 서버로부터 메시지 수신
2. onMessage(String message) 호출
3. messageQueue.offer(message) 코드를 통한 메시지 저장
4. ExecutorService 스레드 풀의 두 개의 executor가 병렬로 메시지 큐의 메시지를 꺼내서 처리
'Programming > Java' 카테고리의 다른 글
(java) Wilder RSI(Relative Strength Index) 계산하는 방법 (2) | 2025.07.13 |
---|---|
Java 디컴파일 도구 JD-GUI 설치 및 사용 방법(.class .jar 디컴파일) (0) | 2025.03.18 |
(java) ZipArchiveOutputStream, ZipArchiveEntry 클래스를 통한 파일 압축 방법 (3) | 2024.11.13 |
윈도우 서버에서 jar 파일 백그라운드 실행하기 (0) | 2024.03.09 |
Java 운영체제(윈도우, 리눅스) 프로세스 상태 확인하는 방법 (0) | 2024.02.11 |