powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Программирование [игнор отключен] [закрыт для гостей] / c# 4.0. Многопоточность и блокировки
25 сообщений из 39, страница 1 из 2
c# 4.0. Многопоточность и блокировки
    #37624839
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Всем привет!

Помогите разобраться с многопоточностью и блокировками на c#.

Есть
1. массив строк StringArray
2. функция StringParser, которая парсит строку и записывает ее в DataTable.

В цикле для элементов массива StringArray вызываем StringParse.
Все работает корректно, если в функции прописана блокировка lock на весь код функции
StringParse(String myString)
{
lock (lockObject)
{
Код функции
}
}

Это означает, что функция StringParse не выполняется в цикле параллельно, а выполняется последовательно.
Как добиться параллельного выполнения StringParse ?
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37624879
Фотография ZyK_BotaN
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Testor1Всем привет!

Помогите разобраться с многопоточностью и блокировками на c#.

Есть
1. массив строк StringArray
2. функция StringParser, которая парсит строку и записывает ее в DataTable.

В цикле для элементов массива StringArray вызываем StringParse.
Все работает корректно, если в функции прописана блокировка lock на весь код функции
StringParse(String myString)
{
lock (lockObject)
{
Код функции
}
}

Это означает, что функция StringParse не выполняется в цикле параллельно, а выполняется последовательно.
Как добиться параллельного выполнения StringParse ?

ты сначала определись что не_должно работать параллельно, там и делай лок.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37624889
b4
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
b4
Гость
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625082
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
ZyK_BotaNты сначала определись что не_должно работать параллельно, там и делай лок.

текст функции приблизительно такой
DataTable table - глобальная переменная

{
DataRow row = table.NewRow()
String[] a = myString.split('|');

row[0] = a[1];
row[1] = Decimal.parse([5]);
row[2] = Datetime.parse([10]);

String[] b = a[20].split(':');


row[3] = b[0];
row[4] = b[30];

table.Add(row);
}

Что здесь можно не лочить ?
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625206
Фотография ZyK_BotaN
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Testor1ZyK_BotaNты сначала определись что не_должно работать параллельно, там и делай лок.

текст функции приблизительно такой
DataTable table - глобальная переменная

{
DataRow row = table.NewRow()
String[] a = myString.split('|');

row[0] = a[1];
row[1] = Decimal.parse([5]);
row[2] = Datetime.parse([10]);

String[] b = a[20].split(':');


row[3] = b[0];
row[4] = b[30];

table.Add(row);
}

Что здесь можно не лочить ?
не уверен на 100% в своей правоте, но считаю что NewRow лочить не нужно, а потому лочить только Add .


но даже если вздумаешь лочить NewRow , то лочь его отдельно.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625223
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
ZyK_BotaNне уверен на 100% в своей правоте, но считаю что NewRow лочить не нужно, а потому лочить только Add .


но даже если вздумаешь лочить NewRow , то лочь его отдельно.

Выигрыш от разлочивания - NewRow - минимальный.

Самые тяжелые операции - это split и присвоение текстовых строк.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625227
Фотография ZyK_BotaN
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Testor1ZyK_BotaNне уверен на 100% в своей правоте, но считаю что NewRow лочить не нужно, а потому лочить только Add .


но даже если вздумаешь лочить NewRow , то лочь его отдельно.

Выигрыш от разлочивания - NewRow - минимальный.

Самые тяжелые операции - это split и присвоение текстовых строк.

дак сплит то лочить и не нужно.

у тебя доступ к общему ресурсу в двух строках

NewRow
Add

остальной код может работать параллельно.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625294
CTPAHHOE MECTO
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
См хелп по datatable & datarow

MSDNThread Safety

--------------------------------------------------------------------------------

This type is safe for multithreaded read operations. You must synchronize any write operations.

Надо определиться является ли
создание строчки
добавление ее в таблицу
заполнение ее данными
модифицирующими операциями по отношению к DataTable. Самое осторожное - считать все. Самое неосторожное - считать только добавление.

Я бы сделал сначала разбор строчки, а добавление в конце в синхросекции

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
String[] a = myString.split('|');

