收藏私塾在线
 

欢迎您来到私塾在线网!   

请登录! 

免费注册 


嘟嘟是个宝的笔记
状态: 离线
人气:11143
访问用户量:68
笔记经验:
总积分:17329
级别:VIP3
搜索本笔记
ta的交流分类
ta的交流主题贴(1)
ta的所有交流贴(3)
ta的全部笔记
全部笔记(8)
未分类笔记(0)
课程笔记(8)
存档
2017-07(2)
2013-12(1)
2012-12(4)
2012-11(1)

2017-07-17 09:47:23
Hadoop实战-中高级部分视频教程-视频教程学习笔记
浏览(466)|评论(95)   交流分类:学习问题讨论|笔记分类: 课程笔记

整体课程概览 视频教程学习笔记
视频课程地址:Hadoop实战-中高级部分视频教程
当前播放的视频地址:整体课程概览

1. hadoop的io,
2.3. mr的原理与编程,
4. hdfs原理,
5. hdfs与mr的管理以及集群的监控与管理,
6. hadoop的ha,
7.hadoop的rpc机制,
8 hadoop的restful api
9. 调优,
10. 集群安装

Hadoop IO:数据完整性 笔记
概览
1.数据完整性
2.压缩
3.序列化
4.基于文件的数据结构



Hadoop IO:数据完整性 笔记
如何保证?
1.写没有问题
2.读没有问题
3.数据放上去之后,时间长了之后,消磁,有没有问题



Hadoop IO:数据完整性 笔记
数据完整性采用的技术:
保证数据在传输过程中不损坏。常见的技术
1.奇偶校验
2.ecc校验纠错技术
3.crc-32循环冗余校验技术



Hadoop IO:数据完整性 笔记
ecc校验纠错技术: 适用于缓存里保存数据,保证内存里的数据不会丢失,校验完整性

crc-32循环冗余校验技术: hadoop使用的就是这个

Hadoop IO:数据完整性 笔记
写入的流程:client往datanode上写文件的同时,每512字节会生成crc校验码,同时存到datanode上,datanode会根据接受到的数据生成crc校验码,跟client传输过来的校验码进行对比,如果不一样,抛出异常:CheckSumException,client根据异常做出下一步决定

读取:client从datanode上读文件的同时,datanode会将数据和对应的crc校验码发过来,
client会根据读到的数据生成一个crc校验码,与从datanode上度过来的crc对比,如果不一致,就从其他机器上读,如果所有机器都不一致,就失败了


持久存放:一个叫DataBlockScanner的后台线程定期检查

如何修复损坏的数据:一个叫DataBlockScanner的后台线程定期检查,发现损坏的块,会从其他副本复制一个新的副本

Hadoop IO:数据完整性 笔记
hadoop fs 中的
-get
-copoLocal
有-ignoreCrc -crc机制,
指定是否使用crc校验机制

Hadoop IO:数据完整性 笔记
.filename.crc

hdoop的文件文件系统会执行客户端校验。也就是,get一个名为filename的文件时,文件系统客户端会创建一个名为.filename.crc的隐藏文件:记录每个文件快的校验和

数据块大小有io.bytes.per.checksum属性控制,块的大小作为元数据存在.crc文件中。

底层文件原生支持校验和,也可以通过fs.file.impl值为org.apache.hadoop.fs.RawLocalFileSystem来禁用校验和,或者通过代码

Configueration conf =...
FileSystem fs = new RawLocalFileSystem();
fs.initialize(null, conf);


Hadoop IO:数据完整性 笔记
fs.file.impl的默认值是org.apache.hadoop.fs.LocalFileSystem

数据块大小由io.bytes.per.checksum属性控制

Hadoop IO:数据完整性 笔记
LocalFileSystem使用CheckSumFileSystem(校验和文件系统)为自己工作。

Hadoop IO:数据完整性 笔记
docs/api/index.html中查看ChecksumFileSystem

//在ubuntu中测试代码:
LocalFileSystem fs = ChecksumFileSystem.getLocal(configuration);
fs.getChecksumFile(new Path crcPath = Path("testchecksum/testcheck.txt"))//拿到crc文件
syso(crcPath);//打印出路径为:testchecksum/testcheck.txt.crc



Hadoop IO:压缩 笔记
压缩的好处:
1. 存得跟多
2. 移动文件的时间减少

本节课程主要学习的内容
1:编码与解码
介绍了Hadoop支持的压缩格式,解码器编码器,了解了如何使用。
2:压缩与输入分割
介绍了哪些格式支持输入分割,为什么要考虑输入分割
3:在M/R中使用压缩
介绍了如何在MR中使用压缩

