Гость
Форумы / WinForms, .Net Framework [игнор отключен] [закрыт для гостей] / Как избежать race condition? / 21 сообщений из 21, страница 1 из 1
10.01.2016, 16:13
    #39144090
Newbie001
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Есть очередь, в которую разные потоки параллельно пишут некие запросы на обработку. Есть отдельный таск, который разгребает эту очередь и возвращает результаты обработки:
Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
public class BufferedConnection
    {  
        ...

        private BlockingCollection<NetworkRequest> _queue;
        private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
       
        ...

        // отправка запроса - ставит запрос в очередь и блокирует вызывающий поток до получения ответа либо тайм-аута:
        public byte[] Send(byte[] requestBytes)
        {
            var request = new NetworkRequest(requestBytes);
             _queue.Add(request);            
            var waitResult = request.ProcessedWaitEvent.Wait(TimeSpan.FromMilliseconds(ReceiveTimeoutMilliseconds));
            if (!waitResult)
            {
                // не дождались - говорим, что отправлять запрос не нужно
                request.ShouldBeProcessed = false;
            }
            return waitResult ? request.ResponseBytes : null;
        }               

        //  обработка очереди:
        private void ProcessorTaskMethod()
        {
            var token = _cancellationTokenSource.Token;
            while (!_cancellationTokenSource.IsCancellationRequested)
            {
                try
                {
                    var request = _queue.Take(token);
                    if (!request.ShouldBeProcessed) continue;   
                    request.ResponseBytes = SomeProcessingLogic(...);
                    request.ProcessedWaitEvent.Set();
                }
                catch (OperationCanceledException oce)
                {
                    break;
                }
            }
        }        
        
        ...

    }



В случае, если в методе Send произошел тайм-аут:
Код: c#
1.
var waitResult = request.ProcessedWaitEvent.Wait(TimeSpan.FromMilliseconds(ReceiveTimeoutMilliseconds));


нужно как-то сказать потоку-обработчику, что этот запрос уже не нужно обрабатывать. Я правильно понимаю, что текущий вариант подвержен race condition? Т.е. когда в отправляющем потоке мы говорим
Код: c#
1.
request.ShouldBeProcessed = false;


поток-обработчик мог уже вытащить это сообщение из очереди и пройти дальше, за проверку условия
Код: c#
1.
 if (!request.ShouldBeProcessed) continue;  


Никак не соображу, как это можно корректно переписать?
...
Рейтинг: 0 / 0
11.01.2016, 09:03
    #39144231
zz118
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
...
Рейтинг: 0 / 0
11.01.2016, 12:50
    #39144485
ProBiotek
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Newbie001,

Из кода следует, что взяв элемент из очереди, если его не нужно обрабатывать - он просто выкидывается (continue). Следовательно, почему бы просто не класть его в очередь, если его не нужно обрабатывать ?

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
           var request = new NetworkRequest(requestBytes);
            var waitResult = request.ProcessedWaitEvent.Wait(TimeSpan.FromMilliseconds(ReceiveTimeoutMilliseconds));

            if (waitResult)
            {
                _queue.Add(request);            // перенесли сюда
                request.ShouldBeProcessed = true;  // формально, данное свойство больше и не нужно. Если только оно не используется где-то еще
            }
...



Иначе гонка будет конечно. Ведь вы кладете элемент в очередь, а потом у этого элемента меняете свойство. Но между тем моментом когда элемент положен в очередь и у него изменилось свойство - его мог взять другой поток, и обработать.
...
Рейтинг: 0 / 0
11.01.2016, 14:15
    #39144563
Newbie001
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
ProBiotek,

Так я не могу не класть его в очередь - иначе как обработчик узнает, что ему нужно что-то обрабатывать?
Он ведь ждет на _queue.Take(token) и как только в очереди появляется элемент - отлипает.

Смысл в том, что метод Send должен отрабатывать за время не более ReceiveTimeoutMilliseconds и если таймаут наступил - обрабатывать этот конкретный запрос не нужно.
...
Рейтинг: 0 / 0
11.01.2016, 14:40
    #39144595
Dima T
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
ИМХУ надо добавить состояние "в обработке" проверять его перед отменой, если еще ждет, то "отмена", если обрабатывается (возможно уже обработался), то дожидаться окончания обработки и убивать результат обработки.
...
Рейтинг: 0 / 0
11.01.2016, 15:38
    #39144676