String[] b = a[20].split(':');

var row1 = Decimal.parse([5]);
var row2 = Datetime.parse([10]);

lock (lockObject)
{
DataRow row = table.NewRow() 
row[0] = a[1];
row[1] = row1;
row[2] = row2;

row[3] = b[0];
row[4] = b[30];

table.Add(row);
}
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625322
Фотография ZyK_BotaN
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
CTPAHHOE MECTOСм хелп по datatable & datarow

MSDNThread Safety

--------------------------------------------------------------------------------

This type is safe for multithreaded read operations. You must synchronize any write operations.

Надо определиться является ли
создание строчки
добавление ее в таблицу
заполнение ее данными
модифицирующими операциями по отношению к DataTable. Самое осторожное - считать все. Самое неосторожное - считать только добавление.

Я бы сделал сначала разбор строчки, а добавление в конце в синхросекции

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
String[] a = myString.split('|');

String[] b = a[20].split(':');

var row1 = Decimal.parse([5]);
var row2 = Datetime.parse([10]);

lock (lockObject)
{
DataRow row = table.NewRow() 
row[0] = a[1];
row[1] = row1;
row[2] = row2;

row[3] = b[0];
row[4] = b[30];

table.Add(row);
}



согласен. главное с блока блокировки - вынести парсинг.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625334
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
ZyK_BotaNCTPAHHOE MECTOСм хелп по datatable & datarow

пропущено...


Надо определиться является ли
создание строчки
добавление ее в таблицу
заполнение ее данными
модифицирующими операциями по отношению к DataTable. Самое осторожное - считать все. Самое неосторожное - считать только добавление.

Я бы сделал сначала разбор строчки, а добавление в конце в синхросекции

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
String[] a = myString.split('|');

String[] b = a[20].split(':');

var row1 = Decimal.parse([5]);
var row2 = Datetime.parse([10]);

lock (lockObject)
{
DataRow row = table.NewRow() 
row[0] = a[1];
row[1] = row1;
row[2] = row2;

row[3] = b[0];
row[4] = b[30];

table.Add(row);
}



согласен. главное с блока блокировки - вынести парсинг.


Попробую твой вариант.

Почему ты сплит вынес из лока? и почему ты использовал var а не Decimal ?

Кстати в реале я передаю не строку, а индекс от массива строк.
Строки в среднем - 5 Килобайт.
Подозреваю, что передача столь длинной строки параметров в функцию, требует дополнительныйх рессурсов.
В моей задаче нужно импортировать 20 000 000 строк.

По этой причине, я создал глобальный массив строк (N-строк).
Я подгружаю новые строки в массив и параллельно делаю парсинг строк.


---
Хочу попробывать другую фишку
Создать массив Action на N элементов - но в каждом function вызывать не функцию, а указывать тело функции целиком.
Полагаю, что для каждого элемента Action будет сохранен свой код. В этом случае, мне не потребуется использовать лок, хотя мне придется за это заплатить "дополнительной памятью"
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625344
Фотография ZyK_BotaN
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Testor1
Почему ты сплит вынес из лока? и почему ты использовал var а не Decimal ?


что-бі он віполнялся параллельно.


Testor1
Кстати в реале я передаю не строку, а индекс от массива строк.
Строки в среднем - 5 Килобайт.
Подозреваю, что передача столь длинной строки параметров в функцию, требует дополнительныйх рессурсов.

нет, строка копироваться не будет - передается только ссылка на нее. смело передавай в качестве параметра.

Testor1Полагаю, что для каждого элемента Action будет сохранен свой код. В этом случае, мне не потребуется использовать лок, хотя мне придется за это заплатить "дополнительной памятью"

[/quote]
используй пул, что-бы не выйти за рамки используемой памяти.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625423
CTPAHHOE MECTO
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Testor1Почему ты сплит вынес из лока?


Чтобы парзить строчку не надо ждать освобождения. Парзить можно и параллельно. Интересно также, откуде берется myString - может его выборку тоже надо лочить.

