티스토리 뷰

반응형
💡 최상용님의 재고시스템으로 알아보는 동시성이슈 해결방법 강의를 듣고 정리한 내용입니다.

 

목차

     
    글 목록

    배경

    지난 [시스템 디자인] 재고시스템으로 알아보는 동시성이슈 해결방법 (1/3) - 동시성 이슈와 Application Level로 해결하기 글에서 멀티 스레드 환경에서 개발할 때 발생할 수 있는 경합 상태(Race condition)를 비롯한 동시성 이슈를 알아보았다. 이를 해결하기 위해서는 공유 데이터에 접근할때 한 번에 한 스레드만 접근하고 작업할 수 있도록 제어를 해야한다는 사실을 알았고 자바의 synchronized 키워드를 먼저 적용하여 해결하였다. 하지만 자바의 synchronized 키워드는 한 프로세스 내에서만 스레드 안전성을 제공하며 스프링의 @Transactional 어노테이션과 함께 사용하기에 한계가 있었다.
     
    결국 동시성 이슈를 해결하기 위해 별도의 동기화 메커니즘의 필요성을 느꼈고, 이번 글에서는 데이터베이스에서 제공하는 Lock을 활용하여 문제를 해결해본다.
     

    데이터베이스 락(Database Lock)

    데이터베이스는 락(Lock)이라는 장치로 데이터에 접근 제약을 두어 데이터 정합성을 맞추는 다양한 방법을 제공하는데, 대표적으로 비관적 락(Pessmistic Lock), 낙관적 락(Optimistic LocK), 네임드 락(Named Lock)이 있다. 각각의 특성에 대해 간략히 살펴보면 다음과 같다.
     

    비관적 락(Pessimistic Lock)

    실제로 데이터베이스 테이블 혹은 데이터베이스 로우(Row)에 Lock을 걸어서 데이터 접근 제약을 두어 정합성을 맞추는 방법이다. 이러한 방식으로 Exclusive lock을 걸게되면 다른 트랜잭션에서는 Lock이 해제되기 전에 데이터를 가져갈 수 없다.
    가장 직관적으로(데이터에 접근 자체를 제한) 접근 제약을 둘 수 있지만, 두 개 이상의 스레드가 서로 락을 얻기 위해 무한정 대기하는 데드락(Deadlock, 교착상태)에 빠질 수 있으므로 주의해서 사용해야한다.
     

    낙관적 락(Optimistic Lock)

     
    실제로 데이터 자체에 락(Lock)을 두는 것이 아니라 데이터의 버전(Version)을 이용하여 정합성을 맞추는 방법이다. 데이터를 읽을 때 데이터의 버전을 함께 조회한다. 그리고 업데이트하기 전에 업데이트하려는 데이터의 버전이 처음 조회했을 당시 버전과 동일한지 확인한다.

    만일 버전이 같다면 업데이트에 성공하고, 버전이 다르다면 업데이트에 실패한다. 이 경우 다시 데이터를 조회해서 업데이트를 재수행해야한다.
     

    네임드 락(Named Lock)

     
    이름을 가진 Metadata lock기법이다. 이름을 가진 Lock을 한 세션이 획득한 후 해제하기 전까지 다른 세션은 이 Lock을 획득할 수 없다. Transaction이 종료될 때 Lock이 자동으로 해제되지 않기 때문에 별도의 명령어로 해제를 수행해야한다. 혹은 TTL(Time to live)를 두어 선점시간이 끝나면 해제하도록 개발자가 별도로 설정해야한다.
    Pessimistic Lock과 유사하나 Pessimistic Lock은 로우나 테이블에 대해서 락을 거는 반면 네임드 락은 metadata lock에 대해 락을 건다는 차이가 있다.
     

    Pessimistic Lock 활용해보기

    앞서 설명한 3가지 락 기법(비관적 락, 낙관적 락, 네임드 락)을 직접 재고 관리 시스템에 적용하여 Race condition을 해결해본다. 먼저 비관적 락(Pessimistic Lock)을 먼저 적용해본다.
    앞서 설명했듯 낙관적 락 은 실제 테이블 혹은 Row에 락을 걸어 동시성 문제를 해결하고 정합성을 맞춘다. 실제 데이터에 Exclusive Lock을 걸면 다른 트랜잭션에서는 해당 데이터에 대해 Lock을 획득하고 작업하기 전까지 대기해야한다.
     
    비관적 락(Pessimistic Lock)을 사용하는 경우 타임라인 별로 두개의 스레드가 공유 데이터인 Stock에 어떻게 접근하여 동시성 이슈를 해결하는지 그 과정을 살펴보면 다음과 같다.
     

    TimeThread-1StockThread-2
    10:00:00

    Thread1이 락을 획득하고
    데이터를 조회한다.
    Thread2은 락을 대기한다.
    select *
    from stock
    where id 1
    {id : 1, quantity : 5}Lock 대기
    10:00:01

    Thread1이 락을 해제한다.
    update set quantity = 4
    from stock
    where id = 1
    {id : 1, quantity : 4} 
    10:00:05

    Thread2가 락을 획득하면
    Lock을 걸고 데이터를 조회한다.
     {id : 1, quantity : 4}select *
    from stock
    where id 1
    10:00:06

    Thread2가 락을 해제한다.
     {id : 1, quantity : 3}update set quantity = 3
    from stock
    where id = 1

     

    StockRepository

    재고 로우에 대해 Lock을 걸고 데이터를 조회하기 위해 findByIdWithPessimisticLock() 메서드를 추가한다.
    이때 Spring Data JPA에서 제공하는 @Lock 어노테이션을 활용하여 LockModeType을 PESSIMISTIC_WRITE로 두어 간단하게 Pessimistic Lock을 구현할 수 있다.

    package com.example.stock.repository;
    
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.data.jpa.repository.Lock;
    import org.springframework.data.jpa.repository.Query;
    
    import com.example.stock.domain.Stock;
    
    import jakarta.persistence.LockModeType;
    
    // StockRepository : 재고(Stock)에 대한 데이터베이스와의 CRUD 기능 제공
    public interface StockRepository extends JpaRepository<Stock, Long> {
    	// Spring Data Jpa에서는 @Lock 어노테이션을 활용하여 손쉽게 Pessimistic Lock을 구현할 수 있다.
    	@Lock(LockModeType.PESSIMISTIC_WRITE)
    	@Query("select s from Stock s where s.id = :id")
    	Stock findByIdWithPessimisticLock(Long id);
    }

     

    PessimisticLockStockService

    재고 감소 로직을 수행할때 Pessimistic Lock을 제공하는 로직을 구현한다. decrease() 메서드를 보면 이전과 달리 락을 걸고 데이터를 조회하는 것을 볼 수 있다.

    package com.example.stock.service;
    
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import com.example.stock.domain.Stock;
    import com.example.stock.repository.StockRepository;
    
    @Service
    public class PessimisticLockStockService {
    	private final StockRepository stockRepository;
    
    	public PessimisticLockStockService(StockRepository stockRepository) {
    		this.stockRepository = stockRepository;
    	}
    
    	@Transactional
    	public void decrease(Long id, Long quantity) {
    		// 락을 걸고 재고 데이터를 가져온다.
    		Stock stock = stockRepository.findByIdWithPessimisticLock(id);
    
    		stock.decrease(quantity);
    		stockRepository.save(stock);
    	}
    }

     

    StockServiceTest

    비관적 락을 사용하는 PessimisticLockStockService 클래스의 decrease() 메서드를 테스트하는 코드를 작성한다.
    테스트 코드를 새로 작성할 필요 없이 기존의 테스트 코드에서 @Autowired로 StockService 대신 PessimisticLockStockService 클래스를 의존관계로 주입하기만 하면 된다.

    package com.example.stock.service;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import com.example.stock.domain.Stock;
    import com.example.stock.repository.StockRepository;
    
    @SpringBootTest
    class StockServiceTest {
    	@Autowired
    	//private StockService stockService;
    	private PessimisticLockStockService stockService;
    
    	@Autowired
    	private StockRepository stockRepository;
    
    	// 각 테스트가 실행되기 전에 데이터베이스에 테스트 데이터 생성
    	@BeforeEach
    	public void before() {
    		stockRepository.saveAndFlush(new Stock(1L, 100L));
    	}
    
    	// 각 테스트를 실행한 후에 데이터베이스에 테스트 데이터 삭제
    	@AfterEach
    	public void after() {
    		stockRepository.deleteAll();
    	}
    
    	@Test
    	public void 동시에_100개의_요청() throws InterruptedException {
    		// when
    		// 100개의 쓰레드 사용(멀티스레드)
    		int threadCount = 100;
    
    		// ExecutorService : 비동기로 실행하는 작업을 간단하게 실행할 수 있도록 자바에서 제공하는 API
    		ExecutorService executorService = Executors.newFixedThreadPool(32);
    
    		// CountDownLatch : 작업을 진행중인 다른 스레드가 작업을 완료할때까지 대기할 수 있도록 도와주는 클래스
    		CountDownLatch latch = new CountDownLatch(threadCount);
    		// 100개의 작업 요청
    		for(int i = 0; i < threadCount; i++) {
    			executorService.submit(() -> {
    				try {
    					stockService.decrease(1L, 1L);
    				} finally {
                   	 	// CountDownLatch 1 감소
    					latch.countDown();
    				}
    			});
    		}
            // CountDownLatch이 0이 될때까지 스레드 대기 - await() 이후 로직은 CountDownLatch이 0이 되고나서 수행된다.
    		latch.await();
    
    		// then
    		Stock stock = stockRepository.findById(1L).orElseThrow();
    		assertEquals(0, stock.getQuantity());
    	}
    }

     
    테스트를 실행하기 전에 JPA가 생성하는 쿼리를 로그로 출력하여 확인할 수 있도록 application.yml을 다음과 같이 수정한다.

    spring:
      application:
        name:stock
      jpa:
        hibernate:
          ddl-auto: create
        show-sql: true
    
    # JPA에서 쿼리가 어떻게 나가는지 확인하기 위함
    logging:
      level:
        org:
          hibernate:
            SQL: DEBUG
            type:
              descriptor:
                sql:
                  BasicBinder: TRACE

    테스트 결과

    테스트가 성공하는 것을 볼 수 있다.
     

     
    로그를 확인해보면 다음과 같이 조회 쿼리를 호출할때  select for update 쿼리를 호출하는 것을 볼 수 있다.이 부분에 데이터에 대해 Lock을 걸고 데이터를 조회하는 부분에 해당한다. 즉, 비관적 락을 사용한다는 것은 조회 시점에 select for update 쿼리를 호출하여 조회 데이터에 대해 Lock을 건다는 것을 알 수 있다.
     

     
    이처럼 Pessimistic Lock은 조회시점에 Lock을 통해 업데이트를 제어하여 데이터 정합성을 보장한다. 하지만 데이터 조회 시점에 별도의 Lock을 잡아버리기 때문에 (충돌이 잦지 않다면) 성능 감소가 발생한다.
    (바로 다음에 살펴볼) Optimistic Lock은 충돌이 일어나는 경우 데이터를 다시 조회해야하므로 충돌이 빈번하게 일어난다면 성능이 저하된다. 따라서 충돌이 빈번하게 일어나는 데이터인 경우 Pessimistic Lock은 Optimistic Lock에 비해 뛰어난 성능을 제공한다.
     

    Optimistic Lock 활용해보기

    이번에는 낙관적 락(Optimistic Lock)으로 동시성을 제어하여 정합성 문제를 해결해본다.
    낙관적 락(Optimistic Lock)은 실제 데이터베이스의 락을 이용하지 않고 version을 이용하여 정합성을 맞추는 방법이다.
     

    그림으로 살펴보면 위와 같은데, 이 케이스에서 server 2가 읽은 버전에서 수정사항이 생긴 경우(다른 세션에서 재고수량이 업데이트 되었다면) 업데이트가 실패하고 애플리케이션 레벨에서 다시 데이터를 조회한 후 업데이트를 수행해야 한다.
     

    Stock

    낙관적 락은 레코드(로우)의 버전을 확인하여 충돌을 감지하는 방식으로 동작한다. 이를 구현하기 위해 Stock 클래스에 version 필드를 추가하고 해당 필드에 @Version 어노테이션을 두어 특정 레코드에 대해 수정이 일어날 때마다 자동으로 증가하고 충돌 감지에 사용할 수 있도록 한다.

    package com.example.stock.domain;
    
    import jakarta.persistence.Entity;
    import jakarta.persistence.GeneratedValue;
    import jakarta.persistence.GenerationType;
    import jakarta.persistence.Id;
    import jakarta.persistence.Version;
    
    @Entity // DB의 테이블로 매핑
    public class Stock {
    
    	@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    	private Long id;
    
    	private Long productId;
    	private Long quantity;
    
    	@Version
    	private Long version; // Optimistic Lock을 위함
    
    	public Stock() {
    	}
    
    	public Stock(Long productId, Long quantity) {
    		this.productId = productId;
    		this.quantity = quantity;
    	}
    
    	public Long getQuantity() {
    		return quantity;
    	}
    
    	public void decrease(Long quantity){
    		if(this.quantity - quantity < 0){
    			throw new RuntimeException("재고는 0개 미만이 될 수 없습니다.");
    		}
    
    		// 현재 수량을 갱신
    		this.quantity -= quantity;
    	}
    }

     
    이 때 @Version 어노테이션은 jakarta.persistence를 import한다.
     

     
    Stock 테이블을 조회해보면 다음과 같이 version 컬럼이 추가된 것을 볼 수 있다.
     

    StockRepository

    Optimistic Lock을 사용하는 findByIdWithOptimisticLock() 메서드를 추가한다. 이때에도 @Lock 어노테이션을 사용하는데 이번에는 LockModeType를 OPTIMISTIC로 둔다.

    package com.example.stock.repository;
    
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.data.jpa.repository.Lock;
    import org.springframework.data.jpa.repository.Query;
    
    import com.example.stock.domain.Stock;
    
    import jakarta.persistence.LockModeType;
    
    // StockRepository : 재고(Stock)에 대한 데이터베이스와의 CRUD 기능 제공
    public interface StockRepository extends JpaRepository<Stock, Long> {
    	@Lock(LockModeType.OPTIMISTIC) // Optimistic Lock을 위함
    	@Query("select s from Stock s where s.id = :id")
    	Stock findByIdWithOptimisticLock(Long id);
    }

     

    OptimisticLockStockService

    findByIdWithOptimisticLock()를 호출하여 재고를 조회하고 업데이트하는 서비스 클래스를 추가한다.

    package com.example.stock.service;
    
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import com.example.stock.domain.Stock;
    import com.example.stock.repository.StockRepository;
    
    @Service
    public class OptimisticLockStockService {
    	private final StockRepository stockRepository;
    
    	public OptimisticLockStockService(StockRepository stockRepository) {
    		this.stockRepository = stockRepository;
    	}
    
    	@Transactional
    	public void decrease(Long id, Long quantity) {
    		// Optimistic Lock으로 재고(Stock) 조회
    		Stock stock = stockRepository.findByIdWithOptimisticLock(id);
    
    		// 재고 수량 감소
    		stock.decrease(quantity);
    		stockRepository.save(stock);
    	}
    }
    
    

     
    테스트 코드를 실행해보면 실패하는 것을 볼 수 있다.

     
    실행 로그를 확인해보면 1번 스레드(pool-3-thread-1)가 update 쿼리를 호출하여 재고 수량을 업데이트하고 나서, 2번 스레드(pool-3-thread-2)가 재고 수량을 감소하기 위해 update 쿼리를 호출했으나 ObjectOptimisticLockingFailureException 예외가 발생하여 실패한 것을 볼 수 있다. 
     

     
    앞서 낙관적 락은 데이터의 버전을 확인하여 충돌을 감지하고 데이터 정합성을 유지한다고 하였다. 즉 이미 1번 스레드(pool-3-thread-1)가 업데이트한 데이터(= 조회한 시점과 버전이 다른 데이터)에 2번 스레드(pool-3-thread-2)기 업데이트를 시도하면 업데이트에 실패하고 로직이 중단된 것인데, 이 과정을 디버깅을 통해 눈으로 직접 확인해보자.
     

    멀티 스레드 디버깅

    먼저 2개의 스레드가 OptimisticLockStockService의 decrease() 메서드를 호출하는 테스트 업데이트_실패_테스트()를 작성한다.

    package com.example.stock.service;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.orm.ObjectOptimisticLockingFailureException;
    
    import com.example.stock.domain.Stock;
    import com.example.stock.repository.StockRepository;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    @SpringBootTest
    public class OptimisticLockStockServiceTest {
    
    	@Autowired
    	private OptimisticLockStockService stockService;
    
    	@Autowired
    	private StockRepository stockRepository;
    
    	// 각 테스트가 실행되기 전에 데이터베이스에 테스트 데이터 생성
    	@BeforeEach
    	public void before() {
    		stockRepository.saveAndFlush(new Stock(1L, 100L));
    	}
    
    	// 각 테스트를 실행한 후에 데이터베이스에 테스트 데이터 삭제
    	@AfterEach
    	public void after() {
    		stockRepository.deleteAll();
    	}
    
    	@Test
    	public void 업데이트_실패_테스트() throws InterruptedException {
    		final Long stockId = 1L; // 테스트할 재고 ID
    		final Long decreaseAmount = 1L;
    		final int threadCount = 2;
    
    		// ExecutorService : 비동기로 실행하는 작업을 간단하게 실행할 수 있도록 자바에서 제공하는 API
    		ExecutorService executorService = Executors.newFixedThreadPool(32);
    
    		// CountDownLatch : 작업을 진행중인 다른 스레드가 작업을 완료할때까지 대기할 수 있도록 도와주는 클래스
    		CountDownLatch latch = new CountDownLatch(2);
    		// 2개 이상의 쓰레드에 작업 요청
    		for(int i = 0; i < threadCount; i++) {
    			executorService.submit(() -> {
    				try {
    					stockService.decrease(stockId, decreaseAmount);
    				} catch (RuntimeException e){
    					log.error("{} 발생, 업데이트 실패", e.getClass().getSimpleName(), e);
    				} finally {
    					// CountDownLatch 1 감소
    					latch.countDown();
    				}
    			});
    		}
    
    		// CountDownLatch이 0이 될때까지 스레드 대기 - await() 이후 로직은 CountDownLatch이 0이 되고나서 수행된다.
    		latch.await();
    
    		// then
    		Stock stock = stockRepository.findById(1L).orElseThrow();
    		// 처음에 100개로 저장하고 한개씩 2번 감소하기 때문에 98이 될 것을 예상
    		// 별도의 재처리 로직이 없으므로 테스트는 실패한다.
    		assertEquals(98, stock.getQuantity());
    	}
    }

     
    재고 감소 로직인 decrease() 메서드에 브레이크 포인트를 지정하는데, 마우스 우클릭하여 Suspend를 Thread로 지정한다.
    멀티 스레드 환경에서 동시성 이슈를 디버깅하여 확인해야하는데 이를 통해 모든 스레드에 대해 브레이크 포인트를 적용할 수 있다.
     

     
    이제 스레드별로 지정한 브레이크 포인트에 도달했을때 해당 라인에서 일시중지 될 것이다. 별도로 Suspend를 수정하지 않았다면 Suspend가 All로 설정되어 있는데 이는 어떠한 스레드가 해당 브레이크포인트에 도착하고 나서부터는 해당 스레드만 컨트롤할 수 있다. (즉 2개 이상의 스레드를 제어하면서 디버깅을 할 수 없다.)
     
    Suspend를 Thread로 지정하고 나면 OptimisticLockStockService의 업데이트_실패_테스트()를 디버깅 모드로 수행해본다.
     

     
    먼저 테스트를 실행하고나서 decrease() 메서드를 호출하기전에 Stock 로우를 확인해보면, before() 메서드로 작성한 것과 같이 id = 1, product = 1, quantity = 100인 재고 로우가 Stock 테이블에 생성된 것을 볼 수 있다. 이때 이 로우를 아무런 스레드도 수정하지 않았으므로 version = 0이 된다.
     

     
    decrease() 메서드에 도달하고 나면 아래 Debug 창에 다음과 같이 현재 사용중인 스레드 목록을 볼 수 있다. 멀티스레드 환경에서는 이처럼 두 스레드를 상황에 맞게 변경해가면서 로직을 추적할 수 있다.
     

     
    먼저 pool-3-thread-1에서 재고 감소 로직인 decrease() 메서드를 디버깅한다.
    Stock 테이블에서 재고 수량을 조회해온다. 아직 아무런 스레드에서 재고 수량을 변경하지 않았으므로 최초 상태인 quantity = 100, version = 0인 것을 볼 수 있다.
     

     
    Stock 도메인 클래스의 decrease() 메서드를 호출하여 수량을 1 감소한다. quantity = 99로 변경된 것을 볼 수 있다. (아직 재고 수량을 업데이트하기 직전이므로 version = 0이다.)
     

     
    save() 메서드를 호출하여 데이터베이스에 변경사항을 반영하기 전에 작업을 멈추고 pool-3-thread-2로 작업 스레드를 변경한다.
     

     
    위와 동일하게 Stock 테이블의 로우를 조회한다. pool-3-thread-1에서 재고 수량을 1 감소한 사항을 데이터베이스에 반영하기 전이므로 quantity = 100, version = 0인 것을 볼 수 있다.
     

     
    마찬가지로 Stock 도메인 클래스의 decrease() 메서드를 호출하여 수량을 1 감소한다. quantity = 99로 변경된 것을 볼 수 있다. (마찬가지로 아직 재고 수량을 업데이트하기 직전이므로 version = 0이다.)
     

     
    pool-3-thread-2에서도 save() 메서드를 호출하여 데이터베이스에 변경사항을 반영하기 전에 작업을 멈추고 pool-3-thread-1로 작업 스레드를 다시 변경한다. 이후 pool-3-thread-1 스레드에서 save() 메서드를 호출하여 변경사항을 DB에 반영한다.
     

     
     
    save() 메서드를 호출하면 EntityUpdateAction의 execute() 메서드를 호출한다.
    이때 version과 관련된 내용을 확인할 수 있는데 다음과 같이 previousVersion과 nextVersion 값을 확인할 수 있다.
     

     
    또한 현재 변경사항을 반영할 Entity 객체와 버전 컬럼의 이름이 정의된 것을 확인할 수 있다.

     
     
    execute() 메서드는 UpdateCoordinatorStandard의 update()를 호출하고 최종적으로 performUpdate()를 호출하여 실제 업데이트 쿼리 호출한다.
     

     
    하지만 이 시점에는 데이터베이스에 변경사항이 반영되어있지 않다.(version = 0이다.) 
     

     
    트랜잭션을 커밋하기 전에 버전을 다시 체크하는데, AbstractEntityPersister.getCurrentVersion()의 versionSelectString 주목하면
     

     
    위와 같이 쿼리를 호출하여 버전 컬럼을 조회한다. 이를 통해 다른 트랜잭션에서 데이터 변경이 일어났는지를 다시 한 번 확인한다. 따라서 로그에는 update 쿼리와 select 쿼리가 남게된다. 이후 버전에 문제가 없을 경우 변경사항을 반영하여 데이터 일관성을 유지한다.
    (디버깅하는 과정에서 캡쳐 실수로 2번 스레드가 로그를 남기고 있는데 pool-3-thread-1에서 update를 하고, select 쿼리를 호출해서 버전을 조회했다고 보시면 됩니다.)
     

     
     
    변경사항을 데이터베이스에 반영하고 나서 테이블을 조회해보면 pool-3-thread-1 스레드의 작업 사항이 반영되어 quantity = 99, version = 1로 업데이트 된 것을 볼 수 있다.
     

     
    이후 다시 pool-3-thread-2 스레드로 작업 스레드를 변경한다.
     

     
    pool-3-thread-2 스레드에서도 동일하게 save() 메서드를 호출하여 마저 변경사항을 DB에 반영한다.
    1번 스레드와 동일하게 UpdateCoordinatorStandard 클래스의 update() 메서드를 호출하여 2번 스레드도 update 쿼리 호출한다.
     

     
    하지만 update 쿼리 실행에 실패하고 CglibAopProxy.proceed()에서 다음과 같이 OptimisticLockException이 발생한 것을 볼 수 있다.
     


    UpdateCoordinateStandard 클래스의 performUpdate() 메서드가 최종적으로 SQL UPDATE 쿼리를 실행하는데, 위 로그에서 확인한 바와 같이 이 쿼리의 WHERE절에는 버전 필드를 포함한다. 이 버전 필드를 통해 데이터베이스에서 업데이트된 행의 수를 확인하여 버전 충돌을 검사한다. (위 EntityUpdateAction의 execute() 메서드를 디버깅하는 과정에서 버전 필드명과 버전 정보를 확인할 수 있었다.)
     
    이때 update 쿼리를 호출하는데 만약 WHERE 절의 버전 값이 일치하지 않으면, 데이터베이스에서 영향을 미친 행의 수가 0이 된다. 이는 버전 충돌을 의미하고(UPDATE 쿼리 조건에 id와 version만 있었고, id는 항상 조건을 만족하므로 영향받은 행의 수가 0이면 version이 조건을 불만족했다는 것을 의미한다.) 이 경우 OptimisticLockException이 발생한다.
     
    그리고 이 OptimisticLockException 예외는 스프링의 TransactionInterceptor(또는 PersistenceExceptionTranslationPostProcessor)에서 스프링의 예외 처리 메커니즘에 의해 ObjectOptimisticLockingFailureException으로 Wrapping 된다.

     

     
    아래 이미지 중 Server 2(예시 코드에서는 pool-3-thread-2 스레드)가 변경사항을 반영하는데 pool-3-thread-1 스레드가 Stock의 로우를 업데이트하면서 이미 version을 1로 갱신하였다. 따라서 pool-3-thread-2 스레드가 업데이트하는 대상인 version = 0인 데이터가 없으므로 ObjectOptimisticLockingFailureException 예외가 발생하여 업데이트에 실패한 것이다.
     

     
    에러 로그를 확인해보면 다른 곳에서 이미 로우를 업데이트 해서 변경사항을 반영할 수 없다는 내용을 확인할 수 있다. Row was updated or deleted by another transaction(or unsaved-value mapping was incorrect)
     
     

     
    이처럼 Optimistic Lock을 사용는 경우 공유 데이터를 다른 스레드에서 이미 업데이트해버렸다면, 이후 이전 버전에 대해 업데이트를 수행한다면 ObjectOptimisticLockingFailureException이 발생하여 실패한다는 내용을 확인할 수 있다.
     

    OptimisticLockStockFacade

     앞서 낙관적 락을 구현할 때 한 스레드가 작업한 내용에 대해 다른 스레드에서 수정사항이 생긴 경우(다른 세션에서 재고수량이 업데이트 되었다면) 업데이트가 실패하고 애플리케이션 레벨에서 다시 데이터를 조회한 후 업데이트를 수행해야 한다. 고 소개하였다.
    따라서 다시 재고 데이터를 조회하고 업데이트를 수행해줄, 업데이트 실패시 재시도(버전을 다시 조회 후 업데이트)를 위한 객체를 구현한다.

    package com.example.stock.facade;
    
    import org.springframework.orm.ObjectOptimisticLockingFailureException;
    import org.springframework.stereotype.Component;
    
    import com.example.stock.service.OptimisticLockStockService;
    
    import jakarta.persistence.OptimisticLockException;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    @Component
    // Optimistic Lock을 사용할때 업데이트 실패시 재시도(버전을 다시 조회)를 위한 객체
    public class OptimisticLockStockFacade {
    	private final OptimisticLockStockService optimisticLockStockService;
    
    	public OptimisticLockStockFacade(OptimisticLockStockService optimisticLockStockService) {
    		this.optimisticLockStockService = optimisticLockStockService;
    	}
    
    	public void decrease(Long id, Long quantity) throws InterruptedException {
    		while(true) {
    			try {
    				optimisticLockStockService.decrease(id, quantity);
    
    				// 재고 차감에 성공하였다면 while loop 종료
    				break;
    			} catch (ObjectOptimisticLockingFailureException e) {
    				// 재고 업데이트 실패시 50ms간 대기 후 재시도
    				log.error("{} 발생, 업데이트 실패", e.getClass().getSimpleName(), e);
    				Thread.sleep(50);
    			}
    		}
    	}
    }

     
     
    Optimistic Lock을 사용할때 업데이트를 실패하고 ObjectOptimisticLockingFailureException 예외가 발생하는 경우 50ms간 대기 후 version을 다시 조회하고 재고차감하는 로직을 구현한다.
     

    OptimisticLockStockFacadeService

    OptimisticLockStockFacade를 테스트한다.

    package com.example.stock.facade;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import com.example.stock.domain.Stock;
    import com.example.stock.repository.StockRepository;
    import com.example.stock.service.PessimisticLockStockService;
    
    @SpringBootTest
    class OptimisticLockStockFacadeTest {
    	@Autowired
    	private OptimisticLockStockFacade optimisticLockStockFacade;
    
    	@Autowired
    	private StockRepository stockRepository;
    
    	// 각 테스트가 실행되기 전에 데이터베이스에 테스트 데이터 생성
    	@BeforeEach
    	public void before() {
    		stockRepository.saveAndFlush(new Stock(1L, 100L));
    	}
    
    	// 각 테스트를 실행한 후에 데이터베이스에 테스트 데이터 삭제
    	@AfterEach
    	public void after() {
    		stockRepository.deleteAll();
    	}
    
    	@Test
    	public void 동시에_100개의_요청() throws InterruptedException {
    		// when
    		// 100개의 쓰레드 사용(멀티스레드)
    		int threadCount = 100;
    
    		// ExecutorService : 비동기로 실행하는 작업을 간단하게 실행할 수 있도록 자바에서 제공하는 API
    		ExecutorService executorService = Executors.newFixedThreadPool(32);
    
    		// CountDownLatch : 작업을 진행중인 다른 스레드가 작업을 완료할때까지 대기할 수 있도록 도와주는 클래스
    		CountDownLatch latch = new CountDownLatch(threadCount);
    		// 100개의 작업 요청
    		for(int i = 0; i < threadCount; i++) {
    			executorService.submit(() -> {
    				try {
    					optimisticLockStockFacade.decrease(1L, 1L);
    				} catch (InterruptedException e) {
    					throw new RuntimeException(e);
    				} finally {
    					latch.countDown();
    				}
    			});
    		}
    		latch.await();
    
    		// then
    		Stock stock = stockRepository.findById(1L).orElseThrow();
    		// 처음에 100개로 저장하고 한개씩 100번 감속하기때문에 0이 될 것을 예상
    		assertEquals(0, stock.getQuantity());
    
    	}
    }
    

     
    테스트가 성공하는 것을 볼 수 있다.
     
    참고로 예시와 같이 충돌이 자주 발생하는 경우 50ms 대기하고 재시도(다시 재고를 조회하여 업데이트)하는 로직이 있어 Pessmistic Lock에 비해 속도가 느리다.
     

     
    Pessmistic Lock으로 구현했을때 테스트 수행 속도는 다음과 같다. (참고)
     

     
    데이터베이스에 직접 락을 걸지 않으므로 성능상 이점이 있다. 하지만 예시와 같이 충돌이 빈번하게 일어나거나 빈번할 것이라고 예상된다면 Pessimistic Lock에 비해 성능이 저하된다.(이 경우 Pessimistic Lock을 사용한다.)
    또한, 실패했을때 재시도 로직을 개발자가 애플리케이션 레벨에 직접 작성해주어야하기 때문에 충분한 이해가 필요하다.
     

    Named Lock 활용해보기

    마지막으로 Named Lock을 활용하여 동시성 문제를 해결해본다.
    Named Lock은 이름을 갖는 메타데이터(Meta data) 락인데, 한 세션이 Named Lock을 획득하면 이를 반납하기 전까지 다른 세션에서는 락을 획득할 수 없다.
     
    주의할 점은 앞서 살펴본 Pessimistic Lock과 Optimistic Lock과 달리 트랜잭션이 종료될때 락을 자동으로 반납하지 않는다. 따라서 개발자가 별도의 명령어로 Lock을 반납하는 로직을 작성해주어야 한다. 혹은 선점시간이 끝나면 락이 해제되도록 타임아웃을 걸어야한다.
     
    Mysql에서는 getLock 명령어로 Named Lock을 획득하고 releaseLock 명령어로 해제할 수 있다.

    위 이미지에서 Session-1에서 ‘1’이라는 이름에 Lock을 걸면 Session-2는 Session-1이 ‘1’이라는 이름의 Lock을 반납해야 이를 획득할 수 있다. (두번째 인자인 1000은 타임아웃을 걸기위한 시간에 해당한다.)
     

    LockRepository

    Named Lock을 위한 리포지토리를 구현한다.

    package com.example.stock.repository;
    
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.data.jpa.repository.Query;
    
    import com.example.stock.domain.Stock;
    
    public interface LockRepository extends JpaRepository<Stock, Long> {
    	@Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
    	void getLock(String key);
    
    	@Query(value = "select release_lock(:key)", nativeQuery = true)
    	void releaseLock(String key);
    }
    

     
    편의를 위해 JPA의 Native Query 기능을 활용하여 Named Lock을 얻는 getLock 메서드와 반납하는 releaseLock을 구현한다.
    또한 비즈니스 로직과 동일한 DataSource로 Named Lock을 구현한다. (참고로 실무에서는 Named Lock이 비즈니스 로직과 동일한 DataSource를 사용하게 되면 커넥션 풀(Connection Pool)이 부족해져 다른 서비스에 영향을 줄 수 있다. 따라서 DataSource를 분리하여 사용하는 것을 추천한다.)
     

    NamedLockStockFacade

    실제 비즈니스 로직 전후로 Named Lock을 획득하고 로직 수행 후 해제를 수행하는 클래스이다.
    비즈니스 로직 수행 전후로 getLock과 releaseLock을 호출하여 Named Lock을 획득/반납한다. 이때 Lock 이름은 row의 id로 지정하였다.

    package com.example.stock.facade;
    
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    import com.example.stock.repository.LockRepository;
    import com.example.stock.service.StockService;
    
    @Component
    public class NamedLockStockFacade {
    	private final LockRepository lockRepository;
    	private final StockService stockService;
    
    	public NamedLockStockFacade(LockRepository lockRepository, StockService stockService) {
    		this.lockRepository = lockRepository;
    		this.stockService = stockService;
    	}
    
    	@Transactional
    	public void decrease(Long id, Long quantity){
    		try {
    			// id 값을 기준으로 Named Lock 획득
    			lockRepository.getLock(id.toString());
    
    			// 재고 감소 로직
    			stockService.decrease(id, quantity);
    		} finally {
    			// 모든 비즈니스 로직을 수행하고 나면 lock 해제
    			lockRepository.releaseLock(id.toString());
    		}
    	}
    }
    

     

    StockService

    전체 로직은 Pessimistic Lock, Optimistic Lock과 동일하다. 차이점으로는 재고 감소를 위한 decrease() 메서드의 Propagation 수준을 REQUIRES_NEW로 지정한다.
    만약 부모와 같은 트랜잭션에서 Named Lock 조회 로직을 사용한다면 비즈니스 로직 수행중에 예외가 발생하여 실패하는 경우, 비즈니스 로직을 롤백하면서 Lock 해제까지 롤백되어버린다. Lock은 여전히 이 스레드가 획득한 상태로 남아있게되고, 다른 스레드는 Lock을 획득하기 위해 무한정 대기에 빠지게 있다. 따라서 비즈니스 로직은 Named Lock을 획득하는 부모와 별도의 트랜잭션에서 수행한다. 이를 위해 Propagation 수준을 REQUIRES_NEW로 지정한다.

    package com.example.stock.service;
    
    import java.util.Optional;
    
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Propagation;
    import org.springframework.transaction.annotation.Transactional;
    
    import com.example.stock.domain.Stock;
    import com.example.stock.repository.StockRepository;
    
    @Service
    public class StockService {
    	// StockService : 재고(Stock)에 대한 CRUD 기능을 통한 비즈니스 로직 제공
    	private final StockRepository stockRepository;
    
    	public StockService(StockRepository stockRepository) {
    		this.stockRepository = stockRepository;
    	}
    
    	// 재고 감소 로직 구현
    	// Named Lock을 사용하는 경우 부모와 별도의 트랜잭션에서 수행되어야하므로 Propagation 수준을 REQUIRES_NEW로 지정한다.
    	@Transactional(propagation = Propagation.REQUIRES_NEW)
    	public synchronized void decrease(Long id, Long quantity) {
    		// Stock 조회
    		Optional<Stock> stock = stockRepository.findById(id);
    
    		// 재고 감소
    		stock.orElseThrow(() -> new RuntimeException("해당하는 재고가 없습니다."))
    			.decrease(quantity);
    
    		// 갱신된 값을 저장
    		stockRepository.save(stock.get());
    	}
    }

     

    NamedLockStockFacadeTest

    NamedLockStockFacade을 테스트하여 Named Lock이 잘 동작하여 동시성 이슈를 해결하고 재고 수량을 성공적으로 차감하는지 테스트 한다.

    package com.example.stock.facade;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import com.example.stock.domain.Stock;
    import com.example.stock.repository.StockRepository;
    
    @SpringBootTest
    class NamedLockStockFacadeTest {
    	@Autowired
    	private NamedLockStockFacade namedLockStockFacade;
    
    	@Autowired
    	private StockRepository stockRepository;
    
    	// 각 테스트가 실행되기 전에 데이터베이스에 테스트 데이터 생성
    	@BeforeEach
    	public void before() {
    		stockRepository.saveAndFlush(new Stock(1L, 100L));
    	}
    
    	// 각 테스트를 실행한 후에 데이터베이스에 테스트 데이터 삭제
    	@AfterEach
    	public void after() {
    		stockRepository.deleteAll();
    	}
    
    	@Test
    	public void 동시에_100개의_요청() throws InterruptedException {
    		// when
    		// 100개의 쓰레드 사용(멀티스레드)
    		int threadCount = 100;
    
    		// ExecutorService : 비동기로 실행하는 작업을 간단하게 실행할 수 있도록 자바에서 제공하는 API
    		ExecutorService executorService = Executors.newFixedThreadPool(32);
    
    		// CountDownLatch : 작업을 진행중인 다른 스레드가 작업을 완료할때까지 대기할 수 있도록 도와주는 클래스
    		CountDownLatch latch = new CountDownLatch(threadCount);
    		// 100개의 작업 요청
    		for(int i = 0; i < threadCount; i++) {
    			executorService.submit(() -> {
    				try {
    					namedLockStockFacade.decrease(1L, 1L);
    				} finally {
    					latch.countDown();
    				}
    			});
    		}
    		latch.await();
    
    		// then
    		Stock stock = stockRepository.findById(1L).orElseThrow();
    		assertEquals(0, stock.getQuantity());
    
    	}
    }

     
    테스트가 성공적으로 수행되는 것을 볼 수 있다.
     

     
    작업 스레드 중 하나인 pool-2-thread-13의 로그를 추적해보면 다음과 같이 수행되는 것을 볼 수 있다.
     
    1. Named Lock 획득
     

     
    2. 비즈니스 로직 수행(재고 차감) → Named Lock 해제
     

     
    Named Lock은 주로 분산락(Distributed lock)을 구현할때 사용한다. 또한 타임아웃을 구현하기 까다로운 Pessimistic Lock과 달리 Named Lock은 손쉽게 이를 구현할 수 있다. 이외에도 데이터 적재시 정합성이 중요한 경우에도 사용할 수 있다.
    하지만 트랜잭션 종료시 Lock을 꼭 해제해줘야하므로 주의해서 사용한다. 따라서 실제로는 구현방법이 복잡해질 수 있다.
     
    이로써 데이터베이스에서 제공하는 3가지 방식으로 동시성 이슈를 확인해보았다. 다음은 레디스(Redis)로 분산 락(Distributed Lock)을 구현하여 동시성 이슈를 해결해본다.

    참고한 곳

    - https://kimjongmo.github.io/intellij/multi-thread-debugging

    반응형
    Comments