Гость
Целевая тема:
Создать новую тему:
Автор:
Форумы / Программирование [игнор отключен] [закрыт для гостей] / c# 4.0. Многопоточность и блокировки / 25 сообщений из 39, страница 1 из 2
20.01.2012, 22:30
    #37624839
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Всем привет!

Помогите разобраться с многопоточностью и блокировками на c#.

Есть
1. массив строк StringArray
2. функция StringParser, которая парсит строку и записывает ее в DataTable.

В цикле для элементов массива StringArray вызываем StringParse.
Все работает корректно, если в функции прописана блокировка lock на весь код функции
StringParse(String myString)
{
lock (lockObject)
{
Код функции
}
}

Это означает, что функция StringParse не выполняется в цикле параллельно, а выполняется последовательно.
Как добиться параллельного выполнения StringParse ?
...
Рейтинг: 0 / 0
20.01.2012, 23:24
    #37624879
ZyK_BotaN
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Testor1Всем привет!

Помогите разобраться с многопоточностью и блокировками на c#.

Есть
1. массив строк StringArray
2. функция StringParser, которая парсит строку и записывает ее в DataTable.

В цикле для элементов массива StringArray вызываем StringParse.
Все работает корректно, если в функции прописана блокировка lock на весь код функции
StringParse(String myString)
{
lock (lockObject)
{
Код функции
}
}

Это означает, что функция StringParse не выполняется в цикле параллельно, а выполняется последовательно.
Как добиться параллельного выполнения StringParse ?

ты сначала определись что не_должно работать параллельно, там и делай лок.
...
Рейтинг: 0 / 0
20.01.2012, 23:32
    #37624889
b4
b4
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
...
Рейтинг: 0 / 0
21.01.2012, 12:52
    #37625082
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
ZyK_BotaNты сначала определись что не_должно работать параллельно, там и делай лок.

текст функции приблизительно такой
DataTable table - глобальная переменная

{
DataRow row = table.NewRow()
String[] a = myString.split('|');

row[0] = a[1];
row[1] = Decimal.parse([5]);
row[2] = Datetime.parse([10]);

String[] b = a[20].split(':');


row[3] = b[0];
row[4] = b[30];

table.Add(row);
}

Что здесь можно не лочить ?
...
Рейтинг: 0 / 0
21.01.2012, 16:45
    #37625206
ZyK_BotaN
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Testor1ZyK_BotaNты сначала определись что не_должно работать параллельно, там и делай лок.

текст функции приблизительно такой
DataTable table - глобальная переменная

{
DataRow row = table.NewRow()
String[] a = myString.split('|');

row[0] = a[1];
row[1] = Decimal.parse([5]);
row[2] = Datetime.parse([10]);

String[] b = a[20].split(':');


row[3] = b[0];
row[4] = b[30];

table.Add(row);
}

Что здесь можно не лочить ?
не уверен на 100% в своей правоте, но считаю что NewRow лочить не нужно, а потому лочить только Add .


но даже если вздумаешь лочить NewRow , то лочь его отдельно.
...
Рейтинг: 0 / 0
21.01.2012, 17:22
    #37625223
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
ZyK_BotaNне уверен на 100% в своей правоте, но считаю что NewRow лочить не нужно, а потому лочить только Add .


но даже если вздумаешь лочить NewRow , то лочь его отдельно.

Выигрыш от разлочивания - NewRow - минимальный.

Самые тяжелые операции - это split и присвоение текстовых строк.
...
Рейтинг: 0 / 0
21.01.2012, 17:26
    #37625227
ZyK_BotaN
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Testor1ZyK_BotaNне уверен на 100% в своей правоте, но считаю что NewRow лочить не нужно, а потому лочить только Add .


но даже если вздумаешь лочить NewRow , то лочь его отдельно.

Выигрыш от разлочивания - NewRow - минимальный.

Самые тяжелые операции - это split и присвоение текстовых строк.

дак сплит то лочить и не нужно.

у тебя доступ к общему ресурсу в двух строках

NewRow
Add

остальной код может работать параллельно.
...
Рейтинг: 0 / 0
21.01.2012, 19:19
    #37625294
CTPAHHOE MECTO
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
См хелп по datatable & datarow

MSDNThread Safety

--------------------------------------------------------------------------------

This type is safe for multithreaded read operations. You must synchronize any write operations.

