Scaling Distributed Machine Learning with the Parameter Server
Mu Li, David G. Andersen, Jun Woo Park, ALexander J. Smola, Amr Ahmed, Vanja Josifovski, James Long, Eugene J. Sita, Bor-Yiing Su
1 介绍
实际数据都是TB和PB级别,复杂模型的参数可以到达,全局共享的参数如果在所以worker机器之间传递,频繁的IO会带来的问题:占用大量带宽、sequential的算法会受到barrier的影响、容错性要求高。
1.1 贡献
除了第2点,其余都已经在2013年NIPS workshop那篇paper里面提到了,可以转到我那篇的分析。
- Efficient communication:节点间的通信是异步的,同步(迭代的操作)也不会阻塞运算。
- Flexible consistency models:一致性模型,放宽的一致性更好的因此了同步的代价和延迟。允许算法设计者可以在convergence rate和system efficiency之间做一个trade-off。
- Elastic Scalability:新增加一个节点不用重启整个系统,用分布式的哈希表来使得节点可以动态加入。
- Fault Tolerance and Durability:宕机是不可避免的,调度器的强制回收会加剧这个现象。用优化的数据复制架构,将数据copy保存在不同的服务器节点来提供快速恢复。(小于1s)而且client是相互独立的,新的client可以在某一个宕机之后自动启动,和MapReduce重新安排mapper一样。
- Ease of use:全剧共享参数是以向量和矩阵的形式标记的,对ml应用比以键值对或者表格形式存储的数据结构好用。高性能的多线程操作,比如向量-矩阵乘法,都能用来进行很好的支持。
1.2 工程上的难度
(稍微有点疑惑)每一个parameter server node保存部分的parameter,而每一个worker node只需要这些parameters的subset。
几个主要问题:
Communication通信:首先数值一般比较小,其次用键值对传输数据开销比较大。改进的方法就是用vector和matrix,或者tensor。 Fault tolerance容错:有一张图展示了在不同系统上跑不同算法的数量级,parameter server性能最好,比其他系统以数量级的优势超过。
1.3 相关工作
大公司都有在做parameter server框架,Amazon,Baidu,Facebook,Google,Microsoft和Yahoo!开源的项目则有YahooLDA,Petuum(Eric Xing)。Graphlab支持的在一个最佳模型上进行parameter synchronization。
第一代parameter server:提出来memcached的分布式同步存储机制。缺少灵活性和性能。仅使用键值对存储作为同步机制。YahooLDA增加了一个专门的服务器来改进这个机制,是用户能自定义地更新操作。
第二代parameter server:第二代则特定针对了应用application specific parameter server。Pentuum是第一个将这个平台一般化的。Petuum用了bounded delay model改进YahooLDA,但在线程模型上有了更多的限制。
第三代parameter server:性能、灵活性、容错性都能够做到。
已经有了很多解决方案,比如Mahout(基于Hadoop)MLI(基于Spark)两个都采纳了迭代的MapReduce框架,也都使用了同步的迭代模式,使得两个方法在迭代的机器学习算法中都会产生不一直的性能分布。为了克服这个缺点,distributed GraphLab通过图抽象的方法用异步通信的调度。但是这个方法缺少MapReduce框架弹性的拓展性,并且依赖于粗粒度coarse-grained的快照来恢复。
2 机器学习
training data可以分作三个部分:feature extraction,(工业界有专门叫feature engineering的东西)objective function和learning。
2.1 Goal
许多机器学习的算法都能通过objective function来表示,SVM、logistic regression、linear regression等等(Deep Learning有没有我还不知道,看到的小伙伴有知道的不?)
下面主要通过两个机器学习技术来展示parameter server的高效性。
2.2 Risk Minimization
其实一个式子就能表示清楚:
期中是loss function,是参数,是regularizer。然后Algorithm 1详细描述了Task Scheduler、Worker和Server之间是如何合作的。
每一个worker用自己分配的参数求解gradient,然后传给server来更新。
2.3 Generative Model
机器学习的第二大类算法中,训练样例的标签是未知的,即unsupervised learning。比如topic modeling:给定一个文档集合,推断每一个文档包含的topic。
实际问题中,数据量会非常大:上亿的用户和上十亿的文档。因为数据量大,这些算法在第一代parameter server上经济性比较能接受。topic model的一个挑战是:描述当前文档生成的estimate必须要共享。
一个topic modeling的解决方案是LDA。尽管统计模型会不一样,但是算法的描述还是和Algorithm 1一样。主要区别是,跟新的步骤不是gradient computation,而是文档可以被当前的模型多大程度描述的估计。这个计算需要访问辅助的元数据,这个元数据是在文档更新的时候会写到硬盘上。
辅助性的元数据里面是topic集合,是这个文档的每个单词对于的topic集合。参数是单词出现的相对频率。然后和前面描述的一样,每一个worker只需要存储它处理的文档里面所包含的单词的参数。
3 架构
parameter server由一个server group和多个worker group组成。每一个server node都保存部分的全局共享参数,然后server node相互之间会通信,来复制和(或)转移参数,一个server manager node监察每个server node。
每一个worker group跑一个应用,一个worker存储部分的训练数据,只和server node交流。每一个worker group有一个scheduler,负责分配任务和监察进程。
parameter server支持独立的参数命名空间。这使得一个worker group可以将他们的共享参数和其他group独立开来。几个worker groups可能有一样的namespace,这就表示他们几个group一起出来一个应用,比如deep learning来增加并行性。另外有可能是一个模型是online services consuming,然后新的数据加进来之后,就在其他worker group上进行训练。
3.1 (key,value)向量
可以用(key,value)向量来表示一个模型的参数。比如loss minimization问题,这个pair就是(feature ID, weight);对LDA,就是word ID、topic ID和count的结合。每一个条目都可以通过key本地、远程地读写。
parameter server通过这个提升:机器学习通常将模型当作一个线性代数对象,通过将这些对象当作稀疏的,parameter server可以提供和(key,value)一样的抽象函数,但是允许重要的优化操作,比如矩阵和向量的加法、乘法、求2范数等。
3.2 Range Push and Pull
数据是通过push和pull来传递的。在Algorithm 1中,每一个worker都push gradient到server上,然后从server那里pull更新后的数据。更高级的算法描述在Algorithm 3中,模式几乎一样,除了每次只通信一个范围内的key。
w.push(R,dest)是将在key range 内,把数据push到dest的node或者group。w.pull(R,dest)也类似。但是为什么range-based的方法能提速没有介绍。猜测是类似于batch的概念。
3.3 User-Defined Functions on the Server
server节点可以执行用户自定义的函数,比如proximal gradient method。
3.4 Asynchronous Tasks and Dependency
任务是由RPC发出的,可以是worker想server发出的push或者pull,也可以是scheduler向任意节点发送的用户自定义函数。task由subtask组成,比如Algorithm 1中的WorkerIterate就是包含了一个push和一个pull。
task的异步执行:caller可以在发出任务之后立即进行计算。caller在收到callee的回复后一个任务标记为完成。一个回复可以是用户自定义的函数回复、由pull请求的键值对、或者一个空的确认。callee会在subtask完成之后,task的call返回,标记为完成。
callee会并行计算tasks。caller则在task之间加入execute-after-finished的依赖。paper中的Figure 5给了一个很好的例子。
3.5 Flexible Consistency
上面提到的依赖关系图可以用于放宽一致性要求。独立的任务通过并行(CPU,disk,network)来提高效率,但也会带来节点间的数据不一致性。还是图中的例子。图中的例子会使得两次迭代的结果一样,但是有的算法会对数据不一致性不那么敏感,比如每次迭代只使用one block of data,那么只有一部分收到了影响。
算法的效率和系统的性能依赖于许多因素,比如特征关系、硬件容量、数据中心负载等等。parameter server提供很高的灵活性,通过给不同的一致性模型建立不同的任务依赖图,这个依赖图是有tasks之间的依赖性定义的DAG。这么三个例子:
- sequential consistency:其实就是同步的任务,只有上一个完成才能进行下一个
- eventual consistency:所有任务相互独立,那么就可以同时开始
- bounded delay:介于sequential和eventual中间,设置了一个最大延迟时间,只有之前的任务在时间完成了,才能开始新的任务。section 4会介绍这个方法的应用。这个是可以动态改变的。
3.6 用户自定义过滤器实现的一致性模型
对优化问题,比如梯度下降,并不是每次计算的梯度对最优化都是有价值的,可以通过自定义的filter过滤掉一些不必要的传输,压缩带宽cost。
用户自定义的过滤器可以更加细粒度地控制一致性。一个filter可以改变和有选择性的同步键值对。几个可以合起来提升data compression的filter:
- significantly modified filter:只有改变超过了阈值,才push
- random skip filter:随机选择
- KKT filter:采用了optimality condition,只在非最优情况下发送
- key chaching filter
- compressing filter:因为大部分传送的数据都是0、small int、和float,可以进行无损压缩
4 实现
4.1 Vector Clock
给定一个任务,它的依赖图很复杂,并且需要快速恢复,每一个键值对与一个vector clock联系。vector clock记录了每一个每一个节点包含这个键值对的时间。他用了追踪聚合状态或者拒接重复发送数据比较方便。但一个比较直接的实现方式是用O(nm)的空间,n个节点,m个参数,当n和m特别大的时候,十分消耗空间和带宽。
幸运的是,因为用来range-based通信,许多参数都有一样的时间戳:如果一个节点push了某个range内的参数给其他节点,那么这些range内的数据时间戳是一样的,也就可以将他们压缩到一个vector clock。比如,就表示, ,下标i表示第i个节点。
最初每一个节点只有一个range vector clock,所有在这个key space的参数都以0作为初始timesstamp。每一个range set最多将range划分成三份,可以参考Algorithm 2。
4.2 Messages
节点会发送信息给其他节点或者群组。一份message包含一个链表的键值对和相对应的range vector block:
因为有很多0的参数是可以不发送的(尤其是sparse情况),加上可以发送哈希后的list,而不是list本身,以此来进行压缩。用了snappy压缩库来压缩message。
4.3 Consistent Hashing
parameter server将key划分的范数和传统的分布式哈希表一样:key和server ID都被插入到了哈希环内。一个物理节点在环内通常有好几个“虚拟”服务器,来提高负载均衡和回复。(为毛?)
server manager负责ring management,所以其他的节点在本地高速缓存key划分,这样就能相互之间知道server的状态以及数据传给谁。
4.4 Replication and Consistency
每一个server节点都会保持它顺时针方向的k个邻居的key range。每次server从worker那里拿到数据并进行更新的时候,(大部分是aggrega)不妨更新完之后再replication,从而减少带宽使用。
注:这里有一个master和slave概念,对于任意一个server,保存的属于自己的数据就是master,保存的k分邻居的数据,那对于那几个邻居来说它就是slave。
4.5 Server Management
为了增加容错性,要支持节点的增加和删除功能。
从其他节点S将某一个范围内的数据取出需要两个步骤:首先S将所有在这个范围的键值对和对应的vector clock都复制;然后第二阶段,S再也不接受范围内的message,然后同时,将第一步发生期间的更新发送给这个新的节点。(有点类似于分布式数据库更新的base和delta表)
节点离开也类似,server manager将这个离开节点的数据复制到另外一个节点上。有意思的是,当前还没有将整个管理部分和YARN或者Mesos类似的资源管理器结合,如果结合起来很期待。这里应该是更加大量的工程性问题。
4.6 Worker Management
增加一个worker节点类似
- task scheduler给新的worker分配一个范围的数据
- 然后这个节点从文件系统或者其他worker那里读取数据(training data),并从server那里pull共享参数
- task scheduler会将新加的worker广播给其他worker,使他们释放一些数据
当一个worker离开的时候,task scheduler可能会用replacement。首先training instances特别多的时候,恢复一个worker比server复杂多,而且一个worker的一点数据对于最终机器学习的结果没有影响。因此算法设计者可能有时会倾向于继续而不管这个离开的worker。
5 Evaluation
Appendix
这一篇既然是发在OSDI上的,和NIPS相比,更偏向于系统和框架。