-
Kafka Consumer를 조금 더 효율적으로 처리하는 방법프로그래밍 2024. 11. 13. 16:42
Kafka를 개발하다보면 consumer를 어떻게 효율적으로 처리할지 고민하게 된다.
필자는 consumer를 처리할 때 아래의 조건에 따라 개발해 보았다.
1. producer에서는 사용자ID에 따라 publish하고 있다.
2. consumer는 최대한 많은 처리를 해야 하고 consumer 처리에 따라 집계가 수행되어야 한다.
3. 집계는 10분, 1시간, 1일, 1주, 1월이다.
먼저 producer에서 발행하는 값는 KEY가 주어진다. 즉, KEY에 따라 할당된 파티션에 순서적으로 데이터가 쌓이는 것을 보장하고 있다.
이에 따라 consumer를 생성할 때 다중 consumer를 생성할 수 있는 조건이 성립되었다.
그림은 아래와 같다.
아래의 코드를 보자.
public class ParallelConsumerWorker : BackgroundService { private ILogger _logger; private IServiceScopeFactory _serviceScopeFactory; /// <summary> /// ctor /// </summary> /// <param name="logger"></param> /// <param name="serviceServiceScopeFactory"></param> public ParallelConsumerWorker(ILogger<ParallelConsumerWorker> logger, IServiceScopeFactory serviceServiceScopeFactory) { this._logger = logger; this._serviceScopeFactory = serviceServiceScopeFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (_logger.IsEnabled(LogLevel.Information)) { _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); } try { var envTaskNum = Environment.GetEnvironmentVariable("ENV_TASK_NUM").xValue<int>(); var taskNum = envTaskNum <= 0 ? Environment.ProcessorCount / 2 : envTaskNum; var tasks = new Task[taskNum]; for (var i = 0; i < tasks.Length; i++) { var ii = i; tasks[i] = Task.Run(async () => { var consumerExecutor = new ParallelConsumerExecutor(ii, _logger, _serviceScopeFactory, stoppingToken); try { await consumerExecutor.Execute(); } catch (OperationCanceledException) { _logger.LogInformation("Task {taskIndex} was canceled.", ii); } catch (Exception ex) { _logger.LogError(ex, "Error in task {taskIndex}", ii); } }, stoppingToken); } await Task.WhenAll(tasks); // 비동기적으로 Task 실행 } catch (Exception e) { _logger.LogError(e, "{name} Error: {message}", nameof(ParallelConsumerWorker), e.Message); } } }
위 코드는 backgroundservice를 상속받아 구현한 것으로
Task를 설정에 따라 생성하여 consumer executor를 생성하는 코드이다.
즉, Consumer를 설정에 따라 Task로 생성해서 처리하고자 하는 것으로 해당 코드가 동작할 경우 설정만큼 Task를 생성하고 계속적으로 대기하게 된다.
그럼 구현 코드를 보자.
public class ParallelConsumerExecutor { private readonly int _num; private readonly ILogger _logger; private readonly IServiceScopeFactory _serviceScopeFactory; private readonly CancellationToken _stoppingToken; /// <summary> /// ctor /// </summary> /// <param name="num"></param> /// <param name="logger"></param> /// <param name="serviceScopeFactory"></param> /// <param name="stoppingToken"></param> public ParallelConsumerExecutor(int num, ILogger logger, IServiceScopeFactory serviceScopeFactory, CancellationToken stoppingToken) { _num = num; _logger = logger; _serviceScopeFactory = serviceScopeFactory; _stoppingToken = stoppingToken; } public async Task Execute() { if (_logger.IsEnabled(LogLevel.Information)) { _logger.LogInformation("{Name} {N} running at: {time}", nameof(ParallelConsumerExecutor), _num, DateTimeOffset.Now); } await using var scope = _serviceScopeFactory.CreateAsyncScope(); var staticsStatus = scope.ServiceProvider.GetRequiredService<IStaticsStatusService>(); var consumerConfig = scope.ServiceProvider.GetRequiredService<ConsumerConfig>(); consumerConfig.ClientId = $"{Environment.MachineName}-{_num}"; using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(ENUM_KAFKA_TOPIC_TYPE.Healthcare.Name); var client = scope.ServiceProvider.GetRequiredService<IMongoClient>(); var database = client.GetDatabase(ENUM_MONGODB_DATABASE_TYPE.HealthCare.Name); try { while (!_stoppingToken.IsCancellationRequested) { var consumeResult = consumer.Consume(_stoppingToken); try { await using var dbContext = MongoDbContext.Create(database); var payload = consumeResult.Message.Value.ToDeserialize<Payload>(); this._logger.LogDebug("{Name} {N} Received key:{Key}", nameof(ParallelConsumerExecutor), _num, consumeResult.Message.Key); } catch (HicardiConverterException e) { _logger.LogError(e, "{name} Error: {message}", nameof(ParallelConsumerExecutor), e.Message); } catch (Exception e) { _logger.LogError(e, "{name} Error: {message}", nameof(ParallelConsumerExecutor), e.Message); } finally { consumer.Commit(consumeResult); } } } catch (ConsumeException e) { _logger.LogError(e, "{name} Kafka Consume Error: {message}", nameof(ParallelConsumerExecutor), e.Message); } finally { consumer.Close(); } } }
위 코드에서 consume을 수행하고 전달된 string을 payload라는 객체로 변환하여 처리하고 있다.
이렇게 처리할 경우 논리는 아래와 같다.
카프카 파티션은 KEY에 의해 hash 값 할당으로 순차적으로 데이터가 저장된다.
즉, 5개의 파티션이 있다고 가정하면 key hash 값 계산에 따라 하나의 파티션에 할당 된다면 계속적으로 해당 파티션으로 할당되는 것을 보장한다.
컨슈머의 경우도 하나의 파티션에 연결될 경우 해당 파티션에서 계속적으로 데이터를 수신하는 것을 보장한다.
따라서, 컨슈머를 파티션만큼 만들 수 있다면 하나의 컨슈머는 하나의 파티션과 지속적으로 연결되고 순서를 보장 받을 수 있게 된다. 물론 해당 사항이 깨질 수 있다. 파티션이 증가하거나 컨슈머가 변동될 경우 재할당 작업이 진행되고 일시적으로 깨질 수 있지만 적어도 일시적이라고 말할 수 있겠다.
위와 같이 구성할 경우 높은 처리량을 보장할 수 있다.
또한 카프카 컨슈머를 이용해서 데이터를 처리할 때 아래 처럼 고려해 보자.
public class StatisticsStatusService: IStaticsStatusService { private ConcurrentQueue<StatisticsMinHourItem> _workItems = new(); private List<PayloadObject> _minuteStatus = new(); private List<PayloadObject> _hourStatus = new(); private readonly ILogger _logger; public StatisticsStatusService(ILogger<StatisticsStatusService> logger) { _logger = logger; } public void GetOrAddForMinutes(PayloadObject payloadObject) { var minExists = _minuteStatus.FirstOrDefault(m => m.AccountId == payloadObject.AccountId && m.ProfileId == payloadObject.ProfileId && m.MetadataId == payloadObject.MetadataId); if (minExists.xIsEmpty()) { _minuteStatus.Add(payloadObject); } else { if ((payloadObject.Timestamp - minExists.Timestamp) >= 600000) { _workItems.Enqueue(new StatisticsMinHourItem() { Type = ENUM_STATISTICS_TYPE.Minute, From = minExists, To = payloadObject }); _minuteStatus.Remove(minExists); _minuteStatus.Add(payloadObject); } } } public void EndAddForMinutes(PayloadObject payloadObject) { var exists = _minuteStatus.FirstOrDefault(m => m.AccountId == payloadObject.AccountId && m.ProfileId == payloadObject.ProfileId && m.MetadataId == payloadObject.MetadataId); if (exists.xIsEmpty()) { _logger.LogCritical("Not exists for this minute item: {Item}", payloadObject.xToJson()); return; } _workItems.Enqueue(new StatisticsMinHourItem() { Type = ENUM_STATISTICS_TYPE.Minute, From = exists, To = payloadObject }); _minuteStatus.Remove(exists); } public void GetOrAddForHour(PayloadObject payloadObject) { var hourExists = _hourStatus.FirstOrDefault(m => m.AccountId == payloadObject.AccountId && m.ProfileId == payloadObject.ProfileId && m.MetadataId == payloadObject.MetadataId); if (hourExists.xIsEmpty()) { _hourStatus.Add(payloadObject); } else { if ((payloadObject.Timestamp - hourExists.Timestamp) >= 3600000) { _workItems.Enqueue(new StatisticsMinHourItem() { Type = ENUM_STATISTICS_TYPE.Hour, From = hourExists, To = payloadObject }); _hourStatus.Remove(hourExists); _hourStatus.Add(payloadObject); } } } public void EndAddForHour(PayloadObject payloadObject) { var exists = _hourStatus.FirstOrDefault(m => m.AccountId == payloadObject.AccountId && m.ProfileId == payloadObject.ProfileId && m.MetadataId == payloadObject.MetadataId); if (exists.xIsEmpty()) { _logger.LogCritical("Not exists for this hour item: {Item}", payloadObject.xToJson()); return; } _workItems.Enqueue(new StatisticsMinHourItem() { Type = ENUM_STATISTICS_TYPE.Hour, From = exists, To = payloadObject }); _hourStatus.Remove(exists); } public StatisticsMinHourItem GetItem() { return _workItems.TryDequeue(out var item) ? item : null; } }
위 코드의 논리는 이전 데이터와 현재 데이터를 비교하여 일정 시간에 도달할 경우 대기열에 실행 데이터를 넣어 처리하고자 하는 것이다. 해당 인스턴스는 싱글톤으로 선언되어 있다.
그럼 해당 대기열을 사용하는 코드를 보자.
public class MinHourStatisticsWorker : BackgroundService { private readonly ILogger _logger; private readonly IServiceScopeFactory _scopeFactory; public MinHourStatisticsWorker(ILogger<MinHourStatisticsWorker> logger, IServiceScopeFactory serviceScopeFactory) { _logger = logger; _scopeFactory = serviceScopeFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await using var scope = _scopeFactory.CreateAsyncScope(); var service = scope.ServiceProvider.GetRequiredService<IStaticsStatusService>(); var client = scope.ServiceProvider.GetRequiredService<IMongoClient>(); var database = client.GetDatabase(ENUM_MONGODB_DATABASE_TYPE.HealthCare.Name); var status = new Dictionary<ENUM_STATISTICS_TYPE, Func<IMongoDatabase, MeasurementPayloadObject, MeasurementPayloadObject , CancellationToken, Task>>() { { ENUM_STATISTICS_TYPE.Minute, async (db, from, to, token) => await MinutesStatics(db, from, to, token) }, { ENUM_STATISTICS_TYPE.Hour, async (db, from, to, token) => await HourStatics(db, from, to, token) } }; while (!stoppingToken.IsCancellationRequested) { if (_logger.IsEnabled(LogLevel.Information)) { _logger.LogInformation("{Name} running at: {time}", nameof(MinHourStatisticsWorker), DateTimeOffset.Now); } var item = service.GetItem(); if(item.xIsNotEmpty()) { await status[item.Type](database, item.From, item.To, stoppingToken); } await Task.Delay(1000, stoppingToken); } } private async Task MinutesStatics(IMongoDatabase database, MeasurementPayloadObject fromObject, MeasurementPayloadObject toObject, CancellationToken stoppingToken) { //db 조회 및 통계 처리 } private async Task HourStatics(IMongoDatabase database, MeasurementPayloadObject fromObject, MeasurementPayloadObject toObject, CancellationToken stoppingToken) { //db 조회 및 통계 처리 } }
위 코드는 StatisticsService의 대기열에 쌓은 데이터를 순차적으로 확인하여 별도의 집계 처리를 하는 코드이다.
위와 같이 구성할 경우 컨슈머 처리에 또 다시 프로듀서를 갖고 발행할 필요없이 비동기적으로 최소 지연으로 처리할 수 있다.
순차처리를 보장하여 처리할 수 있다.
만약, 메모리 대기열을 사용하지 않는다면 Hangfire를 사용하여 동일하게 구현할 수 있다.
이러한 처리는 다소 오버 스펙일 수 있으나 만약, 컨슈머에서 다시 프로듀서로 처리해야 할 경우 이것을 생략하고 처리한다면 고민해 볼만한 코드라고 생각한다.
도움이 되길 바라며...
'프로그래밍' 카테고리의 다른 글
c# 에서 mongodb를 다루는 방법 (2) (0) 2024.11.20 c# 에서 mongodb를 다루는 방법 (0) 2024.11.19 시계열 데이터를 전송하는 방법 (0) 2024.10.25 사용자 Session을 처리하는 방법 (1) 2024.10.24 GUID대신 ULID를 사용해 보자. (2) 2024.10.23