Надо определиться является ли
создание строчки
добавление ее в таблицу
заполнение ее данными
модифицирующими операциями по отношению к DataTable. Самое осторожное - считать все. Самое неосторожное - считать только добавление.

Я бы сделал сначала разбор строчки, а добавление в конце в синхросекции

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
String[] a = myString.split('|');

String[] b = a[20].split(':');

var row1 = Decimal.parse([5]);
var row2 = Datetime.parse([10]);

lock (lockObject)
{
DataRow row = table.NewRow() 
row[0] = a[1];
row[1] = row1;
row[2] = row2;

row[3] = b[0];
row[4] = b[30];

table.Add(row);
}
...
Рейтинг: 0 / 0
21.01.2012, 20:05
    #37625322
ZyK_BotaN
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
CTPAHHOE MECTOСм хелп по datatable & datarow

MSDNThread Safety

--------------------------------------------------------------------------------

This type is safe for multithreaded read operations. You must synchronize any write operations.

Надо определиться является ли
создание строчки
добавление ее в таблицу
заполнение ее данными
модифицирующими операциями по отношению к DataTable. Самое осторожное - считать все. Самое неосторожное - считать только добавление.

Я бы сделал сначала разбор строчки, а добавление в конце в синхросекции

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
String[] a = myString.split('|');

String[] b = a[20].split(':');

var row1 = Decimal.parse([5]);
var row2 = Datetime.parse([10]);

lock (lockObject)
{
DataRow row = table.NewRow() 
row[0] = a[1];
row[1] = row1;
row[2] = row2;

row[3] = b[0];
row[4] = b[30];

table.Add(row);
}



согласен. главное с блока блокировки - вынести парсинг.
...
Рейтинг: 0 / 0
21.01.2012, 20:25
    #37625334
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
ZyK_BotaNCTPAHHOE MECTOСм хелп по datatable & datarow

пропущено...


Надо определиться является ли
создание строчки
добавление ее в таблицу
заполнение ее данными
модифицирующими операциями по отношению к DataTable. Самое осторожное - считать все. Самое неосторожное - считать только добавление.

Я бы сделал сначала разбор строчки, а добавление в конце в синхросекции

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
String[] a = myString.split('|');

String[] b = a[20].split(':');

var row1 = Decimal.parse([5]);
var row2 = Datetime.parse([10]);

lock (lockObject)
{
DataRow row = table.NewRow() 
row[0] = a[1];
row[1] = row1;
row[2] = row2;

row[3] = b[0];
row[4] = b[30];

table.Add(row);
}



согласен. главное с блока блокировки - вынести парсинг.


Попробую твой вариант.

Почему ты сплит вынес из лока? и почему ты использовал var а не Decimal ?

Кстати в реале я передаю не строку, а индекс от массива строк.
Строки в среднем - 5 Килобайт.
Подозреваю, что передача столь длинной строки параметров в функцию, требует дополнительныйх рессурсов.
В моей задаче нужно импортировать 20 000 000 строк.

По этой причине, я создал глобальный массив строк (N-строк).
Я подгружаю новые строки в массив и параллельно делаю парсинг строк.


---
Хочу попробывать другую фишку
Создать массив Action на N элементов - но в каждом function вызывать не функцию, а указывать тело функции целиком.
Полагаю, что для каждого элемента Action будет сохранен свой код. В этом случае, мне не потребуется использовать лок, хотя мне придется за это заплатить "дополнительной памятью"
...
Рейтинг: 0 / 0
21.01.2012, 20:34
    #37625344
ZyK_BotaN
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Testor1
Почему ты сплит вынес из лока? и почему ты использовал var а не Decimal ?


что-бі он віполнялся параллельно.


Testor1
Кстати в реале я передаю не строку, а индекс от массива строк.
Строки в среднем - 5 Килобайт.
Подозреваю, что передача столь длинной строки параметров в функцию, требует дополнительныйх рессурсов.

нет, строка копироваться не будет - передается только ссылка на нее. смело передавай в качестве параметра.

Testor1Полагаю, что для каждого элемента Action будет сохранен свой код. В этом случае, мне не потребуется использовать лок, хотя мне придется за это заплатить "дополнительной памятью"

[/quote]
используй пул, что-бы не выйти за рамки используемой памяти.
...
Рейтинг: 0 / 0
21.01.2012, 22:33
    #37625423
CTPAHHOE MECTO
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Testor1Почему ты сплит вынес из лока?


