powered by simpleCommunicator - 2.0.38     © 2025 Programmizd 02
Форумы / WinForms, .Net Framework [игнор отключен] [закрыт для гостей] / Отловить ошибки при многопоточном выполнении
13 сообщений из 13, страница 1 из 1
Отловить ошибки при многопоточном выполнении
    #40063935
vb_sub
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Всем привет, у меня есть обработчик, для которого нужно выполнить следующую функциональность.
1) Получить список ID-документов через http-запрос
2) Для каждого ID документа сделать http-запрос и распарсить его.
3) Получить распарсенные данные из всех документов.

Я реализую многопоточную обработку посредством использования System.Threading.Channels.
Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
       // Главная точка входа
	public async Task ProcessDocsAsync()
        {
	    //канал для получения и отправки ID-документов
            var docIDChannel = Channel.CreateUnbounded<Doc>(new UnboundedChannelOptions() { SingleReader = false,SingleWriter = true });
			
	    //канал, куда складываем значения из распарсенных JSON-документов
            var parseChannel = Channel.CreateUnbounded<(Doc docID, string uid)>();
			
            var taskList = new List<Task>();

            //читаем номера документов посредством http-запроса 
            taskList
            .Add(Task.Run(async () => await GetDocsNumbersAsync(docIDChannel.Writer, _apiProcessorSettings.GetDocsRoute()))
            .ContinueWith(t =>
            {
                docIDChannel.Writer.Complete();
                Console.ForegroundColor = ConsoleColor.Green;
                Console.WriteLine("DocReading Complete");
                Console.ForegroundColor = ConsoleColor.White;
            }));

           //создаем 5 задач, которые будут распарсивать документы
            var processList = Enumerable
                .Range(1, 5)
                .Select(_ => Task.Run(async () => await ParseDocByIsAsync(docIDChannel.Reader, parseChannel.Writer)));

            //информируем канал о том, что больше добавления элементов не произойдет
            var procTask = Task.WhenAll(processList).ContinueWith(t =>
            {
                parseChannel.Writer.Complete();
                Console.ForegroundColor = ConsoleColor.Green;
                Console.WriteLine("Parsing Complete");
                Console.ForegroundColor = ConsoleColor.White;
            });

            taskList.Add(procTask);

            // читаем распарсенные документы
            taskList
            .Add(Task.Run(async () => await ConsumeOutputProductsInConsoleAsync(parseChannel.Reader)));
			
            await Task.WhenAll(taskList);
            Console.WriteLine("Done");
        }
		
		
	//Получение документов
        private async Task GetDocsNumbersAsync(ChannelWriter<Doc> channelWriter, string route)
        {
            var сlient = _httpClientFactory.CreateClient("сlient");

            using var docStream = await сlient.GetStreamAsync(route);

            using JsonDocument jDocument = await JsonDocument.ParseAsync(docStream);
            var root = jDocument.RootElement.GetProperty("docs").EnumerateArray();

            foreach (var item in root)
            {
                await channelWriter.WriteAsync(new Doc(
                    item.GetProperty("number").GetString()));
            }
        }
			
	//парсинг json-документов	
        private async Task ParseDocByIsAsync(ChannelReader<Doc> docChannel,
            ChannelWriter<(Doc doc, string uid)> uitChannel)
        {
            while (await docChannel.WaitToReadAsync())
            {
                while (docChannel.TryRead(out var doc))
                {
                    try
                    {
                        Console.WriteLine($"start processing doc:{doc.DocID}, thread:{Thread.CurrentThread.ManagedThreadId}");

                        int accumulateCount = 0;
                        var сlient = _httpClientFactory.CreateClient("сlient");

                        var response = await сlient.GetAsync($"route/{doc.DocID}/", HttpCompletionOption.ResponseHeadersRead);

                        using Stream streamToReadFrom = await response.Content.ReadAsStreamAsync();
                        using JsonDocument jDocument = await JsonDocument.ParseAsync(streamToReadFrom);
                        var root = jDocument.RootElement;

                        //количество
                        var allCount = root.GetProperty("total").GetInt32(); /// вот здесь ошибка при многопоточном использовании

                        //парсим документ и в итоге отправляем распарсенные значение в канал
			///...
			await uitChannel.WriteAsync((doc, uit));
                    }
                    catch (Exception ex)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine("Error: " + ex.Message);
                        Console.ForegroundColor = ConsoleColor.White;
                    }
                }
            }
        }
			
        // чтение распарсенных документов			
        private async Task ConsumeOutputProductsInConsoleAsync(ChannelReader<(Doc doc, string uid)> uitChannel)
        {
            long cnt = 0;
            while (await uitChannel.WaitToReadAsync())
            {
                if (uitChannel.TryRead(out var doc))
                {
                    cnt++;
                }
            }
            Console.WriteLine($"Count is:{cnt}");
        }



Для вышеуказаного сниппета я получаю нежелательные side-эффекты, которые отсутствуют если я запускаю парсинг в режиме одной задачи
Код: c#
1.
2.
3.
 var processList = Enumerable
                .Range(1, 1)
                .Select(_ => Task.Run(async () => await ParseDocByIsAsync(docIDChannel.Reader, parseChannel.Writer)));


