# Leaf:简述

说明,本文基于谢照东的《Leaf:美团点评分布式 ID 生成系统》,之所以有这样文章,是因为笔者发现谢照东的这篇文章和美团开源的 leaf(GitHub 地址:https://github.com/Meituan-Dianping/Leaf (opens new window))是有一些非常重要的出入的,尤其在涉及时钟回拨等问题。所以,笔者根据美团开源的 leaf 源码,写下了这篇文章。

为什么叫 leaf?因为天底下没有两片完全一样的树叶(德国哲学家、数学家莱布尼茨:There are no two identical leaves in the world),意味着每次通过 leaf 获取的 ID 肯定是唯一的。

首先,简单介绍一下如何使用 leaf。

  • 配置

leaf 是基于 springboot、以 HTTP 协议的方式提供获取分布式唯一 ID 的服务。

总计有两种模式:SnowflakeSegment

我们通过它的核心配置文件 leaf.properties 可知:

leaf.name=afei
# segment模式开关,这种模式依赖数据库
leaf.segment.enable=true
leaf.jdbc.url=jdbc:mysql://localhost:3306/leaf
leaf.jdbc.username=afei
leaf.jdbc.password=afei

# snowflake模式,这种模式依赖zookeeper
leaf.snowflake.enable=true
leaf.snowflake.zk.address=127.0.0.1:2181
leaf.snowflake.port=8686
  • DML

首先创建表 leaf_alloc(DDL 语句在 GitHub 首页可以找到):

CREATE DATABASE leaf
CREATE TABLE `leaf_alloc` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `biz_tag` varchar(128)  NOT NULL DEFAULT '',
  `max_id` bigint(20) NOT NULL DEFAULT '1',
  `step` int(11) NOT NULL,
  `description` varchar(256)  DEFAULT NULL,
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY (`biz_tag`)
) ENGINE=InnoDB;

然后需要插入几条初始化数据,假设支付服务和账户服务需要调用 leaf 获取唯一 ID,那么初始化数据的 SQL 如下:

insert into leaf_alloc(biz_tag, max_id, step, description) values('pay', 1, 2000, 'leaf'),('account', 1, 2000, 'leaf');
  • 启动

由于 leaf 基于 springboot 开发,所以启动它非常简单,运行 com.sankuai.inf.leaf.server.LeafServerApplication 即可。

  • 访问

Snowflake 和 Segment 两种获取唯一 ID 的模式的访问方式如下:

# Segment模式获取唯一ID
http://localhost:8080/api/segment/get/pay
http://localhost:8080/api/segment/get/account
# Snowflake模式获取唯一ID
http://localhost:8080/api/snowflake/get/pay
http://localhost:8080/api/snowflake/get/account
  • 监控

leaf 提供了非常简单的监控页面,能够让用户看到 Segment 模式下有哪些服务,以及这些服务当前获取唯一 ID 的位置。这个监控页面事实上就是可视化本地内存中的数据,所以一定要在 leaf 启动后获取过至少一次唯一 ID,其对应的数据才能正常展示,否则都是 0。访问地址是 http://localhost:8080/cache (opens new window),访问结果如下:

leaf-cache监控

接下来分别对 leaf 提供的两种模式进行深入剖析。

# Segment 模式

前面已经提到,Segment 模式依赖数据库。认识 leaf 的 Segment 模式之前,我们假设在不考虑并发能力的情况下如何通过数据库获取唯一 ID:利用 MySQL 的自增主键,而且 MyBatis 可以获取每次 insert 的逐渐 ID。

这种方案的优点如下:

  • 非常简单,利用现有数据库系统的功能实现,成本小,有 DBA 专业维护。
  • ID 号单调自增,可以实现一些对 ID 有特殊要求的业务。

缺点如下:

  • 强依赖 DB,当 DB 异常时整个系统不可用,属于致命问题。配置主从复制可以尽可能的增加可用性,但是数据一致性在特殊情况下难以保证。主从切换时的不一致可能会导致重复发号。
  • ID 发号性能瓶颈限制在单台 MySQL 的读写性能。

Segment 有“”的意思,leaf 这种模式意味着并不是每次获取唯一 ID 都需要操作数据库。所以,Segment 模式就是在前面提到的数据库方案基础之上进行了优化:

既然每次操作数据库性能有问题,那么我就每过 N 次才操作一次数据库。在这 N 次以内的访问,都只需要通过操作本地缓存获取。如此一来,性能就很高了。这个 N 值表示的范围就是 Segment 的意思。

