Nacos Naming Server源码分析-distro协议实现

向Nacos注册服务,存在两种类型。临时(Ephemeral)服务和持久(Persistent)服务。服务信息的存储和节点间同步,分别基于Distro协议和Raft协议。

Distro协议

可以从DistroClientComponentRegistry入手去阅读源代码,下面这些成员变量,都是用来实现协议的。

  1. ServerMemberManager 服务端成员管理器,有3种模式分别是:从磁盘文件读取,从指定地址读取,单点模式
  2. DistroProtocol
  3. DistroComponentHolder 各组件组合器
  4. DistroTaskEngineHolder task任务异步执行器

DistroProtocol

看下来主要有两个功能点

1
2
3
4
5
6
7
8
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask(); // 1
startLoadTask(); // 2
}

startVerifyTask()

startVerifyTask()
-> DistroVerifyTimedTask#run()
-> DistroVerifyExecuteTask#run()

发送方,发送:
获取当前节点所有客户端信息,获取所有Nacos Server节点。将客户端信息发送到所有节点,进行校验。我们看下具体校验逻辑是啥?

接收方,校验逻辑:
DistroDataRequestHandler#handle()
->handleVerify(
)
->DistroProtocol#onVerify()
->DistroClientDataProcessor#processVerifyData(
)
->EphemeralIpPortClientManager#verifyClient(*)

比较一下内存中版本号,如果版本号不一致,就更新内存中服务实例信息。

发送方,处理校验结果:
DistroVerifyCallbackWrapper#onResponse(*)

1
2
3
4
5
6
7
8
9
10
11
public void onResponse(Response response) {
if (checkResponse(response)) {
NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
distroCallback.onSuccess();
} else {
Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer)); //往下看
NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
distroCallback.onFailed(null);
}
}

除了记录log和metric,就是发布了一个ClientEvent.ClientVerifyFailedEvent事件。该事件最终触发同步数据请求,发送到校验失败的服务器。
收到请求后的处理逻辑在这里:
DistroDataRequestHandler#handle()
->handleSyncData(
)
-> DistroProtocol#onReceive()
-> DistroClientDataProcessor#handlerClientSyncData(
) ->upgradeClient(*)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
Set<Service> syncedService = new HashSet<>();
// process batch instance sync logic
processBatchInstanceDistroData(syncedService, client, clientSyncData);
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();

for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getMetadataId(), false));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));//更新客户端信息版本号
}

这里显示将收到的数据进行封装,然后维护更新内存种数据。同时发出了3个事件:

  1. ClientRegisterServiceEvent //通知订阅者更新服务实例信息
  2. InstanceMetadataEvent //维护更新内存的Metadata
  3. ClientDeregisterServiceEvent //维护内存数据,通知订阅者更新服务信息

startLoadTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));//从远端拉取一次全量数据
}
}
}

Nacos Naming Server源码分析-服务订阅

服务订阅的目的是为了,能够第一时间感知到被调用服务的实例变化,从而能够及时更新本地缓存,避免调用失败。

subscribe和unsubscribe的逻辑,只有临时实例才有,永久实例是不存在订阅逻辑的。所以还是比较推荐用临时类型的服务注册,这也是官方客户端默认的类型

看一下服务调用的逻辑如何实现的:

SubscribeServiceRequestHandler#handle(*)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
namespaceId, groupedServiceName, 0, request.getClusters());
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());//做了按照cluster过滤。并且还存在防止服务实例过少,被调用方打爆的保护措施
if (request.isSubscribe()) {
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId()); //继续往下看
NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
} else {
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

-> EphemeralClientOperationServiceImpl#subscribe(*)

1
2
3
4
5
6
7
8
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
checkClientIsLegal(client, clientId);
client.addServiceSubscriber(singleton, subscriber); //继续往下看
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

和上一篇一样,大部头的逻辑都是通过事件机制驱动的:

  1. SubscribeServiceTraceEvent //处理逻辑留给用户进行扩展,所以没啥好讲的
  2. ClientOperationEvent.ClientSubscribeServiceEvent 处理函数如下
    1
    2
    3
    4
    5
    6
    7
    8
    //ClientServiceIndexesManager.class
    private void addSubscriberIndexes(Service service, String clientId) {
    Set<String> clientIds = subscriberIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
    // Fix #5404, Only first time add need notify event.
    if (clientIds.add(clientId)) {//客户端第一次订阅服务,才会触发事件
    NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
    }
    }
  3. ServiceEvent.ServiceSubscribedEvent
    这里也就是客户端第一次订阅该服务,才会触发该事件,做一次服务实例信息推送。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    //NamingSubscriberServiceV2Impl.class
    public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
    // If service changed, push to all subscribers.
    ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
    Service service = serviceChangedEvent.getService();
    delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
    MetricsMonitor.incrementServiceChangeCount(service);
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
    // If service is subscribed by one client, only push this client.
    ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
    Service service = subscribedEvent.getService();
    delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
    subscribedEvent.getClientId()));
    }
    }