Hadoop IO:压缩 笔记
编码、解码器:压缩,解压算法的java代码实现类

1.deflate: org.apache.hadoop.io.compress.DefaultDodec
2.gzip:org.apache.hadoop.io.compress.GzipCodec
3.gzip2:org.apache.hadoop.io.compress.BZip2Codec
4.lzo:org.apache.hadoop.io.compress.LzopCodec
5.


Hadoop IO:压缩 笔记
s修正:
LZO: com.hadoop.compress.lzo.LzopCodec
LZO是基于GPL协议的,不是apache协议的,所以所在的包路径不一样

Hadoop IO:压缩 笔记
CompressionCodec 压缩解压缩超类
CompressionCodecFactory: 根据文件扩展名得到文件使用了哪种编码、解码器

Hadoop IO:压缩 笔记
项目TestCompression
src目录:
com.test.StreamCompressor
com.test.FileDeCompressor

resources目录:
core-site:xml:配
fs.default.name = hdfs://locahost:9000告诉fs在哪儿


mapred-site:xml配
mapred.job.tracker = localhost:9001告诉jobtracker在哪儿


Hadoop IO:压缩 笔记
com.test.StreamCompressor


Class coedecClass = class.forName(args[0]);
CopressionCodec ccodec = (CopressionCodec)ReflectionUtils.newInstance(codecClass, conf);
IOUtils.copyBytes(System.in, ccodec.createOutputStream(system.out) 4096,false);







Hadoop IO:压缩 笔记
1.com.test.StreamCompressor

Class coedecClass = class.forName(args[0]);
CompressionCodec ccodec = (CopressionCodec)ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream cos = ccodec.createOutputStream(system.out);
IOUtils.copyBytes(System.in, cos 4096,false);

cos.finish();

2. 导出为jar文件
3. echo "hello" | hadoop jar com.test.StreamCompressor org.apache.hadoop.io.compress.GzipCodec || gunzip -c

Hadoop IO:压缩 笔记
演示解压缩
FileDecompressor.java
Path myPath = new Path("压缩文件");
factory = new CompressionCodecFactory(conf);
codec = factory.getCodec(myPath);
is = codec.createInputStream(fs.open(myPath));
os = fs.create(new Path(outputuristring));
IOUtils.copyBytes(is, os, conf);

IOUtils.closeStream(is);
IOUtils.closeStream(os);


Hadoop IO:压缩 笔记
发布运行解压缩

1.gzip test.txt
2.hadoop fs -mkdir testfactory
3.hadoop fs -put test.txt.gz testfactory
4.hadoop jar compressFactory.jar com.test.FileDecompressor testfactory/test.txt.gz
5.hadoop fs -cat testfactory/test.txt

看到gzip对小文件的压缩反而会把文件变大
源文件6B, gzip压缩后35B



Hadoop IO:压缩 笔记
.deflate,.gz,.bz2,.lzo

1. 将多个文件压缩成一个文件:不支持
2. 压缩后再切割:.bz2,.lzo
3. giz使用的压缩算法和默认的算分即deflate一样,只不过文件扩展名不一样
4. lzo支持分割是有条件的:需要在文件中有索引


Hadoop IO:压缩 笔记
core-site.xml中

io.compression.codes属性用来配置编解码器,多个的情况用逗号分割




Hadoop IO:压缩 笔记
hadoop里面默认,如果有本地库的话,会先去找本地库的压缩解压算分实现

bzip2默认没有本地库的
lzo默认没有java库

Hadoop IO:压缩 笔记
2:压缩与输入分割
介绍了哪些格式支持输入分割,为什么要考虑输入分割

1. 因为1G的文件在64M大小的快上可以分为16块,就可以启16个map

2. 如果使用bzip2或者lzo压缩算法,可以继续细分,启动更多map

Hadoop IO:压缩 笔记
在mr中使用压缩

输出

配置文件中配置:永久性的
1.配置mapred.output.compress属性为true
2.配置mapred.output.compression.codec属性为自己需要的压缩解编解码器的类名
.

或者
在代码中实现:临时性的
1 jobConf = new JobConf(conf, WordCount.class);.
2.jobConf.setBoolean("mapred.output.compress" true);
3.jobConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);



Hadoop IO:压缩 笔记
在m中使用压缩

设置属性
1.mapred.compress.map.output