Локи используется тогда, когда есть доступ к общм данным. Так как каждый тред парзит свою строку, то сплит можно делать пареллельно. DataTable же общий - вставку в него надо синхронизировать.

и почему ты использовал var а не Decimal ?


мне было влом смотреть возвращаемый тип функции и его писать.

Кстати в реале я передаю не строку, а индекс от массива строк.
Строки в среднем - 5 Килобайт.
Подозреваю, что передача столь длинной строки параметров в функцию, требует дополнительныйх рессурсов.
В моей задаче нужно импортировать 20 000 000 строк.


Во всех нормальных языках строки передаются по ссылке и копирование происходит только при получении измененной строки (lда и то не всегда).

По этой причине, я создал глобальный массив строк (N-строк).
Я подгружаю новые строки в массив и параллельно делаю парсинг строк.


не стоит ли воспрользоваться Queue ?

Обратите внимание, что можно использовать PLinq

Обратите внимание, на все методs и свойства в название которых входит подстрока "sync"
Хочу попробывать другую фишку
Создать массив Action на N элементов - но в каждом function вызывать не функцию, а указывать тело функции целиком.
Полагаю, что для каждого элемента Action будет сохранен свой код. В этом случае, мне не потребуется использовать лок, хотя мне придется за это заплатить "дополнительной памятью"

Я не понимаю, как связан лок и тело функции - лок возникает тогда, когда есть общие данные для нескольких тредов. Вам в лбом случае придется составлять из строчек DataTable, а это придется синхронизировать.

Я бы на вашем месте изучил Task Parallel Library, в частности, Plinq
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625428
Фотография ZyK_BotaN
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
CTPAHHOE MECTOне стоит ли воспрользоваться Queue ?


тоже хотел это предложить.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625429
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
ZyK_BotaN,

Пока пошел по твоему методу. Попозже опробую свой.
Есть еще один вопрос.

Я хочу, чтобы bulkInsert в базу происходит в асинхронном режиме.
Можно ли обойтись без создания массива DataTable[] table?
То есть я планирую каждую порцию данных обработанных данных записывать в отдельную таблицу, перед записью в базу.