Newbie001
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Dima TИМХУ надо добавить состояние "в обработке" проверять его перед отменой, если еще ждет, то "отмена", если обрабатывается (возможно уже обработался), то дожидаться окончания обработки и убивать результат обработки.
Да, только я туплю - как его добавить, какими объектами синхронизации воспользоваться?
...
Рейтинг: 0 / 0
11.01.2016, 15:40
    #39144683
ProBiotek
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Newbie001ProBiotek,

Так я не могу не класть его в очередь - иначе как обработчик узнает, что ему нужно что-то обрабатывать?
Он ведь ждет на _queue.Take(token) и как только в очереди появляется элемент - отлипает.

Смысл в том, что метод Send должен отрабатывать за время не более ReceiveTimeoutMilliseconds и если таймаут наступил - обрабатывать этот конкретный запрос не нужно.

Что значит " иначе как обработчик узнает, что ему нужно что-то обрабатывать " ?
Из кода следует, что если "!request.ShouldBeProcessed", то ничего и не обрабатывается. Я не вижу какой-то полезной работы в этом случае, а потому предложил не добавлять пустой элемент в очередь.

В итоге метод ProcessorTaskMethod будет обрабатывать только реальные запросы.

Вместо такого цикла выполнения в ProcessorTaskMethod:

Код: c#
1.
2.
3.
4.
5.
- item1. ShouldBeProcessed==false, Отбросили
- item2. ShouldBeProcessed==false, Отбросили
- item3. ShouldBeProcessed==true, Обработали
- item4. ShouldBeProcessed==false, Отбросили
- item5. ShouldBeProcessed==true, Обработали



будет такое:
Код: c#
1.
2.
- item3. ShouldBeProcessed==true, Обработали
- item5. ShouldBeProcessed==true, Обработали



Без холостых проходов.
...
Рейтинг: 0 / 0
11.01.2016, 15:47
    #39144694
ProBiotek
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
ой. Кажется я не так все понял.

У вас метод Send ставит задачу методу ProcessorTaskMethod, а потом возвращает результат его работы ?

Судя по всему эта строка указывает, что работа была завершена:
request.ProcessedWaitEvent.Set();

а в свойстве request.ResponseBytes заложено блокирование потока, до момента получения результата.

Интересная схема работы, но что-то мне подсказывает, что она весьма не эффективна. Получается, на то время, пока работает ProcessorTaskMethod, поток в методе Send блокируется намертво.

Огромная трата системных ресурсов !
Зачем тут тогда вообще нужны два потока и блокируемая очередь. Не проще ли было бы сделать все в одном потоке: в методе Send вызвать метод ProcessorTaskMethod.
...
Рейтинг: 0 / 0
11.01.2016, 15:51
    #39144698
Dima T
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Newbie001Dima TИМХУ надо добавить состояние "в обработке" проверять его перед отменой, если еще ждет, то "отмена", если обрабатывается (возможно уже обработался), то дожидаться окончания обработки и убивать результат обработки.
Да, только я туплю - как его добавить, какими объектами синхронизации воспользоваться?
Замени request.ShouldBeProcessed на request.ProcessState:
0 - ожидает обработки
1 - отменен
2 - в процессе обработки
3 - обработан

объяви ProcessState как volatile и меняй c помощью Intelocked

или просто в Lock оберни чтение и изменение
...
Рейтинг: 0 / 0
11.01.2016, 16:03
    #39144712
Newbie001
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
ProBiotekой. Кажется я не так все понял.

У вас метод Send ставит задачу методу ProcessorTaskMethod, а потом возвращает результат его работы ?

Судя по всему эта строка указывает, что работа была завершена:
request.ProcessedWaitEvent.Set();

Да, именно так.
ProBiotekа в свойстве request.ResponseBytes заложено блокирование потока, до момента получения результата.

Ну не в свойстве ResponseBytes , но рядом. Извиняюсь, что сразу не выложил имплементацию NetworkRequest:
Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
public class NetworkRequest 
    {
        public byte[] RequestBytes { get; private set; }
        public byte[] ResponseBytes { get;  set; }
        public bool ShouldBeProcessed { get; set; }
        private ManualResetEventSlim _resetEvent = new ManualResetEventSlim(false);

        public ManualResetEventSlim ProcessedWaitEvent
        {
            get { return _resetEvent; }
        }

        public NetworkRequest(byte[] requestDataBytes)
        {
            RequestBytes = requestDataBytes;
            ShouldBeProcessed = true;
        }
       
    }


