powered by simpleCommunicator - 2.0.59     © 2025 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / C++ [игнор отключен] [закрыт для гостей] / с++11: future, async, barrier
14 сообщений из 14, страница 1 из 1
с++11: future, async, barrier
    #38571735
Фотография Анатолий Широков
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Набросал пример многопоточного получения суммы ряда

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
#include <mutex>
#include <future>
#include <iostream>
#include <algorithm>

int main()
{
    const size_t size = 100000;
    const size_t N = 10;
    int a[size];
    int c = 0;
    // генерируем ряд по арифметической прогрессии с шагом 1, для которого мы отлично знаем аналитический способ получения суммы
    std::generate(a, a + 100000, [&c]() {return c++;});
    std::future<int> f[N];
    for(size_t i = 0; i<N; i++ ) {
        f[i] = std::async([](const int*b, const int*e){return std::accumulate(b, e, 0);}, a+i*size/N, a+(i+1)*size/N);
    }
    long long s = 0;
    for(size_t i = 0; i<10; i++ ) {
        s += f[i].get();
    }
    // сумма получена расчетным путем
    std::cout << s << '\n';
    // сумма получена аналитическим путем
    std::cout << (((long long)size)*(a[0]+a[size-1]))/2 << '\n';
    return 0;
}



Не нравится вот этот цикл из за возможности непроизводительной блокировки на потоках, которые еще выполняются.

Код: plaintext
1.
2.
3.
4.
    long long s = 0;
    for(size_t i = 0; i<10; i++ ) {
        s += f[i].get();
    }



По сути, напрашивается барьер, на который вешать ожидание завершения всех future.

Попробовал было написать, но на моей реализации mingw4.8 wair_for постоянно возвращает timeout и в результате циклится:

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
 size_t done = 0;
    while( done < N ) {
        for( size_t i = 0; i < N; i++ ) {
            if( (f[i].wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) ) {
                s += f[i].get();
                done += 1;
            }
        }
    }



В связи с этим вопрос, как это грамотно делается в современном С++?
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571757
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Синхронизирован-ли f[i].get() ?
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571764
Фотография Анатолий Широков
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,

да, если нет результата, он его ждет.
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571833
Фотография Анатолий Широков
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
причину зависания нашел, надо запускать async с в флагом async

Код: plaintext
1.
        f[i] = std::async(std::launch::async, [](const int*b, const int*e){return std::accumulate(b, e, 0);}, a+i*size/N, a+(i+1)*size/N);



пересмотрел код

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
    size_t done = 0;
    while( done < N ) {
        for( size_t i = 0; i < N; i++ ) {
            if( (f[i].wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) ) {
                s += f[i].get();
                done += 1;
            }
        }
    }



ерунду написал. переписал

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
    long long s = 0;
    size_t done = 0;
    bool ready[N] = {false};
    while( done < N ) {
        for( size_t i = 0; i < N; i++ ) {
            if( !ready[i] && (f[i].wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) ) {
                s += f[i].get();
                done += 1;
                ready[i] = true;
            }
        }
    }



теперь этот код работает как и ожидалось.
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571849
Вася Уткин
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Анатолий ШироковНе нравится вот этот цикл из за возможности непроизводительной блокировки на потоках, которые еще выполняются.

Код: plaintext
1.
2.
3.
4.
    long long s = 0;
    for(size_t i = 0; i<10; i++ ) {
        s += f[i].get();
    }



По сути, напрашивается барьер, на который вешать ожидание завершения всех future.
Откуда тут непроизвольная блокировка на потоках?

Анатолий ШироковПопробовал было написать, но на моей реализации mingw4.8 wair_for постоянно возвращает timeout и в результате циклится:

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
 size_t done = 0;
    while( done < N ) {
        for( size_t i = 0; i < N; i++ ) {
            if( (f[i].wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) ) { 
                s += f[i].get();
                done += 1;
            }
        }
    }



В связи с этим вопрос, как это грамотно делается в современном С++?std::chrono::milliseconds(0) - как бы и говорит выдать timeout сразу.

И если уж хочется сразу загрузить все ядра в вашем примере, то передавайте явно std::launch::async
Код: plaintext
1.
f[i] = std::async(std::launch::async, [](const int*b, const int*e){return std::accumulate(b, e, 0);}, a+i*size/N, a+(i+1)*size/N);


Потому что по умолчанию идет launch::async | launch::deferred - а это значит, что поток запустится не в момент вызова std::async, а в момент вызова f[i].get(); - т.е. будет загружать ядра по одному.

Ещё как бы вот так делают, как раз в документации на std::async та же самая сумма ряда, попробуйте сравнить скорости:
http://en.cppreference.com/w/cpp/thread/async

Вообще вот так на GPU это делается:

http://back40computing.googlecode.com/svn-history/r1023/branches/cub/cub/docs/html/classcub_1_1_warp_scan.html
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571870
Фотография Анатолий Широков
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Вася Уткин,

