자바의 비동기 병럴 처리 연산을 통해 이미지 업로드하기
by 키위먹고싶다관리자가 썸네일을 제작하기 위한 과정에 대해 도식화 한 그림이다.
하나의 트랜잭션 메서드에서 원본과 썸네일 처리 과정을 모두 수행했더니, 4초 이상 걸리는 문제가 발생했고, 원본 저장 작업과 썸네일 처리와 저장 작업을 분리하여 비동기로 작업하기로 했다.
Future와 CompleteFuture
Future는 비동기 연산 결과를 표현한다. 연산 작업이 완료되었는지 확인하고, 완료될 때까지 대기, 완료된 결과를 조회하는 기능을 제공한다. 연산 중에 생성되는 데이터에 대해서는 Future 인터페이스로 조회할 수 없다.
이러한 점을 보안하기 위해 CompleteFuture를 사용하는데 Future 인터페이스의 기능인 연산의 완료 여부를 판단하고 결과 얻기 + 비동기 연산 관계를 정의하거나 연산 결과를 수집, 조합하는 등의 작업이 추가적으로 적용된다.
CompleteFuture 장점
스레드의 선언 없이 비동기 연산 작업을 구현할 수 있고 병렬 프로그래밍이 가능하다.
자바 8에서 추가된 기능이므로 람다 표현식, 함수형 프로그래밍을 사용할 수 있어 코드의 양을 줄일 수 있다.
파이프라인(명령어를 서로 연결해 주는 역할) 형태로 작업들을 연결할 수 있어서 비동기 작업의 순서를 정의하고 관리할 수 있다.
주요 메서드
메서드 | 설명 |
runAsync | Runnable 객체를 구현해서 비동기 연산 작업을 하기 위한 새로운 CompletableFuture객체를 리턴한다. |
supplyAsync | Supplier 함수형 인터페이스의 구현체를 이용해서 비동기 연산 작업을 위한 새로운 CompletableFuture 객체를 리턴한다. |
thenAccept | 현재 단계가 성공적으로 종료되면, 파라미터로 전달된 Consumer함수형 인터페이스의 구현체를 실행하기 위한 CompletionStage 객체를 리턴한다. |
thenRun | 현재 단계가 성공적으로 종료되면, 메서드의 파라미터로 전달된 Runnable 구현체를 실행하기 위한 CompletionState객체를 리턴한다. |
complete | 현재 테스크를 종료하며 만일 태스크가 동작 중이라면, get 메서드와 동일하게 종료될 때까지 대기하고 최종 테스크 결과를 리턴한다. |
아래 코드는 원본 저장과 별개로 리사이즈 이미지 저장 과정에서 필요한 작업을 CompletableFuture를 사용한 결과이다.
첫 번째, 이미지를 줄이는 작업과 두 번째, S3저장소에 저장, 세 번째 DB저장소에 저장등의 작업을 체이닝을 통해 작업한 결과를 어떤 스레드를 사용하는지 확인하기 위한 결과이다.
가정 1.
스레드 풀을 만들고, supplyAsync()를 통해 결과 값을 넘겨받아 다음 작업들을 계속해서 진행하게 한다.
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println(Thread.currentThread().getName() + " 메서드 진입");
CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync : " + Thread.currentThread().getName());
return multipartFile.getContentType().equals(Folder.IMAGE.getDirectoryName() + SLASH + ImageFormat.LETTERS.getExtension()) ? ImageUtils.pngToJpeg(multipartFile) : multipartFile;
}, executorService)
.thenApplyAsync((file) -> {
System.out.println("imageUtil 로 줄이기 : " + Thread.currentThread().getName());
return ImageUtils.imageResize(file, ImageFormat.STANDARD.getWeight(), ImageFormat.STANDARD.getHeight());
})
.thenApplyAsync((resizingImage) -> {
System.out.println("저장소에 업로드 : " + Thread.currentThread().getName());
return s3Uploader.upload(getDirectoryName(), itemId, Folder.RESIZE.getDirectoryName(), resizingImage);
}, executorService)
.thenAccept((resizingItemImageUrl) -> {
System.out.println("DB에 업로드 : " + Thread.currentThread().getName());
createItemEntityAndInsert(resizingItemImageUrl);
});
System.out.println(Thread.currentThread().getName() + " 메서드 종료");
http-nio-8080-exec-2 메서드 진입
supplyAsync : pool-1-thread-1
http-nio-8080-exec-2 메서드 종료
imageUtil로 줄이기 : ForkJoinPool.commonPool-worker-3
저장소에 업로드 : pool-1-thread-2
DB에 업로드 : pool-1-thread-2
하나의 메서드에 있는 내용이지만 총 4개의 스레드를 사용하는데 각 스레드의 역할은 다음과 같다.
1. 기존 tomcat에서 사용하던 원래 메서드의 스레드 풀인 http-nio-8080-exec-2
2. 스레드 풀에서 생성된 pool-1-thread-1
3. ForkJoinPool의 worker-3
4. 스레드 풀에서 생성된 pool-1-thread-2
1번 작업은 기존의 메서드를 수행하던 스레드이다. 2, 3번은 생성된 스레드 풀의 작업 스레드이지만 3번만 ForkJoinPool의 워커 스레드인 이유는 생성한 스레드 풀을 넘겨주지 않았기 때문이고 이를 통해 스레드 풀을 생성하지 않는다면 thenApplyAsync()에서 내부적으로 기본 스레드 풀을 ForkJoinPool을 생성한다는 것을 알 수 있다.
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
가정 2.
만약 생성한 스레드 풀을 통해 모든 체이닝 작업을 수행하면 어떨까?
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println(Thread.currentThread().getName() + " 메서드 진입");
CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync : " + Thread.currentThread().getName());
return multipartFile.getContentType().equals(Folder.IMAGE.getDirectoryName() + SLASH + ImageFormat.LETTERS.getExtension()) ? ImageUtils.pngToJpeg(multipartFile) : multipartFile;
}, executorService)
.thenApplyAsync((file) -> {
System.out.println("imageUtil 로 줄이기 : " + Thread.currentThread().getName());
return ImageUtils.imageResize(file, ImageFormat.STANDARD.getWeight(), ImageFormat.STANDARD.getHeight());
}, executorService)
.thenApplyAsync((resizingImage) -> {
System.out.println("저장소에 업로드 : " + Thread.currentThread().getName());
return s3Uploader.upload(getDirectoryName(), itemId, Folder.RESIZE.getDirectoryName(), resizingImage);
}, executorService)
.thenAccept((resizingItemImageUrl) -> {
System.out.println("DB에 업로드 : " + Thread.currentThread().getName());
createItemEntityAndInsert(resizingItemImageUrl);
});
System.out.println(Thread.currentThread().getName() + " 메서드 종료");
http-nio-8080-exec-5 메서드 진입
supplyAsync : pool-1-thread-1
http-nio-8080-exec-5 메서드 종료
imageUtil로 줄이기 : pool-1-thread-2
저장소에 업로드 : pool-1-thread-3
DB에 업로드 : pool-1-thread-3
명확한 결과 확인을 위해 스레드 풀의 크기를 작업 개수만큼 늘리고 확인한 결과 예상한 것처럼 생성한 스레드 풀의 스레드들이 작업한다.
가정 3.
supplyAsync()를 스레드 풀로 생성하지 않고 뒤의 3 작업 중 첫 번째 작업만 생성한 스레드 풀을 사용할 때는 어떨까?
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println(Thread.currentThread().getName() + " 메서드 진입");
CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync : " + Thread.currentThread().getName());
return multipartFile.getContentType().equals(Folder.IMAGE.getDirectoryName() + SLASH + ImageFormat.LETTERS.getExtension()) ? ImageUtils.pngToJpeg(multipartFile) : multipartFile;
})
.thenApplyAsync((file) -> {
System.out.println("imageUtil 로 줄이기 : " + Thread.currentThread().getName());
return ImageUtils.imageResize(file, ImageFormat.STANDARD.getWeight(), ImageFormat.STANDARD.getHeight());
}, executorService)
.thenApply((resizingImage) -> {
System.out.println("저장소에 업로드 : " + Thread.currentThread().getName());
return s3Uploader.upload(getDirectoryName(), itemId, Folder.RESIZE.getDirectoryName(), resizingImage);
})
.thenAccept((resizingItemImageUrl) -> {
System.out.println("DB에 업로드 : " + Thread.currentThread().getName());
createItemEntityAndInsert(resizingItemImageUrl);
});
System.out.println(Thread.currentThread().getName() + " 메서드 종료");
http-nio-8080-exec-1 메서드 진입
supplyAsync : ForkJoinPool.commonPool-worker-3
http-nio-8080-exec-1 메서드 종료
imageUtil로 줄이기 : pool-1-thread-1
저장소에 업로드 : pool-1-thread-1
DB에 업로드 : pool-1-thread-1
당연히 아무런 스레드 풀을 넘기지 않는 supplyAsync는 ForkJoinPool을 사용하지만 한 번의 thenApplyAsync()에서 사용된 스레드 풀은 쭉 이어서 뒷 작업까지 수행한다.
최종 작업
@Service
@RequiredArgsConstructor
public class ImageUploadService {
private final static String SLASH = "/";
private final S3Uploader s3Uploader;
private final ItemRepository itemRepository;
private final ImageRepository imageRepository;
@Transactional
public ImageUploadResponse saveItemImage(final Long itemId, final MultipartFile multipartFile) {
if (!itemRepository.existsById(itemId)) {
throw new NonExistentException(ErrorCode.BAD_VALUE, new ExceptionValue<>(itemId));
}
resizingUpload(itemId, multipartFile);
return new ImageUploadResponse(originalUpload(itemId, multipartFile));
}
private void resizingUpload(final Long itemId, final MultipartFile multipartFile) {
CompletableFuture
.supplyAsync(() -> multipartFile.getContentType().equals(Folder.IMAGE.getDirectoryName() + SLASH + ImageFormat.LETTERS.getExtension()) ? ImageUtils.pngToJpeg(multipartFile) : multipartFile)
.thenApply((file) -> ImageUtils.imageResize(file, ImageFormat.STANDARD.getWeight(), ImageFormat.STANDARD.getHeight()))
.thenApply((resizingImage) -> s3Uploader.upload(getDirectoryName(), itemId, Folder.RESIZE.getDirectoryName(), resizingImage))
.thenAccept((resizingItemImageUrl) -> createItemEntityAndInsert(resizingItemImageUrl));
}
private String originalUpload(final Long itemId, final MultipartFile multipartFile) {
String originalItemImageUrl = s3Uploader.upload(getDirectoryName(), itemId, Folder.ORIGIN.getDirectoryName(), multipartFile);
createItemEntityAndInsert(originalItemImageUrl);
return originalItemImageUrl;
}
private String getDirectoryName() {
return Folder.IMAGE.getDirectoryName() + SLASH + Folder.GOODS.getDirectoryName();
}
private void createItemEntityAndInsert(final String url) {
final Image image = Image.builder().imagePath(url).build();
imageRepository.save(image);
}
}
물론 여러 supplyAsync()나 runAsync()를 사용할 경우 스레드 풀을 사용해서 병렬 처리하는 게 좋지만 DB저장소에 URL
저장 시 데이터베이스의 롤백 가능성 작업 순서에 영향이 가기 때문에 하나의 CompletableFuture 인스턴스만 사용해서 스레드 풀은 사용하지 않았다. 만약 병렬처리를 위한 다른 작업이 필요하다면 스레드 풀을 통해 여러 CompletableFuture 인스턴스를 만들어 병렬 작업하고 Future에서 사용할 수 없는 중간의 연산 결과를 얻을 수 있는 등의 작업을 수행하는 게 효율적이다.
결과적으론 이미지 처리 시간이 4초에서 1-2초로 줄어들었다.
'project' 카테고리의 다른 글
heap dump 살펴보기 (1) | 2023.01.29 |
---|---|
GC 모니터링 하기 (1) | 2023.01.29 |
이미지 최적화에 대하여 (0) | 2022.09.23 |
DB 동시성 문제 해결법 (0) | 2022.07.02 |
블로그의 정보
kiwi
키위먹고싶다