代码示例:
conf.setCompressMapOutput();
conf.setMapOutputCompressorClass(GzipCodec.class);



Hadoop IO:压缩 笔记
代码:

jobConf.setBoolean("mapred.compress.map.output",true);
jobConf.setMapOutputCompressorClass(GzipCodec.class);


Hadoop IO:序列化 笔记
1. 什么是hadoop的序列化?
2. hadoop的序列化接口?
3.工作中自定义hadoop的序列化接口

Hadoop IO:序列化 笔记
序列化
1.什么是序列化?
结构化对象 转为 字节流

2 作用:进程间通信 或者 持久存储
.
3.特点:
1. compact紧凑,更好的利用空间
2. fast,序列化,反序列化性能好
3. extensible扩展性,协议有变化,可以支持新的需求
4. inter operable 互操作性: 客户端与服务端不依赖语言的实现

hadoop满足compact,fast,不满足扩展性以及互操作性

Hadoop IO:序列化 笔记
hadoop新的框架中阿芙罗满足了 扩展性 互操作性

Hadoop IO:序列化 笔记
java的序列化
1. 保存新类的所有信息
2. 不能复用对象的结构
3. hadoop自己写序列化的逻辑比较容易

hadoop序列化相关的接口:Writable,Comparable和WritableComprable


Hadoop IO:序列化 笔记
Writable接口:
1. write(dos)方法写到二进制流
2. readFields(dis)方法读

Hadoop IO:序列化 笔记
1. WritableComparator比较器继承了RawComparator和WritableComparable
2. WritableComparable继承了Writable和Comparable
3. RawComparator继承了Comparator,他比Comparator的好处是可以直接在流里面比较,而不需要创建对象比较

Hadoop IO:序列化 笔记
WritableComparator是RawComparator的一个通用实现,它做了两件事:
1. 实现了compare方法
2. 充当RawComparator的工厂类


Hadoop IO:序列化 笔记
Text与String不同的是,他是可变的,set方法改变。最大存2GB

ObjectWritable适用于字段可以使用多种类型时。

Hadoop IO:序列化 笔记
实现自己的序列化接口
1.实现 WritableComparable接口
2.实现方法write(),readfields(),compare()方法



Hadoop IO:序列化 笔记
public class Person {

Text name = new Text();
Text sex = new Text();
IntWritable age = new Intwritable();

@Override
public void write(DataOutput do) throws IOException{
name.write(do);
sex.write(do);
age.write(do);
}
@Override
public void readFields(DataInput di) throws IOException{
name.readFields(di);
sex.readFields(di);
age.readFields(di);
}
@Override
public int compareTo(Person o){
int i = name.compareTo(o.name);
if(i != 0){
return i
}
i = sex.compareTo(o.sex);
if(i != 0){
return i
}
i = age.compareTo(o.age);
if(i != 0){
return i
}

}

Hadoop IO:序列化 笔记
实现equals和hashcode方法

Hadoop IO:序列化 笔记
// 序列号自定义类
Person person =
byte[] values = HadoopSerializationUtil.serialize(person);
sout(values)
sout(StringUtils.bytesToHexString(values))

// 反序列化


Hadoop IO:序列化 笔记
byte[] values = HadoopSerialization(person)

//反序列化
Person p1 = new Person();
HadoopSerializationUtil.deserialization(p1,values);
sout(p1.toString());

Hadoop IO:基于文件的数据结构 笔记
什么是SequenceFile?
1.保存key value对的二进制文件
2 mapfile是经过排序,且带有索引的sequenceFile

SequenceFile文件格式的好处:
1.支持压缩(基于Record压缩和基于Block的压缩),压缩完了还支持分割 :
2.支持切割,所以可以启更多的map
3.编码难度低:hadoop有api支持,在org.apache.hadoop.io包下面,比自己写序列化容易
.


Hadoop IO:基于文件的数据结构 笔记
写SequenceFile步骤
1.创建Configuration
2.获取FileSystem
3.设置文件输出路径
4.SequenceFile.createwriter创建SequenceFile.Writer,然后写入
5.调用SequenceFile.Writer的append方法追加写入
6. 关闭流


Hadoop IO:基于文件的数据结构 笔记
写SequenceFile步骤
1.创建Configuration
2.获取FileSystem
3.设置文件输出路径
4.SequenceFile.createwriter创建SequenceFile.Writer,然后写入
5.调用SequenceFile.Writer的append方法追加写入
6. 关闭流


Hadoop IO:基于文件的数据结构 笔记
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);
Path path = new Path("myseq.seq");
IntWritable key = new IntW..
Text value = new Text();
Writer w = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass());

