Hong's Notes

High Performance, High Availability and Scalable Architecture, ML, AI and Big Data

Yarn简述

Hadoop2.0引入了yarn(Yet Another Resource Negotiator)资源管理框架。1.0中的MapReduce计算框架变为运行在yarn上的一种application。

Yarn依然采用了master/slave结构,master是ResourceManager,负责整个集群的资源管理和调度,并且支持HA,slave是NodeManager,负责管理各子节点上的资源和任务。每个MapReduce作业提交给ResourceManager并被接受后,ResourceManager会通知某个NodeManager启动一个ApplicationMaster管理此作业的生命周期。

ResourceManager中的模块划分

Yarn中的大多数服务都是带状态的service实现,并通过事件驱动机制实现服务的状态转换和服务之间的交互。ResourceManager是yarn的核心组件,与NodeManager、ApplicationMaster、Client都有交互,提供了非常多的功能,下面基于hadoop2.7版本的实现,梳理一下ResourceManager中的重要service组件及其功能。

ResourceManager中按功能划分的service模块如下图所示。

ResourceManager中核心模块主要包括客户端交互模块、NodeManager管理模块、ApplicationMaster管理模块、Application管理模块、安全管理模块、以及资源管理模块(调度、预留)等。

各模块中的服务介绍

客户端交互模块:

  • AdminService

    • 管理员可通过此接口管理集群,如更新节点、更新ACL、更新队列等。内部有个EmbeddedElectorService,如果RM启用了自动HA,则通过这个service做leader election。
  • ClientRMService

    • 负责为客户端提供服务,是ApplicationClientProtocol协议的服务端。负责处理来自客户端的RPC请求,包括提交app、查询app运行状态、终止app等。
  • Webapp

    • 提供web页面服务,展示集群状态和资源使用情况。

NodeManager管理模块

  • NMLivelinessMonitor

    • 用于监控NM是否存活,若NM在一定时间内(默认10分钟)未上报心跳,则认为其挂了。
  • NodesListManager

    • 负责维护节点列表,并动态加载白名单(yarn.resourcemanager.nodes.include-path)和黑名单(yarn.resourcemanager.nodes.exlude-path)节点。
  • RMNodeLabelsManager

    • 负责节点的标签管理。
  • ResourceTrackerService

    • 负责与NodeManager通信,处理来自NodeManager的请求,包括注册NodeManager和节点心跳两种。接口定义在ResourceTracker中。

ApplicationMaster管理模块

  • AMLivelinessMonitor:两个实例

    • 用于监控ApplicationMaster是否正常,如果在指定时间内(默认10分钟)未收到AM的心跳,则认为其死掉了。
  • ApplicationMasterLauncher

    • 负责通知某个NodeManager启动或销毁ApplicationMaster。在app请求被接受后,与某个NodeManager通信,告知其为此app启动相应的ApplicationMaster。若app运行结束或被kill,则通知app所在NodeManager销毁ApplicationMaster。其内部也维护了一个阻塞队列,并有一个后台线程异步处理提交进来的启动ApplicationMaster的请求。
  • ApplicationMasterService

    • 负责与ApplicationMaster通信,是ApplicationMasterProtocol协议的服务端,ApplicationMaster在NodeManager上启动后通过此协议向ResourceManager注册自己,运行过程中向ResourceManager发送心跳,以及app运行结束后告知RM自己所在的container可以被释放了。

Application管理模块

  • RMAppManager

    • ResourceManager接受客户端提交的app后,会通过RMAppManager来触发启动app的事件RMAppEventType.START,具体启动app的工作由RMAppImpl实现。
  • ApplicationACLsManager

    • 负责app权限控制,包括查看和修改权限。
  • ContainerAllocationExpirer

    • 用于监听NodeManager上是否正常启动了分配给ApplicationMaster的container,若在指定时间未启动(默认10分钟),ResourceManager会强制回收该container。
  • RMApplicationHistoryWriter

    • 负责异步持久化Application运行中的相关日志,主要是Container、Application、ApplicationAttempt在启动和结束时的日志信息。

安全管理模块

  • RMSecretManagerService
    • 负责管理各种通信密钥,包括:
      • RM与NM通信的NMTokenSecretManagerInRM
      • RM与container通信的RMContainerTokenSecretManager
      • 客户端与AM通信的ClientToAMTokenSecretManagerInRM
      • AM与RM通信的AMRMTokenSecretManager
      • DelegationTokenRenewer
      • 启用了安全时,负责定时更新认证token。

