收藏私塾在线
 

欢迎您来到私塾在线网!   

请登录! 

免费注册 

交流首页 » Java »如何将 MongoDB MapReduce 速度提升 20 倍  XML
发表人 内容
zxb_fisha
[头像]

交流经验:
总积分:100
级别:普通会员
注册时间: 2013-12-01
文章: 6
离线

如何将 MongoDB MapReduce 速度提升 20 倍

分析在MongoDB中正成为越来越重要的话题,因为它在越来越多的大型项目中使用。人们厌倦了使用不同的软件来做分析(包括Hadoop),它们显然需要传输大量开销的数据。

MongoDB提供了两种内置分析数据的方法:Map Reduce和Aggregation框架。MR非常灵活,很容易部署。它通过分区工作良好,并允许大量输出。MR在MongoDB v2.4中,通过使用JavaScript引擎把Spider Monkey替换成V8,性能提升很多。老板抱怨它太慢了,尤其是和Agg框架(使用C++)相比。让我们看看能否从中榨出点果汁。

练习

让我们插入1千万条文档,每个文档包含一个从0到1000000的整数。这意味着平均有10个文档会具有相同的值。

> for (var i = 0; i < 10000000; ++i) db.uniques.insert( dim0: Math.floor(Math.random()*1000000) );
> db.uniques.findOne()
 "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 
> db.uniques.ensureIndex(dim0: 1)
> db.uniques.stats()

        "ns" : "test.uniques",
        "count" : 10000000,
        "size" : 360000052,
        "avgObjSize" : 36.0000052,
        "storageSize" : 582864896,
        "numExtents" : 18,
        "nindexes" : 2,
        "lastExtentSize" : 153874432,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 576040080,
        "indexSizes" : 
                "_id_" : 324456384,
                "dim0_1" : 251583696
        ,
        "ok" : 1

从这其中,我们想要计算出现的不同值的个数。可以用下列MR任务轻松完成这个工作:

> db.runCommand(
 mapreduce: "uniques", 
map: function ()  emit(this.dim0, 1); , 
reduce: function (key, values)  return Array.sum(values); , 
out: "mrout" )

        "result" : "mrout",
        "timeMillis" : 1161960,
        "counts" : 
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1059138,
                "output" : 999961
        ,
        "ok" : 1

正如你在输出内容中看到的,这耗费了大概1200秒(在EC2 M3实例上进行的测试)。有1千万个map,1百万个reduce,输出了999961个文档。结果就像下面这样:

> db.mrout.find()
 "_id" : 1, "value" : 10 
 "_id" : 2, "value" : 5 
 "_id" : 3, "value" : 6 
 "_id" : 4, "value" : 10 
 "_id" : 5, "value" : 9 
 "_id" : 6, "value" : 12 
 "_id" : 7, "value" : 5 
 "_id" : 8, "value" : 16 
 "_id" : 9, "value" : 10 
 "_id" : 10, "value" : 13 
...

使用排序

我在上一篇博文中提到了在MR中使用排序多么有益。这个特性很少被理解。在这个例子中,处理未排序的输入意味着MR引擎将得到随机顺序的值,在RAM中根本无法reduce。相反,它将不得不把所有文章写入一个临时收集的磁盘,然后按顺序读取并reduce。让我们看看使用排序是否有助:

> db.runCommand(
 mapreduce: "uniques", 
map: function ()  emit(this.dim0, 1); , 
reduce: function (key, values)  return Array.sum(values); , 
out: "mrout", 
sort: dim0: 1 )

        "result" : "mrout",
        "timeMillis" : 192589,
        "counts" : 
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1000372,
                "output" : 999961
        ,
        "ok" : 1

确实大有助益!我们下降到192秒,已经提升了6倍。reduce的数量基本相同,但现在它们在写入磁盘前,可以在RAM内完成。

使用多线程

MongoDB对单独的MR作业并不使用多线程——它仅仅对多作业使用多线程。但通过多核CPU,在单个服务器使用Hadoop风格来并行作业非常有优势。我们需要做的是把输入分成几块,通过各个块来加速一个MR作业。也许数据集有简单的方法来分割,但其他使用splitVector命令(不明确)可以使你很快的找到分割点:

> db.runCommand(splitVector: "test.uniques", keyPattern: dim0: 1, maxChunkSizeBytes: 32000000)

    "timeMillis" : 6006,
	"splitKeys" : [
		
			"dim0" : 18171
		,
		
			"dim0" : 36378
		,
		
			"dim0" : 54528
		,
		
			"dim0" : 72717
		,
…
		
			"dim0" : 963598
		,
		
			"dim0" : 981805
		
	],
	"ok" : 1
