|
2017-07-17 09:47:23
Hadoop实战-中高级部分视频教程-视频教程学习笔记 浏览(5974)|评论(95) 交流分类:学习问题讨论|笔记分类: 课程笔记
整体课程概览 视频教程学习笔记
视频课程地址:Hadoop实战-中高级部分视频教程 当前播放的视频地址:整体课程概览 1. hadoop的io, 2.3. mr的原理与编程, 4. hdfs原理, 5. hdfs与mr的管理以及集群的监控与管理, 6. hadoop的ha, 7.hadoop的rpc机制, 8 hadoop的restful api 9. 调优, 10. 集群安装 Hadoop IO:数据完整性 笔记 概览 1.数据完整性 2.压缩 3.序列化 4.基于文件的数据结构 Hadoop IO:数据完整性 笔记 如何保证? 1.写没有问题 2.读没有问题 3.数据放上去之后,时间长了之后,消磁,有没有问题 Hadoop IO:数据完整性 笔记 数据完整性采用的技术: 保证数据在传输过程中不损坏。常见的技术 1.奇偶校验 2.ecc校验纠错技术 3.crc-32循环冗余校验技术 Hadoop IO:数据完整性 笔记 ecc校验纠错技术: 适用于缓存里保存数据,保证内存里的数据不会丢失,校验完整性 crc-32循环冗余校验技术: hadoop使用的就是这个 Hadoop IO:数据完整性 笔记 写入的流程:client往datanode上写文件的同时,每512字节会生成crc校验码,同时存到datanode上,datanode会根据接受到的数据生成crc校验码,跟client传输过来的校验码进行对比,如果不一样,抛出异常:CheckSumException,client根据异常做出下一步决定 读取:client从datanode上读文件的同时,datanode会将数据和对应的crc校验码发过来, client会根据读到的数据生成一个crc校验码,与从datanode上度过来的crc对比,如果不一致,就从其他机器上读,如果所有机器都不一致,就失败了 持久存放:一个叫DataBlockScanner的后台线程定期检查 如何修复损坏的数据:一个叫DataBlockScanner的后台线程定期检查,发现损坏的块,会从其他副本复制一个新的副本 Hadoop IO:数据完整性 笔记 hadoop fs 中的 -get -copoLocal 有-ignoreCrc -crc机制, 指定是否使用crc校验机制 Hadoop IO:数据完整性 笔记 .filename.crc hdoop的文件文件系统会执行客户端校验。也就是,get一个名为filename的文件时,文件系统客户端会创建一个名为.filename.crc的隐藏文件:记录每个文件快的校验和 数据块大小有io.bytes.per.checksum属性控制,块的大小作为元数据存在.crc文件中。 底层文件原生支持校验和,也可以通过fs.file.impl值为org.apache.hadoop.fs.RawLocalFileSystem来禁用校验和,或者通过代码 Configueration conf =... FileSystem fs = new RawLocalFileSystem(); fs.initialize(null, conf); Hadoop IO:数据完整性 笔记 fs.file.impl的默认值是org.apache.hadoop.fs.LocalFileSystem 数据块大小由io.bytes.per.checksum属性控制 Hadoop IO:数据完整性 笔记 LocalFileSystem使用CheckSumFileSystem(校验和文件系统)为自己工作。 Hadoop IO:数据完整性 笔记 docs/api/index.html中查看ChecksumFileSystem //在ubuntu中测试代码: LocalFileSystem fs = ChecksumFileSystem.getLocal(configuration); fs.getChecksumFile(new Path crcPath = Path("testchecksum/testcheck.txt"))//拿到crc文件 syso(crcPath);//打印出路径为:testchecksum/testcheck.txt.crc Hadoop IO:压缩 笔记 压缩的好处: 1. 存得跟多 2. 移动文件的时间减少 本节课程主要学习的内容 1:编码与解码 介绍了Hadoop支持的压缩格式,解码器编码器,了解了如何使用。 2:压缩与输入分割 介绍了哪些格式支持输入分割,为什么要考虑输入分割 3:在M/R中使用压缩 介绍了如何在MR中使用压缩 Hadoop IO:压缩 笔记 编码、解码器:压缩,解压算法的java代码实现类 1.deflate: org.apache.hadoop.io.compress.DefaultDodec 2.gzip:org.apache.hadoop.io.compress.GzipCodec 3.gzip2:org.apache.hadoop.io.compress.BZip2Codec 4.lzo:org.apache.hadoop.io.compress.LzopCodec 5. Hadoop IO:压缩 笔记 s修正: LZO: com.hadoop.compress.lzo.LzopCodec LZO是基于GPL协议的,不是apache协议的,所以所在的包路径不一样 Hadoop IO:压缩 笔记 CompressionCodec 压缩解压缩超类 CompressionCodecFactory: 根据文件扩展名得到文件使用了哪种编码、解码器 Hadoop IO:压缩 笔记 项目TestCompression src目录: com.test.StreamCompressor com.test.FileDeCompressor resources目录: core-site:xml:配 fs.default.name = hdfs://locahost:9000告诉fs在哪儿 mapred-site:xml配 mapred.job.tracker = localhost:9001告诉jobtracker在哪儿 Hadoop IO:压缩 笔记 com.test.StreamCompressor Class coedecClass = class.forName(args[0]); CopressionCodec ccodec = (CopressionCodec)ReflectionUtils.newInstance(codecClass, conf); IOUtils.copyBytes(System.in, ccodec.createOutputStream(system.out) 4096,false); Hadoop IO:压缩 笔记 1.com.test.StreamCompressor Class coedecClass = class.forName(args[0]); CompressionCodec ccodec = (CopressionCodec)ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream cos = ccodec.createOutputStream(system.out); IOUtils.copyBytes(System.in, cos 4096,false); cos.finish(); 2. 导出为jar文件 3. echo "hello" | hadoop jar com.test.StreamCompressor org.apache.hadoop.io.compress.GzipCodec || gunzip -c Hadoop IO:压缩 笔记 演示解压缩 FileDecompressor.java Path myPath = new Path("压缩文件"); factory = new CompressionCodecFactory(conf); codec = factory.getCodec(myPath); is = codec.createInputStream(fs.open(myPath)); os = fs.create(new Path(outputuristring)); IOUtils.copyBytes(is, os, conf); IOUtils.closeStream(is); IOUtils.closeStream(os); Hadoop IO:压缩 笔记 发布运行解压缩 1.gzip test.txt 2.hadoop fs -mkdir testfactory 3.hadoop fs -put test.txt.gz testfactory 4.hadoop jar compressFactory.jar com.test.FileDecompressor testfactory/test.txt.gz 5.hadoop fs -cat testfactory/test.txt 看到gzip对小文件的压缩反而会把文件变大 源文件6B, gzip压缩后35B Hadoop IO:压缩 笔记 .deflate,.gz,.bz2,.lzo 1. 将多个文件压缩成一个文件:不支持 2. 压缩后再切割:.bz2,.lzo 3. giz使用的压缩算法和默认的算分即deflate一样,只不过文件扩展名不一样 4. lzo支持分割是有条件的:需要在文件中有索引 Hadoop IO:压缩 笔记 core-site.xml中 io.compression.codes属性用来配置编解码器,多个的情况用逗号分割 Hadoop IO:压缩 笔记 hadoop里面默认,如果有本地库的话,会先去找本地库的压缩解压算分实现 bzip2默认没有本地库的 lzo默认没有java库 Hadoop IO:压缩 笔记 2:压缩与输入分割 介绍了哪些格式支持输入分割,为什么要考虑输入分割 1. 因为1G的文件在64M大小的快上可以分为16块,就可以启16个map 2. 如果使用bzip2或者lzo压缩算法,可以继续细分,启动更多map Hadoop IO:压缩 笔记 在mr中使用压缩 输出 配置文件中配置:永久性的 1.配置mapred.output.compress属性为true 2.配置mapred.output.compression.codec属性为自己需要的压缩解编解码器的类名 . 或者 在代码中实现:临时性的 1 jobConf = new JobConf(conf, WordCount.class);. 2.jobConf.setBoolean("mapred.output.compress" true); 3.jobConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); Hadoop IO:压缩 笔记 在m中使用压缩 设置属性 1.mapred.compress.map.output 代码示例: conf.setCompressMapOutput(); conf.setMapOutputCompressorClass(GzipCodec.class); Hadoop IO:压缩 笔记 代码: jobConf.setBoolean("mapred.compress.map.output",true); jobConf.setMapOutputCompressorClass(GzipCodec.class); Hadoop IO:序列化 笔记 1. 什么是hadoop的序列化? 2. hadoop的序列化接口? 3.工作中自定义hadoop的序列化接口 Hadoop IO:序列化 笔记 序列化 1.什么是序列化? 结构化对象 转为 字节流 2 作用:进程间通信 或者 持久存储 . 3.特点: 1. compact紧凑,更好的利用空间 2. fast,序列化,反序列化性能好 3. extensible扩展性,协议有变化,可以支持新的需求 4. inter operable 互操作性: 客户端与服务端不依赖语言的实现 hadoop满足compact,fast,不满足扩展性以及互操作性 Hadoop IO:序列化 笔记 hadoop新的框架中阿芙罗满足了 扩展性 互操作性 Hadoop IO:序列化 笔记 java的序列化 1. 保存新类的所有信息 2. 不能复用对象的结构 3. hadoop自己写序列化的逻辑比较容易 hadoop序列化相关的接口:Writable,Comparable和WritableComprable Hadoop IO:序列化 笔记 Writable接口: 1. write(dos)方法写到二进制流 2. readFields(dis)方法读 Hadoop IO:序列化 笔记 1. WritableComparator比较器继承了RawComparator和WritableComparable 2. WritableComparable继承了Writable和Comparable 3. RawComparator继承了Comparator,他比Comparator的好处是可以直接在流里面比较,而不需要创建对象比较 Hadoop IO:序列化 笔记 WritableComparator是RawComparator的一个通用实现,它做了两件事: 1. 实现了compare方法 2. 充当RawComparator的工厂类 Hadoop IO:序列化 笔记 Text与String不同的是,他是可变的,set方法改变。最大存2GB ObjectWritable适用于字段可以使用多种类型时。 Hadoop IO:序列化 笔记 实现自己的序列化接口 1.实现 WritableComparable接口 2.实现方法write(),readfields(),compare()方法 Hadoop IO:序列化 笔记 public class Person { Text name = new Text(); Text sex = new Text(); IntWritable age = new Intwritable(); @Override public void write(DataOutput do) throws IOException{ name.write(do); sex.write(do); age.write(do); } @Override public void readFields(DataInput di) throws IOException{ name.readFields(di); sex.readFields(di); age.readFields(di); } @Override public int compareTo(Person o){ int i = name.compareTo(o.name); if(i != 0){ return i } i = sex.compareTo(o.sex); if(i != 0){ return i } i = age.compareTo(o.age); if(i != 0){ return i } } Hadoop IO:序列化 笔记 实现equals和hashcode方法 Hadoop IO:序列化 笔记 // 序列号自定义类 Person person = byte[] values = HadoopSerializationUtil.serialize(person); sout(values) sout(StringUtils.bytesToHexString(values)) // 反序列化 Hadoop IO:序列化 笔记 byte[] values = HadoopSerialization(person) //反序列化 Person p1 = new Person(); HadoopSerializationUtil.deserialization(p1,values); sout(p1.toString()); Hadoop IO:基于文件的数据结构 笔记 什么是SequenceFile? 1.保存key value对的二进制文件 2 mapfile是经过排序,且带有索引的sequenceFile SequenceFile文件格式的好处: 1.支持压缩(基于Record压缩和基于Block的压缩),压缩完了还支持分割 : 2.支持切割,所以可以启更多的map 3.编码难度低:hadoop有api支持,在org.apache.hadoop.io包下面,比自己写序列化容易 . Hadoop IO:基于文件的数据结构 笔记 写SequenceFile步骤 1.创建Configuration 2.获取FileSystem 3.设置文件输出路径 4.SequenceFile.createwriter创建SequenceFile.Writer,然后写入 5.调用SequenceFile.Writer的append方法追加写入 6. 关闭流 Hadoop IO:基于文件的数据结构 笔记 写SequenceFile步骤 1.创建Configuration 2.获取FileSystem 3.设置文件输出路径 4.SequenceFile.createwriter创建SequenceFile.Writer,然后写入 5.调用SequenceFile.Writer的append方法追加写入 6. 关闭流 Hadoop IO:基于文件的数据结构 笔记 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf); Path path = new Path("myseq.seq"); IntWritable key = new IntW.. Text value = new Text(); Writer w = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass()); // 追加 for i = 0 ~ 10 key.set(10-i); //随机写点数据 value.set(str[i%strs.llength]); w.append(key, value); // 关闭流 IOUtils.closeStream(w); Hadoop IO:基于文件的数据结构 笔记 读SequenceFile 1. Configuration conf = new Configuration(); 2. FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf); 3. 定义要读的文件的路径 Path path = new Path("myseq.seq"); 4. SequenceFile.Reader r = new SequenceFile.Reader(fs, path, conf); 5. Writable key = ReflectionUtils.newInstance(r.getKeyClass(),conf); Writable value = ReflectionUtils.newInstance(r.getValueClass(),conf); // 追加 while(r.next(key, value)){ syso(key, value, r.getPosition()); } // 关闭流 IOUtils.closeStream(r); Hadoop IO:基于文件的数据结构 笔记 在Seq..eFile中使用压缩 1. 写: 分为record和block 2. 读: 自动解压 代码: 1. SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass(), SequenceFile.CompressionType.RECORD, new Bzip2Codec()); Hadoop IO:基于文件的数据结构 笔记 MapFile 1.是经过排序的带index的SequenceFile,可以根据key进行查找 2.由两部分组成: data + index(key,文件中的偏移位置) 3.在MapFile被访问的时候,index会被加载到内存中,通过index可以快速定位到指定的recode所在文件位置 与SequenceFile比较: 1.MapFile的检索效率高 2.MapFile会消耗一部分内存来存储index数据 Hadoop IO:基于文件的数据结构 笔记 MapFile练习 1. Configuration conf = new Con.. 2. FileSystem fs = FileSystem.get(conf); 3. //Path path = new Path("myMapFile.map"); 4. Text key = new Text(); key.set("xxx"); Text value = new Text(); value.set("xxx"); MapFile.Writer w = new MapFile.Writer(conf, fs, "myMapFile.map", key.getClass(),value.getClass()); 5.w.append(key, value); 6. IOUtils.closeStream(w); Hadoop IO:基于文件的数据结构 笔记 读MapFile 1. Configuration conf = new Configuration(); 2. FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf); 3. 定义要读的文件的路径 Path path = new Path("myseq.seq"); 4. MapFile.Reader r = new MapFile.Reader(fs, path, conf); 5. 反射出一对空白的key,value对象 Writable key = ReflectionUtils.newInstance(r.getKeyClass(),conf); Writable value = ReflectionUtils.newInstance(r.getValueClass(),conf); // 追加 while(r.next((WritableComparable)key, value)){ syso(key, value, r.getPosition()); } // 关闭流 IOUtils.closeStream(r); MapReduce工作原理概述 笔记 mr工作原理: 1.job提交的流程 2.错误处理:mr的容错机制 3.job调度:3个调度方法 4.shuffle(讲数据从map发送到reduce)与排序:mr的核心 5.任务执行是一些特有的概念:比如jvm重用 6. mr的类型(输入、出类型)与格式(输入、出格式) MapReduce工作原理概述 笔记 mr工作原理: mr的job提交的流程 1.写代码 2.作业配置 3.作业提交 4.作业初始化 5.任务分配 6.任务执行 7.进度与状态的更新 8.作业的完成 MapReduce工作原理概述 笔记 mr的执行流程: input:file -> split -> part1,2,3,4 part1 -> map part2 -> map part3 -> map part4 -> map map1,map2->shuffle与排序 ->reduce1 -> output map3,map4->shuffle与排序 ->reduce2 -> output MapReduce工作原理概述 笔记 mr角色: 1.client: 作业提交发起者 2. jobtracker:初始化作业,分配作业,与taskTracker通信,协调整个作业 3. tasktracker: 与jobTracker通信,在分配的数据片段上执行mr 4. hdfs:保存作业的数据,配置信息等,保存作业结果 mr的job提交的流程 1.写代码 2.作业配置 hdfs??? 3.作业提交 clent 4.作业初始化 jobtracker 5.任务分配 jobtracker 6.任务执行 tasktracker 7.进度与状态的更新 ??? 8.作业的完成 hdfs??? MapReduce工作原理概述 笔记 作业提交 1.写代码 : 写mapreduce代码 2.作业配置 : 配置输入输出路径,其他配置,如输出压缩等。 3.作业提交 MapReduce工作原理概述 笔记 2.作业配置 : 配置输入输出路径,其他配置,如输出压缩等。 // 配置输入输出路径, FilInputFormat.setInputPath(jobConf, new Path(args[0]); FilOutputFormat.setOutputPath(jobConf, new Path(args[1]); //其他配置, jobConf.setOutKeyClass(Text.class); jobConf.setOutValueClass(IntWritable.class; //如输出压缩等。 jobConf.setBoolean("mapred.compress.map.output", true); jobConf.setMapOutputCompressorClass(GZipCodec.class); 3.作业提交 clent JobClient.runJob(jobConf); MapReduce工作原理概述 笔记 JobClient.runJob(jobConf)里面做了啥? 1. get job id 2. 检查作业的相关输入,配置,同时也将Job 一些文件 传到JobTracker job.jar -libjars 3.create splits for the job //计算输入分片,并将分片信息写入到job.split中 4. 把job.xml传到服务器上 5.Now, actually submit the job (using the submit name) MapReduce工作原理概述 笔记 4.作业初始化 jobtracker JobTracker.class中的代码分析: 1.startTracker方法 作业初始化其实是在JobInProgress.class中做的: 1.initTasks() 2. 3. 4. 5. MapReduce工作原理概述 笔记 4.作业初始化: 主要是在JobInProgress.class的nitTasks()方法中做的: 1. 读取作业的分片信息 2. 创建map任务(为每个map task生成一个TaskInProgress nonRunningTask) 和 reduce任务 3.创建两个初始化的Task,根据个数和分片信息,初始化Map和Reduce 5.任务分配 TaskTracker transmitHeartBeat //如果TaskTracker拿到任务,拷贝所有的信息到本地(包括代码,任务的配置信息,数据分片索引) 6.任务执行: 主要是调用TaskTracker.localizeJob()方法来实现的 launchTaskForJob launchTask() 启动TaskRunner 7.进度与状态的更新 1.Task会去跟TaskTracker汇报自己的执行情况 2.TaskTracker会周期收集自己机器上的所有任务的信息,向jobTracker汇报 3.jobTracker会根据所有的task trackers汇报上来的数据进行汇总 MapReduce工作原理:错误处理 笔记 1:任务(Map或者Reduce)的失败 2:TaskTracker 失败 3:jobTracker 失败 MapReduce工作原理:错误处理 笔记 任务(Map或者Reduce)的失败 1.一种情况: 子任务(map或者reduce)失败 2.另一种情况: 子任务的 jvm突然退出 3.任务的挂起:等待用户控制台输入??? tasktracker 在一段时间(10mins)没有收到子任务的 更新,就将改子任务的进程标记为 false,该子进程杀死,tt并报告jt, jt尝试(一般最多4次)尽量在别的机器上启动新的任务。 MapReduce工作原理:错误处理 笔记 TaskTracker 失败 1.tt崩溃后会停止向jt发送心跳信息 2.jt会将该tt从等待的任务池中移除,并将该tt上的t移动到其他地方去运行; 3.tt可以被jt放入到黑名单,即使tt没有失败。 MapReduce工作原理:错误处理 笔记 jobTracker 失败 1.单点故障,hadoop版0.23版本解决了这个问题(可以同时启好多个jt来做这个事情); MapReduce工作原理:作业调度 笔记 作业调度: hadoop支持的调度器 1.fifo先进先出调度器,hadoop中默认的调度器 2.公平调度器 3.容量调度器 MapReduce工作原理:作业调度 笔记 fifo调度器 1.hadoop默认调度器 fifo悬着被执行的作业的顺序 1.先按作业“优先级”的高低 2.再按照“到达时间”的先后悬着被执行的作业 MapReduce工作原理:作业调度 笔记 fifo悬着被执行的作业的顺序 1.先按作业“优先级”的高低 2.再按照“作业到达时间”的先后悬着被执行的作业 作业是放在一个队列里,先进先出 MapReduce工作原理:作业调度 笔记 公平调度器 1.起码这些作业都能起来 pool1:作业A pool2:作业B 池子3: MapReduce工作原理:作业调度 笔记 公平调度器:1个队列中有多个池子 容量调度器:多个队列,每个队列中有多个池子 MapReduce工作原理:作业调度 笔记 配置公平调度器 步骤 1.在mapred.xml中加入如下内容: 调度器 mapred.jobtracker.taskScheduler org.apache.hadoop.mapred.FairScheduler mapred.fairscheduler.allocation.file opt/hadoop/conf/allocations.xml mapred.fairscheduler.poolnameproperty pool.name 2.在hadoop conf下创建allocations.xml,内容为: 5 5 2.0表示权重 哪些用户可以在这个池子里运行 6 3 3.重启 jobtracker 4访问 http://jobTracker:50030/schduler查看FairScheduler的UI. 5.提示测试任务 MapReduce工作原理:作业调度 笔记 allocations.xml里面 里面默认什么都不写也是可以的 MapReduce工作原理:作业调度 笔记 5.提示测试任务 同时提交好多个job,看看UI hadoop jar hadoop-exaplexxxx.jar wordcount 输入路径 输出路径 hadoop jar hadoop-exaplexxxx.jar wordcount 输入路径 输出路径 hadoop jar hadoop-exaplexxxx.jar wordcount 输入路径 输出路径 MapReduce工作原理:Shuffle与排序 笔记 1. Shuffle:将map结果数据重新组织,作为reduce输入的过程叫做shuffle,洗牌 2. 而数组在map与reduce端都会做排序 MapReduce工作原理:Shuffle与排序 笔记 shuffle优化 1.map端 io.sort.mb 2.reduce端: mapred.job.reduce.input.bufer.percent MapReduce工作原理:任务的执行时的一些特有的概念 笔记 推测 式 执行 1. 某些任务可能会特别慢 2. 此时mr会尝试在别的机器上重启慢的任务,与原有任务同时运行 3. 该属性是默认启用的,机器整体性能差的情况下,尝试关闭此属性 MapReduce工作原理:任务的执行时的一些特有的概念 笔记 JVM重用 1.由于启动jvm非常耗时, 2.所以在mr中有jvm重用的机制,条件是 同一个作业的任务 3 可以通过mapred.job.reuse.jvm.num.tasks定义重用次数,如果属性是-1那么为无限制. MapReduce工作原理:任务的执行时的一些特有的概念 笔记 跳过 坏记录 1.数据的 一些记录 不符合规范,处理时抛出异常,mr可以将 此记录 标为 坏记录。重启任务时跳过改记录 2.默认情况下改属性是关闭的。 MapReduce工作原理:任务的执行时的一些特有的概念 笔记 任务执行环境 关于任务执行的环境,可能会有这样那样的问题,hadoop提供了一些支持: 问题1,多个任务可能会同时写一个文件 1.解决方法: 将输出写到任务的临时文件夹,目录为: {mapred.out.put.dir}/temp/${mapred.task.id} MapReduce工作原理:任务的执行时的一些特有的概念 笔记 运行环境获取 Mapper里 @Override public void config(JobConf jobConf){ 通过jobConf可以拿到任务的运行环境 jobConf.getXXX。。 } MapReduce工作原理:MapReduce的类型与格式 笔记 类型 1.mr的类型 使用 key-value对 作为输入类型 2.输入输出的数据类型 是通过 输入输出的格式 进行 设定的。 MapReduce工作原理:MapReduce的类型与格式 笔记 输入格式 1.输入分片 与 记录 2.文件输入 3.文本输入 4.二进制输入 5.多文件输入 6. 数据库格式的输入 MapReduce工作原理:MapReduce的类型与格式 笔记 输入分片与记录 1.inputFormat接口负责生成分片 2.hadoop通过inputSplit表示分片 3.一个分片并不是数据本身,而是对分片数据的引用 MapReduce工作原理:MapReduce的类型与格式 笔记 1. 1个map只操作1个分片,同时一个分片也只属于1个map 2. map是一行一行去处理分片里的数据的。 3. 分片是逻辑上的分片,他有可能是文件中的一部分,1个分片有可能在两台机器上??? MapReduce工作原理:MapReduce的类型与格式 笔记 文件输入格式 1.实现类: FileInputFormat 2.通过文件作为其输入源的积累 四个方法: addInputPath() addInputPaths() setInputPath() setInputPaths() MapReduce工作原理:MapReduce的类型与格式 笔记 FileInputFormat默认会按HDFS块的大小来分割文件 避免分割文件 方法1,增加分割的大小 方法2,禁止分割 1. 继承FileInputFormat后重装isSplitable() 2. return false; MapReduce工作原理:MapReduce的类型与格式 笔记 package com.test.nosplit; public class NonSplitFileInputFormt extends FileInputFormat { @Override protected boolean isSplitable(FileSystem fs, Path path){ return false; } } MapReduce工作原理:MapReduce的类型与格式 笔记 文本输入 1.实现类: TextInputFormat 2.TextInputFormat是默认的输入格式 3.包括: KeyValueTexInputFormat NLineInputFormat XML 4.输入分片 与 HDFS块 之间的关系 TextInputFormat的某一条记录可能跨块存在 MapReduce工作原理:MapReduce的类型与格式 笔记 二进制输入 1.实现类: SequenceFileInputFormat 2.处理二进制数据 3.包括 SequenceFileAsTextInputFormat SequenceFileAsBinaryInputFormat MapReduce工作原理:MapReduce的类型与格式 笔记 TestSeqInputOutput.java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Tool; TestSeqInputOutput extends Configured implements Tool { @Override public int run(String[] args) throws Exception { for(Entry e: args){ syso(e.getKey() + e.getValue()); } 1. 给jobConf设名字, JobConf conf = new JobConf(getConf(), TestSeqInputOutput.class); conf.setJobName("job的名字"); 2. 设输入输出class conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); *3. 设置Mapper Reducer class conf.setMapper(SeqMapper.class); conf.setReducer(SeqReducer.class); *4.输入、出格式设置输入、出路径 SequenceFileInputFormat.setInputPaths(conf, new Paths(args[0])); FileOutputFormat.setOutputPath(conf, new Paths(args[1])); JobClient.rnJob(conf); return 0; } public static void main(){ int exitCode = ToolRunner.run(new TestSeqInputOutput(),args); System.exit(exitCode); } } MapReduce工作原理:MapReduce的类型与格式 笔记 2. 导jar: eclipse上导出jar文件 /home/test/Destop/seq.jar 3. 跑jar: 1.首先有一个.seq文件 hadoop fs -cat /user/test/first.seq 里面是乱码。。。 2. hadoop jar seq.jar comtest.testseqinput.TestSeqInputOutput /user/test/first.seq output8 3. hadoop fs -cat output8/part-00000 MapReduce工作原理:MapReduce的类型与格式 笔记 多文件输入 1.实现类: MultipleInputs 2.处理多种文件的输入 3.包括: addInputPath MapReduce工作原理:MapReduce的类型与格式 笔记 多文件的输入 实现类: MultipInputs 处理多种文件输入 包括: addInputPath 当你有两个文件,需要两个mapper来分别同时处理的时候 1. public class MultipleMapper extends MapReduceBase implements Mapper @Override public void map(LongWritable key, Text value,OutputCollector oCollector, Reporter r){ oCollector.collect(key, value); } } 2. public class MultipleMapper2 extends MapReduceBase implements Mapper @Override public void map(LongWritable key, Text value,OutputCollector oCollector, Reporter r){ oCollector.collect(key, value); } } 3. public class MultipleReducer extends MapReduceBase implements Reducer @Override public void reduce(LongWritable key, Iterator values,OutputCollector oCollector, Reporter r)throws IOException{ while(values.hasNext()){ oCollector.collect(key, new Text(values.next())); }} } MultipleInputsTest.java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Tool; class MultipleInputsTest extends Configured implements Tool { @Override public int run(String[] args) throws Exception { 1. 给jobConf设名字, JobConf conf = new JobConf(getConf(), TestSeqInputOutput.class); conf.setJobName("job的名字"); String[] otherArgs = new GenericOptionParser(conf, args).getRemainingArgs(); if(otherArags.length < 3) { System.err.println("usage: 参数1, 2, 3"); System.exit(2); } *2. 设置输出class conf.setOutputKeyClass(LongWritable.class) conf.setOutputValueClass(Text.class); *3 MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, MultipleMapper.class); MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, MultipleMapper2.class); *4 FileOutputFormat.setOutputPath(conf, new Path(args[2])); JobClient.rnJob(conf); return 0; } public static void main(){ try{ int exitCode = ToolRunner.run(new TestSeqInputOutput(),args); System.exit(exitCode); }catch(e){...}}} MapReduce工作原理:MapReduce的类型与格式 笔记 数据库输入 实现类: DBInputFormat 注意使用: 因为连接过多,数据库无法承受 MapReduce工作原理:MapReduce的类型与格式 笔记 输出格式: 文本输出,二进制输出,多文件输出,数据库格式的输出 MapReduce工作原理:MapReduce的类型与格式 笔记 //设置reducerClass conf.setReducerClass(SeqReducer.class); //设置???InputFormat.Class和???mapper.class MultipleInputs.addInputPath(conf, path1, TextInputFormat.class, MultipleMapper1.class); //设置???InputFormat.Class和???mapper.class MultipleInputs.addInputPath(conf, path2, TextInputFormat.class, MultipleMapper2.class); //设置???OutputFormat.Class FileOutputFormat.setOutputPath(conf, path3); MapReduce工作原理:MapReduce的类型与格式 笔记 输出格式: 1. 文本输出,tOF 2. 二进制输出,sfOF: 实现类:sfAsTextOF ,sfAsBinaryOF,mapFileOF 3. 多文件输出,MOF MOs 4. 数据库格式的输出 dbOF 文本输出: 默认的 1.实现类: TextOutputFormat 2.默认的输出格式: 以 “key \t value"的方式输出行到文件中 二进制输出: 输出sequence格式的文件 1.基类:SequenceFileOutputFormat 2.实现类: 1)SequenceFileAsTextOutputFormat 2)MapFileOutputFormat: 使用了索引,实际上是空间换时间,查找更快 3)SequenceFileAsBinaryOutputFormat 多文件输出 1.MultipeOutputFormat 2.MultipeOutputs 3.两者的区别在于: MOs可以产生不同类型的输出 MapReduce工作原理:MapReduce的类型与格式 笔记 注意: 输出比较复杂,需要引入其他的概念,这里先不多做介绍。 MapReduce工作原理:MapReduce的类型与格式 笔记 mark 2017-07-19 00:28:46 相关笔记推荐
精品视频课程推荐
Hadoop实战-初级部分视频教程
深入浅出学Shrio视频教程
深入浅出学Zookeeper
Hadoop实战-中高级部分视频教程
深入浅出学Spring Web MVC视频教程
评论(95)
请登录后评论 登录
|