资源管理模块

  • ResourceScheduler

    • 资源调度器,可通过yarn.resourcemanager.scheduler.class指定,ResourceManager默认使用的是org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler。
  • SchedulerEventDispatcher

    • 用于处理SchedulerEventType类型的事件,其内部维护了一个存储SchedulerEvent的阻塞队列,并由一个后台线程从队列中取出资源请求事件,再调用ResourceScheduler进行处理。
  • ReservationSystem

    • 资源预留系统,对应的实现有CapacityReservationSystem和FairReservationSystem。

此外,SystemMetricsPublisher负责发布RM的系统统计信息。AsyncDispatcher是中央事件处理分发器,ResourceManager启动时,通过它绑定了几种类型的事件的处理器,包括SchedulerEventType、RMAppEventType、ApplicationAttempt、RMAppAttemptEventType、RMNodeEventType、RMAppManagerEventType、AMLaunchEventType等。

上述各service在ResourceManager中的启动顺序为:

  1. AsyncDispatcher

  2. AdminService

  3. RMActiveServices:是个CompositeService(即service列表,ResourceManager本身就是一个CompositeService),用于管理ResourceManager中的“活动”服务(必须在active的ResourceManager上启动的服务,启用HA时,备份ResourceManager上不启动这些服务),包括以下(按启动顺序):

    • RMSecretManagerService

    • ContainerAllocationExpirer

    • AMLivelinessMonitor

    • RMNodeLabelsManager

    • RMApplicationHistoryWriter

    • SystemMetricsPublisher

    • NodesListManager

    • ResourceScheduler

    • SchedulerEventDispatcher

    • NMLivelinessMonitor

    • ResourceTrackerService

    • ApplicationMasterService

    • ClientRMService

    • ApplicationMasterLauncher

    • DelegationTokenRenewer

问题现象

前段时间,由于线上redis服务器的内存使用率达到了机器总内存的50%以上,导致内存数据的dump持久化一直失败。扩展到多台redis后,应用系统访问redis时,在业务量较少时,时不时会出现以下异常,当业务量较大,redis访问频率很高时,却不会发生这个异常,一时觉得很诡异。

redis.clients.jedis.exceptions.JedisConnectionException: It seems like server has closed the connection.
at redis.clients.util.RedisInputStream.readLine(RedisInputStream.java:90) ~[jedis-2.1.0.jar:na]
at redis.clients.jedis.Protocol.processInteger(Protocol.java:110) ~[jedis-2.1.0.jar:na]
at redis.clients.jedis.Protocol.process(Protocol.java:70) ~[jedis-2.1.0.jar:na]
at redis.clients.jedis.Protocol.read(Protocol.java:131) ~[jedis-2.1.0.jar:na]
at redis.clients.jedis.Connection.getIntegerReply(Connection.java:188) ~[jedis-2.1.0.jar:na]
at redis.clients.jedis.Jedis.sismember(Jedis.java:1266) ~[jedis-2.1.0.jar:na]

看提示,应该是服务端主动关闭了连接。查看了新上线的redis服务器的配置,有这么一项:

# Close the connection after a client is idle for N seconds (0 to disable)
timeout 120

这项配置指的是客户端连接空闲超过多少秒后,服务端主动关闭连接,默认值0表示服务端永远不主动关闭。而op人员把服务器端的超时时间设置为了120秒。

这就解释了发生这个异常的原因。客户端使用了一个连接池管理访问redis的所有连接,这些连接是长连接,当业务量较小时,客户端部分连接使用率较低,当两次使用之间的间隔超过120秒时,redis服务端就主动关闭了这个连接,而等客户端下次再使用这个连接对象时,发现服务端已经关闭了连接,进而报错。

于是,再查看访问redis的系统(客户端)的配置:

客户端使用的是jedis内置的连接池,看其源码本质上是基于apache commons-pool实现的,其中有一个eviction线程,用于回收idle对象,对于redis连接池来说,也就是回收空闲连接。

JedisPoolConfig类继承自GenericObjectPoolConfig并覆盖了几项关于eviction线程的配置,具体如下:

_timeBetweenEvictionRunsMillis:eviction线程的运行周期。默认是-1,表示不启动eviction线程。这里设置为30秒。

_minEvictableIdleTimeMillis:对象处于idle状态的最长时间,默认是30分钟,这里设置为60秒。

通过客户端的默认配置看,对象的最大空闲时长是小于服务端的配置的,应该不是配置上的问题了。

于是,继续看是不是客户端代码使用上的问题。追踪到客户端代码如下:

可见,客户端首先尝试从本线程的ThreadLocal对象中获取jedis对象,若获取不到,再从masterJedisPool中取得jedis对象并放入ThreadLocal对象以便下次使用,并且jedis对象使用完毕后,没有从ThreadLocal中清除,也没有returnResource给masterJedisPool。

因此,问题产生的原因就在于此。ThreadLocal中的这个jedis对象被取出后没有return,对于对象池来说是处于非idle状态,因此不会被对象池evict。当业务量大时,这个jedis会被频繁使用,服务端认为这个jedis对应的连接是非空闲的,或者空闲时间达不到120秒,不会主动关闭,所以没什么问题。然而当业务量小时,这个jedis使用频率很低,当两次之间的使用间隔超出120秒时,服务端会主动把这个jedis的连接关闭,第二次调用时,就会出现上面的报错。

从代码开发者的角度来说,这么做的目的是避免频繁从pool中获取jedis对象和return jedis对象以提高性能。

解决方案有两个:

  1. 在redis-cli下在线修改redis 的配置,把timeout改回为0,无需重启redis即可直接生效,但redis若重启,配置会恢复。

  2. 修改客户端代码,使用完jedis对象后,从ThreadLocal中清除,再返回给连接池。

出于改动成本考虑,先采用了第一种方案,在线修改redis配置后,报错不再出现。

组内的一个系统新上线了通过图片url上传图片到图片存储平台的功能。其中使用了httpclient,通过向图片存储平台发送MultipartPostMethod上传图片。当业务量较大时,10个处理线程满负荷运行,上传图片时,发现应用系统服务器的tcp连接数陡然升高,峰值能达到几万个tcp连接数!

排查系统代码并结合分析httpclient的源码发现,应用系统每次上传图片时,都会做new HttpClient()操作,这个操作内部默认使用的是SimpleHttpConnectionManager来管理http连接,而SimpleHttpConnectionManager有个默认字段alwaysClose=false,表示当外部程序调用了HttpMethod.releaseConnection()时并不会立即释放连接,而是保持这个连接并尝试用于后续的请求,在连接空闲一段时间后(默认3秒)才真正释放。

因此,当业务量较大,系统高并发发送post请求时,new出来的HttpClient对象会很多,而这个对象使用完毕后,而当中建立的client对象在短时间内并不会立即释放连接,因此,随着时间的积累,tcp连接数保持居高不下。

通过查看官方文档,建议在高并发环境下使用MultiThreadedHttpConnectionManager来管理httpclient,因此,我们将httpclient改为单例后,tcp连接数回复正常水平。

通过管理httpclient的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static HttpClient initHttpClient()
{
HttpConnectionManagerParams params = new HttpConnectionManagerParams();
//指定向每个host发起的最大连接数,默认是2,太少了
params.setDefaultMaxConnectionsPerHost(1000);
//指定总共发起的最大连接数,默认是20,太少了
params.setMaxTotalConnections(5000);
//连接超时时间-10s
params.setConnectionTimeout(60*1000);
//读取数据超时时间-60s
params.setSoTimeout(60*1000);

MultiThreadedHttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
manager.setParams(params);
return new HttpClient(manager);
}

背景

在分布式系统中,经常遇到这样一种场景:选举一个节点执行某一个任务,当此节点宕机后,其他节点可以接管并继续执行这个任务。由于各个节点运行的代码是一样的,彼此之间也是平等的,各个节点如何可以知道自己是否可以执行这个任务呢?当有节点宕机时,又如何判断自己是否可以接管任务呢?在我们的分布式任务调度系统中,需要选取调度器集群中的一个节点进行轮询任务状态,这里使用了zookeeper来实现一个统一的分布式锁,从而选出轮询节点。

原理