// 追加
for i = 0 ~ 10
key.set(10-i); //随机写点数据
value.set(str[i%strs.llength]);
w.append(key, value);

// 关闭流
IOUtils.closeStream(w);

Hadoop IO:基于文件的数据结构 笔记
读SequenceFile
1.
Configuration conf = new Configuration();
2.
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);

3. 定义要读的文件的路径
Path path = new Path("myseq.seq");

4.
SequenceFile.Reader r = new SequenceFile.Reader(fs, path, conf);

5.
Writable key = ReflectionUtils.newInstance(r.getKeyClass(),conf);

Writable value = ReflectionUtils.newInstance(r.getValueClass(),conf);


// 追加
while(r.next(key, value)){
syso(key, value, r.getPosition());
}

// 关闭流
IOUtils.closeStream(r);

Hadoop IO:基于文件的数据结构 笔记
在Seq..eFile中使用压缩
1. 写: 分为record和block
2. 读: 自动解压

代码:
1. SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass(), SequenceFile.CompressionType.RECORD, new Bzip2Codec());


Hadoop IO:基于文件的数据结构 笔记
MapFile
1.是经过排序的带index的SequenceFile,可以根据key进行查找
2.由两部分组成: data + index(key,文件中的偏移位置)
3.在MapFile被访问的时候,index会被加载到内存中,通过index可以快速定位到指定的recode所在文件位置

与SequenceFile比较:
1.MapFile的检索效率高
2.MapFile会消耗一部分内存来存储index数据


Hadoop IO:基于文件的数据结构 笔记
MapFile练习

1.
Configuration conf = new Con..
2.
FileSystem fs = FileSystem.get(conf);
3. //Path path = new Path("myMapFile.map");
4.
Text key = new Text();
key.set("xxx");
Text value = new Text();
value.set("xxx");
MapFile.Writer w = new MapFile.Writer(conf, fs, "myMapFile.map", key.getClass(),value.getClass());
5.w.append(key, value);
6. IOUtils.closeStream(w);

Hadoop IO:基于文件的数据结构 笔记
读MapFile
1.
Configuration conf = new Configuration();
2.
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);

3. 定义要读的文件的路径
Path path = new Path("myseq.seq");

4.
MapFile.Reader r = new MapFile.Reader(fs, path, conf);

5. 反射出一对空白的key,value对象
Writable key = ReflectionUtils.newInstance(r.getKeyClass(),conf);

Writable value = ReflectionUtils.newInstance(r.getValueClass(),conf);


// 追加
while(r.next((WritableComparable)key, value)){
syso(key, value, r.getPosition());
}

// 关闭流
IOUtils.closeStream(r);

MapReduce工作原理概述 笔记
mr工作原理:
1.job提交的流程
2.错误处理:mr的容错机制
3.job调度:3个调度方法
4.shuffle(讲数据从map发送到reduce)与排序:mr的核心
5.任务执行是一些特有的概念:比如jvm重用
6. mr的类型(输入、出类型)与格式(输入、出格式)


MapReduce工作原理概述 笔记
mr工作原理:
mr的job提交的流程

1.写代码
2.作业配置
3.作业提交
4.作业初始化
5.任务分配
6.任务执行
7.进度与状态的更新
8.作业的完成


MapReduce工作原理概述 笔记
mr的执行流程:

input:file -> split -> part1,2,3,4

part1 -> map
part2 -> map
part3 -> map
part4 -> map

map1,map2->shuffle与排序 ->reduce1 -> output
map3,map4->shuffle与排序 ->reduce2 -> output

MapReduce工作原理概述 笔记
mr角色:
1.client: 作业提交发起者
2. jobtracker:初始化作业,分配作业,与taskTracker通信,协调整个作业
3. tasktracker: 与jobTracker通信,在分配的数据片段上执行mr
4. hdfs:保存作业的数据,配置信息等,保存作业结果

mr的job提交的流程
1.写代码
2.作业配置 hdfs???
3.作业提交 clent
4.作业初始化 jobtracker
5.任务分配 jobtracker
6.任务执行 tasktracker
7.进度与状态的更新 ???
8.作业的完成 hdfs???




MapReduce工作原理概述 笔记
作业提交
1.写代码 : 写mapreduce代码
2.作业配置 : 配置输入输出路径,其他配置,如输出压缩等。
3.作业提交

