powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Программирование [игнор отключен] [закрыт для гостей] / c# 4.0. Многопоточность и блокировки
14 сообщений из 39, страница 2 из 2
c# 4.0. Многопоточность и блокировки
    #37628108
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Testor1,

Есть еще один вариант.
Создать не двумерный массив DataRow, а создать к слову 10 переменных типа DataTable.

Каждый блок данных загружать в отдельную переменную (DataTable), и в ассинхронном режиме записывать данные на сервер из "загруженной" таблицы и так по кругу :)
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37630073
Testor1,

имхо 700 байт слишком мало, чтобы создавать отдельную тачку. Я бы давал порциями по 1000 строк на тачку

Попробовать не добавлять в datatable а пусть каждая таска накапливает в список структур результат разбора. И возвращает его через result вставку в базу производить в отдельной нити

еще вопрос - какое соотношение времени на 10000 строчках чтение/разбор/вставка?
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37630075
странное место,

тачку -> таску
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37636013
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
странное место,

Можешь создать код ? Не совсем понял твою идею.
Еще учти - что "лишние" операция присваивания - это дополнительные операции с памятью и менеджером памяти. все это замедляет работу.

Я достал perfomance анализатор. по его результатм - 60% уходит на запись в базу.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37636721
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Testor1странное место,

Можешь создать код ? Не совсем понял твою идею.
Еще учти - что "лишние" операция присваивания - это дополнительные операции с памятью и менеджером памяти. все это замедляет работу.

Я достал perfomance анализатор. по его результатм - 60% уходит на запись в базу.

Давай на простом примере попробуем твою идею?

Создай проект
пусть в цикле передается одна и та же строка "1|22|333|4444|{A:one:B}{C:two:D}|666666"

в таблицу в отдельные столбцы должны быть загружены
22,4444,A,B,C,D

Кол-во строк 80 000 000 - то есть цикл на это число
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37637608
belugin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Testor1,

попробую на досуге
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37640023
Пока не запускал, общая идея см под спойлером. Я не очень хорошо знаю C# и TPL, так что кое-что тут коряво

Можно исходники вот этих функций и create table для таблички-получателя?

