跳至主要內容

MapReduce 基础入门

Vingkin...大约 8 分钟

1. MapReduce相关介绍

MapReduce是一个离线计算框架,针对静态数据集,不支持动态的离线数据

1.1 Hadoop Writable序列化机制

序列化 (Serialization)是将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程。

反序列化(Deserialization)是将字节流转换为一系列结构化对象的过程,重新创建该对象。

Hadoop通过Writable接口实现的序列化机制,接口提供两个方法writereadFields

  • write叫做序列化方法,用于把对象指定的字段写出去;

  • readFields叫做反序列化方法,用于从字节流中读取字段重构对象;

Hadoop没有提供对象比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable

WritableComparable接口可用于用户自定义对象的比较规则。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
Hadoop数据类型Java数据类型
BooleanWritableboolean
ByteWritablebyte
IntWritableint
FloatWritablefloat
LongWritablelong
DoubleWritabledouble
TextString
MapWritablemap
ArrayWritablearray
NullWritablenull

如果想自定义Hadoop数据类型

  • 自定义对象必须实现Hadoop的序列化机制Writable
  • 如果需要将自定义的对象作为key传递,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。你得指定对象的排序规则是什么。

2. WordCount

所需要的依赖以及maven打包插件

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.1.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>3.1.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>3.1.4</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.32</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>2.4</version>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass>com.vingkin.mapreduce.wordcount.WordCountDriver_v2</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>

Map阶段

WordCountMapper.java

Map阶段的处理类,对应着MapTask

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    private Text outkey = new Text();
    private final static LongWritable outvalue = new LongWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        // 拿取一行数据转换为String
        String line = value.toString();
        // 根据分隔符进行切割
        String[] words = line.split("\\s+");
        // 遍历数组
        for (String word : words) {
            outkey.set(word);
            // 输出数据 吧每个单词都标记1 输出的结果<单词,1>
            context.write(outkey, outvalue);
        }
    }
}

相关解析

Mapper的四个泛型分别为:

  • KEYIN:LongWritable
  • VALUEIN:Text
  • KEYOUT:Text
  • VALUEOUT:LongWritable

在map阶段之前,MapReduce程序已采用其默认的读取数据组件TextInputFormat对文本数据进行了读取,每一行返回一个kv键值对(map方法基于行来调用,每一行调用一次map方法进行业务处理),分别表示每一行起始位置的偏移量和这一行的文本内容。因此,Mapper的KEYIN和VALUEIN分别对应着TextInputFormat返回的键值对,其Hadoop数据类型分别为LongWritable和Text。

在执行map的时候,先将LongWritable类型的value转换成String类型,再通过正则对一行文本进行分隔。最后通过context上下文对象将每个单词都组成<单词, 1>形式的键值对进行输出。因此KEYOUT和VALUEOUT的数据类型分别为Text和LongWritable。

Reduce阶段

WordCountReducer.java

Reduce阶段的处理类,对应着ReduceTask

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    private LongWritable outValue = new LongWritable();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        // 统计变量
        long count = 0;
        for (LongWritable value : values) { // <hadoop,Iterable[1,1,1]>
            count += value.get();
        }
        outValue.set(count);
        // 最终使用上下文对此项输出结果
        context.write(key, outValue);
    }
}

相关解析

Reducer的四个泛型分别为:

  • KEYIN
  • VALUEIN
  • KEYOUT
  • VALUEOUT

Reducer的四个泛型很好确定,Reducer的KEYIN和VALUEIN对应着Mapper的KEYOUT和VALUEOUT,因此为Text和LongWritable。对于KEYOUT和VALUEOUT要根据具体的业务进行判断,此业务是统计单词个数,因而KEYOUT为单词Text,VALUEOUT为单词个数LongWritable。

在Map阶段和Reduce阶段之间MapReduce程序默认对数据进行了相关的处理。对于Map阶段的输出结果我们可以假设为<hadoop,1><hadoop,1><hello,1><hello,1><hadoop,1>,MapReduce程序会对数据先按key的字典序对键值对进行排序,然后再以key相同的为一组,分组之后,同一组数据组成一个新的kv键值对,调用一次reduce(reduce方法基于分组调用,一个分组调用一次)。

具体的数据操作如下:

  • 排序:<hadoop,1><hadoop,1><hadoop,1><hello,1><hello,1>
  • 分组:
    • <hadoop,1><hadoop,1><hadoop,1>
    • <hello,1><hello,1>
  • 分组后新的kv键值对:
    • <hadoop,Iterable[1,1,1]>
    • <hello,Iterable[1,1]>