ProBiotekИнтересная схема работы, но что-то мне подсказывает, что она весьма не эффективна.

Неэффективна - возможно, но требуется именно она... Метод Send вызывается параллельно разными потоками, и требуется, чтобы запросы обрабатывались в порядке их поступления. Можно конечно пометить метод атрибутом:
Код: c#
1.
2.
3.
4.
5.
    [MethodImpl(MethodImplOptions.Synchronized)]
    public byte[] Send(byte[] requestBytes)
    {
        ...
    }


и вроде как, будет такая же очередь. Но у меня уже проснулся академический интерес:)
...
Рейтинг: 0 / 0
11.01.2016, 16:06
    #39144716
Newbie001
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Dima T,
Спасибо за наводку! До volatile с interlocked я еще не дочитал, буду изучать:)
...
Рейтинг: 0 / 0
11.01.2016, 16:12
    #39144724
Dima T
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Как вариант: два флага
Код: c#
1.
2.
request.ShouldBeProcessed // должен быть обработан
request.Commit // Подтвержден


Основной поток:
ShouldBeProcessed = true; Commit = false;
ставит в очередь
по окончанию ставит или Commit = true или ShouldBeProcessed = false

Обработчик:
если ShouldBeProcessed == false игнорирует (как сейчас у тебя написано)
иначе обрабатывает, если в конце Commit = true то удаляет
в конце проверяет Commit == true (удаляет задание) или ShouldBeProcessed == false (отказывает результат)
иначе ставит в доп.очередь, периодически ее проверяет и дожидается Commit == true или ShouldBeProcessed == false

так никакой синхронизации не надо. Достаточно оба флага чтобы были volatile
...
Рейтинг: 0 / 0
11.01.2016, 16:33
    #39144747
ProBiotek
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Newbie001,

Тогда тоже дам наводку. Возможно раннюю, а может уже чувствуете ее необходимость.
Если уж занимаетесь академическим интересом и хотите эффективности. Рассмотрите возможность перевести схему работы в настоящую многопоточность, с использованием async/await. Это потребует существенной доработки. Но это стоит делать, только если у вас в настоящий момент к методу Send может быть одновременных обращений больше чем процессоров в системе - в этом случае будет создано "бутылочное горлышко", когда потоки простаивают, ожидая ответа.

В этом случае в методе Send вы сможете вместо
var waitResult = request.ProcessedWaitEvent.Wait(TimeSpan.FromMilliseconds(ReceiveTimeoutMilliseconds));

сделать что-то вроде
var waitResult = request.ProcessedWaitEvent. WaitAsync (TimeSpan.FromMilliseconds(ReceiveTimeoutMilliseconds));

WaitAsync это не стандартный метод.Есть одна сторонняя библиотечка, которая делает множество сущностей NetFramework async-friendly . Т.е. добавляет к ним поддержку асинхронности.

Вот эта библиотечка
https://github.com/StephenCleary/AsyncEx

а вот описание AsyncManualResetEvent - асинхронной версией ManualResetEventSlim
https://github.com/StephenCleary/AsyncEx/wiki/AsyncManualResetEvent

В этом случае можно существенно увеличить производительность (опять же - только если вас беспокоит бутылочное горлышко).
...
Рейтинг: 0 / 0
11.01.2016, 16:43
    #39144757
Newbie001
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
ProBiotek,
Перевод на TPL - это будет следующий шаг. Сейчас хотя бы так заставить работать:)
...
Рейтинг: 0 / 0
11.01.2016, 19:19
    #39144911
Dima T
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Смотрю исходный код. Как понимаю логика такая: надо выполнить какую-то задачу за конкретное время, ждем, если не уложились - считаем что задача не решена.

Как понял один поток обработки очереди заданий, которую наполняют несколько потоков.

Если так, то тут все идеологически неверно. Т.е. стоит одному заданию завесить обработку и все запрашивающие тупо начнут отпадывать по таймауту, независимо от того долгий у них был запрос или нет. Главный косяк: ожидание ответа, т.е. синхронность при асинхронных операциях.
Должно быть примерно так: если не наступило время Ч, то запускаем задание, если по итогу выполнения задания наступило время Ч, то считаем результат недействительным. Тут создателю задания надо только указать это время Ч и забыть о результатах. Дальше вопрос что за задание и как несколько заданий параллелить, если можно, то каждое задание в свой таск, а дальше пусть менеджер потоков разбирается как это исполнять.