Да, я уже понял свою ошибку на счет запуска async. Но теперь когда понял, то все остальное честно:

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
long long s = 0;
    size_t done = 0;
    bool ready[N] = {false};
    while( done < N ) {
        for( size_t i = 0; i < N; i++ ) {
            // если результат еще не получен, то пытаемся проверить статус передавая нулей таймаут
            // в этом случае мгновенно получу либо ready, либо timeout
            if( !ready[i] && (f[i].wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) ) {
                s += f[i].get();
                done += 1;
                ready[i] = true;
            }
        }
    }
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571921
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Анатолий Широков, а какое макс. число потоков было замечено? И есть ли вообще ограничения?
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571944
Вася Уткин
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Анатолий ШироковВася Уткин,

Да, я уже понял свою ошибку на счет запуска async. Но теперь когда понял, то все остальное честно:

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
long long s = 0;
    size_t done = 0;
    bool ready[N] = {false};
    while( done < N ) {
        for( size_t i = 0; i < N; i++ ) {
            // если результат еще не получен, то пытаемся проверить статус передавая нулей таймаут
            // в этом случае мгновенно получу либо ready, либо timeout
            if( !ready[i] && (f[i].wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) ) {
                s += f[i].get();
                done += 1;
                ready[i] = true;
            }
        }
    }


Если вы используете std::launch::async, то в этом коде нету смысла. Берите первый вариант:
(только замените там 10, на N)
Код: plaintext
1.
2.
3.
for(size_t i = 0; i<N; i++ ) {
        s += f[i].get();
    }



С std::launch::async - все потоки запущены сразу, и вам в любом случае надо дождаться завершения последнего из них.
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571977
Фотография Анатолий Широков
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Вася Уткин,

да, спасибо большое! овчинка выделки не стоит, прогнал тесты на 4-х ядерной машине

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
#include "futuretest.h"

#include <mutex>
#include <future>
#include <iostream>
#include <algorithm>

#include <QtTest/QtTest>

FutureTest::FutureTest(QObject *parent) :
    QObject(parent)
{
}

void FutureTest::simpleTestCase_data()
{
    QTest::addColumn<int>("size");
    QTest::addColumn<int>("N");
    QTest::newRow("1000000/10") << 1000000 << 10;
    QTest::newRow("1000000/5") << 1000000 << 5;
    QTest::newRow("100000/10") << 100000 << 10;
    QTest::newRow("100000/5") << 100000 << 5;
}

void FutureTest::simpleTestCase()
{
    QFETCH(int, size);
    QFETCH(int, N);
    std::vector<int> v(size);
    std::vector<std::future<int> > f(N);

    int* a = &v[0];
    int c = 0;
    // генерируем ряд по арифметической прогрессии с шагом 1, для которого мы отлично знаем аналитический способ получения суммы
    std::generate(a, a + size, [&c]() {return c++;});
    QBENCHMARK {
        for(size_t i = 0; i<N; i++ ) {
            f[i] = std::async(std::launch::async, [](const int*b, const int*e){return std::accumulate(b, e, 0);}, a+i*size/N, a+(i+1)*size/N);
        }
        long long s = 0;
        for( size_t i = 0; i < N; i++ ) {
           s += f[i].get();
        }
    }
}

void FutureTest::barrierTestCase_data()
{
    simpleTestCase_data();
}

void FutureTest::barrierTestCase()
{
    QFETCH(int, size);
    QFETCH(int, N);
    std::vector<int> v(size);
    std::vector<bool> ready(N, false);
    std::vector<std::future<int> > f(N);
    int* a = &v[0];
    int c = 0;
    // генерируем ряд по арифметической прогрессии с шагом 1, для которого мы отлично знаем аналитический способ получения суммы
    std::generate(a, a + size, [&c]() {return c++;});
    QBENCHMARK {
        for(size_t i = 0; i<N; i++ ) {
            f[i] = std::async(std::launch::async, [](const int*b, const int*e){return std::accumulate(b, e, 0);}, a+i*size/N, a+(i+1)*size/N);
        }
        long long s = 0;
        size_t done = 0;
        std::fill(ready.begin(), ready.end(), false);
        while( done < N ) {
            for( size_t i = 0; i < N; i++ ) {
                if( !ready[i] && (f[i].wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) ) {
                    s += f[i].get();
                    done += 1;
                    ready[i] = true;
                }
            }
        }
    }
}



Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
********* Start testing of FutureTest *********
Config: Using QtTest library 5.2.1, Qt 5.2.1
PASS   : FutureTest::initTestCase()
PASS   : FutureTest::simpleTestCase(1000000/10)
RESULT : FutureTest::simpleTestCase():"1000000/10":
     0.84 msecs per iteration (total: 54, iterations: 64)
PASS   : FutureTest::simpleTestCase(1000000/5)
RESULT : FutureTest::simpleTestCase():"1000000/5":
     0.74 msecs per iteration (total: 95, iterations: 128)
