powered by simpleCommunicator - 2.0.27     © 2024 Programmizd 02
Map
Форумы / NoSQL, Big Data [игнор отключен] [закрыт для гостей] / Шаблон MapReduce
3 сообщений из 3, страница 1 из 1
Шаблон MapReduce
    #38263214
Фотография ZeD.ORA
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Доброго времени суток, пытаюсь понять суть структуры шаблона MapReduce, прочитал достаточно материалов по этой технологии в частности и про Hadoop и как он устроен, также поэкспериментировал над MapReduce фреймворками от MapR и Hadoop, но хотелось бы все попробовать, как говорится пощупать руками :).

От себя скажу что "теоретически" что есть MapReduce понял, коротко говоря это параллельный алгоритм для равномерного распределения(map) больших объемов данных и передача их в один конечный поток(reduce).

Создал примитивный пример на Java, прошу строго не судить структуру программы, все делалось в спешке дабы понять суть алгоритма :)

Main класс:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
public class MainApp
{
    public static void main(String[] args) throws Exception {

        StopWatch stopWatch = new StopWatch();
        CityTemperature ct= new CityTemperature();
        System.out.println(ct.getReduceDone());
        stopWatch.stop();
        System.out.println("Elapsed Time: " + stopWatch.getElapsedTime());
    }

}


Основной класс реализации метода Map/Reduce
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
public class CityTemperature
{
    private HashMap<String,Integer> redu= new HashMap<>();
    private HashMap<String,List<Integer>> mapOut = new HashMap<>();

    public CityTemperature() throws InterruptedException
    {
        List<Map<String, String>> states = new ArrayList<>();
        Map mp=new LinkedHashMap<String,String[]>();

        mp.put("City11", "MP,77");
        mp.put("City12", "MP,72");
        mp.put("City13", "MP,76");
        states.clear();
        states.add(mp);
        new StartMapThread(shuffleOut(map(states))).run();        
        mp.clear();

        mp.put("City31", "OR,69");
        mp.put("City32", "OR,71");
        mp.put("City33", "OR,76");
        states.clear();
        states.add(mp);
        new StartMapThread(shuffleOut(map(states))).run();
        mp.clear();

        mp.put("City21", "CG,70");
        mp.put("City22", "CG,72");
        mp.put("City23", "CG,75");
        states.clear();
        states.add(mp);
        new StartMapThread(shuffleOut(map(states))).run();
        mp.clear();

        mp.put("City31", "QT,69");
        mp.put("City32", "QT,71");
        mp.put("City33", "QT,76");
        states.clear();
        states.add(mp);
        new StartMapThread(shuffleOut(map(states))).run();

    }

    public HashMap<String,Integer> getReduceDone()
    {
        return redu;
    }

    private List<String> map(List<Map<String,String>> mapIn)
    {
        List<String> list = new ArrayList<>();
        for (Map<String,String> mp:mapIn)
        {
            list.addAll(mp.values());
        }

        return list;
    }

    private HashMap<String,List<Integer>> shuffleOut(List<String> randIn)
    {
        List<Integer> list = new ArrayList<>();
        HashMap<String,List<Integer>> map = new HashMap<>();

        String temp="";
        for (String mp:randIn)
        {
            if(isValueValid(mp))
            {
                String[] spl=mp.split(",");
                if(!temp.equals(spl[0]))
                {
                    temp=spl[0];
                    list=new ArrayList<>();
                }
                list.add(Integer.valueOf(spl[1]));
                map.put(temp,list);
            }

        }

        //добавил 200млн. цикл чтобы определить производительность параллельного вычисления
        for(int i = 0; i<=200000000;i++)
        {
            //любая операция
        }

        return map;

    }
    
    //хотя пишут Map далее Reduce, но как я понимаю между Map и Reduce 
    //существует невидимая операция которая собирает все входные данные для передачи в reduce
    private HashMap<String,List<Integer>> shuffleOutTemp(List<String> randIn)
    {
        List<Integer> list = new ArrayList<>();
        HashMap<String,List<Integer>> map = new HashMap<>();

        String temp="";
        for (String mp:randIn)
        {
            if(isValueValid(mp))
            {
                String[] spl=mp.split(",");
                if(!temp.equals(spl[0]))
                {
                    temp=spl[0];
                    list=new ArrayList<>();
                }
                list.add(Integer.valueOf(spl[1]));
                map.put(temp,list);
            }

        }

        return map;
    }

    private synchronized void reduce(HashMap<String,List<Integer>> redOut)
    {
        HashMap<String,Integer> mapR=new HashMap<>();
        for(Map.Entry<String,List<Integer>> red:redOut.entrySet())
        {
            mapR.put(red.getKey(),avgFunc(red.getValue()));
        }

        redu = mapR;

    }

    private class StartMapThread extends Thread
    {
        private HashMap<String,List<Integer>> tempList = new HashMap<>();

        public StartMapThread(HashMap<String,List<Integer>> list)
        {
            tempList = list;
        }

        public void run()
        {
            mapOut.putAll(tempList);//System.out.println(reduce(tempList));
            reduce(mapOut);
        }

    }


    private Integer avgFunc(List<Integer> vals)
    {
        Integer sumAvg=0;
        for(Integer vl:vals)
        {
            sumAvg+=vl;
        }

        return sumAvg/(vals.size()==0?1:vals.size());
    }

    private boolean isValueValid(final String value) {        
        Pattern p = Pattern.compile("\\S\\S\\,\\d+");
        Matcher m = p.matcher(value);
        return m.matches();
    }


Смысл программы в том, что надо получить среднюю температуру городов по районам.

Теперь сам вопрос :) правильно ли я реализовал структуру MapReduce, на выходе получаю идентичные значения что добавляй отдельный поток для каждого Map, что не добавляй. Если все это делается как то по другом буду весьма благодарен за пример или "тынц"на полезный ресурс. Спасибо.
...
Рейтинг: 0 / 0
Шаблон MapReduce
    #38263246
servit
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ZeD.ORAТеперь сам вопрос :) правильно ли я реализовал структуру MapReduceЭто Вам лучше подскажут в разделе Java .
ZeD.ORAЕсли все это делается как то по другом буду весьма благодарен за пример или "тынц"на полезный ресурс. Спасибо. Пример (но для СУБД Caché)
...
Рейтинг: 0 / 0
Шаблон MapReduce
    #38282293
serega_sh
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
,
Есть много тонкостей и возможностей для оптимизации. У вас, например, нет ни слова про combiner, aka локальный reduce шаг.
ZeD.ORA//хотя пишут Map далее Reduce, но как я понимаю между Map и Reduce
//существует невидимая операция которая собирает все входные данные для передачи в reduce

Предлагаю прочитать настольную книгу Тома Вайта Hadoop: The Definitive Guide .
Операция несколько сложнее, чем вы про нее написали. Промежуточный "результат" на Map-фазе партиционируется с целью распределения по reder-ам. На стороне reducer-а происходит сортировка и группировка по ключу. Благодаря тому, что все три фазы могут быть определны разработчиком, можно добиваться secindaey-sort'a, когда значения ключа отсортированы в порядке. В общем случае, порядок значений ключа не гарантируется.

Предлагаю еще раз обратиться к матчасти :)
...
Рейтинг: 0 / 0
3 сообщений из 3, страница 1 из 1
Форумы / NoSQL, Big Data [игнор отключен] [закрыт для гостей] / Шаблон MapReduce
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали тему (0):
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]