Чтобы парзить строчку не надо ждать освобождения. Парзить можно и параллельно. Интересно также, откуде берется myString - может его выборку тоже надо лочить.

Локи используется тогда, когда есть доступ к общм данным. Так как каждый тред парзит свою строку, то сплит можно делать пареллельно. DataTable же общий - вставку в него надо синхронизировать.

и почему ты использовал var а не Decimal ?


мне было влом смотреть возвращаемый тип функции и его писать.

Кстати в реале я передаю не строку, а индекс от массива строк.
Строки в среднем - 5 Килобайт.
Подозреваю, что передача столь длинной строки параметров в функцию, требует дополнительныйх рессурсов.
В моей задаче нужно импортировать 20 000 000 строк.


Во всех нормальных языках строки передаются по ссылке и копирование происходит только при получении измененной строки (lда и то не всегда).

По этой причине, я создал глобальный массив строк (N-строк).
Я подгружаю новые строки в массив и параллельно делаю парсинг строк.


не стоит ли воспрользоваться Queue ?

Обратите внимание, что можно использовать PLinq

Обратите внимание, на все методs и свойства в название которых входит подстрока "sync"
Хочу попробывать другую фишку
Создать массив Action на N элементов - но в каждом function вызывать не функцию, а указывать тело функции целиком.
Полагаю, что для каждого элемента Action будет сохранен свой код. В этом случае, мне не потребуется использовать лок, хотя мне придется за это заплатить "дополнительной памятью"

Я не понимаю, как связан лок и тело функции - лок возникает тогда, когда есть общие данные для нескольких тредов. Вам в лбом случае придется составлять из строчек DataTable, а это придется синхронизировать.

Я бы на вашем месте изучил Task Parallel Library, в частности, Plinq
...
Рейтинг: 0 / 0
21.01.2012, 22:36
    #37625428
ZyK_BotaN
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
CTPAHHOE MECTOне стоит ли воспрользоваться Queue ?


тоже хотел это предложить.
...
Рейтинг: 0 / 0
21.01.2012, 22:37
    #37625429
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
ZyK_BotaN,

Пока пошел по твоему методу. Попозже опробую свой.
Есть еще один вопрос.

Я хочу, чтобы bulkInsert в базу происходит в асинхронном режиме.
Можно ли обойтись без создания массива DataTable[] table?
То есть я планирую каждую порцию данных обработанных данных записывать в отдельную таблицу, перед записью в базу.