MapReduce工作原理概述 笔记
2.作业配置 : 配置输入输出路径,其他配置,如输出压缩等。

// 配置输入输出路径,
FilInputFormat.setInputPath(jobConf, new Path(args[0]);
FilOutputFormat.setOutputPath(jobConf, new Path(args[1]);
//其他配置,
jobConf.setOutKeyClass(Text.class);
jobConf.setOutValueClass(IntWritable.class;
//如输出压缩等。
jobConf.setBoolean("mapred.compress.map.output", true);
jobConf.setMapOutputCompressorClass(GZipCodec.class);

3.作业提交 clent
JobClient.runJob(jobConf);



MapReduce工作原理概述 笔记
JobClient.runJob(jobConf)里面做了啥?

1. get job id

2. 检查作业的相关输入,配置,同时也将Job 一些文件 传到JobTracker

job.jar -libjars

3.create splits for the job //计算输入分片,并将分片信息写入到job.split中


4. 把job.xml传到服务器上

5.Now, actually submit the job (using the submit name)


MapReduce工作原理概述 笔记
4.作业初始化 jobtracker

JobTracker.class中的代码分析:
1.startTracker方法

作业初始化其实是在JobInProgress.class中做的:
1.initTasks()

2.
3.
4.
5.



MapReduce工作原理概述 笔记
4.作业初始化: 主要是在JobInProgress.class的nitTasks()方法中做的:
1. 读取作业的分片信息
2. 创建map任务(为每个map task生成一个TaskInProgress nonRunningTask) 和 reduce任务
3.创建两个初始化的Task,根据个数和分片信息,初始化Map和Reduce


5.任务分配
TaskTracker transmitHeartBeat //如果TaskTracker拿到任务,拷贝所有的信息到本地(包括代码,任务的配置信息,数据分片索引)

6.任务执行: 主要是调用TaskTracker.localizeJob()方法来实现的
launchTaskForJob
launchTask()
启动TaskRunner


7.进度与状态的更新
1.Task会去跟TaskTracker汇报自己的执行情况
2.TaskTracker会周期收集自己机器上的所有任务的信息,向jobTracker汇报
3.jobTracker会根据所有的task trackers汇报上来的数据进行汇总


MapReduce工作原理:错误处理 笔记
1:任务(Map或者Reduce)的失败


2:TaskTracker 失败


3:jobTracker 失败


MapReduce工作原理:错误处理 笔记
任务(Map或者Reduce)的失败
1.一种情况: 子任务(map或者reduce)失败
2.另一种情况: 子任务的 jvm突然退出
3.任务的挂起:等待用户控制台输入???

tasktracker 在一段时间(10mins)没有收到子任务的 更新,就将改子任务的进程标记为 false,该子进程杀死,tt并报告jt, jt尝试(一般最多4次)尽量在别的机器上启动新的任务。

MapReduce工作原理:错误处理 笔记
TaskTracker 失败
1.tt崩溃后会停止向jt发送心跳信息
2.jt会将该tt从等待的任务池中移除,并将该tt上的t移动到其他地方去运行;
3.tt可以被jt放入到黑名单,即使tt没有失败。





MapReduce工作原理:错误处理 笔记
jobTracker 失败
1.单点故障,hadoop版0.23版本解决了这个问题(可以同时启好多个jt来做这个事情);


MapReduce工作原理:作业调度 笔记
作业调度: hadoop支持的调度器
1.fifo先进先出调度器,hadoop中默认的调度器
2.公平调度器
3.容量调度器


MapReduce工作原理:作业调度 笔记
fifo调度器
1.hadoop默认调度器

fifo悬着被执行的作业的顺序
1.先按作业“优先级”的高低
2.再按照“到达时间”的先后悬着被执行的作业




MapReduce工作原理:作业调度 笔记
fifo悬着被执行的作业的顺序
1.先按作业“优先级”的高低
2.再按照“作业到达时间”的先后悬着被执行的作业

作业是放在一个队列里,先进先出

MapReduce工作原理:作业调度 笔记
公平调度器
1.起码这些作业都能起来

pool1:作业A
pool2:作业B
池子3:


MapReduce工作原理:作业调度 笔记
公平调度器:1个队列中有多个池子
容量调度器:多个队列,每个队列中有多个池子

MapReduce工作原理:作业调度 笔记
配置公平调度器 步骤
1.在mapred.xml中加入如下内容:

调度器

mapred.jobtracker.taskScheduler
org.apache.hadoop.mapred.FairScheduler


mapred.fairscheduler.allocation.file
opt/hadoop/conf/allocations.xml


mapred.fairscheduler.poolnameproperty
pool.name



2.在hadoop conf下创建allocations.xml,内容为:



5
5
2.0表示权重


哪些用户可以在这个池子里运行

6

3


3.重启 jobtracker

4访问 http://jobTracker:50030/schduler查看FairScheduler的UI.

5.提示测试任务


MapReduce工作原理:作业调度 笔记
allocations.xml里面
里面默认什么都不写也是可以的

MapReduce工作原理:作业调度 笔记
5.提示测试任务

同时提交好多个job,看看UI
hadoop jar hadoop-exaplexxxx.jar wordcount 输入路径 输出路径
hadoop jar hadoop-exaplexxxx.jar wordcount 输入路径 输出路径
hadoop jar hadoop-exaplexxxx.jar wordcount 输入路径 输出路径

MapReduce工作原理:Shuffle与排序 笔记
1. Shuffle:将map结果数据重新组织,作为reduce输入的过程叫做shuffle,洗牌
2. 而数组在map与reduce端都会做排序

MapReduce工作原理:Shuffle与排序 笔记
shuffle优化
1.map端
io.sort.mb
2.reduce端:
mapred.job.reduce.input.bufer.percent

MapReduce工作原理:任务的执行时的一些特有的概念 笔记
推测 式 执行
1. 某些任务可能会特别慢
2. 此时mr会尝试在别的机器上重启慢的任务,与原有任务同时运行
3. 该属性是默认启用的,机器整体性能差的情况下,尝试关闭此属性

MapReduce工作原理:任务的执行时的一些特有的概念 笔记
JVM重用
1.由于启动jvm非常耗时,
2.所以在mr中有jvm重用的机制,条件是 同一个作业的任务
3 可以通过mapred.job.reuse.jvm.num.tasks定义重用次数,如果属性是-1那么为无限制.


MapReduce工作原理:任务的执行时的一些特有的概念 笔记
跳过 坏记录
1.数据的 一些记录 不符合规范,处理时抛出异常,mr可以将 此记录 标为 坏记录。重启任务时跳过改记录
2.默认情况下改属性是关闭的。


MapReduce工作原理:任务的执行时的一些特有的概念 笔记
任务执行环境

关于任务执行的环境,可能会有这样那样的问题,hadoop提供了一些支持:

问题1,多个任务可能会同时写一个文件
1.解决方法: 将输出写到任务的临时文件夹,目录为: {mapred.out.put.dir}/temp/${mapred.task.id}


MapReduce工作原理:任务的执行时的一些特有的概念 笔记
运行环境获取

Mapper里

@Override
public void config(JobConf jobConf){
通过jobConf可以拿到任务的运行环境
jobConf.getXXX。。
}

MapReduce工作原理:MapReduce的类型与格式 笔记
类型
1.mr的类型 使用 key-value对 作为输入类型
2.输入输出的数据类型 是通过 输入输出的格式 进行 设定的。


MapReduce工作原理:MapReduce的类型与格式 笔记
输入格式
1.输入分片 与 记录
2.文件输入
3.文本输入
4.二进制输入
5.多文件输入
6. 数据库格式的输入

MapReduce工作原理:MapReduce的类型与格式 笔记
输入分片与记录
1.inputFormat接口负责生成分片
2.hadoop通过inputSplit表示分片
3.一个分片并不是数据本身,而是对分片数据的引用

MapReduce工作原理:MapReduce的类型与格式 笔记
1. 1个map只操作1个分片,同时一个分片也只属于1个map
2. map是一行一行去处理分片里的数据的。
3. 分片是逻辑上的分片,他有可能是文件中的一部分,1个分片有可能在两台机器上???

MapReduce工作原理:MapReduce的类型与格式 笔记
文件输入格式
1.实现类: FileInputFormat
2.通过文件作为其输入源的积累
四个方法:

addInputPath()
addInputPaths()
setInputPath()
setInputPaths()

MapReduce工作原理:MapReduce的类型与格式 笔记
FileInputFormat默认会按HDFS块的大小来分割文件

避免分割文件
方法1,增加分割的大小
方法2,禁止分割
1. 继承FileInputFormat后重装isSplitable()
2. return false;

MapReduce工作原理:MapReduce的类型与格式 笔记
package com.test.nosplit;

public class NonSplitFileInputFormt extends FileInputFormat {

@Override
protected boolean isSplitable(FileSystem fs, Path path){
return false;
}
}


MapReduce工作原理:MapReduce的类型与格式 笔记
文本输入
1.实现类: TextInputFormat
2.TextInputFormat是默认的输入格式
3.包括:
KeyValueTexInputFormat
NLineInputFormat
XML
4.输入分片 与 HDFS块 之间的关系
TextInputFormat的某一条记录可能跨块存在



MapReduce工作原理:MapReduce的类型与格式 笔记
二进制输入
1.实现类: SequenceFileInputFormat
2.处理二进制数据
3.包括
SequenceFileAsTextInputFormat
SequenceFileAsBinaryInputFormat



MapReduce工作原理:MapReduce的类型与格式 笔记
TestSeqInputOutput.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;


TestSeqInputOutput extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
for(Entry e: args){
syso(e.getKey() + e.getValue());
}

1. 给jobConf设名字,
JobConf conf = new JobConf(getConf(), TestSeqInputOutput.class);
conf.setJobName("job的名字");

2. 设输入输出class
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
*3. 设置Mapper Reducer class
conf.setMapper(SeqMapper.class);
conf.setReducer(SeqReducer.class);
*4.输入、出格式设置输入、出路径
SequenceFileInputFormat.setInputPaths(conf, new Paths(args[0]));
FileOutputFormat.setOutputPath(conf, new Paths(args[1]));

JobClient.rnJob(conf);

return 0;
}


public static void main(){
int exitCode = ToolRunner.run(new TestSeqInputOutput(),args);
System.exit(exitCode);
}
}

MapReduce工作原理:MapReduce的类型与格式 笔记
2. 导jar: eclipse上导出jar文件 /home/test/Destop/seq.jar
3. 跑jar:
1.首先有一个.seq文件
hadoop fs -cat /user/test/first.seq 里面是乱码。。。
2. hadoop jar seq.jar comtest.testseqinput.TestSeqInputOutput /user/test/first.seq output8
3. hadoop fs -cat output8/part-00000

MapReduce工作原理:MapReduce的类型与格式 笔记
多文件输入
1.实现类: MultipleInputs
2.处理多种文件的输入
3.包括:
addInputPath


MapReduce工作原理:MapReduce的类型与格式 笔记
多文件的输入
实现类: MultipInputs
处理多种文件输入
包括: addInputPath

当你有两个文件,需要两个mapper来分别同时处理的时候

1.
public class MultipleMapper extends MapReduceBase implements Mapper
@Override
public void map(LongWritable key, Text value,OutputCollector oCollector, Reporter r){
oCollector.collect(key, value);
}
}

