Гость
Целевая тема:
Создать новую тему:
Автор:
Форумы / Delphi [игнор отключен] [закрыт для гостей] / Многопоточный парсинг / 25 сообщений из 29, страница 1 из 2
01.07.2017, 09:23:39
    #39480794
applauser
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
Здравствуйте.

Делаю парсинг страниц. Для каждой страницы запускаю новый поток:
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
procedure TForm1.Button1Click(Sender: TObject);
var
  i, PageCount: integer;
begin
  PageCount := 120;
  for i := 0 to 49 do
    begin
      tThread := Thread.Create(true);
      tThread.tPage := IntToStr(i + 1);
      tThread.Resume;
    end;
end;


Запускаю 50 потоков в цикле (например), иначе сайт банит. Итерации соответствуют номерам страниц.
Но количество страниц на сайте может быть больше 50, могу определить это количество.
Нужен совет, как запускать цикл по 50 потоков до тех пор, пока не исчерпаются количества страниц.
Спасибо.
...
Рейтинг: 0 / 0
01.07.2017, 10:18:53
    #39480806
schi
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
applauserЗдравствуйте.

Делаю парсинг страниц. Для каждой страницы запускаю новый поток:
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
procedure TForm1.Button1Click(Sender: TObject);
var
  i, PageCount: integer;
begin
  PageCount := 120;
  for i := 0 to 49 do
    begin
      tThread := Thread.Create(true);
      tThread.tPage := IntToStr(i + 1);
      tThread.Resume;
    end;
end;


Запускаю 50 потоков в цикле (например), иначе сайт банит. Итерации соответствуют номерам страниц.
Но количество страниц на сайте может быть больше 50, могу определить это количество.
Нужен совет, как запускать цикл по 50 потоков до тех пор, пока не исчерпаются количества страниц.
Спасибо.

Например создать семафор на 50 и перед запуском нового потока подождать его. Каждый поток, начинаясь, блокирует семафор, а завершаясь - освобождает.
Дел на 10 минут.
...
Рейтинг: 0 / 0
01.07.2017, 11:16:57
    #39480830
applauser
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
schi,

Накидал на скорую руку. Всё ли верно? Просто под рукой Delphi нет.
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
var
  hSem: THandle = 0;

procedure TForm1.Button1Click(Sender: TObject);
var
  i, PageCount: integer;
begin
  PageCount := 120;
  for i := 1 to PageCount do
    begin
      hSem := CreateSemaphore(nil, 50, 50, nil);
      tThread := Thread.Create(true);
      tThread.tPage := IntToStr(i);
      tThread.Resume;
    end;
end;

procedure Thread.Execute;
var
  WaitReturn: DWORD;
begin
  while not Terminated do
    begin
      WaitReturn := WaitForSingleObject(hSem, INFINITE);
      if WaitReturn = WAIT_OBJECT_0 then
        begin
          //Выполняем действия;
          Synchronize(процедура);
          ReleaseSemaphore(hSem, 1, nil);
        end;
    end;
end;
...
Рейтинг: 0 / 0
01.07.2017, 12:23:39
    #39480861
SOFT FOR YOU
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
Ну вы гоните что ли
В новых версиях есть Parallel.For
...
Рейтинг: 0 / 0
01.07.2017, 12:24:48
    #39480862
defecator
Модератор форума
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
SOFT FOR YOUНу вы гоните что ли
В новых версиях есть Parallel.For
и что ? Тредами всё, уже нельзя пользоваться ?
...
Рейтинг: 0 / 0
01.07.2017, 12:44:40
    #39480869
SOFT FOR YOU
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
defecator,

Нельзя )
...
Рейтинг: 0 / 0
01.07.2017, 12:47:29
    #39480872
SOFT FOR YOU
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
defecator,