В методе ParseDocByIsAsync я делаю http-запрос, в результате которого возвращается JSON- документ, у которого в обязательном порядке есть свойство "total"- проверял много раз через Postman.
В результате выполнения кода я попадаю в секцию "Exception" с текстом ошибки об отсутствии свойства/токена "total" у Json-документа.
Я проверяю url, по которому запрашивается Json- он приходит корректный. Проверяю значение переменной "root" при возникновении ошибки- действительно именно это свойство отсутствует. Как будто один поток прочитал свойство, закорраптил его, положил в пул для дальнейшего выполнения, другой поток подхватил таску и у же не может его нормально прочитать.
Ошибка носит вероятностный характер и её вероятность возрастает с увеличением количества Task для обработки парсинга. При 5 тасках на парсинг вероятность ошибки около 90%, хотя бывало и успешное выполнение.
Также заметил, что один документ может обрабатываться несколькими потоками- возможно это просто перекладывание таски из одного в другой, но использование Channels обещает потокобезопасное получение элементов из канала и то что элемент не может быть получен срузу несколькими потоками. Хоть бери и на BlockCollection откатывайся. Подскажите, кто с такой ситуацией сталкивался. Спасибо
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064036
Фотография Где-то в степи
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
vb_sub,
Зачем вы нам это показываете? Вы эксгибионист?
Зачем тут таски, при долгоиграющих операциях, в контексте их умопомрачительных колличествах?
У вас есть процедура загрузки, напишите ее тест и тестируйте, не разбираясь, глядя на хрустальный шар ... - чуйкой чувствую не реентерабельность(загадочно так)
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064042
fkthat
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Для чего так люто смешивать async/await и ContinueWith. Для чего заворачивать вызовы асинхронных методов в асинхронные длегаты, а потом еще и вызывать их через Task.Run. Это не код, а говнокод.
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064057
vb_sub
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Где-то в степи,
создаю таски, чтобы каждый json-документ парсился в отдельной задаче - они друг от друга не зависят, поэтому хочу сделать их выполнение параллельным.
Рентабельность здесь очень-высока-выполнение множества независимых задач, если запускаешь в отдельных тасках, то время выполнения уменьшается кратно.
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064058
vb_sub
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
fkthat,
continueWith нужно для того, чтобы по завершению Task или группы Task закрыть канал и уведомить слушателей, что больше элементов не будет. Если это 1 Task, то канал можно закрыть прямо из метода, если же это группа Task, то необходимо дожидаться пока все задачи не завершатся.
Заворачивать в асинхронные делегаты приходится потому что внутри методов(ParseDocByIsAsync), для которых они вызываются есть await, соответственно я не могу сделать метод просто Task- приходится делать async Task.
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064112
Фотография Где-то в степи
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
vb_sub,
ну если Вы не охотно создаете свои потоки.
воспользуйтесь WebClient
там множество изощренных рализаций, и есть неблокирующие выполнения, с возможностью отмены, событийная модель.
ваш код бы сократился до 10 строк, без вычурных каналов.
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064312
Фотография hVostt
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
vb_sub,

Лютая забористая дичь :)
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064341
vb_sub
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
hVostt,
какой вариант не дичь?
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064360
Фотография Где-то в степи
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
vb_sub,
все придумали до нас, программировать не надо, просто копипаст и вперде.
код набранный копипастом

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
static void Main(string[] args)
        {
            MyDownloader myDownloader=new MyDownloader();
            myDownloader.
                AddUrl("http://soft.eurodir.ru/test-speed-100Mb.bin").
                AddUrl("http://soft-ru.eurodir.ru/test-speed-ru-100Mb.bin?w=22222").
                AddUrl("http://soft-ru.eurodir.ru/test-speed-ru-100Mb.bin?w=8888888888888").
                OnComplete+=(sender, dictionary) =>Console.WriteLine("************* download done ***********"); 
          
            myDownloader.Run();
            Console.ReadKey();
        }

        private sealed class MyDownloader
        {
            public event EventDownloadComplete OnComplete;
            private readonly Dictionary<string,string> _dictionaryResult=new Dictionary<string, string>();
            public MyDownloader AddUrl(string url) { 
                _dictionaryResult.Add(url,null);
                return this;
            }
            public void Run()
            {
                foreach (var keyValuePair in _dictionaryResult)
                   InnerDownloadWorker(keyValuePair.Key);
            }

            private void InnerDownloadWorker(string url)
            {
                var webClient = new WebClient();
                webClient.DownloadStringCompleted += (sender, args) =>
                {
                    lock (this)
                    {
                        _dictionaryResult = args.Result;
                        if (_dictionaryResult.All(a => a.Value != null))
                        {
                            OnCompleteCore(_dictionaryResult);
                        }
                    }
                };
                webClient.DownloadProgressChanged += (sender, args) => { Console.WriteLine($"{url} .............................. {args.ProgressPercentage}"); }; 
                webClient.DownloadStringAsync(new Uri($"{url}"));
            }

            private void OnCompleteCore(Dictionary<string, string> args)
            {
                OnComplete?.Invoke(this, args);
            }
            public delegate void EventDownloadComplete(object sender, Dictionary<string,string> args);
        }



...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064386
fkthat
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Где-то в степи
код набранный копипастом

Оно и видно. Код из начала двухтысячных.
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064579
Фотография hVostt
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
vb_sub
hVostt,
какой вариант не дичь?


Попробуйте Dataflow , может понравится :)
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064879
love_bach
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
fkthat
Где-то в степи
код набранный копипастом

Оно и видно. Код из начала двухтысячных.


и что с ним не так?
...
Рейтинг: 0 / 0
Отловить ошибки при многопоточном выполнении
    #40064931
Roman Mejtes
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
hVostt,
я вообще с нее балдею, так удобно и просто делать многопоточные комбайны, просто сказка. для интеграции, трансформации данные, просто сказка. на днях в 1 проекте внедрил, надо было 100500 файлов обработать, трансформировать данные и объединить в файлах порциями. Лучше не придумаешь.
...
Рейтинг: 0 / 0
13 сообщений из 13, страница 1 из 1
Форумы / WinForms, .Net Framework [игнор отключен] [закрыт для гостей] / Отловить ошибки при многопоточном выполнении
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]