2.
public class MultipleMapper2 extends MapReduceBase implements Mapper
@Override
public void map(LongWritable key, Text value,OutputCollector oCollector, Reporter r){
oCollector.collect(key, value);
}
}

3.
public class MultipleReducer extends MapReduceBase implements Reducer
@Override
public void reduce(LongWritable key, Iterator values,OutputCollector oCollector, Reporter r)throws IOException{
while(values.hasNext()){
oCollector.collect(key, new Text(values.next()));
}}
}


MultipleInputsTest.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;


class MultipleInputsTest extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
1. 给jobConf设名字,
JobConf conf = new JobConf(getConf(), TestSeqInputOutput.class);
conf.setJobName("job的名字");

String[] otherArgs = new GenericOptionParser(conf, args).getRemainingArgs();
if(otherArags.length < 3)
{
System.err.println("usage: 参数1, 2, 3");
System.exit(2);
}

*2. 设置输出class
conf.setOutputKeyClass(LongWritable.class)
conf.setOutputValueClass(Text.class);

*3
MultipleInputs.addInputPath(conf,
new Path(args[0]),
TextInputFormat.class, MultipleMapper.class);

MultipleInputs.addInputPath(conf,
new Path(args[1]),
TextInputFormat.class, MultipleMapper2.class);