На самом деле дело не в тредах. А в коде, который можно подсмотреть там. Если я ничего не путаю, в стандартной реализации используется AtomicIncrement, а никакой не семафор. И тем более не for i
...
Рейтинг: 0 / 0
01.07.2017, 12:55:18
    #39480875
alekcvp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
applauserschi,

Накидал на скорую руку. Всё ли верно? Просто под рукой Delphi нет.
Код: pascal
1.
2.
  for i := 1 to PageCount do
     hSem := CreateSemaphore(nil, 50, 50, nil);


Мне кажется что нет, зачем вам дохреналиард семафоров?
...
Рейтинг: 0 / 0
01.07.2017, 13:09:25
    #39480881
applauser
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
alekcvp,

Прошу прощения, семафор должен создаваться вне цикла, разумеется:
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
var
  hSem: THandle = 0;

procedure TForm1.Button1Click(Sender: TObject);
var
  i, PageCount: integer;
begin
  PageCount := 120;
  hSem := CreateSemaphore(nil, 50, 50, nil);
  for i := 1 to PageCount do
    begin
      tThread := Thread.Create(true);
      tThread.tPage := IntToStr(i);
      tThread.Resume;
    end;
end;

procedure Thread.Execute;
var
  WaitReturn: DWORD;
begin
  while not Terminated do
    begin
      WaitReturn := WaitForSingleObject(hSem, INFINITE);
      if WaitReturn = WAIT_OBJECT_0 then
        begin
          //Выполняем действия;
          Synchronize(процедура);
          ReleaseSemaphore(hSem, 1, nil);
        end;
    end;
end;
...
Рейтинг: 0 / 0
01.07.2017, 13:27:43
    #39480892
wadman
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
Создал список страниц, создал пул потоков и по факту передачи ссылки потоку на страницу удаляешь её из списка.
Делов-то...
...
Рейтинг: 0 / 0
01.07.2017, 13:53:05
    #39480897
defecator
Модератор форума
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
SOFT FOR YOUdefecator,

На самом деле дело не в тредах. А в коде, который можно подсмотреть там. Если я ничего не путаю, в стандартной реализации используется AtomicIncrement, а никакой не семафор. И тем более не for i

Я использую библиотеку MtxVec, там реализован параллельный for и много чего ещё,
и гораздо круче, чем в Parallel.

В Паралель я не могу запустить сотни четыре потоков, приложение выходит из-под контроля,
а в MtxVec он ещё и по ядрам сам раскладывает нагрузку.
...
Рейтинг: 0 / 0
01.07.2017, 14:19:00
    #39480905
schi
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
applauserschi,

Накидал на скорую руку. Всё ли верно? Просто под рукой Delphi нет.
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
var
  hSem: THandle = 0;

procedure TForm1.Button1Click(Sender: TObject);
var
  i, PageCount: integer;
begin
  PageCount := 120;
  for i := 1 to PageCount do
    begin
      hSem := CreateSemaphore(nil, 50, 50, nil);
      tThread := Thread.Create(true);
      tThread.tPage := IntToStr(i);
      tThread.Resume;
    end;
end;

procedure Thread.Execute;
var
  WaitReturn: DWORD;
begin
  while not Terminated do
    begin
      WaitReturn := WaitForSingleObject(hSem, INFINITE);
      if WaitReturn = WAIT_OBJECT_0 then
        begin
          //Выполняем действия;
          Synchronize(процедура);
          ReleaseSemaphore(hSem, 1, nil);
        end;
    end;
end;



Нет, ждать семафора следует ПЕРЕД созданием потока, а не внутри него.
Ну и вынести создание семафора за цикл, уже написали.
...
Рейтинг: 0 / 0
01.07.2017, 15:14:37
    #39480914
YuRock
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
applauser,

Смысла в этих потоках не будет никакого все равно, раз "процедура" через synchronize вызывается. В лучшем случае будет просто медленнее, чем если в главном потоке по очереди в цикле по одной странице обрабатывать.
...
Рейтинг: 0 / 0
01.07.2017, 17:53:42
    #39480940
