2024. 9. 12. 11:57ㆍServer/Spring
개요
2024년도 7월부터 8월간, SOPT 34기를 진행하며, 프로젝트를 진행했습니다.
한줄로 요약하자면, '답변을 제공받는 감사일기' 입니다. 답변은 가상의 캐릭터 '로디' 가 제공합니다.
AI를 활용하여 일기를 작성하면, 일기에 대한 긍정적인 답변을 받을 수 있는 서비스 입니다.
팀의 여건 상, AI 요청을 담당하는 서버와 인증 및 비즈니스 로직을 담당하는 백엔드 서버를 나누어 개발을 진행하게 되었습니다.
따라서 사용자가 저희 백엔드 서버에 일기를 작성하면, AI 서버로 요청을 보내는 방식의 간단한 로직이 핵심 비즈니스 로직 이었습니다. 하지만, 요구사항이 점점 추가되었습니다.
최종적으로 결정된 일기 작성에 관련한 요구사항은 다음과 같습니다.
'일기에 대한 답장은 하루에 한번 받을 수 있다.',
'첫 일기 작성시, 답장이 1분만에 제공되고, 그 이후부터는 12시간 뒤에 답장이 제공된다.'
'일기는 작성한 이후 2시간동안은 삭제/수정이 가능하다.'
첫 번째 해결 방법 - Redis Key Event Notification
분리된 서버 환경에서 위와 같은 요구사항을 만족하기 위해 물심양면으로 고민했습니다.
MessageQueue를 사용해, 지연처리 큐를 구현해야 하나 ?, 주기적으로 Polling을 시도해야 하나 ?, SSE 같은 방식을 사용해야 하나 ?
등, 처음 다루는 방식을 해결하기 위해 다양한 솔루션을 지속적으로 생각했습니다.
고민 끝에 생각한 첫 번째 방법은, Redis 의 Key Event Notificaiton 입니다.
Redis Key Event Notification 이란 ?
Redis는 InMemory에서 작동하는 Key:Value 를 지원하는 NoSQL로써,
간단한 Pub/Sub을 지원합니다.
Key Event Notification은 값을 삽입할 때, Expire를 설정하여 등록합니다.
Expire가 만료될 경우, Listener가 해당 만료 이벤트를 감지하여 'Key' 값을 받아들입니다.
어떻게 해당 방식을 이용해 요구사항을 구현했는지 말씀드리겠습니다.
다음을 참고했습니다.
https://redis.io/docs/latest/develop/use/keyspace-notifications/
https://px201226.github.io/redis-event-notifications/
Redis는 지원하는 Notification 유형이 있습니다.
Key Event Notification에서 사용하는 옵션은 주로 K 와 E 옵션입니다.
K ( Keyspace Event)
KeySpace Event는 특정 키에 발생하는 이벤트를 추적합니다.
따라서, 해당 옵션을 활성화 하면, 어떤 키에서 변경이 발생했는지에 대한 알림을 보내게 됩니다.
즉, myKey에서 Del 명령이 실행되면, Redis는 keyspace@0:mykey del 와 같은 형식으로 알림을 발생시킵니다.
E (Key event notifications):
Key event 알림은 특정 명령이 실행된 이벤트를 추적합니다.
즉, Del 명령이 실행되면, Redis 는 Del 이벤트 자체에 대해 알림을 발생시킵니다.
해당 기능들을 사용하기 위해서는 몇가지 설정이 필요합니다. 기본적으로 메모리 자원을 소모하는 기능이기 때문에, Default 값은 Off 입니다. Config에 접근하여 수정이 필요합니다.
# redis-cli config set notify-keyspace-events "AKE" (모든 K, E 알림 수신)
얼마나 부하가 생기나요 ?
- Key Event Notification On
- Key Event Notification Off
다음과 같이 redis-benchmark -q
로 성능을 측정 해보았습니다.
확실히 성능의 차이가 존재하긴 하지만, 유의미한 차이는 아니라고 판단하여, 도입을 진행했습니다.
스프링 부트 설정
build.gradle.kts
implementation("org.springframework.boot:spring-boot-starter-data-redis")
RedisConfig.java
@Configuration
@EnableRedisRepositories
@RequiredArgsConstructor
public class RedisConfig {
private static final String EXPIRED_EVENT_PATTERN = "__keyevent@0__:expired";
private final ObjectMapper objectMapper;
@Value("${spring.data.redis.host}")
private String host;
@Value("${spring.data.redis.port}")
private int port;
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
@Bean(name = "redisMessageTaskExecutor")
public Executor redisMessageTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(2);
threadPoolTaskExecutor.setMaxPoolSize(4);
return threadPoolTaskExecutor;
}
@Bean
@Primary
public RedisTemplate<String, String> redisStringTemplate(
RedisConnectionFactory connectionFactory) {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}
@Bean
public RedisTemplate<String, Object> redisObjectTemplate(
RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
// 객체를 직렬화하는 방법 설정
template.setValueSerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setDefaultSerializer(new StringRedisSerializer());
return template;
}
RedisConfig.java 를 통해, 다음과 같이 Configuration 클래스를 설정하였습니다.
각 요소들을 하나씩 점검하겠습니다.
- redisConnectionFactory:
Spring Data Redis는 내부적으로 Redis Library들인 Jedis, Lettuce 중, Lettuce 구현체를 사용합니다. 또한 Test 와 Dev 상의 Redis 주소가 다르기 때문에, 프로필에 따라 동적인 값을 주입하여 Connection을 생성하기 위하여 수동으로 Connection Factory를 설정하였습니다. - redisStringTemplate, redisObjectTemplate :
Spring에서 Redis를 다루는 방법은 Redis Repository 방식과, Redis Template 두가지 방식이 존재합니다.
Repository를 사용하는 방식은 기존 Spring Data JPA와 유사한 방식으로 통합될 수 있지만, Pub/Sub 기능의 활용에 제한이 발생합니다.
따라서 Template를 사용하여 객체를 직렬화하고, Redis에 직접 삽입하는 방식을 선택하였습니다.
<K,V> 제네릭을 통해, Key와 Value에서 다룰 타입을 명시하고, 해당 타입에 대한 직렬화 도구를 설정할 수 있습니다.
Template를 설정하고, 각 요소와 연산에 대해 Serializer를 정의합니다. Key 값의 Serializer를 GenericJackson2JsonRedisSerializer() 로 정의할 경우, 객체로 파싱이 가능하지만
저장 시 클래스 패키지 타입 전체를 저장하기 때문에, 해당 패키지의 DTO가 아닐경우, 해당 Key값을 조회할 수 없습니다. - 따라서 StringRedisSerializer() 를 Key값의 Serializer 로 설정하였습니다.
- redisConnectionFactory
Redis에서 키가 만료될 경우, 이벤트를 발생시키게 됩니다. 이벤트를 처리하기 위해서는 리스너가 필요합니다. 각 이벤트는 새로운 스레드를 통해 처리되기 때문에 발생하는 이벤트가 많아질 경우 스레드 풀을 생성하지 않으면 매 이벤트마다 스레드를 생성하게 되고, 성능상 문제를 유발할 수 있습니다. - 따라서 스레드풀을 생성하여 대기 스레드를 생성하고, 이벤트가 발생할 때마다 대기 중인 스레드가 이벤트를처리하도록 구성하였습니다. CorePoolSize(2)와 MaxPoolSize(4)로 설정하여 기본적으로 2개의 스레드가 항상 대기하고, 필요시 4개까지 확장하여 이벤트를 처리할 수 있도록 구현하였습니다.
DiaryPublisher
public interface DiaryPublisher {
void publishDiaryMessage(DiaryMessage message);
void removeDiary(Long diaryId);
boolean containsKey(Long diaryId);
}
DIP 를 적용하기 위해, DiaryPublisher라는 이름의 Interface를 정의하였습니다.
순서대로 일기를 발행하고, 요청을 제거하고, 일기가 요청중인지 확인하는 메서드가 될 것입니다.
RedisDiaryPublisher
@Component
@RequiredArgsConstructor
public class RedisDiaryPublisher implements DiaryPublisher {
private static final Duration TTL = Duration.ofHours(2);
private static final Duration TEST_TTL = Duration.ofSeconds(10);
private static final Logger log = LoggerFactory.getLogger(RedisDiaryPublisher.class);
private final RedisTemplate<String, Object> redisTemplate;
private final String eventPrefix = "__keyevent@0__:expired ";
@Override
public void publishDiaryMessage(DiaryMessage message) {
String key = "diaryMessage:"+message.userId();
String expiryKey = "diaryMessage_expiry:"+message.userId();
String serializedMessage = serializaeMessageToString(message);
redisTemplate.opsForValue().set(key,serializedMessage);
redisTemplate.opsForValue().set(expiryKey,"", 25, TimeUnit.SECONDS);
}
@Override
public boolean containsKey(Long userID){
String key = "diaryMessage:"+userID;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
@Override
public void removeDiary(Long userId){
String key = "diaryMessage:"+userId;
String expiryKey = "diaryMessage_expiry:"+userId;
redisTemplate.delete(key);
redisTemplate.delete(expiryKey);
}
}
다음은 DiaryPublisher
를 구현한 RedisDiaryPublisher
입니다.
TTL 설정
사용자의 일기 작성 상태에 따라 TTL(Time to Live)을 다르게 설정했습니다. 새로 작성한 일기에는 2시간의 TTL을 부여하고, 테스트 목적으로는 10초의 TTL을 설정했습니다.
직렬화
DiaryMessage
DTO를 사용하여 일기 데이터를 직렬화하여 Redis에 저장합니다. 이때, redisTemplate
을 주입받아 데이터를 Redis에 저장하고 관리합니다. serializaeMessageToString
메서드를 통해 데이터를 문자열로 직렬화한 후 Redis에 저장하는데, 이 방식은 Jackson과 같은 직렬화 라이브러리로 구현될 수 있습니다.
Key Event Notification
Redis의 Key Event Notification은 key가 만료되었을 때 이벤트를 발생시키지만, key의 value는 알 수 없습니다. 예를 들어, __keyevent@0__:expired 3
이 발행되면, value 없이 key 값인 3
만 얻을 수 있습니다. 이 한계를 해결하기 위해, 메세지를 발행할 때 동일한 userId
에 value를 담아 별도의 key로 저장합니다. 따라서 value 조회가 필요할 때는 추가로 저장한 expiryKey
에서 값을 가져오도록 설계했습니다.
다음은 Message를 처리하는 AI 서버의 Redis Config 입니다.
// 나머지는 백엔드의 Config과 동일
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter listenerAdapter, Executor commonTaskExecutor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(EXPIRED_EVENT_PATTERN));
container.setTaskExecutor(commonTaskExecutor);
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisMessageListener listener) {
return new MessageListenerAdapter(listener);
}
추가되는 부분만 작성하였습니다. redisMessageListenerContainer
와 redisMessageListenerContainer
가 추가적으로 작성된 모습을 확인할 수 있습니다.
우선 MessageListenerAdapter
를 반환하는 listenerAdapter 코드입니다.
@Bean
public MessageListenerAdapter listenerAdapter(RedisMessageListener listener) {
return new MessageListenerAdapter(listener);
}
다음과 같이 RedisMessageListener
를 주입받고 있는 모습입니다. RedisMessageListener
는 메세지 수신을 위해 제가 구현한 서비스 코드입니다.
아래에도 나오겠지만, MessageListener를 구현하고 있기 때문에, 해당 객체를 Adapter를 통해 감싸는 형태로 반환됩니다.
그 다음은 RedisMessageListenerContainer
를 반환하는 redisMessageListenerContainer
입니다.
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter listenerAdapter, Executor commonTaskExecutor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); //1
container.setConnectionFactory(redisConnectionFactory); //2
container.addMessageListener(listenerAdapter, new PatternTopic(EXPIRED_EVENT_PATTERN)); //3
container.setTaskExecutor(commonTaskExecutor); //4
return container;
}
- 새로운 메세지 리스너 컨테이너를 생성합니다.
- 컨테이너에 기존 Config에서 설정한
ConnectionFactory
를 설정합니다. messageListener
에 위에서 생성한 서비스 어댑터(MessageListenerAdapter)와, 패턴을 설정합니다.
저희는 이전에 설정했던 패턴인 "keyevent@0:expired" 와 같은 패턴을 추가하겠습니다.
이렇게RedisMessageListenerContainer
를 생성하게 되면 , Redis의 특정 채널과 연결을 유지하고 블로킹된 상태로 유지됩니다.
Redis 서버에서 구독 중인 클라이언트에게 메세지를 전달하면, RedisMessageListenerContainer
는 해당 메세지를 수신합니다.RedisMessageListenerContainer
는 전달받은 메세지를 미리 등록된 MessageListenerAdapter
에 전달합니다.
해당 어댑터는 실제 메세지를 처리하는 MessageListener
구현체에게 전달하고, onMessagee()
를 호출하여 메세지를 처리하는 방식입니다.
AI 요청을 처리하는 MessageListener
를 확인하겠습니다.
@Service
@RequiredArgsConstructor
@Slf4j
public class RedisMessageListener implements MessageListener {
private final ObjectMapper objectMapper;
private final ChatService chatService;
private final RedisTemplate<String, String> redisTemplate;
private final RedisLockService redisLockService;
private final String EXPIRED_MESSAGE_KEY_PREFIX = "diaryMessage_expiry:";
private final String MESSAGE_KEY_PREFIX = "diaryMessage:";
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = new String(message.getBody());
log.info("########## listen succeed ##########");
long diaryId = Long.parseLong(expiredKey.split(":")[1]);
log.info("expiredKey : {}", diaryId);
String lockKey = "lock:" + expiredKey;
if (redisLockService.tryLock(lockKey, 10, TimeUnit.SECONDS)) {
try {
String nonExpiredKey = MESSAGE_KEY_PREFIX + diaryId;
String value = redisTemplate.opsForValue().get(nonExpiredKey);
validateMessagePrefix(expiredKey);
DiaryListenedMessage listenedMessage = parseStringToDiaryListenedMessage(value);
log.info("listenedMessage : userId = {}, message = {}, date = {}", listenedMessage.userId(),
listenedMessage.message(), listenedMessage.date().toString());
CompletableFuture<String> future = chatService.getChatResponse(listenedMessage.message(), listenedMessage.userId(),
listenedMessage.date().toString()).toFuture();
future.join();
} finally {
redisLockService.unlock(lockKey);
}
} else {
log.warn("키 락을 해제할 수 없습니다. : {}", expiredKey);
}
}
}
다음과 같이, MessageListener를 구현하고 있습니다.
MessageListener는 Spring Redis 에서 제공하는 Functional Interface입니다.
@FunctionalInterface
public interface MessageListener {
void onMessage(Message message, @Nullable byte[] pattern);
}
onMessage를 구현하여, 발생하는 수신한 이벤트를 실행하게 됩니다. 하지만 이 상황에서 발생할 수 있는 문제 상황이 존재합니다.
이전 코드에서, Redis Listener ThreadPool을 2개 이상으로 설정하였습니다.
때문에 메세지 수신이 두번 이루어져, 요청이 두번 전송되는 에러가 발생하였는데요.
바로 다음과 같은 상황입니다. 동일한 자원을 두 스레드가 접근하여, 요청을 두번 보내는 상황을 마주하게 되었습니다.
처음에는 요청이 두번 전송되기 때문에, 수신 처리를 잘못 했는지 의심이 들었지만, Pool size를 1로 줄여보니 해당 문제가 발생하지 않았고, 수신 동기화 문제임을 알 수 있었습니다.
락킹 서비스
일반적인 3-tier-architecture 에서 이러한 동기화 문제를 해결하기 위한 방식은 크게 세가지가 있습니다.
- 클라이언트 단의 동시성 제어
- 백엔드 어플리케이션 단의 동시성 제어
- 데이터베이스 단의 동시성 제어
현재 문제가 발생하는 지점은 Redis와 Server 간에서 발생하는 동시성 문제입니다. 여러 스레드가 DB 자원에 접근하여 발생하는 문제를 해결하기 위해, 간단한 락킹 서비스를 구현하여 동시성 문제를 방지하려고 합니다.
Redis에서의 Lock
Redis는 여러 노드에서 작동할 수 있으며, Master-Slave replication을 사용하여 서버를 구성할 수도 있습니다. 이때는 RedLock 알고리즘을 사용해 분산 환경에서 락을 관리할 수 있습니다.
하지만 현재 시나리오는 Key Event Notification을 사용 중입니다. 또한 공식 문서에는 다음과 같이 나와있습니다.
Events in a cluster Every node of a Redis cluster generates events about its own subset of the keyspace as described above. However, unlike regular Pub/Sub communication in a cluster, events' notifications are not broadcasted to all nodes. Put differently, keyspace events are node-specific. This means that to receive all keyspace events of a cluster, clients need to subscribe to each of the nodes.
번역하자면, Keyspace Events는 노드별로 동작합니다. Redis 클러스터 환경에서 각 노드는 자신의 키 공간에 대해서만 이벤트를 생성하고 관리합니다.
Keyspace 클러스터의 이벤트는 다른 노드들로 전파되지 않습니다. 따라서, 전체 클러스터에서 모든 키 이벤트를 감지하려면, 각 노드에 대해 별도로 구독을 해야 합니다.
즉, Key-Event-Notification
는 노드 하나에서만 이벤트를 처리하기 때문에, 분산된 이벤트 처리가 어렵습니다. 그렇기 때문에, Redis에서 제공하는 분산 락을 사용할 필요는 없다고 판단했습니다.
@Service
@RequiredArgsConstructor
public class RedisLockService {
private final StringRedisTemplate redisTemplate;
public boolean tryLock(String key, long timeout, TimeUnit unit) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "locked", timeout, unit);
return success != null && success;
}
public void unlock(String key) {
redisTemplate.delete(key);
}
}
다음과 같이 간단한 LockService
를 구현하였습니다.tryLock
메서드는 key
, timeout
, TimeUnit
을 입력받아 Redis의 setIfAbsent
를 사용해 락을 설정합니다. 만약 해당 키가 이미 존재하지 않으면, 성공적으로 락이 설정되고 true
가 반환됩니다.
또한, unlock()은 매개변수로 입력된 키를 삭제합니다.
Key Event Notification과 Redis 이벤트 처리
Redis의 Key Event Notification은 하나의 스레드에서 이벤트 처리를 관리하도록 구현할 수 있습니다. tryLock
에서 설정한 타임아웃을 통해 여러 스레드가 동시에 접근하지 않도록 하며, 락을 획득한 스레드가 비즈니스 로직을 실행하게 됩니다.
하지만 비동기 방식으로 API 요청을 처리하는 경우, 외부 API의 응답 지연으로 인해 성능 저하가 발생할 수 있습니다. 실제로 동시 OpenAI API call 시, 6초 간격을 두고 응답이 반환됨을 확인할 수 있었고,
이를 해결하기 위해 리팩토링을 계획하였습니다.
Redis의 Key Event Notification을 통해 일종의 이벤트 버스로 활용하면서, 다양한 상황을 마주했습니다.
어찌되었든 구현은 완료하였지만, 정말 효율적인 방식인가?
를 생각하면 물음표가 드는 것도 사실입니다.
일기의 지연처리는 프로젝트의 핵심 비즈니스 요구사항인데, Redis의 Pub/Sub은 수신을 보장하지 않습니다.
또한 분산 환경에서 구현되지 않기 때문에, 양 서버중 하나라도 불안정하거나 문제가 생길 경우, 상대방은 메세지를 전달받지 못하게 됩니다.
이러한 단점은 프로젝트 운영에 치명적이라고 생각이 되어, 앱잼 기간이 끝나는 대로 코드의 리팩토링을 계획하였습니다.
따라서 다음 글은, 코드의 리팩토링과 변경된 지연처리 구현을 주제로 찾아뵙겠습니다.
감사합니다.