*4
FileOutputFormat.setOutputPath(conf, new Path(args[2]));

JobClient.rnJob(conf);

return 0;
}


public static void main(){
try{
int exitCode = ToolRunner.run(new TestSeqInputOutput(),args);
System.exit(exitCode);
}catch(e){...}}}



MapReduce工作原理:MapReduce的类型与格式 笔记
数据库输入
实现类: DBInputFormat
注意使用: 因为连接过多,数据库无法承受

MapReduce工作原理:MapReduce的类型与格式 笔记
输出格式:
文本输出,二进制输出,多文件输出,数据库格式的输出

MapReduce工作原理:MapReduce的类型与格式 笔记
//设置reducerClass
conf.setReducerClass(SeqReducer.class);

//设置???InputFormat.Class和???mapper.class
MultipleInputs.addInputPath(conf, path1, TextInputFormat.class, MultipleMapper1.class);
//设置???InputFormat.Class和???mapper.class
MultipleInputs.addInputPath(conf, path2, TextInputFormat.class, MultipleMapper2.class);
//设置???OutputFormat.Class
FileOutputFormat.setOutputPath(conf, path3);



MapReduce工作原理:MapReduce的类型与格式 笔记
输出格式:
1. 文本输出,tOF
2. 二进制输出,sfOF: 实现类:sfAsTextOF ,sfAsBinaryOF,mapFileOF
3. 多文件输出,MOF MOs
4. 数据库格式的输出 dbOF

