-
멀티스레드 환경에서의 데이터 전송에 대한 안정성 향상 방법프로그래밍 2024. 10. 10. 13:08
멀티스레드 환경에서 우리는 데이터를 처리해야 할 경우가 있다.
서버 측의 경우 kafka, rabbitmq, redis 등 대기열로 처리할 수 있는 충분한 솔루션이 있으므로 이슈가 많지는 않지만, 클라이언트, 서버 모두 이슈가 있을 수 있다.
먼저, 멀티스레드는 이벤트(코루틴) 스레드라던지, 직접 생성한 Background Thread가 될 수 도 있도 있다.
모두 비동기적 작업이라는 면에서 동일하게 보아도 되겠다.
따라서 동기 작업이 아니므로 절차적으로 진행되지 않고, 여러 스레드가 동시적으로 같은 메모리를 점유할 수 있으므로 항상 경쟁상태가 발생할 수 있는 것을 염두에 두어야 한다.
클라이언트에서 한가지 예를 보자.
파일을 공유하기 위해 서버에 연속적으로 파일 전송을 하는 http 리퀘스트가 있다고 가정하자.
로컬 파일이 변경되면 변경된 파일 이벤트가 발생하고 이는 동시다발적으로 발생할 수 있다.
httpclient가 서버 쪽에 해당 이벤트 스레드를 통하여 연속적으로 파일을 전송할 경우 동시성 문제가 발생할 수 있고 할당되는 소켓 소모로 예기치 않은 오류가 발생할 수 있다.
따라서, 이를 처리하기 위해 개발자는 대기열을 생성해서 처리하는 것을 고려해야 한다.
이벤트 스레드에서 Queue에 할당하고 할당된 Queue를 처리하는 별도의 Background Thread를 이용하여 순차적으로 전송한다면 이러한 이슈를 피할 수 있다.
C#에서는 ConcurrentQueue나 BlockingCollection을 사용할 수 있겠다.
서버 쪽에서 같은 방법을 고려할 수 있겠으나 서버가 상태를 저장하는 역할을 하기에 인프라 활용을 최우선으로 고려하는 게 좋다. 서버 쪽에 같은 방식을 적용할 경우 발생할 수 있는 문제는 서버가 다운될 경우 서버 측에 만들어진 Queue의 내역이 전부 초기화되므로 데이터 누락으로 연결되기 때문이다.
누군가 클라이언트의 논리를 서버측 구현으로 넘기려 한다면 이러한 문제를 파악하는 게 도움이 되겠다.
아래 코드는 예시 코드로 해당 논리를 구현한 코드이다.
public class JobHandler<T> where T : class { private static Lazy<JobHandler<T>> _instance = new Lazy<JobHandler<T>>(() => new JobHandler<T>()); public static JobHandler<T> Instance => _instance.Value; private readonly ConcurrentQueue<T> _queue; private JobHandler() { _queue = new ConcurrentQueue<T>(); } public void Enqueue(T item) => _queue.Enqueue(item); public T Dequeue() => _queue.TryDequeue(out var result) ? result : default; } public class JobProcessorAsync<T> : IDisposable where T : class { private JobHandler<T> _handler; private Func<T, Task> _func; private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private Task _task; public JobProcessorAsync() { _task = Task.Run(Start); } public void SetProcess(JobHandler<T> jobHandler, Func<T, Task> callback) { _handler = jobHandler; _func = callback; } private async Task Start() { while (!_cts.Token.IsCancellationRequested) { try { if (_handler.xIsNotEmpty()) { if (_func.xIsNotEmpty()) { var item = _handler.Dequeue(); if (item.xIsNotNull()) { await _func(item); } } } // Instead of Thread.Sleep, we use Task.Delay to make this asynchronous await Task.Delay(10); } catch (Exception e) { Console.WriteLine(e); break; } } } public void Stop() { _cts.Cancel(); _task.Wait(); // Wait for the task to finish gracefully } public void Dispose() { _cts?.Cancel(); _task?.Wait(); } }
위와 같이 구현할 수 있겠다.
위 코드상에서 JobProcessorAsync는 단독으로 Thread 처리를 하게 하였지만 asp.net core 8 기준으로 BackgroundService를 이용하여 처리할 수도 있겠다.
도움이 되길 바라며.
'프로그래밍' 카테고리의 다른 글
예약, 예매 시스템에 대한 고찰-2 (6) 2024.10.16 Entity Framework는 Unit of Work가 필요한가? (0) 2024.10.14 테이블 캐시를 관리하는 방법 (0) 2024.10.04 EF Core Code First Entity Model을 작업하는 방법 (0) 2024.09.24 python + .net core integration, use pythonnet (0) 2024.07.26