不过还是有点瑕疵,因为每过 N 次都要操作一次数据库。如果恰好在这个时候并发较高,那么数据库操作就会阻塞,甚至出现超时,从而形成性能毛刺甚至降低 SLA。

怎么办?

leaf 的做法是将这个步骤提前并异步化,leaf 的 Segment 模式用一张图表示如下:

 leaf segment

如图所示:

  1. 设置 step 为 1000(可通过 DML 配置);
  2. 当发号期已经分发到 100 的时候(即取了步长 step 的 10%),就会触发一个异步操作去初始化下一个号段;
  3. 当 1 ~ 1000 用尽并且分配到 1100 的时候,又会触发异步操作去初始化又 1 个号段(2001 ~ 3000),以此类推;

通过这种方案设计,每次获取唯一 ID 都只需要操作内存即可。至于对数据库的操作,完全异步完成。

一些疑问点

这种方案是如何避免重启后分配重复的 ID 呢?

我们假设已经分配到 1122,这个 1122 又没有持久化到任何一个地方,数据库中只保存了(max_id=2001, step=1000),如果这时候 leaf 宕机。leaf 的做法是在第一次获取唯一 ID 的时候,会首先更新数据库跳到下一个号段(max_id=3001, step=1000),那么这时候获取的唯一 ID 就是 2001,至于 1123 ~ 2000 之前的 ID 全部被抛弃,不会被分配了。

Segment 模式有时钟回拨问题吗?

很明显没有,因为通过这种模式获取的 ID 没有任何时间属性,所以不存在时钟回拨问题。

新增服务需要多久生效?

假设我们通过 SQL 语句插入一个订单服务,那么要过多久订单服务才能通过请求地址http://localhost:8080/api/segment/get/order (opens new window)向 leaf 获取唯一 ID 呢?

insert into afei.leaf_alloc values('order', 1, 1000, 'leaf', now());

答案是最多 60s,因为 leaf 有一个 schedule 任务,间隔 60s 刷新本地缓存中的服务信息,假设刷新前只有[pay,account]两个服务,那么刷新后就有[pay,account,order]三个服务。核心源码如下:

private void updateCacheFromDbAtEveryMinute() {
    ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("check-idCache-thread");
            t.setDaemon(true);
            return t;
        }
    });
    service.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            updateCacheFromDb();
        }
    }, 60, 60, TimeUnit.SECONDS);
}

private void updateCacheFromDb() {
    logger.info("update cache from db");
    try {
        List<String> dbTags = dao.getAllTags();
        // 这里就会更新本地缓存中的接入服务列表信息(即leaf_alloc表中的biz_tag字段)
        ...
    }
    ...
}

# Snowflake 模式

leaf 的 Snowflake 模式,顾名思义,源自于 twitter 的 Snowflake 算法,

其 64 位构成如下,leaf 的 Snowflake 模式与原生 Snowflake 模式完全一致,都是采用 1+41+10+12 的模式,且不可配置,除非修改源码:

 snowflake mode

leaf 的 Snowflake 模式核心源码在com.sankuai.inf.leaf.snowflake.SnowflakeIDGenImpl中。接下来,我们看看该模式下,获取唯一 id 时调用的方法 get(String)的核心源码(部分省略):

// synchronized保证线程安全问题
public synchronized Result get(String key) {
    long timestamp = System.currentTimeMillis();
    // 如果时钟发生了回拨
    if (timestamp < lastTimestamp) {
        long offset = lastTimestamp - timestamp;
        if (offset <= 5) {
            // 如果回拨的时间在5ms以内,那么直接等待
            wait(offset << 1);
            timestamp = System.currentTimeMillis();
        } else {
            // 如果超过5ms,那么直接抛出异常
            return new Result(-3, Status.EXCEPTION);
        }
    }
    // 如果和上一次请求是同一毫秒以内,那么sequence+1
    if (lastTimestamp == timestamp) {
        sequence = (sequence + 1) & sequenceMask;
        if (sequence == 0) {
            //sequence为0的时候表示这一毫秒请求量超过1024,那么自旋等待下一毫秒
            sequence = RANDOM.nextInt(100);
            timestamp = tilNextMillis(lastTimestamp);
        }
    } else {
        //如果是新的一毫秒,那么从一个[0, 100)的随机数开始,之所以不是每次都从0开始,是因为防止低并发时获取的唯一ID都是偶数,如果用唯一ID作为分片键,可能导致数据倾斜
        sequence = RANDOM.nextInt(100);
    }
    lastTimestamp = timestamp;
    // 通过位运算计算此次生成的唯一ID
    long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
    return new Result(id, Status.SUCCESS);

}