Текущий код, без массива.


Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
                    using (StreamReader sr = new StreamReader(args[4]))
                    {

                        StartTimer = DateTime.Now;
                        IndexOfRecords = 0;

                        Action<object> taskAction = (object obj) =>
                        {
                            importRecord((Int32)obj);
                        };
                        

                        while ((Records[IndexOfRecords] = sr.ReadLine()) != null)
                        {

                            tasks[IndexOfRecords] = Task.Factory.StartNew(taskAction, IndexOfRecords);


                            if ((IndexOfRecords + 1) % ReadBatchSize == 0)
                            {
                                Task.WaitAll(tasks);
                                Records = new String[ReadBatchSize];
                                IndexOfRecords = 0;
                            }
                            else
                            {
                                IndexOfRecords++;
                            }

                            if ((++TotalRecords) % WriteBatchSize == 0)
                            {
                                Task.WaitAll(tasks);
                                bulkCopy.WriteToServer(table);
                                table.Clear();
                                Console.WriteLine("Records: {0} CurrentTime: {1}", TotalRecords, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
                            }
                        }
                    }
                    if (IndexOfRecords % ReadBatchSize != 0)
                    {
                        Task.WaitAll(tasks);
                        bulkCopy.WriteToServer(table);
                    }

                    bulkCopy.Close();
                    table.Clear();

                    EndTimer = DateTime.Now;
...
Рейтинг: 0 / 0
21.01.2012, 23:42
    #37625503
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Testor1,

Я не понял смысла использования Queue ?
Массив вроде бы работает быстрее и он меня полностью устраивает.

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
        private static void importRecord(Int32 Index)
        {
            String[] Columns = new String[11];
            Int32[] Value_ID = new Int32[20];
            Decimal[] Value_Changes = new Decimal[20];

            fSplit(Records[Index], '|', Columns);
            Records[Index] = null;
            
            String[] Values = Columns[10].Split(']'); 

            Int32 Id = 0;

            foreach(String Value in Values)
            {
                if(!String.IsNullOrEmpty(Value))
                {
                    String[] Value_structure = Value.Split('~');
                    Value_ID[Id] = Int32.Parse(Value_structure[0].Remove(0, 1));
                    Value_Changes[Id] = Decimal.Parse(Value_structure[2], ci);
                    Id++;
                }
            }

            var row1 = Convert.ToDateTime(Columns[1], ruRuCulture);
            var row4 = Decimal.Parse(Columns[4], ci);
            var row5 = Decimal.Parse(Columns[5], ci);
            var row6 = Decimal.Parse(Columns[6], ci);
            var row9 = String.IsNullOrEmpty(Columns[9]) ? Convert.DBNull : Decimal.Parse(Columns[9], ci);

            lock (ObjectLock)
            {
                
                DataRow row = table.NewRow();
                row[0] = Columns[0]; 
                row[1] = row1; 
                row[2] = Columns[2]; 
                row[3] = Columns[3]; 
                row[4] = row4; 
                row[5] = row5; 
                row[6] = row6; 
                row[7] = Columns[7]; 
                row[8] = Columns[8]; 
                row[9] = row9; 

                for (Int32 i = 0; i < Id; i++)
                {
                    row[i + 10] = Value_ID[i];
                    row[i + 30] = Value_Changes[i];
                } 
                
                table.Rows.Add(row);
            }
        }

        private static void fSplit(String src, char delim, String[] output)
        {
            int index = 0;
            int lindex = 0;
            int columnID = 0;
            int arrayID = 0;
            int[] array = { 0, 1, 3, 4, 5, 12, 16, 23, 25, 27, 30 };
            int i;

            while ((index = src.IndexOf(delim, lindex)) != -1)
            {
                for (i = 0; i < 11 && array[i] != columnID; i++) ;
                if (i < 11) output[arrayID++] = src.Substring(lindex, index - lindex);
                if (columnID >= 30) break;
                lindex = index + 1;
                index++;
                columnID++;
            }
        } 
...
Рейтинг: 0 / 0
21.01.2012, 23:51
    #37625508
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
CTPAHHOE MECTO


Я не понимаю, как связан лок и тело функции - лок возникает тогда, когда есть общие данные для нескольких тредов. Вам в лбом случае придется составлять из строчек DataTable, а это придется синхронизировать.

Я бы на вашем месте изучил Task Parallel Library, в частности, Plinq

Согласен. Разобрался с блокировкой. Мой метод бессмысленен из-за лока при добавлению в таблицу.

Попытаюсь разобраться с plinq, хотя не очень понимаю, как он в конкретном случае может помочь. Где гарантия, что он сможет более эфективно решить мою задачу, чем мой код?
...
Рейтинг: 0 / 0
22.01.2012, 00:19
    #37625530
локфри-кюю
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
ZyK_BotaNCTPAHHOE MECTOне стоит ли воспрользоваться Queue ?


тоже хотел это предложить.

тут не кюю нужен, а локфри-кюю
читать про Interlocked
...
Рейтинг: 0 / 0
22.01.2012, 01:38
    #37625560
CTPAHHOE MECTO
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Testor1Попытаюсь разобраться с plinq, хотя не очень понимаю, как он в конкретном случае может помочь. Где гарантия, что он сможет более эфективно решить мою задачу, чем мой код?

Никакой гарантии нет, просто мне кажется это инструмент более высокого уровня и код может быть компактнее.

По поводу Queue. У вас, насколько я понял происходит чтение пакета строк с параллельной обработкой, а затем происходит ожидание их разбора и сброс в БД. При этом может быть простой, когда, например, нитью которая разбирала первую строку уже справилась со своей работой и, теоретически, могла заняться следующей строкой. Но, так как, происходит ожидание окончания работы всех нитей пула, она простаивает.

Можно было бы сделать примерно так Читалка |> Парсилки |> Писалка, где |> это очереди. Читалка закидывает строки во входную очередь для нитей разбора, они разбирают и перекидывают результат в очередь для писалки, которая, в свою очередь, чиатет результат разбора и кладет в базу.

Тогда не будел лишних синхронизаций.

PS. Я никакой не эксперт в параллельной обработке, просто развлекаюсь
...
Рейтинг: 0 / 0
22.01.2012, 21:09
    #37626116
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
CTPAHHOE MECTO,

Изменил код, но есть не понимание.
Я создал глобальный массив rows. Не могу понять почему мне нужно лочить его на момент записи? Ведь данные записываются в разные "ячейки" разными тасками. Если не лочить массив, то в базу попадают искаженные данные.

--
Я вынужден загружать и обрабатывать строки порциями
1. мне нужно использовать BulkCopy поскольку он самый быстрый способ для загрузки данных в базу. А он в свою очередь будет корректно работать только с набором не изменяющихся записей.
2. Чем больше массив tasks - тем больше тратиться времени и рессурсов на его обслуживание.
--

Кстати, по производительности я пока особого выигрыша не получил, если сравнивать со стандартным подходом без использования task.


Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
class Program
{
    private static Object ObjectLock = new Object();

    private static DataTable table;
    private static SqlBulkCopy bulkCopy;
    private static String[] Records;
    private static CultureInfo ruRuCulture;
    private static CultureInfo ci;

    private static DataRow[] rows;

    private class Variables
    {
        public Int32 IndexOfRead { get; set; }
        public Int32 IndexOfWrite { get; set; }
    }


    static void Main(string[] args)
    {

        String SqlServer, SqlDatabase;

        Int32 ReadBatchSize, WriteBatchSize;
        Int32 IndexOfReadRecords, IndexOfWriteRecords;
        Int64 TotalRecords;
        Task[] tasks;

        DateTime StartTimer;
        DateTime EndTimer;

        try
        {

            ci = CultureInfo.InvariantCulture.Clone() as CultureInfo;
            ci.NumberFormat.NumberDecimalSeparator = ".";
            ruRuCulture = CultureInfo.GetCultureInfo("ru-Ru");

            SqlServer = args[0];
            SqlDatabase = args[1];

            table = new DataTable();
            bulkCopy = new SqlBulkCopy(GetConnectionString(SqlServer, SqlDatabase));

            SqlServer = args[0];
            SqlDatabase = args[1];

            ReadBatchSize = Convert.ToInt32(args[2]);
            WriteBatchSize = Convert.ToInt32(args[3]);
            Records = new String[ReadBatchSize];
            tasks = new Task[ReadBatchSize];

            MakeTable(ref table);
            ColumnMappings(ref bulkCopy);

            TotalRecords = 0;

            using (bulkCopy)
            {

                using (StreamReader sr = new StreamReader(args[4]))
                {

                    StartTimer = DateTime.Now;
                    IndexOfReadRecords = 0;
                    IndexOfWriteRecords = 0;

                    Action<object> taskAction = (object obj) => { importRecord((Variables)obj); };

                    rows = new DataRow[WriteBatchSize];
                    for (Int32 i = 0; i < WriteBatchSize; i++) { rows[i] = table.NewRow(); }

                    while ((Records[IndexOfReadRecords] = sr.ReadLine()) != null)
                    {

                        Variables Indexes = new Variables();
                        Indexes.IndexOfRead = IndexOfReadRecords;
                        Indexes.IndexOfWrite = IndexOfWriteRecords;

                        tasks[IndexOfReadRecords] = Task.Factory.StartNew(taskAction, Indexes);

                        if ((IndexOfReadRecords + 1) >= ReadBatchSize)
                        {
                            Task.WaitAll(tasks);
                            Records = new String[ReadBatchSize];
                            IndexOfReadRecords = 0;
                        }
                        else
                            IndexOfReadRecords++;

                        TotalRecords++;

                        if ((IndexOfWriteRecords + 1) >= WriteBatchSize)
                        {
                            Task.WaitAll(tasks);
                            bulkCopy.WriteToServer(rows);
                            Console.WriteLine("Records: {0} CurrentTime: {1}", TotalRecords, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
                            IndexOfWriteRecords = 0;
                        }
                        else
                            IndexOfWriteRecords++;

                    }

                }
                if (IndexOfWriteRecords > 0)
                {
                    Task.WaitAll(tasks);
                    Array.Resize(ref rows, IndexOfWriteRecords);
                    bulkCopy.WriteToServer(rows);
                }

                bulkCopy.Close();

                EndTimer = DateTime.Now;
                Console.WriteLine("Records: {0}\nStartTime: {1}\nExecution Time: {2}", TotalRecords, StartTimer.ToString("yyyy-MM-dd HH:mm:ss"), new TimeSpan(EndTimer.Ticks - StartTimer.Ticks));
                Console.ReadKey();

            }
        }
        catch (AggregateException ae)
        {
            foreach (var e in ae.InnerExceptions)
            {
                Console.WriteLine("Import error:");
                Console.WriteLine(e.Message);
                Console.ReadKey();
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Import error:");
            Console.WriteLine(e.Message);
            Console.ReadKey();
        }

    }


    private static void importRecord(Variables objIndexes)
    {
        Int32 IndexRead = objIndexes.IndexOfRead;
        Int32 IndexWrite = objIndexes.IndexOfWrite;

        Object[] rowArray = new Object[50];
        String[] Columns = new String[11];
        Int32[] Value_ID = new Int32[20];
        Decimal[] Value_Changes = new Decimal[20];

        fSplit(Records[IndexRead], '|', Columns);
        Records[IndexRead] = null;

        rowArray[0] = Columns[0]; 
        rowArray[1] = Convert.ToDateTime(Columns[1], ruRuCulture); 
        rowArray[2] = Columns[2]; 
        rowArray[3] = Columns[3]; 
        rowArray[4] = Decimal.Parse(Columns[4], ci); 
        rowArray[5] = Decimal.Parse(Columns[5], ci); 
        rowArray[6] = Decimal.Parse(Columns[6], ci); 
        rowArray[7] = Columns[7]; 
        rowArray[8] = Columns[8]; 
        rowArray[9] = String.IsNullOrEmpty(Columns[9]) ? Convert.DBNull : Decimal.Parse(Columns[9], ci); 

        String[] Values = Columns[10].Split(']'); 

        Int32 Id = 10;

        foreach (String Value in Values)
        {
            if (!String.IsNullOrEmpty(Value))
            {
                String[] Value_structure = Value.Split('~');
                rowArray[Id + 0] = Int32.Parse(Value_structure[0].Remove(0, 1));
                rowArray[Id + 20] = Decimal.Parse(Value_structure[2], ci);
                Id++;
            }
        }

        lock (ObjectLock)
        {
            rows[IndexWrite].ItemArray = rowArray;
        }
    }

    private static void fSplit(String src, char delim, String[] output)
    {
        int index = 0;
        int lindex = 0;
        int columnID = 0;
        int arrayID = 0;
        int[] array = { 0, 1, 3, 4, 5, 12, 16, 23, 25, 27, 30 };
        int i;

        while ((index = src.IndexOf(delim, lindex)) != -1)
        {
            for (i = 0; i < 11 && array[i] != columnID; i++) ;
            if (i < 11) output[arrayID++] = src.Substring(lindex, index - lindex);
            if (columnID >= 30) break;
            lindex = index + 1;
            index++;
            columnID++;
        }
    }


}
...
Рейтинг: 0 / 0
22.01.2012, 23:10
    #37626199
CTPAHHOE MECTO
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
Testor1CTPAHHOE MECTO,

Изменил код, но есть не понимание.
Я создал глобальный массив rows. Не могу понять почему мне нужно лочить его на момент записи? Ведь данные записываются в разные "ячейки" разными тасками. Если не лочить массив, то в базу попадают искаженные данные.


Ну мы не знаем как оно внутри устроено. Например для того, чтобы добавить строку может быть надо
- прочитать счетчик строк
- увеличить его на единиуцу
- записать обратьно

Если перед вторым шагом другая нить прочитает счетчик строк с той же целью то обе нити запишут в одну и ту же строку.

Я вынужден загружать и обрабатывать строки порциями
1. мне нужно использовать BulkCopy поскольку он самый быстрый способ для загрузки данных в базу. А он в свою очередь будет корректно работать только с набором не изменяющихся записей.


Ну как только обнаруживается что в DataTable записали достаточно строчек можно создавать новую DataTable а старую передавать записывающей нити.

2. Чем больше массив tasks - тем больше тратиться времени и рессурсов на его обслуживание.
--

Кстати, по производительности я пока особого выигрыша не получил, если сравнивать со стандартным подходом без использования task.


Имеется ввиду без распараллеливания или с распараллеливанием через ручное содание нитей и т.д.?

Можено попробовать накапливать строки порциями - чтобы каждый таск обрабатывал пачку строк.

Вообще, уже можно посмотреть, как оно работает - сколько процентов жрет разбор, сколько чтение и сколько запись
...
Рейтинг: 0 / 0
22.01.2012, 23:57
    #37626237
c# 4.0. Многопоточность и блокировки
Кстати, можно запись в базу отдельно вынести. Не уверен что прокатит. Но создать отдельную таску для записи базу и связать с таксами по разбору при помощи ContinueWhenAll
...
Рейтинг: 0 / 0
23.01.2012, 20:16
    #37627854
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
СТРАННОЕ МЕСТОКстати, можно запись в базу отдельно вынести. Не уверен что прокатит. Но создать отдельную таску для записи базу и связать с таксами по разбору при помощи ContinueWhenAll

Чем дальше в лес тем злее волки ....

Судя по экспериментам выгоды от асинхронной обработки и загрузки данных в базу минимальны или их вообще нет.
Слабое место - это получения результатов из нескольких потоков. Блокировки сводят на нет, все преймущества PPL при решении данной задачи.

Постараюсь, ускорить алгоритм парсинга строки и отказываюсь от асинхронной загрузки. Возможно придется переключиться на C++
...
Рейтинг: 0 / 0
23.01.2012, 22:14
    #37627974
c# 4.0. Многопоточность и блокировки
Testor1,

я не знаю какого размера строчки, но имхо одна строчка - слишком мало для того,чтобы выделять в отдельную таску. Попробуйте тупо прочитать побольше строчек, обработать их одним таском и вставить балк инсертом в отдельном рамке

Впрочем, я не знаю
...
Рейтинг: 0 / 0
23.01.2012, 23:18
    #37628027
c# 4.0. Многопоточность и блокировки
Testor1
Судя по экспериментам выгоды от асинхронной обработки и загрузки данных в базу минимальны или их вообще нет.


Тут не очень много асинхронности

Код: c#
1.
2.
Task.WaitAll(tasks); // подождем пока все разберется, при этом ничего другого не делаем (почему)
bulkCopy.WriteToServer(rows); // теперь запишем все на сервер, в это время ничего другого не делаем (почему)



Фактически, распаралелен только разбор разных строчек и немного разбор с чтением (и то не до конца - как только наьирается первый пакет, чтение останавливается и ждет до полной записи). Я думаю, если распаралеллить разбор запись и чтение и увеличить количество строчек, разбираемых зараз, можно увидеть выигрыш. Хотя точно не скажу
...
Рейтинг: 0 / 0
24.01.2012, 00:10
    #37628071
Testor1
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
c# 4.0. Многопоточность и блокировки
СТРАННОЕ МЕСТОTestor1
Судя по экспериментам выгоды от асинхронной обработки и загрузки данных в базу минимальны или их вообще нет.


Тут не очень много асинхронности

Код: c#
1.
2.
Task.WaitAll(tasks); // подождем пока все разберется, при этом ничего другого не делаем (почему)
bulkCopy.WriteToServer(rows); // теперь запишем все на сервер, в это время ничего другого не делаем (почему)



Фактически, распаралелен только разбор разных строчек и немного разбор с чтением (и то не до конца - как только наьирается первый пакет, чтение останавливается и ждет до полной записи). Я думаю, если распаралеллить разбор запись и чтение и увеличить количество строчек, разбираемых зараз, можно увидеть выигрыш. Хотя точно не скажу

Я вынужден ждать завершения Task.WaitAll(tasks);, поскольку bulkCopy.WriteToServer(rows); не работает корректно с массивом rows, который меняется в ассинхроном режиме.

Была идея создать двумерный массив DataRows[индекс набора записей][индекс записи].
Я полагал, что пока один набор записей копируется на сервер, можно грузить в следующий набор записей. НО, это не работает корректно без lock :(

Если поштучно грузить строку в базу в ассинхроном режиме, то есть вероятность просто уложить базу или сделать ее недоступной.

В последнем эксперименте, я отказался от записи в базу данных и просто сравнил работу загрузки и обработки данных.
В классическом режиме - загрузка и обработка 100 000 строк занимала 3-4 секунды, а в ассинхроном 5-6 секунд. Функция обработки строк оставалась не изменной за исключением в синхроном режим я удалял определение lock (object).

Средний размер строки 700 байт.
Загружаю не все поля, а только нужные.
Одно из полей составное и требует отдельного парсинга.
...
Рейтинг: 0 / 0
Форумы / Программирование [игнор отключен] [закрыт для гостей] / c# 4.0. Многопоточность и блокировки / 25 сообщений из 39, страница 1 из 2
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]