Hadoop?MapReduce實(shí)現(xiàn)單詞計(jì)數(shù)(Word?Count)

hadoop?mapreduce實(shí)現(xiàn)單詞計(jì)數(shù)(word?count)

 

1.map與reduce過(guò)程

1.1 map過(guò)程

首先,hadoop會(huì)把輸入數(shù)據(jù)劃分成等長(zhǎng)的輸入分片(input split)或分片發(fā)送到mapreduce。hadoop為每個(gè)分片創(chuàng)建一個(gè)map任務(wù),由它來(lái)運(yùn)行用戶自定義的map函數(shù)以分析每個(gè)分片中的記錄。在我們的單詞計(jì)數(shù)例子中,輸入是多個(gè)文件,一般一個(gè)文件對(duì)應(yīng)一個(gè)分片,如果文件太大則會(huì)劃分為多個(gè)分片。map函數(shù)的輸入以<key, value>形式做為輸入,value為文件的每一行,key為該行在文件中的偏移量(一般我們會(huì)忽視)。這里map函數(shù)起到的作用為將每一行進(jìn)行分詞為多個(gè)word,并在context中寫入<word, 1>以代表該單詞出現(xiàn)一次。

map過(guò)程的示意圖如下:

mapper代碼編寫如下:

public static class tokenizermapper
      extends mapper<object, text, text, intwritable> {
  private final static intwritable one = new intwritable(1);
  private text word = new text();
  public void map(object key, text value, context context) throws ioexception, interruptedexception {
      //每次處理一行,一個(gè)mapper里的value為一行,key為該行在文件中的偏移量
      stringtokenizer iter = new stringtokenizer(value.tostring());
      while (iter.hasmoretokens()) {
          word.set(iter.nexttoken());
          // 向context中寫入<word, 1>
          context.write(word, one);
          system.out.println(word);
      }
  }
}

如果我們能夠并行處理分片(不一定是完全并行),且分片是小塊的數(shù)據(jù),那么處理過(guò)程將會(huì)有一個(gè)好的負(fù)載平衡。但是如果分片太小,那么管理分片與map任務(wù)創(chuàng)建將會(huì)耗費(fèi)太多時(shí)間。對(duì)于大多數(shù)作業(yè),理想分片大小為一個(gè)hdfs塊的大小,默認(rèn)是64mb。

map任務(wù)的執(zhí)行節(jié)點(diǎn)和輸入數(shù)據(jù)的存儲(chǔ)節(jié)點(diǎn)相同時(shí),hadoop的性能能達(dá)到最佳,這就是計(jì)算機(jī)系統(tǒng)中所謂的data locality optimization(數(shù)據(jù)局部性優(yōu)化)。而最佳分片大小與塊大小相同的原因就在于,它能夠保證一個(gè)分片存儲(chǔ)在單個(gè)節(jié)點(diǎn)上,再大就不能了。

1.2 reduce過(guò)程

接下來(lái)我們看reducer的編寫。reduce任務(wù)的多少并不是由輸入大小來(lái)決定,而是需要人工單獨(dú)指定的(默認(rèn)為1個(gè))。和上面map不同的是,reduce任務(wù)不再具有本地讀取的優(yōu)勢(shì)————一個(gè)reduce任務(wù)的輸入往往來(lái)自于所有mapper的輸出,因此map和reduce之間的數(shù)據(jù)流被稱為shuffle(洗牌)。hadoop會(huì)先按照key-value對(duì)進(jìn)行排序,然后將排序好的map的輸出通過(guò)網(wǎng)絡(luò)傳輸?shù)絩educe任務(wù)運(yùn)行的節(jié)點(diǎn),并在那里進(jìn)行合并,然后傳遞到用戶定義的reduce函數(shù)中。

reduce 函數(shù)示意圖如下:

reducer代碼編寫如下:

 public static class intsumreducer
          extends reducer<text, intwritable, text, intwritable>{
      private intwritable result = new intwritable();
      public void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception{
          int sum = 0;
          for (intwritable val : values) {
              sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
      }
  }

 

2.完整代碼

2.1 項(xiàng)目架構(gòu)

關(guān)于vscode+java+maven+hadoop開(kāi)發(fā)環(huán)境搭建,可以參見(jiàn)我的博客《vscode+maven+hadoop開(kāi)發(fā)環(huán)境搭建》,此處不再贅述。這里展示我們的項(xiàng)目架構(gòu)如下:

word-count-hadoop
├─ input
│ ├─ file1
│ ├─ file2
│ └─ file3
├─ output
├─ pom.xml
├─ src
│ └─ main
│ └─ java
│ └─ wordcount.java
└─ target

wordcount.java代碼如下:

import java.io.ioexception;
import java.util.stringtokenizer;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.mapper;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
public class wordcount{
  public static class tokenizermapper
          extends mapper<object, text, text, intwritable> {
      private final static intwritable one = new intwritable(1);
      private text word = new text();

      public void map(object key, text value, context context) throws ioexception, interruptedexception {
      //每次處理一行,一個(gè)mapper里的value為一行,key為該行在文件中的偏移量
          stringtokenizer iter = new stringtokenizer(value.tostring());
          while (iter.hasmoretokens()) {
              word.set(iter.nexttoken());
              // 向context中寫入<word, 1>
              context.write(word, one);
          }
      }
  }

  public static class intsumreducer
          extends reducer<text, intwritable, text, intwritable>{
      private intwritable result = new intwritable();
      public void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception{
          int sum = 0;
          for (intwritable val : values) {
              sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
      }
  }

  public static void main(string[] args) throws exception{
      configuration conf = new configuration();
      job job = job.getinstance(conf, "word_count");

      job.setjarbyclass(wordcount.class);

      job.setmapperclass(tokenizermapper.class);
      //此處的combine操作意為即第每個(gè)mapper工作完了先局部reduce一下,最后再全局reduce
      job.setcombinerclass(intsumreducer.class);
      job.setreducerclass(intsumreducer.class);

      job.setoutputkeyclass(text.class);
      job.setoutputvalueclass(intwritable.class);

      //第0個(gè)參數(shù)是輸入目錄,第1個(gè)參數(shù)是輸出目錄
      //先判斷output path是否存在,如果存在則刪除
      path path = new path(args[1]);// 
      filesystem filesystem = path.getfilesystem(conf);
      if (filesystem.exists(path)) {
          filesystem.delete(path, true);
      }

      //設(shè)置輸入目錄和輸出目錄
      fileinputformat.addinputpath(job, new path(args[0]));
      fileoutputformat.setoutputpath(job, new path(args[1]));
      system.exit(job.waitforcompletion(true)?0:1);
  }
}

pom.xml中記得配置hadoop的依賴環(huán)境:

    ...
<!-- 集中定義版本號(hào) -->
<properties>
  <project.build.sourceencoding>utf-8</project.build.sourceencoding>
  <maven.compiler.source>17</maven.compiler.source>
  <maven.compiler.target>17</maven.compiler.target>
  <hadoop.version>3.3.1</hadoop.version>
</properties>
<dependencies>
  <dependency>
    <groupid>junit</groupid>
    <artifactid>junit</artifactid>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>
  <!-- 導(dǎo)入hadoop依賴環(huán)境 -->
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-common</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-hdfs</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-mapreduce-client-core</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-client</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-yarn-api</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
</dependencies>
...
</project>

此外,因?yàn)槲覀兊某绦蜃詭л斎雲(yún)?shù),我們還需要在vscode的launch.json中配置輸入?yún)?shù)intput(代表輸入目錄)和output(代表輸出目錄):

...
"args": [
  "input",
  "output"
],
...

編譯運(yùn)行完畢后,可以查看output文件夾下的part-r-00000文件:

david 1
goodbye 1
hello 3
tom 1
world 2

可見(jiàn)我們的程序正確地完成了單詞計(jì)數(shù)的功能。

以上就是hadoop mapreduce實(shí)現(xiàn)單詞計(jì)數(shù)(word count)的詳細(xì)內(nèi)容,更多關(guān)于hadoop mapreduce的資料請(qǐng)關(guān)注碩編程其它相關(guān)文章!

下一節(jié):jdbc用idea連接sqlserver數(shù)據(jù)庫(kù)的超實(shí)用教程

java編程技術(shù)

相關(guān)文章
亚洲国产精品第一区二区,久久免费视频77,99V久久综合狠狠综合久久,国产免费久久九九免费视频