由这段源码我们可知,leaf 的 Snowflake 模式并没有彻底解决时钟回拨的问题。当运行过程中,如果时钟回拨超过 5ms,依然会抛出异常。

那么,Snowflake 模式主要解决什么问题?很明显,是snowflake 中的 workerId 部分。当需要启动的 leaf 服务越来越多时,对其分配 workerId 是一件非常令人头疼的事情。我们要做的是,尽量让一件事情简单化,让用户无感知。百度的 UID 做到了(文末有相关阅读链接),leaf 也做到了!

leaf 的 Snowflake 模式是怎么做到的呢?很简单,通过 zookeeper 的PERSISTENT_SEQUENTIAL类型节点为每一个 leaf 实例生成一个递增的 workerId。以总计部署 4 个 leaf 实例为例:第 1 个 leaf 实例的 workerId 为 0,且根据该实例的 IP 地址和配置的 Port 值,即使接下来重启,workerId 仍然为 0;第 2 个 leaf 实例的 workerId 为 1;第 3 个 leaf 实例的 workerId 为 2;第 4 个 leaf 实例的 workerId 为 3,以此类推。leaf 持久化在 zookeeper 中的数据如下所示:

/snowflake/afei/forever/
    |--192.168.0.1
        |--0
    |--192.168.0.2
        |--1
    |--192.168.0.3
        |--2

我们可以看到这些数据的路径是:/snowflake/${leaf.name}/forever/${ip}:${port}。如此一来,对于所有部署的 leaf 实例,其获取到的 workerId 只跟它的 ip 和 port 有关。当然,由于其 workerId 占 10 位,所以,理论上 Leaf 服务实例数可以达到 1024 个(很明显,这个实例数上限几乎能够满足任何业务场景)。

最后,leaf 会定期(间隔周期是 3 秒)上报更新 timestamp。并且上报时,如果发现当前时间戳少于最后一次上报的时间戳,那么会放弃上报。之所以这么做的原因是,防止在 leaf 实例重启过程中,由于时钟回拨导致可能产生重复 ID 的问题

private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
    Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "schedule-upload-time");
            thread.setDaemon(true);
            return thread;
        }
    }).scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            updateNewData(curator, zk_AddressNode);
        }
    }, 1L, 3L, TimeUnit.SECONDS);//每3s上报数据

}

产生重复 ID 的场景:假设leaf 不定期上报当前时间戳,那么可能会发生这种情况导致产生重复的 ID。假设 leaf 重启前最后一次生成的唯一 ID 是(2019-05-31 08:15:00 + 12 + 168),如果这时候 leaf 重启,并且在启动之前,发生了时钟回拨(假设回拨了 20s,并且 leaf 实例启动花了 10s),那么在该 leaf 实例重启后,生成的 ID 是(2019-05-31 08:14:50 + 12 + 168)。很明显的是,这个 ID 以及接下来 10 秒内生成的 ID,都很可能是之前已经生成过的 ID。

我们再看一下谢照东在文章《Leaf:美团点评分布式 ID 生成系统》中,leaf 在 snowflake 模式下的流程图:

 leaf with zk

前面一部分逻辑(即 leaf 启动时,根据自己的 ip 和 port 从 zk 上获取 workerId)是和 GitHub 上开源代码一致的,但是笔者在图中用红色框住的就略有不同的:

  1. 校验每次周期性上传的本机时间必须大于以前上传的时间,如果失败,并没有任何动作(只是简单的 return)。
  2. 校验每次周期性上传的本机时间必须大于以前上传的时间,如果成功,直接上报。并不会得到 leaf_temporary 下所有节点,然后得到各个正在服务的机器时间并做平均值校验。

这块逻辑对应的源码就是 ScheduledUploadData()中调用的方法 updateNewData(curator, zk_AddressNode),其源码如下:

private void updateNewData(CuratorFramework curator, String path) {
    try {
        if (System.currentTimeMillis() < lastUpdateTime) {
            return;
        }
        curator.setData().forPath(path, buildData().getBytes());
        lastUpdateTime = System.currentTimeMillis();
    } catch (Exception e) {
        LOGGER.info("update init data error path is {} error is {}", path, e);
    }
}

笔者尝试找谢照东找到存在这样差异性的答案,因为笔者猜测可能是开源版本和美团内部版本可能存在出入。但是,毕竟时间过了很久,谢照东大神也记不大清晰了,可惜可惜!

作者:阿飞的博客

链接:https://www.jianshu.com/p/bd6b00e5f5ac (opens new window)