kiwi

DB 동시성 문제 해결법

by 키위먹고싶다

병행 제어

멀티 스레드 환경인 경우 동시 요청 작업이 병행적으로 이루어지는데 이때 데이터베이스의 일관성을 해지지 않기 위해 트랜잭션을 제어하는 것을 병행 제어, 동시성 제어라고 한다. 병행 제어가 안될 경우 갱신 손실의 문제점이나 하나의 트랜잭션이 여러 데이터 갱신 연산을 수행할 때 일관성 없는 데이터를 가져옴으로써 모순된 결과가 발생하는 문제가 발생한다.

결국 일관성을 해치지 않고 연산하기 위해 트랜잭션을 순차적으로 실행하는 것과 같은 결과를 얻어야 하는데 가장 대표적인 방법으로 Locking 방법이 있다.

 

문제 배경

@Transactional
public Member updateMemberInner(String memberId, ProfileUpdateRequest profileUpdateRequest) {

    Member member = findMember(memberId);

    duplicateIdChecking(profileUpdateRequest.getMemberId());

    member.updateProfile(profileUpdateRequest);

    return member;
}
private void duplicateIdChecking(String memberId) {

    if (memberRepository.existsByMemberId(memberId)) {
        throw new DuplicationException(ErrorCode.ID_DUPLICATE_PREVENTION, memberId);
    }
}

1. 요청 A가 변경할 아이디를 사용하고 있는 다른 사용자가 있는지 확인하기 위해 duplicateIdCheking() 메서드 호출

2. 요청 A가 끝나기 전에 요청 B가 들어와서 같은 아이디로 duplicateIdCheking() 메서드 호출

3. 요청 A가 성공적으로 프로필을 업데이트 완료

4. 요청 B가 성공적으로 프로필을 업데이트 완료

 

[기대 결과] : 요청 A의 업데이트 성공, 요청 B의 업데이트 실패

[실제] : 요청 A의 업데이트 성공, 요청 B의 업데이트 성공

 

원인

의도한 바로는 순차적으로 결과가 실행되며 A트랜잭션이 끝난 후 B작업에서는 예외가 발생해야 하는데  실제 결과는 각 요청이 비동기 처리되며 읽어간 값을 기준으로 업데이트했다.

 

해결방법 

1. 격리 수준을 serializer로 올리기

강도 높은 락으로 완전한 일관성을 유지할 수 있지만 다른 요청들도 모두 취소된다. 취소된 작업들은 모두 재시도 해야한다. 잠금 단위가 너무 커진다.

 

2. select for update 사용

특정 row에 lock을 잡는 행위인데 여러 트랜잭션에서 SELECT 한 후 INSERT, UPDATE, DELETE 하는 로직이 있을 때 SELECT 한 리소스에 대해 lock을 잡고 commit, rollback하기 전에 다른 세션에서 select 하지 못하게 하는 방법이다. 실제로 count가 중요한 서비스(영화예매 할때, 선착순 이벤트) 에서 이 방법을 많이 사용하는데 데이터 수준에서 엔티티 잠금을 하므로 좋은 방법인것같다. 

@Lock(LockModeType.PESSIMISTIC_WRITE)

jpa에서 사용하는 @Lock 은 트랜잭션 내부에서만 사용가능하고 데이터베이스에 따라서 timeout이 걸리지 않을 수도 있다. DB row에 직접 Lock을 건다. 

 

3. 분산락 사용

분산락은 데이터베이스 등 공통된 저장소를 이용하여 자원이 사용중인지 체크하고 모든 서버에서 동기화된 처리를 구현하는 것이다.   

MySQL의 Named Lock

네임드 락(Named Lock)은 GET_LOCK() 함수를 이용하여 임의의 문자열에 대한 잠금을 설정할 수 있는 LOCK기능이다.

 

이 잠금의 특징은 테이블이나 레코드와 같은 실제 DB의 객체가 아닌, 단순히 사용자가 지정한 문자열(String) 에 대한 잠금을 획득하고 반납하는 기능이다. 또한 timeout을 설정할 수 있어서 timeout시간 내에서 동시요청에 의한 작업들이 누락되지 않고 정삭적으로 처리 될 수 있다. 

 

나같은 경우는 트랜잭션 시작전에 락을 잡고 로직 처리 후 다시 락을 반환하여 동시성제어가 필요한 부분에 메서드 + 변경 아이디 이름으로 락을 걸어주었다. 

 

코드

package individual.freshplace.util.lock;