map的执行流程为:对于每一个分组,对values进行迭代遍历加出该key的总个数,再将key作为输出的key,个数作为输出的value通过context上下文对象进行输出。

Driver阶段

WordCountDriver_v1.java

/**
 * @Author: Vingkin
 * @Date: Create in 14:44 2021/8/1
 * @Description: MapReduce客户端程序驱动类 构造Job对象实例
 * 指定各种组件属性 包括:mapper reducer类、输入输出的数据类型、输入输出的数据路径
 * 提交job作业 job.submit()
 */
public class WordCountDriver_v1 {
    public static void main(String[] args) throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        // 构建Job作业的实例 参数(配置对象,Job名字)
        Job job = Job.getInstance(conf, WordCountDriver_v1.class.getSimpleName());
        // 设置MR程序运行的主类
        job.setJarByClass(WordCountDriver_v1.class);
        // 设置MR程序的mapper类 reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 指定mapper阶段输出的key value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reducer阶段输出的key value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 配置本次作业的输入数据路径 输出数据路径
        // todo 默认组件 TextInputFormat TextOutputFormat
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 提交作业
        // 采用waitForCompletion参数表示是否开启实时监视追踪作业的执行情况
        boolean resultFlag = job.waitForCompletion(true);
        // 退出程序 和job结果进行绑定
        System.exit(resultFlag ? 0 : 1);
    }
}

WordCountDriver_v2.java(效率更高,优先使用)

/**
 * @Author: Vingkin
 * @Date: Create in 14:44 2021/8/1
 * @Description: 使用工具类ToolRunner提交MapReduce作业
 */
public class WordCountDriver_v2 extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();

        // todo 使用工具类ToolRunner提交程序
        int status = ToolRunner.run(conf, new WordCountDriver_v2(), args);
        // 退出客户端
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), WordCountDriver_v2.class.getSimpleName());
        // 设置MR程序运行的主类
        job.setJarByClass(WordCountDriver_v2.class);
        // 设置MR程序的mapper类 reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 指定mapper阶段输出的key value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reducer阶段输出的key value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 配置本次作业的输入数据路径 输出数据路径
        // todo 默认组件 TextInputFormat TextOutputFormat
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

打包方式:先clean再package

hadoop jar WordCount-1.0-SNAPSHOT.jar /vingkin/input /vingkin/output

默认输入组件为TextInputFormat,通过指定input文件夹,就处理该目录所有的文件,把所有文件当成整体来处理。默认输出组件为*TextOutputFormat,output文件夹应该不存在,否则报错。

3. MapReduce程序运行模式

所谓运行模式是指MapReduce程序是单机运行还是分布式运行,程序所需要的运算资源是YARN分配还是本机系统自己分配

  • yarn:YARN集群模式
  • local:本地模式(默认)

在mapred-default.xml中可以进行相应的配置,需要配置的参数如下:

  • mapreduce.framework.name=yarn
  • yarn.resourcemanager.hostname=node1

但是如果在代码中(conf.set())或者运行的环境中有配置(mapred-site.xml),则会覆盖default的配置。

conf.set("mapreduce.framework.name", "yarn")

4. MapReduce流程梳理

4.1 Map阶段执行过程

  • 第一阶段:把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。

    默认Split size = Block size,每一个切片由一个MapTask处理。(getSplits)

  • 第二阶段:对切片中的数据按照一定的规则读取解析返回<key,value>对。

    默认是按行读取数据。key是每一行的起始位置偏移量,value是本行的文本内容。(TextInputFormat)

  • 第三阶段:调用Mapper类中的map方法处理数据。

    每读取解析出来的一个<key,value> ,调用一次map方法。

  • 第四阶段:按照一定的规则对Map输出的键值对进行分区partition。默认不分区,因为只有一个reducetask。

    分区的数量就是reducetask运行的数量。

  • 第五阶段:Map输出数据写入内存缓冲区,达到比例溢出到磁盘上。溢出spill的时候根据key进行排序sort

    默认根据key字典序排序。

  • 第六阶段:对所有溢出文件进行最终的merge合并,成为一个文件。

4.2 Reduce阶段执行过程

  • 第一阶段:ReduceTask会主动从MapTask复制拉取其输出的键值对。

  • 第二阶段:把复制到Reducer本地数据,全部进行合并merge,即把分散的数据合并成一个大的数据。再对合并后的数据排序

  • 第三阶段是对排序后的键值对调用reduce方法

    键相等的键值对调用一次reduce方法。最后把这些输出的键值对写入到HDFS文件中。

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.8