Текущий код, без массива.


Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
                    using (StreamReader sr = new StreamReader(args[4]))
                    {

                        StartTimer = DateTime.Now;
                        IndexOfRecords = 0;

                        Action<object> taskAction = (object obj) =>
                        {
                            importRecord((Int32)obj);
                        };
                        

                        while ((Records[IndexOfRecords] = sr.ReadLine()) != null)
                        {

                            tasks[IndexOfRecords] = Task.Factory.StartNew(taskAction, IndexOfRecords);


                            if ((IndexOfRecords + 1) % ReadBatchSize == 0)
                            {
                                Task.WaitAll(tasks);
                                Records = new String[ReadBatchSize];
                                IndexOfRecords = 0;
                            }
                            else
                            {
                                IndexOfRecords++;
                            }

                            if ((++TotalRecords) % WriteBatchSize == 0)
                            {
                                Task.WaitAll(tasks);
                                bulkCopy.WriteToServer(table);
                                table.Clear();
                                Console.WriteLine("Records: {0} CurrentTime: {1}", TotalRecords, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
                            }
                        }
                    }
                    if (IndexOfRecords % ReadBatchSize != 0)
                    {
                        Task.WaitAll(tasks);
                        bulkCopy.WriteToServer(table);
                    }

                    bulkCopy.Close();
                    table.Clear();

                    EndTimer = DateTime.Now;
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625503
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Testor1,

Я не понял смысла использования Queue ?
Массив вроде бы работает быстрее и он меня полностью устраивает.

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
        private static void importRecord(Int32 Index)
        {
            String[] Columns = new String[11];
            Int32[] Value_ID = new Int32[20];
            Decimal[] Value_Changes = new Decimal[20];

            fSplit(Records[Index], '|', Columns);
            Records[Index] = null;
            
            String[] Values = Columns[10].Split(']'); 

            Int32 Id = 0;

            foreach(String Value in Values)
            {
                if(!String.IsNullOrEmpty(Value))
                {
                    String[] Value_structure = Value.Split('~');
                    Value_ID[Id] = Int32.Parse(Value_structure[0].Remove(0, 1));
                    Value_Changes[Id] = Decimal.Parse(Value_structure[2], ci);
                    Id++;
                }
            }

            var row1 = Convert.ToDateTime(Columns[1], ruRuCulture);
            var row4 = Decimal.Parse(Columns[4], ci);
            var row5 = Decimal.Parse(Columns[5], ci);
            var row6 = Decimal.Parse(Columns[6], ci);
            var row9 = String.IsNullOrEmpty(Columns[9]) ? Convert.DBNull : Decimal.Parse(Columns[9], ci);

            lock (ObjectLock)
            {
                
                DataRow row = table.NewRow();
                row[0] = Columns[0]; 
                row[1] = row1; 
                row[2] = Columns[2]; 
                row[3] = Columns[3]; 
                row[4] = row4; 
                row[5] = row5; 
                row[6] = row6; 
                row[7] = Columns[7]; 
                row[8] = Columns[8]; 
                row[9] = row9; 

                for (Int32 i = 0; i < Id; i++)
                {
                    row[i + 10] = Value_ID[i];
                    row[i + 30] = Value_Changes[i];
                } 
                
                table.Rows.Add(row);
            }
        }

        private static void fSplit(String src, char delim, String[] output)
        {
            int index = 0;
            int lindex = 0;
            int columnID = 0;
            int arrayID = 0;
            int[] array = { 0, 1, 3, 4, 5, 12, 16, 23, 25, 27, 30 };
            int i;

            while ((index = src.IndexOf(delim, lindex)) != -1)
            {
                for (i = 0; i < 11 && array[i] != columnID; i++) ;
                if (i < 11) output[arrayID++] = src.Substring(lindex, index - lindex);
                if (columnID >= 30) break;
                lindex = index + 1;
                index++;
                columnID++;
            }
        } 
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625508
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
CTPAHHOE MECTO


Я не понимаю, как связан лок и тело функции - лок возникает тогда, когда есть общие данные для нескольких тредов. Вам в лбом случае придется составлять из строчек DataTable, а это придется синхронизировать.

Я бы на вашем месте изучил Task Parallel Library, в частности, Plinq

Согласен. Разобрался с блокировкой. Мой метод бессмысленен из-за лока при добавлению в таблицу.

Попытаюсь разобраться с plinq, хотя не очень понимаю, как он в конкретном случае может помочь. Где гарантия, что он сможет более эфективно решить мою задачу, чем мой код?
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625530
локфри-кюю
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
ZyK_BotaNCTPAHHOE MECTOне стоит ли воспрользоваться Queue ?


тоже хотел это предложить.

тут не кюю нужен, а локфри-кюю
читать про Interlocked
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37625560
CTPAHHOE MECTO
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Testor1Попытаюсь разобраться с plinq, хотя не очень понимаю, как он в конкретном случае может помочь. Где гарантия, что он сможет более эфективно решить мою задачу, чем мой код?

Никакой гарантии нет, просто мне кажется это инструмент более высокого уровня и код может быть компактнее.

По поводу Queue. У вас, насколько я понял происходит чтение пакета строк с параллельной обработкой, а затем происходит ожидание их разбора и сброс в БД. При этом может быть простой, когда, например, нитью которая разбирала первую строку уже справилась со своей работой и, теоретически, могла заняться следующей строкой. Но, так как, происходит ожидание окончания работы всех нитей пула, она простаивает.

Можно было бы сделать примерно так Читалка |> Парсилки |> Писалка, где |> это очереди. Читалка закидывает строки во входную очередь для нитей разбора, они разбирают и перекидывают результат в очередь для писалки, которая, в свою очередь, чиатет результат разбора и кладет в базу.

Тогда не будел лишних синхронизаций.

PS. Я никакой не эксперт в параллельной обработке, просто развлекаюсь
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37626116
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
CTPAHHOE MECTO,

Изменил код, но есть не понимание.
Я создал глобальный массив rows. Не могу понять почему мне нужно лочить его на момент записи? Ведь данные записываются в разные "ячейки" разными тасками. Если не лочить массив, то в базу попадают искаженные данные.

--
Я вынужден загружать и обрабатывать строки порциями
1. мне нужно использовать BulkCopy поскольку он самый быстрый способ для загрузки данных в базу. А он в свою очередь будет корректно работать только с набором не изменяющихся записей.
2. Чем больше массив tasks - тем больше тратиться времени и рессурсов на его обслуживание.
--

Кстати, по производительности я пока особого выигрыша не получил, если сравнивать со стандартным подходом без использования task.


Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
class Program
{
    private static Object ObjectLock = new Object();

    private static DataTable table;
    private static SqlBulkCopy bulkCopy;
    private static String[] Records;
    private static CultureInfo ruRuCulture;
    private static CultureInfo ci;

    private static DataRow[] rows;

    private class Variables
    {
        public Int32 IndexOfRead { get; set; }
        public Int32 IndexOfWrite { get; set; }
    }


    static void Main(string[] args)
    {

        String SqlServer, SqlDatabase;

        Int32 ReadBatchSize, WriteBatchSize;
        Int32 IndexOfReadRecords, IndexOfWriteRecords;
        Int64 TotalRecords;
        Task[] tasks;

        DateTime StartTimer;
        DateTime EndTimer;

        try
        {

            ci = CultureInfo.InvariantCulture.Clone() as CultureInfo;
            ci.NumberFormat.NumberDecimalSeparator = ".";
            ruRuCulture = CultureInfo.GetCultureInfo("ru-Ru");

            SqlServer = args[0];
            SqlDatabase = args[1];

            table = new DataTable();
            bulkCopy = new SqlBulkCopy(GetConnectionString(SqlServer, SqlDatabase));

            SqlServer = args[0];
            SqlDatabase = args[1];

            ReadBatchSize = Convert.ToInt32(args[2]);
            WriteBatchSize = Convert.ToInt32(args[3]);
            Records = new String[ReadBatchSize];
            tasks = new Task[ReadBatchSize];

            MakeTable(ref table);
            ColumnMappings(ref bulkCopy);

            TotalRecords = 0;

            using (bulkCopy)
            {

                using (StreamReader sr = new StreamReader(args[4]))
                {

                    StartTimer = DateTime.Now;
                    IndexOfReadRecords = 0;
                    IndexOfWriteRecords = 0;

                    Action<object> taskAction = (object obj) => { importRecord((Variables)obj); };

                    rows = new DataRow[WriteBatchSize];
                    for (Int32 i = 0; i < WriteBatchSize; i++) { rows[i] = table.NewRow(); }

                    while ((Records[IndexOfReadRecords] = sr.ReadLine()) != null)
                    {

                        Variables Indexes = new Variables();
                        Indexes.IndexOfRead = IndexOfReadRecords;
                        Indexes.IndexOfWrite = IndexOfWriteRecords;

                        tasks[IndexOfReadRecords] = Task.Factory.StartNew(taskAction, Indexes);

                        if ((IndexOfReadRecords + 1) >= ReadBatchSize)
                        {
                            Task.WaitAll(tasks);
                            Records = new String[ReadBatchSize];
                            IndexOfReadRecords = 0;
                        }
                        else
                            IndexOfReadRecords++;

                        TotalRecords++;

                        if ((IndexOfWriteRecords + 1) >= WriteBatchSize)
                        {
                            Task.WaitAll(tasks);
                            bulkCopy.WriteToServer(rows);
                            Console.WriteLine("Records: {0} CurrentTime: {1}", TotalRecords, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
                            IndexOfWriteRecords = 0;
                        }
                        else
                            IndexOfWriteRecords++;

                    }

                }
                if (IndexOfWriteRecords > 0)
                {
                    Task.WaitAll(tasks);
                    Array.Resize(ref rows, IndexOfWriteRecords);
                    bulkCopy.WriteToServer(rows);
                }

                bulkCopy.Close();

                EndTimer = DateTime.Now;
                Console.WriteLine("Records: {0}\nStartTime: {1}\nExecution Time: {2}", TotalRecords, StartTimer.ToString("yyyy-MM-dd HH:mm:ss"), new TimeSpan(EndTimer.Ticks - StartTimer.Ticks));
                Console.ReadKey();

            }
        }
        catch (AggregateException ae)
        {
            foreach (var e in ae.InnerExceptions)
            {
                Console.WriteLine("Import error:");
                Console.WriteLine(e.Message);
                Console.ReadKey();
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Import error:");
            Console.WriteLine(e.Message);
            Console.ReadKey();
        }

    }


    private static void importRecord(Variables objIndexes)
    {
        Int32 IndexRead = objIndexes.IndexOfRead;
        Int32 IndexWrite = objIndexes.IndexOfWrite;

        Object[] rowArray = new Object[50];
        String[] Columns = new String[11];
        Int32[] Value_ID = new Int32[20];
        Decimal[] Value_Changes = new Decimal[20];

        fSplit(Records[IndexRead], '|', Columns);
        Records[IndexRead] = null;

        rowArray[0] = Columns[0]; 
        rowArray[1] = Convert.ToDateTime(Columns[1], ruRuCulture); 
        rowArray[2] = Columns[2]; 
        rowArray[3] = Columns[3]; 
        rowArray[4] = Decimal.Parse(Columns[4], ci); 
        rowArray[5] = Decimal.Parse(Columns[5], ci); 
        rowArray[6] = Decimal.Parse(Columns[6], ci); 
        rowArray[7] = Columns[7]; 
        rowArray[8] = Columns[8]; 
        rowArray[9] = String.IsNullOrEmpty(Columns[9]) ? Convert.DBNull : Decimal.Parse(Columns[9], ci); 

        String[] Values = Columns[10].Split(']'); 

        Int32 Id = 10;

        foreach (String Value in Values)
        {
            if (!String.IsNullOrEmpty(Value))
            {
                String[] Value_structure = Value.Split('~');
                rowArray[Id + 0] = Int32.Parse(Value_structure[0].Remove(0, 1));
                rowArray[Id + 20] = Decimal.Parse(Value_structure[2], ci);
                Id++;
            }
        }

        lock (ObjectLock)
        {
            rows[IndexWrite].ItemArray = rowArray;
        }
    }

    private static void fSplit(String src, char delim, String[] output)
    {
        int index = 0;
        int lindex = 0;
        int columnID = 0;
        int arrayID = 0;
        int[] array = { 0, 1, 3, 4, 5, 12, 16, 23, 25, 27, 30 };
        int i;

        while ((index = src.IndexOf(delim, lindex)) != -1)
        {
            for (i = 0; i < 11 && array[i] != columnID; i++) ;
            if (i < 11) output[arrayID++] = src.Substring(lindex, index - lindex);
            if (columnID >= 30) break;
            lindex = index + 1;
            index++;
            columnID++;
        }
    }


}
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37626199
CTPAHHOE MECTO
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Testor1CTPAHHOE MECTO,

Изменил код, но есть не понимание.
Я создал глобальный массив rows. Не могу понять почему мне нужно лочить его на момент записи? Ведь данные записываются в разные "ячейки" разными тасками. Если не лочить массив, то в базу попадают искаженные данные.


Ну мы не знаем как оно внутри устроено. Например для того, чтобы добавить строку может быть надо
- прочитать счетчик строк
- увеличить его на единиуцу
- записать обратьно

Если перед вторым шагом другая нить прочитает счетчик строк с той же целью то обе нити запишут в одну и ту же строку.

Я вынужден загружать и обрабатывать строки порциями
1. мне нужно использовать BulkCopy поскольку он самый быстрый способ для загрузки данных в базу. А он в свою очередь будет корректно работать только с набором не изменяющихся записей.


Ну как только обнаруживается что в DataTable записали достаточно строчек можно создавать новую DataTable а старую передавать записывающей нити.

2. Чем больше массив tasks - тем больше тратиться времени и рессурсов на его обслуживание.
--

Кстати, по производительности я пока особого выигрыша не получил, если сравнивать со стандартным подходом без использования task.


Имеется ввиду без распараллеливания или с распараллеливанием через ручное содание нитей и т.д.?

Можено попробовать накапливать строки порциями - чтобы каждый таск обрабатывал пачку строк.

Вообще, уже можно посмотреть, как оно работает - сколько процентов жрет разбор, сколько чтение и сколько запись
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37626237
Кстати, можно запись в базу отдельно вынести. Не уверен что прокатит. Но создать отдельную таску для записи базу и связать с таксами по разбору при помощи ContinueWhenAll
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37627854
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
СТРАННОЕ МЕСТОКстати, можно запись в базу отдельно вынести. Не уверен что прокатит. Но создать отдельную таску для записи базу и связать с таксами по разбору при помощи ContinueWhenAll

Чем дальше в лес тем злее волки ....

Судя по экспериментам выгоды от асинхронной обработки и загрузки данных в базу минимальны или их вообще нет.
Слабое место - это получения результатов из нескольких потоков. Блокировки сводят на нет, все преймущества PPL при решении данной задачи.

Постараюсь, ускорить алгоритм парсинга строки и отказываюсь от асинхронной загрузки. Возможно придется переключиться на C++
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37627974
Testor1,

я не знаю какого размера строчки, но имхо одна строчка - слишком мало для того,чтобы выделять в отдельную таску. Попробуйте тупо прочитать побольше строчек, обработать их одним таском и вставить балк инсертом в отдельном рамке

Впрочем, я не знаю
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37628027
Testor1
Судя по экспериментам выгоды от асинхронной обработки и загрузки данных в базу минимальны или их вообще нет.


Тут не очень много асинхронности

Код: c#
1.
2.
Task.WaitAll(tasks); // подождем пока все разберется, при этом ничего другого не делаем (почему)
bulkCopy.WriteToServer(rows); // теперь запишем все на сервер, в это время ничего другого не делаем (почему)



Фактически, распаралелен только разбор разных строчек и немного разбор с чтением (и то не до конца - как только наьирается первый пакет, чтение останавливается и ждет до полной записи). Я думаю, если распаралеллить разбор запись и чтение и увеличить количество строчек, разбираемых зараз, можно увидеть выигрыш. Хотя точно не скажу
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37628071
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
СТРАННОЕ МЕСТОTestor1
Судя по экспериментам выгоды от асинхронной обработки и загрузки данных в базу минимальны или их вообще нет.


Тут не очень много асинхронности

Код: c#
1.
2.
Task.WaitAll(tasks); // подождем пока все разберется, при этом ничего другого не делаем (почему)
bulkCopy.WriteToServer(rows); // теперь запишем все на сервер, в это время ничего другого не делаем (почему)



Фактически, распаралелен только разбор разных строчек и немного разбор с чтением (и то не до конца - как только наьирается первый пакет, чтение останавливается и ждет до полной записи). Я думаю, если распаралеллить разбор запись и чтение и увеличить количество строчек, разбираемых зараз, можно увидеть выигрыш. Хотя точно не скажу

Я вынужден ждать завершения Task.WaitAll(tasks);, поскольку bulkCopy.WriteToServer(rows); не работает корректно с массивом rows, который меняется в ассинхроном режиме.

Была идея создать двумерный массив DataRows[индекс набора записей][индекс записи].
Я полагал, что пока один набор записей копируется на сервер, можно грузить в следующий набор записей. НО, это не работает корректно без lock :(

Если поштучно грузить строку в базу в ассинхроном режиме, то есть вероятность просто уложить базу или сделать ее недоступной.

В последнем эксперименте, я отказался от записи в базу данных и просто сравнил работу загрузки и обработки данных.
В классическом режиме - загрузка и обработка 100 000 строк занимала 3-4 секунды, а в ассинхроном 5-6 секунд. Функция обработки строк оставалась не изменной за исключением в синхроном режим я удалял определение lock (object).

Средний размер строки 700 байт.
Загружаю не все поля, а только нужные.
Одно из полей составное и требует отдельного парсинга.
...
Рейтинг: 0 / 0
25 сообщений из 39, страница 1 из 2
Форумы / Программирование [игнор отключен] [закрыт для гостей] / c# 4.0. Многопоточность и блокировки
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]