总结:

  1. 服务订阅功能的目的是避免服务调用失败
  2. 服务订阅第一次发起,服务端会全量推一次订阅服务所有实例信息

Nacos Naming Server源码分析-服务注册注销

之前分析客户端的代码,发现Naming相关的后端调用主要有这几个,代码为NamingClientProxyDelegate这个class:

  1. registerService, deregisterService (grpc和http 两种实现)
  2. subscribe, unsubscribe grpc实现
  3. getServiceList grpc实现

我们先去看Naming Server的grpc实现

registerService

代码调用链路,grpc方式注册的服务,默认为临时服务。

  1. InstanceRequestHandler#handle(InstanceRequest request, RequestMeta meta)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
    Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
    InstanceUtil.setInstanceIdIfEmpty(request.getInstance(), service.getGroupedServiceName());
    switch (request.getType()) {
    case NamingRemoteConstants.REGISTER_INSTANCE:
    return registerInstance(service, request, meta); // 此处继续看下去
    case NamingRemoteConstants.DE_REGISTER_INSTANCE:
    return deregisterInstance(service, request, meta);
    default:
    throw new NacosException(NacosException.INVALID_PARAM,
    String.format("Unsupported request type %s", request.getType()));
    }
    }
  2. InstanceRequestHandler#registerInstance(Service service, InstanceRequest request, RequestMeta meta)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
    throws NacosException {
    clientOperationService
    .registerInstance(service, request.getInstance(), meta.getConnectionId()); // 此处继续看下去
    NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
    meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
    request.getInstance().getIp(), request.getInstance().getPort()));
    return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
  3. EphemeralClientOperationServiceImpl#registerInstance(Service service, Instance instance, String clientId)
    默认注册的临时服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);//校验实例是否过期

Service singleton = ServiceManager
.getInstance().getSingleton(service); //获取单例服务
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);//获取已经连接的客户端信息
checkClientIsLegal(client, clientId); //校验是否是临时实例
InstancePublishInfo instanceInfo = getPublishInfo(instance);//增加一些额外信息,对象转换
client.addServiceInstance(singleton, instanceInfo);// 此处继续看下去
client.setLastUpdatedTime();
client.recalculateRevision();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
  1. AbstractClient#addServiceInstance(Service service, InstancePublishInfo instanceInfo)
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
InstancePublishInfo old = publishers.put(service, instancePublishInfo);
MetricsMonitor.incrementIpCountWithBatchRegister(old, (BatchInstancePublishInfo) instancePublishInfo);
} else {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}

这里最终将实例信息,保存到一个map,做了metric统计和发布一个事件,上述列出的代码总共发布了4个事件:

  1. RegisterInstanceTraceEvent 这种TraceEvent类事件作为Nacos的扩展机制使用,需要自己定制处理逻辑
  2. ClientOperationEvent.ClientRegisterServiceEvent
  3. MetadataEvent.InstanceMetadataEvent
  4. ClientEvent.ClientChangedEvent

Read More

Nacos权限模块整合方案

Nacos是阿里巴巴开源的,用于服务发现和配置管理的中间件。配置中心已经使用Apollo,所以我们只需要使用服务发现能力即可。

问题

学习研究Nacos过程中,发现如下几点:

  1. Nacos的支持插件机制,包括权限模块
  2. Nacos管理后台,支持使用LDAP的方式对接已有权限系统
  3. Spring-cloud-nacos客户端需要配置用户名和密码,才能正常注册

第2点是为了用户方便使用公司内部账号访问Nacos。但这结合第3点使用就出现了冲突。假设一个部门多个人同时进行开发,这样某个人的用户名和密码就会暴露给了其他开发人员。

1
2
3
spring.cloud.nacos.discovery.namespace=provider
spring.cloud.nacos.discovery.username=test
spring.cloud.nacos.discovery.password=123456

Nacos本地身份验证和Ldap身份验证的区别

翻了一下源码,这两个方式,主要区别在于,ldap方式会先用Nacos本地身份验证,如果验证失败,才会使用ldap方式验证。

manager类涉及到身份认证,区别在上述已经说明。authPluginService涉及权限验证,两种方式代码完全是一样的,不存在差别。
鉴权模块类图

Read More

Go语言协程调度器GPM模型

为了提高Go程序对于CPU的利用率。Go语言的设计者,设计了GPM模型,并且实现了一个高效的协程调度器。

协程Goroutine