PS Если хочешь что-то ассинхронно делать - забудь про синхронность. Везде. Строй логику, интерфейс и т.д. исходя из того что результат будет в будущем и непонятно когда, а может и не будет. Тогда будет реально асинхронно.

PPS Самое сложное в асинхронном коде - отмена какой-либо операции, ну и остановка приложения (что есть куча отмен). Тут гораздо проще предусмотреть отдельную операцию под названием "отмена", т.е. не отмена операции, а операция "отмена".
...
Рейтинг: 0 / 0
11.01.2016, 21:01
    #39144971
Newbie001
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Dima T,
Нормальную асинхронность к сожалению не получится сделать.
Обработка на самом деле выполняется не в этом классе - есть внешний сервис, общение с которым идет по собственному протоколу поверх tcp. Каждый поток при обращении к сервису открывал отдельное tcp-соединение, засылал запрос и получал ответ, все работало. Сейчас товарищи с другой стороны просят нас не делать кучу одновременных подключений, а всегда держать одно активное подключение и через него гонять запросы. Такая вот бредовая постановка...
...
Рейтинг: 0 / 0
12.01.2016, 06:45
    #39145076
Dima T
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Каждый раз открыл/закрыл тоже нехорошо. Кроме затрат на установку соединения наверно еще есть затраты на авторизацию. Все это бесполезная нагрузка. Плюс выделение памяти для буфера каждого соединения.

По-хорошему надо пул соединений. Ограничить минимальное и максимальное количество одновременных соединений. Взял свободное и через него обратился. Свободных нет - открыл новое или подождал пока освободится. Если много соединений долго простаивает - закрыть часть.
...
Рейтинг: 0 / 0
12.01.2016, 10:29
    #39145186
zz118
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Dima TКаждый раз открыл/закрыл тоже нехорошо. Кроме затрат на установку соединения наверно еще есть затраты на авторизацию. Все это бесполезная нагрузка. Плюс выделение памяти для буфера каждого соединения.

рискну быть навязчивым....

Код: c#
1.
2.
3.
4.
5.
var requestsSubject = new Subject<Unit>();
var x = requestsSubject.ObserveOn(Scheduler.DispatcherScheduler)
                                .Select(x => tcpService.Do())
                                .SubscribeOn(Scheduler.NewThread)
                                .Subscribe(x => Handle());
...
Рейтинг: 0 / 0
12.01.2016, 12:14
    #39145331
ProBiotek
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Dima TPPS Самое сложное в асинхронном коде - отмена какой-либо операции, ну и остановка приложения (что есть куча отмен). Тут гораздо проще предусмотреть отдельную операцию под названием "отмена", т.е. не отмена операции, а операция "отмена".

Это делается через CancellationToken. Весьма удобно.
Нужно в каждый свой асинхронный метод передавать и эту структуру. И везде не забывать ее периодически отслеживать - не дана ли команда останова. Сейчас уже все ОРМы, вроде, прекрасно поддерживают асинхронность, и отмену. Так что программу обычно можно остановить очень быстро.
...
Рейтинг: 0 / 0
12.01.2016, 12:20
    #39145342
ProBiotek
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Newbie001,

zz118 +1.
zz118 дело говорит. RX (Reactive Extensions) как раз и предназначен для организации очередей для их последовательной обработки. Причем гибкость настройки у него большая. Там можно и пакетность настроить и задержки всякие, ограничить параллельность и т.д.. Обратите внимание, если захотите дорабатывать свою программу. RX может выступать ядром менеджера задач.
...
Рейтинг: 0 / 0
12.01.2016, 15:46
    #39145590
mikron
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Как избежать race condition?
Newbie001Dima TИМХУ надо добавить состояние "в обработке" проверять его перед отменой, если еще ждет, то "отмена", если обрабатывается (возможно уже обработался), то дожидаться окончания обработки и убивать результат обработки.
Да, только я туплю - как его добавить, какими объектами синхронизации воспользоваться?
Код: sql
1.
2.
3.
4.
5.
6.
// In Send
if(InterlockedExchange(request.Status, SKIP, AWAITING) != AWAITING)
  xxx ; // already in processing.  
// In worker thread
if(InterlockedExchange(request.Status, PROCESSING, AWAITING)!=AWAITING)
  xxx; // skip processing
...
Рейтинг: 0 / 0
Форумы / WinForms, .Net Framework [игнор отключен] [закрыт для гостей] / Как избежать race condition? / 21 сообщений из 21, страница 1 из 1
Целевая тема:
Создать новую тему:
Автор:
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]