文本输出: 默认的
1.实现类: TextOutputFormat
2.默认的输出格式: 以 “key \t value"的方式输出行到文件中

二进制输出: 输出sequence格式的文件
1.基类:SequenceFileOutputFormat
2.实现类:
1)SequenceFileAsTextOutputFormat
2)MapFileOutputFormat: 使用了索引,实际上是空间换时间,查找更快
3)SequenceFileAsBinaryOutputFormat

多文件输出
1.MultipeOutputFormat
2.MultipeOutputs
3.两者的区别在于: MOs可以产生不同类型的输出


MapReduce工作原理:MapReduce的类型与格式 笔记
注意: 输出比较复杂,需要引入其他的概念,这里先不多做介绍。

MapReduce工作原理:MapReduce的类型与格式 笔记
mark 2017-07-19 00:28:46
精品视频课程推荐

Java数据结构和算法精讲版
本课程专注于数据结构和算法的内容,使用Java来进行代码示例,不空洞的讲解概念和理论,重点放在代码的实现和示例上。 从零开始、全面系统、成体系的讲解数据结构和基本算法,循序渐进的讲述构建软件系统所常见的数据结构和算法。

深入浅出学Shrio视频教程
内容概述:Shiro是目前最热门、最易用、功能超强大的Java权限管理框架,强烈推荐,每个项目都必备的权限管理技术!通过本课程,你将从零开始直到彻底掌握Shiro的相关开发知识,达到可以进行实际项目开发的能力。包括:权限管理基础、Shiro入门、配置、身份认证、授权、Realms、Session管理、和Spring的集成、Web、Cache等众多开发细节技术 技术要点:源码级分析Shiro的授权过程、自定义开发Realm、多个Realms的开发配置、自定义开发AuthenticationStrategy、自定义开发自定义SessionDAO、和Struts2+Spring3的集成(包括修正struts2的bug)、Shiro和SpringMVC+Spring3的集成、包装使用其他的Cache框架、缓存数据同步更新的解决方案等等实际开发中常用的内容

深入浅出学Zookeeper
内容概述:本课程深入浅出的讲解Hadoop 体系下的分布式协调组件-Zookeeper,向大家介绍了Zookeeper的原理与使用,深入浅出,结合生动实例,具有很高的实战意味 技术要点:Zookeepr 原理,Zookeeper 架构,Zookeeper 简单使用,Zookeeper Watch等

云计算综合实战项目视频教程(更新版)
内容概述:通过一个真实完整的项目——某大型互联网企业用户上网行为日志分析系统,系统学习Hadoop、Hive,HBase、Zookeeper等云计算开发技术在真实商业系统中到底如何使用。具有极高的学习和参考价值。 该系统是一个每天5亿条数据,上百个节点的实际商业项目,该系统将海量日志数据进行分布式存储,并通过分布式算法和网络爬虫技术形成标签化的用户模型,最终实现人与内容、人与行为、人与商品的智能配对。 技术要点:Hadoop、Hive、HBase、Zookeeper、爬虫技术等

深入浅出学Spring Web MVC视频教程
系统、完整的学习Spring Web MVC开发的知识。包括:Spring Web MVC入门;理解DispatcherServlet;注解式控制器开发详解;数据类型转换;数据格式化;数据验证; 拦截器;对Ajax的支持;文件上传下载;表单标签等内容;最后以一个综合的CRUD带翻页的应用示例来综合所学的知识

浏览(466)|评论(95)   交流分类:学习问题讨论|笔记分类: 课程笔记

评论(95)
请登录后评论 登录

关于我们 | 联系我们 | 用户协议 | 私塾在线服务协议 | 版权声明 | 隐私保护

版权所有 Copyright(C)2009-2012 私塾在线学习网