Programowanie w hadoopie
1 Informacje wstępne
Będziemy wykorzystywać "stare" (oryginalne) API (pakiet org.apache.hadoop.mapred); nowe API jest w pakiecie org.apache.hadoop.mapreduce. Różnice między API są w prezentacji: http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api
Pliki:
- http://mimuw.edu.pl/~krzadca/mimuw-hadoop-1.2.1.tar.gz
- http://mimuw.edu.pl/~krzadca/hadoop-programy.tar.gz
(po rozpakowaniu, umieść katalog programy jako podkatalog dystrybucji hadoopa).
2 Podstawowe działanie
2.1 Map
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } }
2.2 Reduce
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }
2.3 Główny program
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); JobConf job = new JobConf(conf, MyWordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormat(TextOutputFormat.class); job.setInputFormat(TextInputFormat.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setCombinerClass(Reduce.class); FileInputFormat.setInputPaths( job, new Path( args[0] ) ); FileOutputFormat.setOutputPath( job, new Path( args[1] ) ); job.setJobName( "MyWordCount" ); // Run the job JobClient.runJob(job);
3 Optymalizacje
3.1 Combiner
Gdy funkcja reduce jest przemienna i łączna (co zachodzi np. w programie MyWordCount.java), możemy usprawnić nasze programy, wykonując część operacji reduce na węzłach odpowiedzialnych za map. W ten sposób zmniejszamy ilość danych przesyłanych po sieci. Formalnie proces wykonujący część operacji reduce nazywa się Combiner. Model MapReduce wymaga aby Combiner i Reduce miały ten sam typ argumentów i wyników. Dlatego najczęściej proces Combiner jest funkcjonalnie identyczny z Reduce. Aby użyć dodatkowego procesu Combiner wystarczy zaznaczyć w kodzie naszego programu:
conf.setCombinerClass(Reduce.class);
3.2 Counter
W niektórych przypadkach chcielibyśmy policzyć pewne globalne wartości (np. statystyki) w programach MapReduce. W tym celu architektura MapReduce została wzbogacona o objekty Counter. Aby użyć tych obiektów, należy najpierw zadeklarować swój własny counter:
public static enum MY_COUNTER { LONG_WORDS_NUM, SHORT_WORDS_NUM };
A następnie można go modyfikować w metodach map i reduce:
reporter.getCounter(MY_COUNTER.LONG_WORDS_NUM).increment(1); reporter.getCounter(MY_COUNTER.SHORT_WORDS_NUM).increment(1);
3.3 Przekazywanie parametrów do klas Map/Reduce
Czasami chcemy przekazać parametr do klasy MapClass i Reduce z funkcji main. Można to zrobić wpisując parametr do konfiguracji:
JobConf job = new JobConf(conf, Extract.class); job.set("nazwa_parametru", "wartosc_parametru");
W wypadku metody set możemy przekazywać wartości typu String. Jednak klasa JobConf posiada również metody setBoolean, setFloat, setInt, itp. Aby odczytać wartość parametru do klasy MapClass (ewentualnie Reduce) należy dodać metodę configure:
public void configure(JobConf conf) { String wartosc = new String(conf.get("nazwa_parametru")); }
z której mamy dostęp do parametrów.
4 Zadania
4.1 statystyki liczbowe
Napisz program który z (potencjalnie bardzo dużego) zbioru liczb całkowitych wyznaczy:
- Największą liczbę.
- Średnią arytmetyczną.
- Średnią geometryczną.
- Medianę.
- Liczbę różnych liczb.
W których przypadkach możemy użyć Combinera?