hadoop MapReduce Yarn(四)

hadoop运行jar包

运行一个mapreduce程序job(打包成一个jar包)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
hadoop jar
cd /root/hadoop-2.8.1/share/hadoop/mapreduce
# 用mapreduce计算圆周率 map的任务数量 每一map的取样数
hadoop jar hadoop-mapreduce-examples-2.8.1.jar pi 5 5
# yarn 创建了tmp ,同时mapreduce程序创建了user目录
# 运行计算单词出现的次数
# wordcount 将此目录下的所有文件进行统计 结果输出到此目录下
vim test.txt
hello world
hello femn
hello leipengkai
hello friend
hadoop fs -put test.txt /wordcount/input
hadoop jar hadoop-mapreduce-examples-2.8.1.jar wordcount \
/wordcount/input /wordcount/output

MapReduce

处理海量数据的运算, 即使是一个很简单的逻辑,要把它变成分布式运行的程序将会面临很多的其它问题:

运算代码程序的资源分发和启动程序的环境配置,以及代码分发到哪些datanode上,并且还得监控datanode运行状态是否正常 以及datanode的调试汇总

解决思路:运算往数据方移动,而不是数据移动到运算这方来.

MapReduce分成两个步骤去完成业务逻辑:
Map逻辑和Reduce逻辑都会在分布在datanode中,先执行Map,再执行Reduce.在Map程序时可高并发执行

实例代码

vim WCMAP.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package cn.itcast.hadoop.wcmapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
// 4个泛型中,前两个是指定map输入数据类型,KEY是输入的key类型,
// VALUEIN是输入的value的类型,后面两个是map输出给reduce的输出数据类型
// 默认情况下,框架传递给我们的map的输入数据中,
// key是要处理的文本(block)中一行的起始偏移量,这一行的内容作为value
public class WCMap extends Mapper<LongWritable, Text, Text, LongWritable>{
// Long,String,String,Long等内存对象 经过序列化之后再通过网络传递到节点中
// hadoop实现了自己的序列化机制 去掉多余的java序列化信息
private final static LongWritable one =new LongWritable(1);
private Text word = new Text();
// map()框架第每读一行数据就调用一次该方法,
// 对节点中的文本处理完所有的map之后才进入reduce
protected void map(LongWritable key,Text value,Context context)
throws IOException, InterruptedException{
// 具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来
// 在方法的参数中key-value中
// key是这一行数据的起始偏移量 value是这一行的文本内容
// context传到reduce的工具不用自己去找节点
String line = value.toString();
char split = ' ';
String[] words =StringUtils.split(line, split);
//
//遍历这个单词数组输出为k-v形式 k:单词 v:1
for (String word: words) {
context.write(new Text(word), new LongWritable(1));
}
// StringTokenizer itr = new StringTokenizer(value.toString());
// while(itr.hasMoreTokens()) {
// word.set(itr.nextToken());
// context.write(word, one);
// }
}
}
vim WCReduce.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package cn.itcast.hadoop.wcmapreduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
// 框架在map处理完之后,将所有的k-v缓存起来,进行分级,然后传递一个组,<k,vs{}>
//调用一次reduce方法 <hello,{1,1...}>
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// super.reduce(key, values, context);
long count = 0;
for (LongWritable v:values) {
count += v.get();
}
context.write(key, new LongWritable(count));
}
}
vim WCRunner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package cn.itcast.hadoop.wcmapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.mapred.jobcontrol.Job;
public class WCRunner {
//用来描述一个特定的作业
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("mapreduce.job.jar","/root/wc.jar");
Job job = Job.getInstance(conf);
//设置整个job手忙脚乱的那些类在哪个jar包中
job.setJarByClass(WCRunner.class);
//该作业使用哪个类作为逻辑处理中的map,哪个作为reduce
job.setMapperClass(WCMap.class);
job.setReducerClass(WCReduce.class);
//指定map输出数据类型k-v
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定reduce输出数据类型k-v
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//该作业要处理的数据所在的路径以及输出结果存放路径
// hdfs只有一个test.txt文件
FileInputFormat.setInputPaths(job, new Path("hdfs://cluster1:9000/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://cluster1:9000/wc/output"));
//提交集群运行
//yarn机制
job.waitForCompletion(true);
}
}

在本地测试可以启动Bebug模式,直接在虚拟机下用eclipse运行WCRunner

注意:如果是eclipse它会自动加载core-site.xml和hdfs-site.xml的配置文件,
所以可以写成如下,也是读取hdfs文件系统

1
2
FileInputFormat.setInputPaths(job, new Path("/"));
FileOutputFormat.setOutputPath(job, new Path("/wc/output"));

使用dhfs的话,可以不用启动yarn,只启动dhfs,这时的Mapreduce程序将跑在本机
不通过yarn分配

如果想要yarn分配在Node Manage运行Mapreduce,则在src下加上mapred-site.xml和yarn-site.xml文件
并加上如下代码,再运行

1
conf.set("mapreduce.job.jar","/root/wc.jar");

也可以不用从hdfs中读写文件:

1
2
FileInputFormat.setInputPaths(job, new Path("/root/test.txt"));
FileOutputFormat.setOutputPath(job, new Path("/root/output"));

在集群中运行,将将整个项目打包成jar,让hadoop yarn分发运行Mapreduce程序

1
2
3
4
hadoop fs -rm -f -R hdfs://cluster1:9000/*
hadoop fs -put test.txt hdfs://cluster1:9000/
hadoop jar wc.jar cn.itcast.hadoop.wcmapreduce.WCRunner
# -Dmapreduce.input.fileinputformat.input.dir.recursive=true

注意:如果使用jar包运行的话,必须得指定成hdfs的文件系统格式

1
2
FileInputFormat.setInputPaths(job, new Path("hdfs://cluster1:9000/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://cluster1:9000/wc/output"));

查看运行的结果

1
hadoop fs -cat hdfs://cluster1:9000/wc/output/part-r-00000

将信息打印到控制台

1
export HADOOP_ROOT_LOGGER=DEBUG,console

总结MR程序的提交运行模式

本地模式

在eclipse里面直接运行main方法,不添加mapred-site.xml和yarn-site.xml
也会提交给localjobnumber执行,输出输出数据按如下设置保存在对应路径下:

1
2
FileInputFormat.setInputPaths(job, new Path("/"));
FileOutputFormat.setOutputPath(job, new Path("/wc/output"));

集群模式运行
  1. 将项目打成jar包,上传到服务器上,然后用hadoop命令提交:

    1
    hadoop jar wc.jar cn.itcast.hadoop.wcmapreduce.WCRunner
  2. 在eclipse中运行main,在src下加上mapred-site.xml和yarn-site.xml文件
    并加上如下代码,再运行

    1
    conf.set("mapreduce.job.jar","/root/wc.jar");

Map进程数不是由block大小决定的,而是由一个切片(split)对应一个Map进程
Map task的并发数量是由切片的数量决定的,有多少个切片就启动多少个Map task

切片是个逻辑概念,指文件中数据中偏移量范围,block是物理概念
切片的具体大小应该根据所处理的文件大小来调整

MapReduce,Shuffle:分组,排序以及各种内存和磁盘缓存机制 yarn的资源分配和调度
Share Comments