Bred eFeM
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
defecatorЯ использую библиотеку MtxVec, там реализован параллельный for и много чего ещё,
и гораздо круче, чем в Parallel. и что ещё, например, - исходники есть?
В Паралель я не могу запустить сотни четыре потоков зачем ?!
приложение выходит из-под контроля синей изолентой примотай
а в MtxVec он ещё и по ядрам сам раскладывает нагрузку. а гипертрейдинговую пару как одно ядро считает или как?
...
Рейтинг: 0 / 0
01.07.2017, 17:58:02
    #39480942
Bred eFeM
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
wadmanСоздал список страниц, создал пул потоков и по факту передачи ссылки потоку на страницу удаляешь её из списка.
Делов-то...у человека проблема как 120 на 50 поровну разделить авторНужен совет, как запускать цикл по 50 потоков до тех пор, пока не исчерпаются количества страниц.а тут списки, пулл ...
...
Рейтинг: 0 / 0
01.07.2017, 21:42:20
    #39480979
чччД
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
applauser,

используйте ZeroMQ, эта задача как раз для него: 16591809
Никаких мьютексов и семафором (и, следовательно, никаких дедлоков), только сообщения.

Вот твоя задачка, решенная в рамках нитей, не процессов. Но с ZMQ ты легко ее можешь масштабировать, вместо нитей создавая процессы, в том числе и на других компьютерах сети: достаточно лишь изменить протокол с inproc на tcp и изменить строку коннекта.

В zmq уже встроены очереди и алгоритмы распределения заданий (сообщений) свободным клиентам.

Весь код
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
program MultiParsing;

{$APPTYPE CONSOLE}

uses
  FastMM4,
  SysUtils,
  zmq_h in '..\libZMQ\zmq_h.pas',
  zmq_classes in '..\libZMQ\zmq_classes.pas',
  Math;

const
  cMaxThreads = 5;
  cPagesCount = 20;
  cMasterEndPint = 'inproc://Parsers';


procedure Parser(aArgs: Pointer; aContext: TzmqContext; aInnerPipe: TzmqSocket);
var
  fMsg: TzmqMessage;
  fMasterSocket: TzmqSocket;
  fStr: string;
  fInnerNumber: Integer;
begin
  fInnerNumber := pInteger(aArgs)^; // Номер парасера
  fMasterSocket := aContext.Socket(stRouter);
  fMasterSocket.Connect(cMasterEndPint);
  fMsg := TzmqMessage.Create;
  try
    fMsg.Add().AsString := 'Ready!';
    aInnerPipe.Send(fMsg);
    while fMasterSocket.Recv(fMsg) >= 0 do begin; // Получение задания на парсинг;
      fStr := fMsg[1].AsString; // Текст задания
      if fStr = 'Parse' then begin
        sleep(Random(1000));// кагбэ.. выполняем задание, "парсим"
        // Для сокета ROUTER в fMsg[0] хранится "обратный адрес"
        fMsg[1].AsString := 'OK'; //  Сообщаем о результате
        fMsg[2].AsInteger := fInnerNumber; //  Идентификатор парсера
        fMasterSocket.Send(fMsg)
      end;
    end;
  finally
    FreeAndNil(fMsg);
  end;
end;

procedure doIt();
var
  fCtx: TzmqContext;
  i: Integer;
  fOuterPipe: TzmqSocket;
  fParserSocket: TzmqSocket;
  fMsg: TzmqMessage;