PASS   : FutureTest::simpleTestCase(100000/10)
RESULT : FutureTest::simpleTestCase():"100000/10":
     0.55 msecs per iteration (total: 71, iterations: 128)
PASS   : FutureTest::simpleTestCase(100000/5)
RESULT : FutureTest::simpleTestCase():"100000/5":
     0.34 msecs per iteration (total: 89, iterations: 256)
PASS   : FutureTest::barrierTestCase(1000000/10)
RESULT : FutureTest::barrierTestCase():"1000000/10":
     0.82 msecs per iteration (total: 53, iterations: 64)
PASS   : FutureTest::barrierTestCase(1000000/5)
RESULT : FutureTest::barrierTestCase():"1000000/5":
     0.71 msecs per iteration (total: 91, iterations: 128)
PASS   : FutureTest::barrierTestCase(100000/10)
RESULT : FutureTest::barrierTestCase():"100000/10":
     0.57 msecs per iteration (total: 73, iterations: 128)
PASS   : FutureTest::barrierTestCase(100000/5)
RESULT : FutureTest::barrierTestCase():"100000/5":
     0.31 msecs per iteration (total: 81, iterations: 256)
PASS   : FutureTest::cleanupTestCase()
Totals: 10 passed, 0 failed, 0 skipped
********* Finished testing of FutureTest *********
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38571999
Фотография Анатолий Широков
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,

поток в любом случае создается на каждый async, ограничения я думаю уже OS related.
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38572019
Вася Уткин
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Анатолий ШироковВася Уткин,

да, спасибо большое! овчинка выделки не стоит, прогнал тесты на 4-х ядерной машине
Да, т.к. особо не имеет смысла ставить N намного больше числа CPU-Cores, то в ближайшие лет 10 это N не будет значительным, чтобы не дожидаясь всех потоков, пытаться сложить уже полученные результаты.

А через 10 лет, будет выгодней использовать подход на подобии этого с O(logN):
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
 
template <typename RAIter>
int parallel_sum(RAIter beg, RAIter end)
{
    typename RAIter::difference_type len = end-beg;
    if(len < 1000)
        return std::accumulate(beg, end, 0);
 
    RAIter mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                              parallel_sum<RAIter>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}
 
int main()
{
    std::vector<int> v(10000, 1);
    std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
}



Собственно в будущее позволяет заглянуть GPU, где уже выбран оптимальный путь для десятков тысяч виртуальных ядер:
(в моем первом сообщении я картинкой ошибся, там для scan, а вот эта как раз для reduce)
http://back40computing.googlecode.com/svn-history/r1023/branches/cub/cub/docs/html/classcub_1_1_cta_reduce.html
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38572047
Фотография Анатолий Широков
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Вася Уткин,

да, алгоритм изящный, но показатели производительности хромают

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
template <typename RAIter>
int parallel_sum(RAIter beg, RAIter end)
{
    size_t len = end-beg;
    if(len < 1000)
        return std::accumulate(beg, end, 0);

    RAIter mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                              parallel_sum<RAIter>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}

void FutureTest::parallelSumTestCase()
{
    QFETCH(int, size);
    QFETCH(int, N);
    std::vector<int> v(size);
    int* a = &v[0];
    int c = 0;
    std::generate(a, a + size, [&c]() {return c++;});
    QBENCHMARK {
        parallel_sum(a, a + size);
    }
}



Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
PASS   : FutureTest::parallelSumTestCase(1000000/10)
RESULT : FutureTest::parallelSumTestCase():"1000000/10":
     90 msecs per iteration (total: 90, iterations: 1)
PASS   : FutureTest::parallelSumTestCase(1000000/5)
RESULT : FutureTest::parallelSumTestCase():"1000000/5":
     101 msecs per iteration (total: 101, iterations: 1)
PASS   : FutureTest::parallelSumTestCase(100000/10)
RESULT : FutureTest::parallelSumTestCase():"100000/10":
     8.3 msecs per iteration (total: 67, iterations: 8)
PASS   : FutureTest::parallelSumTestCase(100000/5)
RESULT : FutureTest::parallelSumTestCase():"100000/5":
     9.8 msecs per iteration (total: 79, iterations: 8)
PASS   : FutureTest::cleanupTestCase()
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38572110
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Если считать не SUM а AVG то можно заранее выдавать пользователю прибл. значение.
еще до того как все потоки посчитаны. Удобно с точки зрения GUI или графиков и диаграм.
...
Рейтинг: 0 / 0
с++11: future, async, barrier
    #38572132
Фотография Анатолий Широков
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
maytonЕсли считать не SUM а AVG то можно заранее выдавать пользователю прибл. значение.
еще до того как все потоки посчитаны. Удобно с точки зрения GUI или графиков и диаграм.

да, если нормальное распределение, то приблизительная оценка будет недалека от истины.
...
Рейтинг: 0 / 0
14 сообщений из 14, страница 1 из 1
Форумы / C++ [игнор отключен] [закрыт для гостей] / с++11: future, async, barrier
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]