这个命令在超过1千万个文档中找到分割点仅仅需要花费5秒,很快!那么现在我们仅仅需要一个方法来创建多个MR作业。从一个应用服务器,使用多线程和为MR命令使用$gt/$It查询 相当简单。通过shell,你可以使用ScopedThread,使用方法如下:

> var t = new ScopedThread(mapred, 963598, 981805)
> t.start()
> t.join()

现在我们把一些快速运行的js代码放在一起,它们会产生4个线程(或者更多的线程),执行后呈现出下面的结果:

> var res = db.runCommand(splitVector: "test.uniques", keyPattern: dim0: 1, maxChunkSizeBytes: 32 *1024 * 1024 )
> var keys = res.splitKeys
> keys.length
39
> var mapred = function(min, max)  
return db.runCommand( mapreduce: "uniques", 
map: function ()  emit(this.dim0, 1); , 
reduce: function (key, values)  return Array.sum(values); , 
out: "mrout" + min, 
sort: dim0: 1, 
query:  dim0:  $gte: min, $lt: max   ) 
> var numThreads = 4
> var inc = Math.floor(keys.length / numThreads) + 1
> threads = []; for (var i = 0; i < numThreads; ++i)  var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() 
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max: "$maxKey" : 1 
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads)  var t = threads[i]; t.join(); printjson(t.returnData()); 
 
        "result" : "mrout0",
        "timeMillis" : 205790,
        "counts" : 
                "input" : 2750002,
                "emit" : 2750002,
                "reduce" : 274828,
                "output" : 274723
        ,
        "ok" : 1

 
        "result" : "mrout274736",
        "timeMillis" : 189868,
        "counts" : 
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250364,
                "output" : 250255
        ,
        "ok" : 1
 

        "result" : "mrout524997",
        "timeMillis" : 191449,
        "counts" : 
                "input" : 2500014,
                "emit" : 2500014,
                "reduce" : 250120,
                "output" : 250019
        ,
"ok" : 1


        "result" : "mrout775025",
        "timeMillis" : 184945,
        "counts" : 
                "input" : 2249971,
                "emit" : 2249971,
                "reduce" : 225057,
                "output" : 224964
        ,
        "ok" : 1

         "ok" : 1


        "result" : "mrout775025",
        "timeMillis" : 184945,
        "counts" : 
                "input" : 2249971,
                "emit" : 2249971,
                "reduce" : 225057,
                "output" : 224964
        ,
        "ok" : 1


第一个线程时间确实超过了其他的线程,但是平均每个线程仍然用了大约190s的时间.这意味着并没有一个线程快!这有点奇怪,自从用了‘top’,在某种程度上,你可以看到所有的内核运行情况。

使用多数据库

问题是在多线程之间会有很多锁竞争。在上锁时,MR并不是那么无私的(它每1000次读操作就会产生一次锁定),而且MR任务还会执行许多写操作,导致线程最终都会在等待另一个线程。由于每个MongoDB数据库都有私有锁,让我们尝试为每一个线程使用一个不同的输出数据库:

> var mapred = function(min, max)  
return db.runCommand( mapreduce: "uniques", 
map: function ()  emit(this.dim0, 1); , 
reduce: function (key, values)  return Array.sum(values); , 
out:  replace: "mrout" + min, db: "mrdb" + min , 
sort: dim0: 1, 
query:  dim0:  $gte: min, $lt: max   ) 
> threads = []; for (var i = 0; i < numThreads; ++i)  var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() 
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max: "$maxKey" : 1 
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads)  var t = threads[i]; t.join(); printjson(t.returnData()); 
...
 
        "result" : 
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        ,
        "timeMillis" : 105821,
        "counts" : 
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250364,
                "output" : 250255
        ,
        "ok" : 1

...

这才像话!我们现在降到了100秒,这意味着相比一个线程而言已经提升了2倍。还算差强人意吧。现在我们只有4个核所以只快了2倍,要是在8核CPU上将会快4倍,以此类推。

使用纯JavaScript模式

当把输入数据拆分到不同线程上去的时候,发生了一些有趣的事情:每个线程现在有大约250000个不同的值来输出,而不是1百万。这意味着我们可以使用“纯JS模式”,它可以通过使用jsMode:true来开启。开启后,MongoDB在处理时将不会把对象在JS和BSON之间来回翻译,相反,它使用一个限额500000个key的内部JS字典来化简所有对象。让我们看看这是否有用:

> var mapred = function(min, max)  
return db.runCommand( mapreduce: "uniques", 
map: function ()  emit(this.dim0, 1); , 
reduce: function (key, values)  return Array.sum(values); , 
out:  replace: "mrout" + min, db: "mrdb" + min , 
sort: dim0: 1, 
query:  dim0:  $gte: min, $lt: max  , 
jsMode: true ) 
> threads = []; for (var i = 0; i < numThreads; ++i)  var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() 
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max: "$maxKey" : 1 
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads)  var t = threads[i]; t.join(); printjson(t.returnData()); 
...
 
        "result" : 
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        ,
        "timeMillis" : 70507,
        "counts" : 
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250156,
                "output" : 250255
        ,
        "ok" : 1

...

现在我们降到了70秒,就搞定了任务!jsMode真心有用,尤其是当对象有很多字段的时候。这里只有一个数字字段就已经下降了30%。

MongoDB在2.6版本上的改进

在很早的2.6版本中,在任何的js函数调用的时候,我们就通过一段代码设置一个可选参数”args“。这种做法并不标准,不在使用。但是它确有留下来的原因(查看 SERVER-4654)。让我们从Git资源库中导入MongoDB,编译并运行进行测试:

...
 
        "result" : 
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        ,
        "timeMillis" : 62785,
        "counts" : 
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250156,
                "output" : 250255
        ,
        "ok" : 1

...

这是明显的提高了3倍的运行速度,时间降低到了60s,大约10-15%。这种变化也提高了整体JS引擎的堆消耗。

结语

回顾一下,对于同一个MR作业,我们开始时花费1200秒,最后花费60秒,提升了20倍!这项提高应该对大部分应用都有效,即使有些trick不太理想(例如,使用多种输出dbs/collections)。至少这能提供给人们思路,如何加速他们的MR作业,希望这些特征在将来会更加易于使用。接下来的将使得‘splitVector’命令更加有用,这张将在同一个数据库中提升多MR作业。干杯!


推广链接
精品视频课程推荐

Java数据结构和算法精讲版
本课程专注于数据结构和算法的内容,使用Java来进行代码示例,不空洞的讲解概念和理论,重点放在代码的实现和示例上。 从零开始、全面系统、成体系的讲解数据结构和基本算法,循序渐进的讲述构建软件系统所常见的数据结构和算法。

Hadoop实战-初级部分视频教程
Hadoop初级精品课程,帮助学员快速掌握Hadoop入门到上手开发,并掌握一定的开发技巧。通过Hadoop初级课程,学员可以掌握基本的Hadoop 原理,Hadoop环境搭建,Hadoop Shell,Hadoop HDFS基本操作和编程,Hadoop Mapreduce编程。

深入浅出学Shrio视频教程
内容概述:Shiro是目前最热门、最易用、功能超强大的Java权限管理框架,强烈推荐,每个项目都必备的权限管理技术!通过本课程,你将从零开始直到彻底掌握Shiro的相关开发知识,达到可以进行实际项目开发的能力。包括:权限管理基础、Shiro入门、配置、身份认证、授权、Realms、Session管理、和Spring的集成、Web、Cache等众多开发细节技术 技术要点:源码级分析Shiro的授权过程、自定义开发Realm、多个Realms的开发配置、自定义开发AuthenticationStrategy、自定义开发自定义SessionDAO、和Struts2+Spring3的集成(包括修正struts2的bug)、Shiro和SpringMVC+Spring3的集成、包装使用其他的Cache框架、缓存数据同步更新的解决方案等等实际开发中常用的内容

Hadoop实战-中高级部分视频教程
Hadoop中高级精品课程,帮助学员快速掌握Hadoop HDFS的原理;MapReduce的原理;MapReduce高级编程;Hadoop的IO机制,如序列化、压缩;Hadoop的RPC,RESTFul API等高级特性;彻底理解Hadoop,成为一名合格的云计算开发者。并掌握一些云端基本的运维知识,从而实现开发和运维兼修的高级人才。

Ajax+JSON基础实战视频教程
数据校验、Javascript模拟多线程、下拉列表联动、操作XML、AJAX结合JSON的操作、Json-lib的使用

 
交流首页 » Java
前往:   

关于我们 | 联系我们 | 用户协议 | 私塾在线服务协议 | 版权声明 | 隐私保护

版权所有 Copyright(C)2009-2012 私塾在线学习网