begin
  fCtx := TzmqContext.Create();
  fParserSocket := fCtx.Socket(stDealer);
  fParserSocket.Bind(cMasterEndPint);
  fMsg := nil;
  try
    // Создание пула парсеров:
    for i := 0 to Pred(Min(cMaxThreads, cPagesCount)) do begin
      fOuterPipe := fCtx.Fork_Attached_Thread(@i, @Parser); // Создаем нить - парсер
      fOuterPipe.Recv(fMsg); // Дожидаемся запуска
      Writeln('Parser ', i, ':', fMsg[0].AsString);
    end;
    FreeAndNil(fMsg);
    // // Отправка заданий через очередь  сообщений
    for i := 0 to Pred(cPagesCount) do begin
      fMsg := TzmqMessage.Create;
      fMsg.Add.AsString := 'Parse'; // "Задание"
      fMsg.Add.AsInteger := i; // Параметр "Задания", в данном случае - № страницы
      fParserSocket.Send(fMsg);
    end;
    for i := 0 to Pred(cPagesCount) do begin // Отправка задания
      fMsg := TzmqMessage.Create;
      fMsg.Add.AsString := 'Parse'; // "Задание"
      fMsg.Add.AsInteger := i; //Параметр - № страницы
      fParserSocket.Send(fMsg);
    end;
    // Ждем завершения работы по парсингу страниц
    for i := Pred(cPagesCount) downto 0 do begin 
     fParserSocket.Recv(fMsg);
      Writeln('Parser ', fMsg[1].AsInteger, ' - ', fMsg[0].AsString );
    end;

  finally
    FreeAndNil(fMsg);
    FreeAndNil(fCtx);
  end;

end;

begin
  doIt();
  Readln;
end.




Сперва создаем столько нитей-парсеров, сколько нужно. Минимум из максимально допустимого числа нитей и общего числа страниц.
Почему не раздаем задания сразу? Потому, что в реальных системах коннект может выполняться не мгновенно, а очередь существует как на приеме, так и на передаче (длина очереди регулируется). Как только появится готовая к работе нить, все задания могу быть отправлены ей, так как другие нити еще не запустились и не приконнектились к серверу.
Поэтому сперва запускаем все парсеры, они сообщают о готовности и начинаю ждать получение задания:

Код: pascal
1.
2.
3.
4.
5.
6.
    // Создание пула парсеров:
    for i := 0 to Pred(Min(cMaxThreads, cPagesCount)) do begin
      fOuterPipe := fCtx.Fork_Attached_Thread(@i, @Parser); // Создаем нить - парсер
      fOuterPipe.Recv(fMsg); // Дожидаемся запуска
      Writeln('Parser ', i, ':', fMsg[0].AsString);
    end;



Вот сам парсер:

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
procedure Parser(aArgs: Pointer; aContext: TzmqContext; aInnerPipe: TzmqSocket);
var
  fMsg: TzmqMessage;
  fMasterSocket: TzmqSocket;
  fStr: string;
  fInnerNumber: Integer;
begin
  fInnerNumber := pInteger(aArgs)^; // Номер парасера
  fMasterSocket := aContext.Socket(stRouter);
  fMasterSocket.Connect(cMasterEndPint);
  fMsg := TzmqMessage.Create;
  try
    fMsg.Add().AsString := 'Ready!'; // Сообщение... 
    aInnerPipe.Send(fMsg); // ...о готовности к работе.

    // Рабочий цикл: ждем приема команды на работу. Если <0 -  значит, приложение завершается
    while fMasterSocket.Recv(fMsg) >= 0 do begin; // Получение задания на парсинг;
      fStr := fMsg[1].AsString; // Текст задания
      if fStr = 'Parse' then begin
        sleep(Random(1000));// кагбэ.. выполняем задание, "парсим"
        // Для сокета ROUTER в fMsg[0] хранится "обратный адрес"
        fMsg[1].AsString := 'OK'; //  Сообщаем о результате
        fMsg[2].AsInteger := fInnerNumber; //  Идентификатор парсера
        fMasterSocket.Send(fMsg)
      end;
    end;
  finally
    FreeAndNil(fMsg);
  end;
end;