如图所示,每台服务器启动后,都在同一目录下建一个临时顺序节点(EPHEMERAL_SEQUENTIAL),并获取此目录下的所有节点信息,如果自己的序号是最小的,就认为获取到了锁,可以执行任务。若自己的节点不是最小的,就认为自己没有获取到锁,不执行任务,同时,在比自己小1个序号的节点上增加监听。当比自己小1个序号的节点发生变化的时候,再次检查自己是否是最小序号的节点,如果是则获取锁,否则继续监听比自己小1个序号的节点。

实现

以下是一个demo实现程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
public class DistributedExclusiveLock implements Watcher
{
private ZooKeeper zk;
private String lockDir = "/testlock";//锁节点所在zk的目录
private String lockSymbol = "_lock_";//锁节点标志
private String lockName;//锁节点前缀,构造锁时由外部传入
private String waitNodePath;//等待的前一个锁的节点名称
private String myNodePath;//当前锁
private CountDownLatch latch;//计数器
private String threadId;

/**
* 创建分布式锁
* @param lockName 竞争资源标志,lockName中不能包含单词lock
* @throws Exception
*/
public DistributedExclusiveLock(String zkServers, String lockName) throws Exception
{
//简单校验lockDir路径
if (!lockDir.startsWith("/"))
throw new Exception("LockDir Path must start with / character! lockDir=" + lockDir);
if (lockDir.endsWith("/"))
throw new Exception("LockDir Path must not end with / character! lockDir=" + lockDir);

this.lockName = lockName;
this.threadId = getThreadId();
// 创建一个与服务器的连接
try
{
zk = new ZooKeeper(zkServers, 3000, this);
createLockDirIfNecessary(lockDir);
} catch (Exception e) {
throw new Exception("Error while initializing DistributedExclusiveLock!" + e.getMessage(), e);
}
}

private String getThreadId()
{
return "Thread-" + Thread.currentThread().getId();
}

/**
* 在zk上建立lock目录,如果目录不存在,逐级创建节点
*/
private synchronized void createLockDirIfNecessary(String zkDir) throws KeeperException, InterruptedException
{
//zkDir是一级节点,如/cloudscheduler
if (zkDir.indexOf("/") == zkDir.lastIndexOf("/"))
{
Stat stat = zk.exists(zkDir, false);
if(stat == null){
// 创建一级节点
zk.create(zkDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
else //zkDir非一级节点
{
String parentDir = zkDir.substring(0, zkDir.lastIndexOf("/"));
if (zk.exists(parentDir, false) != null)
{ //如果父节点存在,建当前节点
Stat stat = zk.exists(zkDir, false);
if(stat == null){
// 创建非一级节点
zk.create(zkDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
else
{ //否则,先建父节点,再建当前节点
createLockDirIfNecessary(parentDir);
createLockDirIfNecessary(zkDir);
}
}
}

/**
* zookeeper节点的监视器
*/
@Override
public void process(WatchedEvent event)
{
if (event.getType() == EventType.NodeDeleted)
{
if (this.latch!=null)
this.latch.countDown();
try
{
List<String> childrenNodes = zk.getChildren(lockDir, false);
// 排序
Collections.sort(childrenNodes);
System.out.println("Node: " + event.getPath()
+ " change event is deleted! Current locked nodes:\n\t"
+ StringUtils.join(childrenNodes,"\n\t"));
}
catch (KeeperException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}

public boolean tryLock()
{
try
{
if(tryLockInner())
return true;
else
return waitForLockInner(waitNodePath);
}
catch (Exception e)
{
e.printStackTrace();
return false;
}
}

private boolean tryLockInner() throws Exception
{
try
{
if(lockName.contains(lockSymbol))
throw new Exception("lockName can not contains " + lockSymbol);
//创建临时子节点
myNodePath = zk.create(lockDir + "/" + lockName + lockSymbol, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(threadId + " created " + myNodePath);
//取出所有子节点
List<String> subNodes = zk.getChildren(lockDir, false);
//取出所有lockName的锁
List<String> lockedNodes = new ArrayList<String>();
for (String node : subNodes) {
String nodePrefix = node.split(lockSymbol)[0];
if(nodePrefix.equals(lockName)){//对锁名做个判断,前缀相同即为同一组锁
lockedNodes.add(node);
}
}
Collections.sort(lockedNodes);
System.out.println("Current locked nodes: \n\t" + StringUtils.join(lockedNodes, "\n\t"));
if(myNodePath.equals(lockDir + "/" + lockedNodes.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点,在List中的位置是自己的前一位
String myZnodeName = myNodePath.substring(myNodePath.lastIndexOf("/") + 1);
waitNodePath = lockDir + "/" + lockedNodes.get(lockedNodes.indexOf(myZnodeName)-1);
}
catch (KeeperException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return false;
}

private boolean waitForLockInner(String waitPath) throws InterruptedException, KeeperException {
Stat stat = zk.exists(waitPath, true);
//判断比自己小一个数的节点是否存在,如果存在则需等待锁,同时注册监听
if (stat != null)
{
System.out.println(threadId + " waiting for " + waitPath);
this.latch = new CountDownLatch(1);
this.latch.await(); //不加超时时间,无限等待
//
//waiting
//Zzzzz...
//still waiting
//
// 探测到节点变化,刷新节点信息
this.latch = null;
try
{
// 确认myNodePath是否真的是列表中的最小节点
List<String> childrenNodes = zk.getChildren(lockDir, false);
// 排序
Collections.sort(childrenNodes);
if(myNodePath.equals(lockDir + "/" + childrenNodes.get(0)))
return true;
else
{
// 说明waitNodePath是由于出现异常而挂掉的 , 更新waitNodePath
String thisNodeName = myNodePath.substring(myNodePath.lastIndexOf("/") + 1);
int index = childrenNodes.indexOf(thisNodeName);
waitNodePath = lockDir + "/" + childrenNodes.get(index - 1);
//重新等待锁
return waitForLockInner(waitNodePath);
}
}
catch (KeeperException e)
{
e.printStackTrace();
return false;
}
catch (InterruptedException e)
{
e.printStackTrace();
return false;
}
}
return true;
}

public void unlock() throws Exception
{
try
{
System.out.println(threadId + " unlock " + myNodePath);
zk.delete(myNodePath,-1);
myNodePath = null;
zk.close();
}
catch (InterruptedException e)
{
throw new Exception("Error while releasing lock! " + e.getMessage(), e);
}
catch (KeeperException e)
{
throw new Exception("Error while releasing lock! " + e.getMessage(), e);
}
}

public static void main(String[] args) throws Exception
{
//一个简单的测试
List<Thread> workers = new ArrayList<Thread>(10);
for (int i=1; i<10; ++i)
{
Thread thread = new Thread(new Runnable()
{
String zk = "10.12.10.169:2181,10.12.139.141:2181";
@Override
public void run()
{
try
{
DistributedExclusiveLock lock = new DistributedExclusiveLock(zk, "zkLock");
if (lock.tryLock());
{
String tid = "Thread-" + Thread.currentThread().getId();
int time = new Random().nextInt(5000);
System.out.println(tid + " gets lock and is working, sleep for " + time + " ms");
Thread.sleep(time);
lock.unlock();
System.out.println(tid + " releases lock");
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
});
thread.setDaemon(true);
workers.add(thread);
}

for (Thread t : workers)
{
t.start();
}
Thread.sleep(100000);
}
}

背景

在hadoop中,主从节点之间保持着心跳通信,用于传输节点状态信息、任务调度信息以及节点动作信息等等。 hdfs的namenode与datanode,mapreduce的jobtracker与tasktracker,hbase的hmaster与 regionserver之间的通信,都是基于hadoop RPC。Hadoop RPC是hadoop里非常基础的通信框架。hadoop 2.0以前hadoop RPC的数据序列化是通过实现自己定义的Writable接口实现,而从hadoop 2.0开始,数据的序列化工作交给了ProtocolBuffer去做。关于Hadoop RPC的实现原理已经有很多文章进行了详细的介绍(源码级强力分析hadoop的RPC机制Hadoop基于Protocol Buffer的RPC实现代码分析-Server端带有HA功能的Hadoop Client端RPC实现原理与代码分析),这里就不在赘述了。下面就直接引入问题和方案吧。

问题

工作中经常需要在定时任务系统上写一些定时任务,随着业务规模的增长和扩大,需要定时处理的任务越来越多,任务之间的执行间隔越来越小,某一时间段内(比如0点、整点或半点)执行的任务会越来越密集,只在一台机器上执行这些任务的话,会出现较大的风险:

  • 任务并发度较高时,单机的系统资源将成为瓶颈
  • 如果一个任务的运行占用了整个机器的大部分资源,比如sql查询耗费巨大内存和CPU资源,将直接影响其他任务的运行
  • 任务失败后,如果仍然在同一台节点自动重新执行,失败率较高
  • 机器宕机后,必须第一时间重启机器或重新部署定时任务系统,所有任务都不能按时执行
  • 等等

方案

可想而知的是,可以通过将定时任务系统进行分布式改造,使用多个节点执行任务,将任务分发到不同节点上进行处理,并且完善失败重试机制,从而提高系统稳定性,实现任务系统的高可靠。
既然是在多个节点之间分发任务,肯定得有个任务的管理者(主节点),在我们现有的系统中,也就是一套可以部署定时任务的web系统,任务代码更新后,部署好这套web系统,即可通过web页面设置定时任务并且进行调度(在单个节点上执行)。执行任务的节点(子节点)有多个以后,如何分发任务到子节点呢,我们可以把任务的信息封装成一个bean,通过RPC发布给子节点,子节点通过这个任务bean获得任务信息,并在指定的时刻执行任务。同时,子节点可以通过与主节点的心跳通信将节点状态和执行任务的情况告诉主节点。
这样其实就与hadoop mapreduce分发任务有点相似了,呵呵,这里主节点与子节点之间的通信,我们就可以通过Hadoop RPC框架来实现了,不同的是,我们分发的任务是定时任务,发布任务时需要将任务的定时信息一并发给子节点。

实现

单点的定时任务系统是基于Quartz的,在分布式环境下,可以继续基于Quartz进行改造,任务的定时信息可以通过Quartz中的JobDetail和Trigger对象来描述并封装,加上任务执行的入口类信息,再通过RPC由主节点发给子节点。子节点收到封装好的任务信息对象后,再构造JobDetail和Trigger,设置好启动时间后,通过入口类启动任务。下面是一个简单的demo。

Read more »

背景

Zookeeper是hadoop的子项目,是google的chubby的开源实现,是一个针对大规模分布式系统的可靠的分布式协调系统。Zookeeper一般部署在一个集群上,通过在集群间维护一个数据树,使得连接到集群的client能够获得统一的数据信息,比如系统公共配置信息、节点存活状态等等。因此,在互联网公司中,zookeeper被广泛运用于统一配置管理、名字服务、分布式同步等。

问题

我们看下这样一种场景:
前台系统每时每刻都生成大量数据,这些原生数据由后台系统处理完毕后再作他用,我们暂且不谈这些数据的存储形式,只关注如何能够尽可能高效的处理。举个例子,前台系统可能是微博的前端发布系统、搜索引擎上的广告投放系统,或者是任务发布系统,后台系统则可能是对微博和广告信息的审查系统,比如用户发的微博如果包含近期敏感信息则不予显示,若是任务,后台系统则负责处理任务具体的执行。
若数据量和任务量较小,单节点的后台系统或许可以处理得过来,但是如果数据量和任务量很大(比如新浪微博,龙年正月初一0点0分0秒,共有32312条微博同时发布),单节点的后台系统肯定吃不消,这时候,可想而知的是多节点同时处理前台过来的数据。
最简单的方法是,按消息id对后台节点数取模(msgid%server_num=mod),每个后台节点取自己那份数据进行处理,这就需要每个节点都知晓当前有多少个后台节点以及本节点所应取的mod数。但是,当某个节点宕机时,这个节点所应处理的数据无法被继续处理了,势必会造成阻塞,除非重新配置各节点上的参数,将节点数server_num减1,并修改各节点取数据的mod数。
毋庸置疑,这样非常麻烦!如果能够将这种配置信息(实际上是数据在节点间分配的控制信息)统一管理起来,在配置信息发生变化时,各个后台节点能够及时知晓其变化,就可以避免上述情况的发生。
因此,采用多节点处理数据时,有两个问题:
1.避免多个节点重复处理同一条数据,否则造成资源浪费。
2.不能有数据被遗漏处理,尤其是在有后台节点down掉的时候。
也就是说,采用多节点同时处理数据时,需要将数据隔离开,分别给不同的节点处理,而且在有节点宕机的情况下,所有数据也必须可以无误的被其他可用节点处理。如何做到这一点呢,使用zookeeper吧!

Read more »
0%