从操作系统层面去看,一般可以把线程分为“用户态”和“内核态”。“用户态”可以理解为编程语言实现的线程,而“内核态”可以理解为操作系统实现的线程。“用户态”线程必须和“内核态”线程绑定,才能正常执行。
一个线程中的用户态和内核态

GPM模型

Go的Goroutine即为“用户态”线程的Go语言实现。而管理Goroutine和“内核态”线程绑定关系的管理器,被称为“协程调度器”。“协程调度器”的模型有三个组件组成:G(协程Goroutine)、M(线程Thread)、P(处理器Processor)。
GPM模型

Read More

Loki日志平台

Loki是一个Grafana的开源项目,用以低成本的构建日志系统。Loki只对日志的MetaData构建索引,日志的内容则会进行压缩存储。后端支持多种存储,官方比较推荐使用分布式对象存储MinIO。

部署方式

支持多种部署方式:包括k8s、docker部署,还有windows、linux、mac二进制部署。
还支持多种部署架构:单节点单磁盘、单节点多磁盘、多节点多磁盘。正式环境首选多节点多磁盘部署,具有高可用,企业级性能和可扩展性。由于高可用的需要,需要磁盘的数量需要为4的整数倍。
如下步骤都在centos7.9发行版进行

Read More

[ RocketMQ源码阅读 7 ] CommitLog文件刷盘方式

之前我们进行RocketMQ的搭建,其中有一个参数是用来配置刷盘方式的。存在“同步”和“异步”两种方式。

1
flushDiskType=ASYNC_FLUSH|SYNC_FLUSH

和刷新磁盘逻辑相关的代码可以从这里开始看DefaultFlushManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DefaultFlushManager implements FlushManager {
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitRealTimeService;

public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new CommitLog.GroupCommitService();//同步
} else {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();//异步
}
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.start();
}
}

从构造函数可以看到,要理解刷盘的行为,需要搞懂GroupCommitService同步刷盘,FlushRealTimeService 异步刷盘和CommitRealTimeService这三个类的名称取的不是很便于记忆和理解。

为了理解刷盘操作,我们去看更上一层的逻辑。在Broker接收到客户端的消息写入请求,并完成一系列的解析、校验放入内存等工作后。后续就需要将消息持久化到磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {//同步刷盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {//消息中是否存在需要等待消息落盘的属性
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
service.putRequest(request);//提交刷盘请求,将request存入队列后,会立马调用一次wakeup()
return request.future(); //返回刷盘请求,异步等待句柄
} else {
service.wakeup();//唤醒刷盘线程
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {//按照配置不同,分别唤醒不同的刷盘线程
flushCommitLogService.wakeup();
} else {
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}

GroupCommitService和FlushRealTimeService的主要区别在于调用flush传入的刷新数据页数(RocketMQ内部逻辑概念,和计算机系统的页无关)。GroupCommitService每次都做刷新都传入0,FlushRealTimeService则按照规则进行计算出页数。我这边按照原逻辑改写的容易理解的伪代码:

1
2
3
4
5
int flushPhysicQueueLeastPages = 4;
if (currentTimeMillis >= (lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
flushPhysicQueueLeastPages = 0;
}
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

这段代码避免了短时间内进行多次全量刷盘,从而提高了刷盘效率和性能呢。正因为异步刷盘不是每次都是全量刷盘,从这个角度来看才会被称为异步刷盘,其实本质上都是异步进行刷盘的。

transientStorePoolEnable

我们在上述刷盘代码中看到了此配置项。该配置项是为了提高IO性能,但是在Broker JVM进程异常退出的时候增加丢消息的风险。感兴趣的同学可以看这篇文章

[ RocketMQ源码阅读 6 ] Broker磁盘文件格式与作用

Broker的功能点很多,安装程序启动的顺序去看源码,发现代码量比之前的组件要大很多。阅读过程中发现Broker会去持久化一些配置,并且会将消息数据存储在磁盘上。
整理和检索了网上的一些资料,列出了这些文件和相应的作用,如下。

  • store
    • commitlog
      • 000000000
      • xxxxxxxxxx
    • compaction
      • compactionLog
        • {topic}
          • 0
          • 1
          • {queueId}
      • compactionCq
        • {topic}
          • 0
          • 1
          • {queueId}
    • config
      • delayOffset.json
      • broker.properties
      • topics.json
      • topicQueueMapping.json
      • consumerOffset.json
      • lmqConsumerOffset.json
      • consumerOrderInfo.json
      • subscriptionGroup.json
      • timercheck
      • timermetrics
      • consumerFilter.json
      • messageRequestMode.json
      • tieredStoreMetadata.json
    • consumequeue
      • {topic}
        • 0
          • 00000000000000000000
        • 1
        • {queueId}
    • index
      • 20240305101010000
    • abort
    • checkpoint
    • lock
    • timerwheel
    • timerlog
      • 00000000000000000000

Read More