...
Ну вот, можно напихать заданий в очередь. По мере готовности парсеров задание будет извлекаться из очереди и отправляться свободному парсеру. Если очередь переполнится, программа тормознет на этом месте до момента освобождения ресурса:
Код: pascal
1.
2.
3.
4.
5.
6.
    for i := 0 to Pred(cPagesCount) do begin // Отправка задания
      fMsg := TzmqMessage.Create;
      fMsg.Add.AsString := 'Parse'; // "Задание"
      fMsg.Add.AsInteger := i; //Параметр - № страницы
      fParserSocket.Send(fMsg);
    end;



Парсеры шлют результат работы, которые накапливаются в приемной очереди.
Мы знаем, что результатов должно быть столько же, сколько и страниц. Дожидаемся этих результатов:
Код: pascal
1.
2.
3.
4.
5.
    // Ждем завершения работы по парсингу страниц
    for i := Pred(cPagesCount) downto 0 do begin 
     fParserSocket.Recv(fMsg);
      Writeln('Parser ', fMsg[1].AsInteger, ' - ', fMsg[0].AsString );
    end;
...
Рейтинг: 0 / 0
01.07.2017, 21:42:52
    #39480980
чччД
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
...
Рейтинг: 0 / 0
01.07.2017, 21:52:00
    #39480984
чччД
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
чччД...с ZMQ ты легко ее можешь масштабировать, вместо нитей создавая процессы, в том числе и на других компьютерах сети: достаточно лишь изменить протокол с inproc на tcp и изменить строку коннекта...

Более того, так как
Код: pascal
1.
       // Для сокета ROUTER в fMsg[0] хранится "обратный адрес"

- данный парсер всегда отсылает результат именно отправителю задания. Таким образом, задания на обработку ("парсинг") могут поступать откуда угодно, из разных нитей, процессов и компьютеров-узлов сети.
...
Рейтинг: 0 / 0
02.07.2017, 00:14:39
    #39481001
defecator
Модератор форума
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
Bred eFeMdefecatorЯ использую библиотеку MtxVec, там реализован параллельный for и много чего ещё,
и гораздо круче, чем в Parallel. и что ещё, например, - исходники есть?
В Паралель я не могу запустить сотни четыре потоков зачем ?!
приложение выходит из-под контроля синей изолентой примотай
а в MtxVec он ещё и по ядрам сам раскладывает нагрузку. а гипертрейдинговую пару как одно ядро считает или как?
наверное, ты сам себе сейчас показался самым умным
...
Рейтинг: 0 / 0
02.07.2017, 00:18:10
    #39481002
rgreat
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
А действительно, зачем 400 потоков. У тебя 200 ядерный сервер?
...
Рейтинг: 0 / 0
02.07.2017, 00:23:39
    #39481004
defecator
Модератор форума
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
rgreatА действительно, зачем 400 потоков. У тебя 200 ядерный сервер?
тут я уже создавал тему насчёт создания тысяч элементарных потоков
...
Рейтинг: 0 / 0
02.07.2017, 00:36:53
    #39481007
rgreat
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
Понятней не стало.
...
Рейтинг: 0 / 0
02.07.2017, 00:58:49
    #39481014
fd00ch
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
rgreatА действительно, зачем 400 потоков. У тебя 200 ядерный сервер?потому что при скачивании страниц из веба нагрузка на проц ~ 0, узкое место не в вычислительных мощностях
...
Рейтинг: 0 / 0
02.07.2017, 02:00:25
    #39481031
rgreat
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
Хм, а разве для 400-т парралельных HTTP запросов разве обязательно надо 400 потоков?
...
Рейтинг: 0 / 0
02.07.2017, 02:17:24
    #39481033
чччД
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточный парсинг
rgreatХм, а разве для 400-т парралельных HTTP запросов разве обязательно надо 400 потоков?
Что ж ты их так боишься?
...
Рейтинг: 0 / 0
Форумы / Delphi [игнор отключен] [закрыт для гостей] / Многопоточный парсинг / 25 сообщений из 29, страница 1 из 2
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]