ketama一致性哈希
哈希算法是一种从任何一种数据中创建小的数字“指纹”的方法,解决如何将数据映射到固定槽问题。而一致性哈希算法是为了解决当槽数目变化的时候如何将映射结果的变化降到最小。那么为什么要将变化降到最小呢?可以考虑分布式缓存的场景,一个key对应一台server,如果一台server挂掉或者需要扩展一台新server,那么传统的哈希就会使原先的映射关系全部失效,而一致性哈希就是为了将影响降至最低,从而缓存没必要全部失效。
一致性hash算法 - consistent hashing图文并茂的描述了一致性哈希算法的出发点及解决方案,主要是使传统的哈希算法同时满足单调性和平衡性。
- 单调性是指当新加槽位后,key还是映射到原先的槽或是新加的槽,这样就将影响范围缩小到一个槽位。
- 平衡性是指尽可能平衡的分布到所有槽位中去。
Kentama
kentama是一致性哈希的一种实现,主要思想如下:
- 将server和key同时映射到环形连续统(0~2^32)
- 为了将key->server,找到第一个比key的映射值大的server的映射值,则key就映射到这台server上,如果没找到,则映射至第一台server
- 为了平衡性,可以添加一层虚拟节点到物理节点的映射,将key首先映射到虚拟节点,然后再映射到物理节点
其中的策略就是将哈希空间固定并分段,段分割点就是新的映射值,将映射到段中间的所有key都映射到段分割点。这样段分割点如果失效,那么只影响映射到该段分割点的key,而不影响其他key,添加段分割点是同样的逻辑。
memcache的client就是通过一致性哈希进行server的选择的,下面介绍一下其源码的实现,代码摘自xmemcached
public class KetamaMemcachedSessionLocator extends
AbstractMemcachedSessionLocator {
// 每一个物理节点的虚拟节点副本个数
static final int NUM_REPS = 160;
// 有序的段分割点哈希值与物理节点的映射关系
private transient volatile TreeMap<Long, List<Session>> ketamaSessions = new TreeMap<Long, List<Session>>();
// 具体的哈希算法
private final HashAlgorithm hashAlg;
// 最大重试次数
private volatile int maxTries;
private final Random random = new Random();
/**
* compatible with nginx-upstream-consistent,patched by wolfg1969
*/
static final int DEFAULT_PORT = 11211;
private final boolean cwNginxUpstreamConsistent;
public KetamaMemcachedSessionLocator() {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = false;
}
public KetamaMemcachedSessionLocator(boolean cwNginxUpstreamConsistent) {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
}
public KetamaMemcachedSessionLocator(HashAlgorithm alg) {
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = false;
}
public KetamaMemcachedSessionLocator(HashAlgorithm alg,
boolean cwNginxUpstreamConsistent) {
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
}
public KetamaMemcachedSessionLocator(List<Session> list, HashAlgorithm alg) {
super();
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = false;
this.buildMap(list, alg);
}
// 构建虚拟节点、物理节点的映射关系
private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
TreeMap<Long, List<Session>> sessionMap = new TreeMap<Long, List<Session>>();
for (Session session : list) {
String sockStr = null;
if (this.cwNginxUpstreamConsistent) {
InetSocketAddress serverAddress = session
.getRemoteSocketAddress();
sockStr = serverAddress.getAddress().getHostAddress();
if (serverAddress.getPort() != DEFAULT_PORT) {
sockStr = sockStr + ":" + serverAddress.getPort();
}
} else {
if (session instanceof MemcachedTCPSession) {
// Always use the first time resolved address.
sockStr = ((MemcachedTCPSession) session)
.getInetSocketAddressWrapper()
.getRemoteAddressStr();
}
if (sockStr == null) {
sockStr = String.valueOf(session.getRemoteSocketAddress());
}
}
// 160个虚拟节点副本
int numReps = NUM_REPS;
if (session instanceof MemcachedTCPSession) {
numReps *= ((MemcachedSession) session).getWeight();
}
// 关键过程
if (alg == HashAlgorithm.KETAMA_HASH) {
for (int i = 0; i < numReps / 4; i++) {
// 计算MD5摘要
byte[] digest = HashAlgorithm.computeMd5(sockStr + "-" + i);
for (int h = 0; h < 4; h++) {
// 计算虚拟节点的环形hash值
long k = (long) (digest[3 + h * 4] & 0xFF) << 24
| (long) (digest[2 + h * 4] & 0xFF) << 16
| (long) (digest[1 + h * 4] & 0xFF) << 8
| digest[h * 4] & 0xFF;
// 建立物理节点与虚拟节点映射关系
this.getSessionList(sessionMap, k).add(session);
}
}
} else {
for (int i = 0; i < numReps; i++) {
long key = alg.hash(sockStr + "-" + i);
this.getSessionList(sessionMap, key).add(session);
}
}
}
this.ketamaSessions = sessionMap;
this.maxTries = list.size();
}
private List<Session> getSessionList(
TreeMap<Long, List<Session>> sessionMap, long k) {
List<Session> sessionList = sessionMap.get(k);
if (sessionList == null) {
sessionList = new ArrayList<Session>();
sessionMap.put(k, sessionList);
}
return sessionList;
}
// 通过key找到真正的物理节点
public final Session getSessionByKey(final String key) {
if (this.ketamaSessions == null || this.ketamaSessions.size() == 0) {
return null;
}
// 同样的hash计算方式
long hash = this.hashAlg.hash(key);
Session rv = this.getSessionByHash(hash);
int tries = 0;
while (!this.failureMode && (rv == null || rv.isClosed())
&& tries++ < this.maxTries) {
hash = this.nextHash(hash, key, tries);
rv = this.getSessionByHash(hash);
}
return rv;
}
public final Session getSessionByHash(final long hash) {
TreeMap<Long, List<Session>> sessionMap = this.ketamaSessions;
if (sessionMap.size() == 0) {
return null;
}
Long resultHash = hash;
// 如果key的值没找到,则顺序找到最大的虚拟节点值
if (!sessionMap.containsKey(hash)) {
// Java 1.6 adds a ceilingKey method, but xmemcached is compatible
// with jdk5,So use tailMap method to do this.
// 由于sessionMap是有序的treeMap,可以高效的找到对应的虚拟节点
SortedMap<Long, List<Session>> tailMap = sessionMap.tailMap(hash);
if (tailMap.isEmpty()) {
// 如果对应的物理节点为空,则选择第一个虚拟节点
resultHash = sessionMap.firstKey();
} else {
resultHash = tailMap.firstKey();
}
}
List<Session> sessionList = sessionMap.get(resultHash);
if (sessionList == null || sessionList.size() == 0) {
return null;
}
int size = sessionList.size();
return sessionList.get(this.random.nextInt(size));
}
public final long nextHash(long hashVal, String key, int tries) {
long tmpKey = this.hashAlg.hash(tries + key);
hashVal += (int) (tmpKey ^ tmpKey >>> 32);
hashVal &= 0xffffffffL; /* truncate to 32-bits */
return hashVal;
}
public final void updateSessions(final Collection<Session> list) {
this.buildMap(list, this.hashAlg);
}
}
- 上一篇 Hbase架构
- 下一篇 分布式消息系统的若干问题