Код: c#
1.
2.
3.
static void MakeTable(DataTable table)
private static void ColumnMappings(SqlBulkCopy bulkCopy)
private static string GetConnectionString(string SqlServer, string SqlDatabase)





Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
231.
232.
233.
234.
235.
236.
237.
238.
239.
240.
241.
242.
243.
244.
245.
246.
247.
248.
249.
250.
251.
252.
253.
254.
255.
256.
257.
258.
259.
260.
261.
262.
263.
264.
265.
266.
267.
268.
class Program
        {
            private static Object ObjectLock = new Object();

            private static DataTable table;
            private static SqlBulkCopy bulkCopy;
            private static String[] Records;
            private static CultureInfo ruRuCulture;
            private static CultureInfo ci;

            private static DataRow[] rows;
            private static Semaphore semaphore = new Semaphore(2, 2);

            private class Variables
            {
                public Int32 IndexOfRead { get; set; }
                public Int32 IndexOfWrite { get; set; }
            }

            static void MakeTable(DataTable table)
            {

            }
            
            static object [][] parsePortion(object arg)
            {
                var lines = arg as string[];
                var result = new object[lines.Length][];
                int i=0;
                foreach (var line in lines)
                {
                    result[i] = importRecord(line);
                    i++;
                }
                return result;
            }
            static Task<object[][]> processParsePortion(StreamReader reader, int portionSize, TaskFactory<object [][]> taskFactory, ref bool hasMore)
            {
                var portion = new string[portionSize];
                int i = 0;
                var s = reader.ReadLine();
                while (s!=null && i<portionSize)
                {
                    portion[i] = s;
                    i++;
                    s = reader.ReadLine();
                }
                             
                hasMore =  s != null;
                return (i > 0) ?
                    taskFactory.StartNew(parsePortion, portion)
                    : null;
            }
            static DataTable makeDataTable(Task<object [][]>[] parseTasks)
            {
                DataTable result = new DataTable();
                MakeTable(result);
                foreach (var task in parseTasks)
                {
                    foreach (var row in task.Result)
                    {
                        var resultRow = result.NewRow();
                        resultRow.ItemArray = row;
                        result.Rows.Add(resultRow);
                    }
                }
                return result;
            }
            static void writeToDB(Task<DataTable> task)
            {
                lock (ObjectLock)
                {
                    bulkCopy.WriteToServer(task.Result);
                }
                semaphore.Release();
            }
            static Boolean processWritePortion(StreamReader reader, int writePortionSize, int portionSize)
            {
                bool hasMore = true; 
                semaphore.WaitOne();
                var taskFactory = new TaskFactory<object [][]>();
                var parseTasks = new Task<object [][]>[writePortionSize];
                int i=0;
                while (i < writePortionSize && hasMore)
                {
                    var currentTask = processParsePortion(reader, portionSize, taskFactory, ref hasMore);
                    if (currentTask!=null)
                    {
                        parseTasks[i] = currentTask;
                        i++;
                    }
                }
                if (i > 0)
                {
                    if (writePortionSize > i)
                    {
                        Array.Resize<Task<object[][]>>(ref parseTasks, i);
                    }
                    var dataTaskFactory = new TaskFactory<DataTable>();
                    dataTaskFactory.ContinueWhenAll<object[][]>(parseTasks, makeDataTable)
                                   .ContinueWith(writeToDB).Start();
                }
                else 
                {
                    semaphore.Release();
                }
                return hasMore;
            }
            static void Main(string[] args)
            {

                String SqlServer, SqlDatabase;

                Int32 ReadBatchSize, WriteBatchSize;
                Int32 IndexOfReadRecords, IndexOfWriteRecords;
                Int64 TotalRecords;
                Task[] tasks;

                DateTime StartTimer;
                DateTime EndTimer;

                try
                {

                    ci = CultureInfo.InvariantCulture.Clone() as CultureInfo;
                    ci.NumberFormat.NumberDecimalSeparator = ".";
                    ruRuCulture = CultureInfo.GetCultureInfo("ru-Ru");

                    SqlServer = args[0];
                    SqlDatabase = args[1];

                    table = new DataTable();
                    bulkCopy = new SqlBulkCopy(GetConnectionString(SqlServer, SqlDatabase));

                    SqlServer = args[0];
                    SqlDatabase = args[1];

                    ReadBatchSize = Convert.ToInt32(args[2]);
                    WriteBatchSize = Convert.ToInt32(args[3]);
                    Records = new String[ReadBatchSize];
                    tasks = new Task[ReadBatchSize];

                    
                    
                    ColumnMappings(bulkCopy);
                    

                    TotalRecords = 0;

                    using (bulkCopy)
                    {

                        using (var sr = new StreamReader(args[4]))
                        {

                            StartTimer = DateTime.Now;
                            IndexOfReadRecords = 0;
                            IndexOfWriteRecords = 0;
                            while (processWritePortion(sr, 2, 1000))
                            {
                                Console.Write("Portion");
                            }

                        }
                        for (var i = 0; i < 2; i++)
                        {
                            semaphore.WaitOne();
                        }
                        semaphore.Dispose();

                        bulkCopy.Close();

                        EndTimer = DateTime.Now;
                        Console.WriteLine("Records: {0}\nStartTime: {1}\nExecution Time: {2}", TotalRecords, StartTimer.ToString("yyyy-MM-dd HH:mm:ss"), new TimeSpan(EndTimer.Ticks - StartTimer.Ticks));
                        Console.ReadKey();

                    }
                }
                catch (AggregateException ae)
                {
                    foreach (var e in ae.InnerExceptions)
                    {
                        Console.WriteLine("Import error:");
                        Console.WriteLine(e.Message);
                        Console.ReadKey();
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine("Import error:");
                    Console.WriteLine(e.Message);
                    Console.ReadKey();
                }

            }

            private static void ColumnMappings(SqlBulkCopy bulkCopy)
            {
                throw new NotImplementedException();
            }

            private static string GetConnectionString(string SqlServer, string SqlDatabase)
            {
                throw new NotImplementedException();
            }


            private static object[] importRecord(string line)
            {

                Object[] rowArray = new Object[50];
                String[] Columns = new String[11];
                Int32[] Value_ID = new Int32[20];
                Decimal[] Value_Changes = new Decimal[20];

                fSplit(line, '|', Columns);

                rowArray[0] = Columns[0];
                rowArray[1] = Convert.ToDateTime(Columns[1], ruRuCulture);
                rowArray[2] = Columns[2];
                rowArray[3] = Columns[3];
                rowArray[4] = Decimal.Parse(Columns[4], ci);
                rowArray[5] = Decimal.Parse(Columns[5], ci);
                rowArray[6] = Decimal.Parse(Columns[6], ci);
                rowArray[7] = Columns[7];
                rowArray[8] = Columns[8];
                rowArray[9] = String.IsNullOrEmpty(Columns[9]) ? Convert.DBNull : Decimal.Parse(Columns[9], ci);

                String[] Values = Columns[10].Split(']');

                Int32 Id = 10;

                foreach (String Value in Values)
                {
                    if (!String.IsNullOrEmpty(Value))
                    {
                        String[] Value_structure = Value.Split('~');
                        rowArray[Id + 0] = Int32.Parse(Value_structure[0].Remove(0, 1));
                        rowArray[Id + 20] = Decimal.Parse(Value_structure[2], ci);
                        Id++;
                    }
                }

                return rowArray;
            }

            private static void fSplit(String src, char delim, String[] output)
            {
                int index = 0;
                int lindex = 0;
                int columnID = 0;
                int arrayID = 0;
                int[] array = { 0, 1, 3, 4, 5, 12, 16, 23, 25, 27, 30 };
                int i;

                while ((index = src.IndexOf(delim, lindex)) != -1)
                {
                    for (i = 0; i < 11 && array[i] != columnID; i++) ;
                    if (i < 11) output[arrayID++] = src.Substring(lindex, index - lindex);
                    if (columnID >= 30) break;
                    lindex = index + 1;
                    index++;
                    columnID++;
                }
            }


        }

...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37641300
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Адаптируй ранее высланый код под конкретную задачу (из последних сообщений)

Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
        private static void MakeTable(ref DataTable table)
        {

            table.Columns.Add("Col1", System.Type.GetType("System.String"));
            table.Columns.Add("Col2", System.Type.GetType("System.Int32"));
            table.Columns.Add("Col3", System.Type.GetType("System.Decimal"));
            table.Columns.Add("Col4", System.Type.GetType("System.String"));
            table.Columns.Add("Col5", System.Type.GetType("System.String"));
            table.Columns.Add("Col6", System.Type.GetType("System.String"));

        }

        private static string GetConnectionString(String SqlServer, String SqlDatabase)
        {
            return "Data Source=" + SqlServer + "; " +
                " Integrated Security=true;" +
                "Initial Catalog=" + SqlDatabase + ";";
        }


        private static void ColumnMappings(ref SqlBulkCopy bulkCopy)
        {
                bulkCopy.DestinationTableName = "tb_test";
                bulkCopy.ColumnMappings.Add("col1", "col1");
                bulkCopy.ColumnMappings.Add("col2", "col2");
                bulkCopy.ColumnMappings.Add("col3", "col3");
                bulkCopy.ColumnMappings.Add("col4", "col4");
                bulkCopy.ColumnMappings.Add("col5", "col5");
                bulkCopy.ColumnMappings.Add("col6", "col6");
       }
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37647563
Testor1,

Разбор писал с 0
(importRecord vs importRecord_old)

твои параметры параллелизма не использую, а захардкодил:

processWritePortion(sr, 2, 5000, ref currentTask)

2 - количество параллельных задач разбора

5000 - количество строчек в одной задаче

То есть, в данном случае, разберет 2мя нитями 10000 строк, а потом их вставит.




Код: c#
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
231.
232.
233.
234.
235.
236.
237.
238.
239.
240.
241.
242.
243.
244.
245.
246.
247.
248.
249.
250.
251.
252.
253.
254.
255.
256.
257.
258.
259.
260.
261.
262.
263.
264.
265.
266.
267.
268.
269.
270.
271.
272.
273.
274.
275.
276.
277.
278.
279.
280.
281.
282.
283.
284.
285.
286.
287.
288.
289.
290.
291.
292.
293.
294.
295.
296.
297.
298.
299.
300.
301.
302.
303.
304.
305.
306.
307.
308.
309.
310.
311.
312.
313.
314.
315.
316.
317.
318.
319.
320.
321.
322.
323.
324.
325.
326.
327.
328.
329.
330.
331.
332.
333.
334.
335.
336.
337.
338.
339.
340.
341.
342.
343.
344.
345.
346.
347.
348.
349.
350.
351.
352.
353.
354.
355.
356.
357.
358.
359.
360.
361.
362.
363.
364.
365.
366.
367.
368.
369.
370.
371.
using System;
using System.Threading.Tasks;
using System.IO;
using System.Threading;
using System.Data;
using System.Data.SqlClient;
using System.Collections.Generic;
using System.Collections;
using System.Linq;
using System.Globalization;


namespace ConsoleApplication4
{
         
        class VirtualTextReader: TextReader
        {
            int line;
            int count;
            public VirtualTextReader(int count)
            {
                this.count = count;
            }
            public override string ReadLine()
            {
                line++;
                return line > count ? null : "1|2222|333|4444|{A:one:B}{C:two:D}|666666";
            }
        }
        class Program
        {
            private static Object ObjectLock = new Object();

            private static DataTable table;
            private static SqlBulkCopy bulkCopy;
            private static String[] Records;
            private static CultureInfo ruRuCulture;
            private static CultureInfo ci;

            
            
            private class Variables
            {
                public Int32 IndexOfRead { get; set; }
                public Int32 IndexOfWrite { get; set; }
            }

 
            
            static object [][] parsePortion(object arg)
            {
                var lines = arg as string[];
                var result = new object[lines.Length][];
                int i=0;
                foreach (var line in lines)
                {
                    result[i] = importRecord(line);
                    i++;
                }
                return result;
            }
            static Task<object[][]> processParsePortion(TextReader reader, int portionSize, TaskFactory<object [][]> taskFactory, ref bool hasMore)
            {
                var portion = new string[portionSize];
                int i = 0;
                string s=null;
                do
                {
                    s = reader.ReadLine();
                    if (s != null)
                    {
                        portion[i] = s;
                        i++;
                    }
                } while (s != null && i < portionSize);

                if (i < portionSize)
                {
                    Array.Resize<string>(ref portion, i);
                }
                             
                hasMore =  s != null;
                return (i > 0) ?
                    taskFactory.StartNew(parsePortion, portion)
                    : null;
            }
            
            static DataTable makeDataTable(Task<object [][]>[] parseTasks)
            {
                // rethrow exceptions 
                // http://reedcopsey.com/2010/07/19/parallelism-in-net-part-18-task-continuations-with-multiple-tasks/
                Task.WaitAll(parseTasks);
                DataTable result = new DataTable();
                MakeTable(result);
                foreach (var task in parseTasks)
                {
                    foreach (var row in task.Result)
                    {
                        var resultRow = result.NewRow();
                        resultRow.ItemArray = row;
                        result.Rows.Add(resultRow);
                    }
                }
                return result;
            }
            static void writeToDB(Task<DataTable> task)
            {
                task.Wait();
                lock (ObjectLock)
                {
                    bulkCopy.WriteToServer(task.Result);
                }
            }
            static Boolean processWritePortion(TextReader reader, int writePortionSize, int portionSize, ref Task task)
            {
                bool hasMore = true; 
                var taskFactory = new TaskFactory<object [][]>();
                var parseTasks = new Task<object [][]>[writePortionSize];
                int i=0;
                do
                {
                    var currentTask = processParsePortion(reader, portionSize, taskFactory, ref hasMore);
                    if (currentTask != null)
                    {
                        parseTasks[i] = currentTask;
                        i++;
                    }
                }
                while (i < writePortionSize && hasMore);

                if (i > 0)
                {
                    if (writePortionSize > i)
                    {
                        Array.Resize<Task<object[][]>>(ref parseTasks, i);
                    }
                    var dataTaskFactory = new TaskFactory<DataTable>();
                    task = dataTaskFactory.ContinueWhenAll<object[][]>(parseTasks, makeDataTable)
                                   .ContinueWith(writeToDB);
                }
                else 
                {
                    task = null;
                }
                return hasMore;
            }
            static void Main(string[] args)
            {

                String SqlServer, SqlDatabase;

                Int32 ReadBatchSize, WriteBatchSize;
                Int64 TotalRecords;
                HashSet<Task> tasks = new HashSet<Task>();

                DateTime StartTimer;
                DateTime EndTimer;

                try
                {

                    ci = CultureInfo.InvariantCulture.Clone() as CultureInfo;
                    ci.NumberFormat.NumberDecimalSeparator = ".";
                    ruRuCulture = CultureInfo.GetCultureInfo("ru-Ru");

                    SqlServer = args[0];
                    SqlDatabase = args[1];
                    SqlServer = args[0];
                    SqlDatabase = args[1];

                    bulkCopy = new SqlBulkCopy(GetConnectionString(SqlServer, SqlDatabase));


                    ReadBatchSize = Convert.ToInt32(args[2]);
                    WriteBatchSize = Convert.ToInt32(args[3]);
                    Records = new String[ReadBatchSize];

                    
                    
                    ColumnMappings(bulkCopy);
                    

                    TotalRecords = 0;

                    using (bulkCopy)
                    {
                        
                        using (var sr = new VirtualTextReader(8000000))//new StreamReader(args[4]))
                        {

                            StartTimer = DateTime.Now;

                            Task currentTask = null;
                            while (processWritePortion(sr, 2, 5000, ref currentTask))
                            {
                                if (currentTask != null)
                                {
                                    tasks.Add(currentTask);
                                    currentTask = null;
                                }
                                if (tasks.Count > 1)
                                {
                                    Task.WaitAny(tasks.ToArray());
                                    Task.WaitAll(tasks.Where(t => t.IsFaulted).ToArray());
                                    tasks.RemoveWhere(task => task.IsCompleted);
                                }
                            }
                            
                            if (currentTask != null)
                            {
                                tasks.Add(currentTask);
                            }
                            Task.WaitAll(tasks.ToArray());
                        }

                        bulkCopy.Close();

                        EndTimer = DateTime.Now;
                        Console.WriteLine("Records: {0}\nStartTime: {1}\nExecution Time: {2}", TotalRecords, StartTimer.ToString("yyyy-MM-dd HH:mm:ss"), new TimeSpan(EndTimer.Ticks - StartTimer.Ticks));
                        Console.ReadKey();

                    }
                }
                catch (AggregateException ae)
                {
                    foreach (var e in ae.Flatten().InnerExceptions)
                    {
                        Console.WriteLine("Import error:");
                        Console.WriteLine(e.Message);                        
                        Console.ReadKey();
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine("Import error:");
                    Console.WriteLine(e.Message);
                    Console.ReadKey();
                }

            }

            private static void MakeTable(DataTable table)
            {

                table.Columns.Add("Col1", System.Type.GetType("System.String"));
                table.Columns.Add("Col2", System.Type.GetType("System.Int32"));
                table.Columns.Add("Col3", System.Type.GetType("System.Decimal"));
                table.Columns.Add("Col4", System.Type.GetType("System.String"));
                table.Columns.Add("Col5", System.Type.GetType("System.String"));
                table.Columns.Add("Col6", System.Type.GetType("System.String"));

            }

            private static string GetConnectionString(String SqlServer, String SqlDatabase)
            {
                return "Data Source=" + SqlServer + "; " +
                    " Integrated Security=true;" +
                    "Initial Catalog=" + SqlDatabase + ";";
            }


            private static void ColumnMappings(SqlBulkCopy bulkCopy)
            {
                bulkCopy.DestinationTableName = "tb_test";
                bulkCopy.ColumnMappings.Add("col1", "col1");
                bulkCopy.ColumnMappings.Add("col2", "col2");
                bulkCopy.ColumnMappings.Add("col3", "col3");
                bulkCopy.ColumnMappings.Add("col4", "col4");
                bulkCopy.ColumnMappings.Add("col5", "col5");
                bulkCopy.ColumnMappings.Add("col6", "col6");
            }

            private static String[] parseComplexColumn(string value)
            {
                var g = System.Text.RegularExpressions.Regex.Match(value, 
                    @"\{(.*):.*:(.*)}{(.*):.*:(.*)\}").Groups;
                return new String[] { g[1].Value, g[2].Value, g[3].Value, g[4].Value };
            }

            /// <summary>
            /// Создай проект
            /// пусть в цикле передается одна и та же строка "1|22|333|4444|{A:one:B}{C:two:D}|666666"
            /// в таблицу в отдельные столбцы должны быть загружены
            /// 22,4444,A,B,C,D
            /// </summary>
            /// <param name="line"></param>
            /// <returns></returns>
            private static object[] importRecord(string line)
            {
                var columns = line.Split('|');
                var result = new Object[6];
                result[0] = columns[1];
                result[1] = Int32.Parse(columns[2]);
                result[2] = Decimal.Parse(columns[3], ci);

                var complex = parseComplexColumn(columns[4]);
                result[3] = complex[0];
                result[4] = complex[1];
                result[5] = complex[2];
                //result[6] = complex[3];

                return result;
            }
            
            private static object[] importRecord_old(string line)
            {

                Object[] rowArray = new Object[50];
                String[] Columns = new String[11];
                Int32[] Value_ID = new Int32[20];
                Decimal[] Value_Changes = new Decimal[20];

                fSplit(line, '|', Columns);


                rowArray[0] = Columns[0];
                rowArray[1] = Convert.ToDateTime(Columns[1], ruRuCulture);
                rowArray[2] = Columns[2];
                rowArray[3] = Columns[3];
                rowArray[4] = Decimal.Parse(Columns[4], ci);
                rowArray[5] = Decimal.Parse(Columns[5], ci);
                rowArray[6] = Decimal.Parse(Columns[6], ci);
                rowArray[7] = Columns[7];
                rowArray[8] = Columns[8];
                rowArray[9] = String.IsNullOrEmpty(Columns[9]) ? Convert.DBNull : Decimal.Parse(Columns[9], ci);

                String[] Values = Columns[10].Split(']');

                Int32 Id = 10;

                foreach (String Value in Values)
                {
                    if (!String.IsNullOrEmpty(Value))
                    {
                        String[] Value_structure = Value.Split('~');
                        rowArray[Id + 0] = Int32.Parse(Value_structure[0].Remove(0, 1));
                        rowArray[Id + 20] = Decimal.Parse(Value_structure[2], ci);
                        Id++;
                    }
                }

                return rowArray;
            }

            private static void fSplit(String src, char delim, String[] output)
            {
                int index = 0;
                int lindex = 0;
                int columnID = 0;
                int arrayID = 0;
                int[] array = { 0, 1, 3, 4, 5, 12, 16, 23, 25, 27, 30 };
                int i;

                while ((index = src.IndexOf(delim, lindex)) != -1)
                {
                    for (i = 0; i < 11 && array[i] != columnID; i++) ;
                    
                    if (i < 11) 
                        output[arrayID++] = src.Substring(lindex, index - lindex);
                    
                    if (columnID >= 30) 
                        break;
                    
                    lindex = index + 1;
                    index++;
                    columnID++;
                }
            }
        }

}


...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37647683
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
СТРАННОЕ МЕСТОTestor1,

Разбор писал с 0
(importRecord vs importRecord_old)

твои параметры параллелизма не использую, а захардкодил:

processWritePortion(sr, 2, 5000, ref currentTask)

2 - количество параллельных задач разбора

5000 - количество строчек в одной задаче

То есть, в данном случае, разберет 2мя нитями 10000 строк, а потом их вставит.


1. Проверил, что загруженные данные не искаженны?
2. Какова скорость загрузки ?
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37647882
Testor1,

Мельком глянул - похоже количество, количество строчек - совпадает
Скорость не мерял. В процессе глянул на перформанс монитор - узкое место - диск. Но, может не попал на что-то. Теоретически оно должно писать и готовить следующую порцию одновременно, но если есть кто поопытнее - пусть поправит
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37648233
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
странное местоTestor1,

Мельком глянул - похоже количество, количество строчек - совпадает
Скорость не мерял. В процессе глянул на перформанс монитор - узкое место - диск. Но, может не попал на что-то. Теоретически оно должно писать и готовить следующую порцию одновременно, но если есть кто поопытнее - пусть поправит

Проверь что данные не исказились
Что во всех строчках они одинаковые.
У меня с этим была проблема.
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37649622
CTPAHHOE MECTO
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Testor1Проверь что данные не исказились
Что во всех строчках они одинаковые.
У меня с этим была проблема.
Проверил на 80к - все нормально
...
Рейтинг: 0 / 0
c# 4.0. Многопоточность и блокировки
    #37651750
Testor1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
CTPAHHOE MECTOTestor1Проверь что данные не исказились
Что во всех строчках они одинаковые.
У меня с этим была проблема.
Проверил на 80к - все нормально

На днях проверю твой вариант.
Спасибо за внимание !!!
...
Рейтинг: 0 / 0
14 сообщений из 39, страница 2 из 2
Форумы / Программирование [игнор отключен] [закрыт для гостей] / c# 4.0. Многопоточность и блокировки
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]