import com.zaxxer.hikari.HikariDataSource;
import individual.freshplace.util.ErrorCode;
import individual.freshplace.util.exception.StringLockException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Supplier;

@Slf4j
@Component
@RequiredArgsConstructor
public class UserLevelLock {

    private static final String GET_LOCK = "SELECT GET_LOCK(?, ?)";
    private static final String RELEASE_LOCK = "SELECT RELEASE_LOCK(?)";
    private static final String IS_USED_LOCK = "SELECT IS_USED_LOCK(?)";
    private static final long ACQUIRER_IN_SECONDS = 1L;
    private static final int SUCCESS_QUERY_VALUE = 1;
    private static final String SQLEXCEPTION_MESSAGE = "쿼리 실행 오류";

    private final HikariDataSource hikariDataSource;

    public <T> T LockProcess(String lockName, Supplier<T> supplier) {

        try (Connection connection = hikariDataSource.getConnection()) {
            try {
                getLock(connection, lockName);
                return supplier.get();
            } finally {
                releaseLock(connection, lockName);
            }
        } catch (SQLException e) {
            log.error(SQLEXCEPTION_MESSAGE);
            throw new RuntimeException(SQLEXCEPTION_MESSAGE);
        }
    }

    public void LockProcess(String lockName, Runnable runnable) {
        LockProcess(lockName, () -> {
            runnable.run();
            return null;
        });
    }

    private void getLock(Connection connection, String lockName) throws SQLException {

        try (PreparedStatement preparedStatement = connection.prepareStatement(GET_LOCK)) {

            preparedStatement.setString(1, lockName);
            preparedStatement.setLong(2, ACQUIRER_IN_SECONDS);

            resultQuery(lockName, preparedStatement, "getLock()");
        }
    }

    private void releaseLock(Connection connection, String lockName) throws SQLException {

        try (PreparedStatement preparedStatement = connection.prepareStatement(RELEASE_LOCK)){

            preparedStatement.setString(1, lockName);

            resultQuery(lockName, preparedStatement, "releaseLock()");
        }
    }

    private void resultQuery(String lockName, PreparedStatement preparedStatement, String methodType) throws SQLException {

        try (ResultSet resultSet = preparedStatement.executeQuery()){

            if (!resultSet.next()) {
                log.error(methodType + SQLEXCEPTION_MESSAGE);
                throw new RuntimeException(SQLEXCEPTION_MESSAGE);
            }

            int resultSetInt = resultSet.getInt(1);

            if (resultSetInt != SUCCESS_QUERY_VALUE) {
                log.error("lockName={} 에 대한 methodType={} 실패", lockName, methodType);
                throw new StringLockException(ErrorCode.UN_AVAILABLE_ID, lockName);
            }

            if (resultSetInt == SUCCESS_QUERY_VALUE) {
                log.info("lockName={} 에 대한 methodType={} 성공", lockName, methodType);
            }

        }

    }

}
@Test
@DisplayName("일반 회원가입")
void non_lock_join() throws InterruptedException {

    final int threadCount = 5;
    AtomicInteger throwCount = new AtomicInteger(0);

    ExecutorService service = Executors.newFixedThreadPool(threadCount);
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {

        service.execute(() -> {

            try {
                memberService.signupInner(SIGNUP_REQUEST);
            } catch (DuplicationException e) {
                throwCount.getAndIncrement();
            }

            countDownLatch.countDown();

        });
    }

    countDownLatch.await();
    Assertions.assertEquals(throwCount.get(), 0);
}

@Test
@DisplayName("분산 락 적용한 회원가입")
void distribution_lock_join() throws InterruptedException {

    final int threadCount = 5;
    AtomicInteger throwCount = new AtomicInteger(0);

    ExecutorService service = Executors.newFixedThreadPool(threadCount);
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {

        service.execute(() -> {

            try {
                memberService.signup(SIGNUP_REQUEST);
            } catch (DuplicationException e) {
                throwCount.getAndIncrement();
            }

            countDownLatch.countDown();

        });
    }

    countDownLatch.await();
    Assertions.assertEquals(throwCount.get(), threadCount-1);
}

 

'project' 카테고리의 다른 글

자바의 비동기 병럴 처리 연산을 통해 이미지 업로드하기  (0) 2023.06.25
heap dump 살펴보기  (1) 2023.01.29
GC 모니터링 하기  (1) 2023.01.29
이미지 최적화에 대하여  (0) 2022.09.23

블로그의 정보

kiwi

키위먹고싶다

활동하기